diff --git a/connection.go b/connection.go index 09fd68b..a6b2291 100644 --- a/connection.go +++ b/connection.go @@ -1,12 +1,13 @@ package nostr import ( + "bytes" "context" "crypto/tls" "errors" - "fmt" "io" "net/http" + "sync/atomic" "time" ws "github.com/coder/websocket" @@ -14,52 +15,112 @@ import ( // Connection represents a websocket connection to a Nostr relay. type Connection struct { - conn *ws.Conn + conn *ws.Conn + writeQueue chan writeRequest + closed *atomic.Bool + closedNotify chan struct{} +} + +type writeRequest struct { + msg []byte + answer chan error } // NewConnection creates a new websocket connection to a Nostr relay. -func NewConnection(ctx context.Context, url string, requestHeader http.Header, tlsConfig *tls.Config) (*Connection, error) { +func NewConnection( + ctx context.Context, + url string, + handleMessage func(string), + requestHeader http.Header, + tlsConfig *tls.Config, +) (*Connection, error) { c, _, err := ws.Dial(ctx, url, getConnectionOptions(requestHeader, tlsConfig)) if err != nil { return nil, err } - c.SetReadLimit(2 << 24) // 33MB - return &Connection{ - conn: c, - }, nil -} + // this will tell if the connection is closed -// WriteMessage writes arbitrary bytes to the websocket connection. -func (c *Connection) WriteMessage(ctx context.Context, data []byte) error { - if err := c.conn.Write(ctx, ws.MessageText, data); err != nil { - return fmt.Errorf("failed to write message: %w", err) + // ping every 29 seconds + ticker := time.NewTicker(29 * time.Second) + + // main websocket loop + writeQueue := make(chan writeRequest) + readQueue := make(chan string) + + conn := &Connection{ + conn: c, + writeQueue: writeQueue, + closed: &atomic.Bool{}, + closedNotify: make(chan struct{}), } - return nil + go func() { + for { + select { + case <-ctx.Done(): + conn.doClose(ws.StatusNormalClosure, "") + return + case <-ticker.C: + ctx, cancel := context.WithTimeoutCause(ctx, time.Millisecond*800, errors.New("ping took too long")) + err := c.Ping(ctx) + cancel() + if err != nil { + conn.doClose(ws.StatusAbnormalClosure, "ping took too long") + return + } + case wr := <-writeQueue: + debugLogf("{%s} sending %v\n", url, string(wr.msg)) + ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("write took too long")) + err := c.Write(ctx, ws.MessageText, wr.msg) + cancel() + if err != nil { + conn.doClose(ws.StatusAbnormalClosure, "write took too long") + if wr.answer != nil { + wr.answer <- err + } + return + } + if wr.answer != nil { + close(wr.answer) + } + case msg := <-readQueue: + debugLogf("{%s} received %v\n", url, msg) + handleMessage(msg) + } + } + }() + + // read loop -- loops back to the main loop + go func() { + buf := new(bytes.Buffer) + + for { + buf.Reset() + + _, reader, err := c.Reader(ctx) + if err != nil { + conn.doClose(ws.StatusAbnormalClosure, "failed to get reader") + return + } + if _, err := io.Copy(buf, reader); err != nil { + conn.doClose(ws.StatusAbnormalClosure, "failed to read") + return + } + + readQueue <- string(buf.Bytes()) + } + }() + + return conn, nil } -// ReadMessage reads arbitrary bytes from the websocket connection into the provided buffer. -func (c *Connection) ReadMessage(ctx context.Context, buf io.Writer) error { - _, reader, err := c.conn.Reader(ctx) - if err != nil { - return fmt.Errorf("conn reader: %w", err) +func (c *Connection) doClose(code ws.StatusCode, reason string) { + wasClosed := c.closed.Swap(true) + if !wasClosed { + c.conn.Close(code, reason) + close(c.closedNotify) + close(c.writeQueue) } - if _, err := io.Copy(buf, reader); err != nil { - return fmt.Errorf("failed to read message: %w", err) - } - return nil -} - -// Close closes the websocket connection. -func (c *Connection) Close() error { - return c.conn.Close(ws.StatusNormalClosure, "") -} - -// Ping sends a ping message to the websocket connection. -func (c *Connection) Ping(ctx context.Context) error { - ctx, cancel := context.WithTimeoutCause(ctx, time.Millisecond*800, errors.New("ping took too long")) - defer cancel() - return c.conn.Ping(ctx) } diff --git a/count_test.go b/count_test.go index aeda7da..9e27543 100644 --- a/count_test.go +++ b/count_test.go @@ -8,7 +8,7 @@ import ( ) func TestCount(t *testing.T) { - const RELAY = "wss://chorus.mikedilger.com:444" + const RELAY = "wss://chorus.pjv.me" rl := mustRelayConnect(t, RELAY) defer rl.Close() diff --git a/envelopes.go b/envelopes.go index 28dab48..2284e50 100644 --- a/envelopes.go +++ b/envelopes.go @@ -13,22 +13,19 @@ import ( "github.com/tidwall/gjson" ) -var UnknownLabel = errors.New("unknown envelope label") +var ( + UnknownLabel = errors.New("unknown envelope label") + InvalidJsonEnvelope = errors.New("invalid json envelope") +) -type MessageParser interface { - // ParseMessage parses a message into an Envelope. - ParseMessage(string) (Envelope, error) -} - -// Deprecated: use NewMessageParser instead -func ParseMessage(message string) Envelope { +func ParseMessage(message string) (Envelope, error) { firstQuote := strings.IndexByte(message, '"') if firstQuote == -1 { - return nil + return nil, InvalidJsonEnvelope } secondQuote := strings.IndexByte(message[firstQuote+1:], '"') if secondQuote == -1 { - return nil + return nil, InvalidJsonEnvelope } label := message[firstQuote+1 : firstQuote+1+secondQuote] @@ -56,14 +53,14 @@ func ParseMessage(message string) Envelope { x := CloseEnvelope("") v = &x default: - return nil + return nil, UnknownLabel } if err := v.FromJSON(message); err != nil { - return nil + return nil, err } - return v + return v, nil } // Envelope is the interface for all nostr message envelopes. @@ -124,10 +121,14 @@ func (v EventEnvelope) MarshalJSON() ([]byte, error) { // ReqEnvelope represents a REQ message. type ReqEnvelope struct { SubscriptionID string - Filter + Filters []Filter } func (_ ReqEnvelope) Label() string { return "REQ" } +func (c ReqEnvelope) String() string { + v, _ := json.Marshal(c) + return string(v) +} func (v *ReqEnvelope) FromJSON(data string) error { r := gjson.Parse(data) @@ -136,8 +137,12 @@ func (v *ReqEnvelope) FromJSON(data string) error { return fmt.Errorf("failed to decode REQ envelope: missing filters") } v.SubscriptionID = string(unsafe.Slice(unsafe.StringData(arr[1].Str), len(arr[1].Str))) - if err := easyjson.Unmarshal(unsafe.Slice(unsafe.StringData(arr[2].Raw), len(arr[2].Raw)), &v.Filter); err != nil { - return fmt.Errorf("on filter: %w", err) + + v.Filters = make([]Filter, len(arr)-2) + for i, filterj := range arr[2:] { + if err := easyjson.Unmarshal(unsafe.Slice(unsafe.StringData(filterj.Raw), len(filterj.Raw)), &v.Filters[i]); err != nil { + return fmt.Errorf("on filter: %w", err) + } } return nil @@ -148,7 +153,7 @@ func (v ReqEnvelope) MarshalJSON() ([]byte, error) { w.RawString(`["REQ","`) w.RawString(v.SubscriptionID) w.RawString(`",`) - v.Filter.MarshalEasyJSON(&w) + v.Filters[0].MarshalEasyJSON(&w) w.RawString(`]`) return w.BuildBytes() } diff --git a/envelopes_benchmark_test.go b/envelopes_benchmark_test.go index 06f861f..ce26d88 100644 --- a/envelopes_benchmark_test.go +++ b/envelopes_benchmark_test.go @@ -1,5 +1,3 @@ -//go:build sonic - package nostr import ( @@ -28,16 +26,7 @@ func BenchmarkParseMessage(b *testing.B) { b.Run("easyjson", func(b *testing.B) { for b.Loop() { for _, msg := range messages { - _ = ParseMessage(msg) - } - } - }) - - b.Run("sonic", func(b *testing.B) { - smp := NewSonicMessageParser() - for b.Loop() { - for _, msg := range messages { - _, _ = smp.ParseMessage(msg) + _, _ = ParseMessage(msg) } } }) @@ -103,14 +92,13 @@ func generateRandomEvent() Event { } event := Event{ - ID: generateRandomHex(64), - PubKey: generateRandomHex(64), CreatedAt: Timestamp(time.Now().Unix() - int64(rand.IntN(10000000))), - Kind: rand.IntN(10000), + Kind: Kind(rand.IntN(10000)), Tags: tags, Content: string(content), - Sig: generateRandomHex(128), } + event.ID, _ = IDFromHex(generateRandomHex(64)) + event.PubKey, _ = PubKeyFromHexCheap(generateRandomHex(64)) return event } @@ -198,25 +186,25 @@ func generateRandomFilter() Filter { if rand.IntN(2) == 0 { count := rand.IntN(5) + 1 - filter.IDs = make([]string, count) + filter.IDs = make([]ID, count) for i := range filter.IDs { - filter.IDs[i] = generateRandomHex(64) + filter.IDs[i], _ = IDFromHex(generateRandomHex(64)) } } if rand.IntN(2) == 0 { count := rand.IntN(5) + 1 - filter.Kinds = make([]int, count) + filter.Kinds = make([]Kind, count) for i := range filter.Kinds { - filter.Kinds[i] = rand.IntN(10000) + filter.Kinds[i] = Kind(rand.IntN(10000)) } } if rand.IntN(2) == 0 { count := rand.IntN(5) + 1 - filter.Authors = make([]string, count) + filter.Authors = make([]PubKey, count) for i := range filter.Authors { - filter.Authors[i] = generateRandomHex(64) + filter.Authors[i], _ = PubKeyFromHexCheap(generateRandomHex(64)) } } @@ -238,13 +226,11 @@ func generateRandomFilter() Filter { } if rand.IntN(2) == 0 { - ts := Timestamp(time.Now().Unix() - int64(rand.IntN(10000000))) - filter.Since = &ts + filter.Since = Timestamp(time.Now().Unix() - int64(rand.IntN(10000000))) } if rand.IntN(2) == 0 { - ts := Timestamp(time.Now().Unix() - int64(rand.IntN(1000000))) - filter.Until = &ts + filter.Until = Timestamp(time.Now().Unix() - int64(rand.IntN(1000000))) } if rand.IntN(2) == 0 { diff --git a/envelopes_default.go b/envelopes_default.go deleted file mode 100644 index 2119427..0000000 --- a/envelopes_default.go +++ /dev/null @@ -1,58 +0,0 @@ -//go:build !sonic - -package nostr - -import ( - "errors" - "strings" -) - -func NewMessageParser() MessageParser { - return messageParser{} -} - -type messageParser struct{} - -func (messageParser) ParseMessage(message string) (Envelope, error) { - firstQuote := strings.IndexByte(message, '"') - if firstQuote == -1 { - return nil, errors.New("malformed json") - } - secondQuote := strings.IndexByte(message[firstQuote+1:], '"') - if secondQuote == -1 { - return nil, errors.New("malformed json") - } - label := message[firstQuote+1 : firstQuote+1+secondQuote] - - var v Envelope - switch label { - case "EVENT": - v = &EventEnvelope{} - case "REQ": - v = &ReqEnvelope{} - case "COUNT": - v = &CountEnvelope{} - case "NOTICE": - x := NoticeEnvelope("") - v = &x - case "EOSE": - x := EOSEEnvelope("") - v = &x - case "OK": - v = &OKEnvelope{} - case "AUTH": - v = &AuthEnvelope{} - case "CLOSED": - v = &ClosedEnvelope{} - case "CLOSE": - x := CloseEnvelope("") - v = &x - default: - return nil, UnknownLabel - } - - if err := v.FromJSON(message); err != nil { - return nil, err - } - return v, nil -} diff --git a/envelopes_sonic.go b/envelopes_sonic.go deleted file mode 100644 index e8d126b..0000000 --- a/envelopes_sonic.go +++ /dev/null @@ -1,566 +0,0 @@ -//go:build sonic - -package nostr - -import ( - "encoding/hex" - stdlibjson "encoding/json" - "fmt" - "unsafe" - - "github.com/bytedance/sonic/ast" -) - -type sonicVisitorPosition int - -const ( - inEnvelope sonicVisitorPosition = iota - - inEvent - inReq - inOk - inEose - inCount - inAuth - inClose - inClosed - inNotice - - inFilterObject - inEventObject - inCountObject - - inSince - inLimit - inUntil - inIds - inAuthors - inKinds - inSearch - inAFilterTag - - inId - inCreatedAt - inKind - inContent - inPubkey - inSig - inTags // we just saw the "tags" object key - inTagsList // we have just seen the first `[` of the tags - inAnEventTag // we are inside an actual tag, i.e we have just seen `[[`, or `].[` -) - -func (spp sonicVisitorPosition) String() string { - switch spp { - case inEnvelope: - return "inEnvelope" - case inEvent: - return "inEvent" - case inReq: - return "inReq" - case inOk: - return "inOk" - case inEose: - return "inEose" - case inCount: - return "inCount" - case inAuth: - return "inAuth" - case inClose: - return "inClose" - case inClosed: - return "inClosed" - case inNotice: - return "inNotice" - case inFilterObject: - return "inFilterObject" - case inEventObject: - return "inEventObject" - case inCountObject: - return "inCountObject" - case inSince: - return "inSince" - case inLimit: - return "inLimit" - case inUntil: - return "inUntil" - case inIds: - return "inIds" - case inAuthors: - return "inAuthors" - case inKinds: - return "inKinds" - case inAFilterTag: - return "inAFilterTag" - case inId: - return "inId" - case inCreatedAt: - return "inCreatedAt" - case inKind: - return "inKind" - case inContent: - return "inContent" - case inPubkey: - return "inPubkey" - case inSig: - return "inSig" - case inTags: - return "inTags" - case inTagsList: - return "inTagsList" - case inAnEventTag: - return "inAnEventTag" - default: - return "" - } -} - -type sonicVisitor struct { - event *EventEnvelope - req *ReqEnvelope - ok *OKEnvelope - eose *EOSEEnvelope - count *CountEnvelope - auth *AuthEnvelope - close *CloseEnvelope - closed *ClosedEnvelope - notice *NoticeEnvelope - - whereWeAre sonicVisitorPosition - - currentEvent *Event - currentEventTag Tag - - currentFilter *Filter - currentFilterTagList []string - currentFilterTagName string - - smp *sonicMessageParser - mainEnvelope Envelope -} - -func (sv *sonicVisitor) OnArrayBegin(capacity int) error { - // fmt.Println("***", "OnArrayBegin", "==", sv.whereWeAre) - - switch sv.whereWeAre { - case inTags: - sv.whereWeAre = inTagsList - sv.currentEvent.Tags = sv.smp.reusableTagArray - case inTagsList: - sv.whereWeAre = inAnEventTag - sv.currentEventTag = sv.smp.reusableStringArray - case inAFilterTag: - // we have already created this - } - - return nil -} - -func (sv *sonicVisitor) OnArrayEnd() error { - // fmt.Println("***", "OnArrayEnd", "==", sv.whereWeAre) - - switch sv.whereWeAre { - // envelopes - case inEvent: - sv.mainEnvelope = sv.event - case inReq: - sv.mainEnvelope = sv.req - case inOk: - sv.mainEnvelope = sv.ok - case inEose: - sv.mainEnvelope = sv.eose - case inCount: - sv.mainEnvelope = sv.count - case inAuth: - sv.mainEnvelope = sv.auth - case inClose: - sv.mainEnvelope = sv.close - case inClosed: - sv.mainEnvelope = sv.closed - case inNotice: - sv.mainEnvelope = sv.notice - - // filter object properties - case inIds: - sv.whereWeAre = inFilterObject - sv.smp.doneWithIDSlice(sv.currentFilter.IDs) - case inAuthors: - sv.whereWeAre = inFilterObject - sv.smp.doneWithPubKeySlice(sv.currentFilter.Authors) - case inKinds: - sv.whereWeAre = inFilterObject - sv.smp.doneWithUint16Slice(sv.currentFilter.Kinds) - case inAFilterTag: - sv.currentFilter.Tags[sv.currentFilterTagName] = sv.currentFilterTagList - sv.whereWeAre = inFilterObject - sv.smp.doneWithStringSlice(sv.currentFilterTagList) - - // event object properties - case inAnEventTag: - sv.currentEvent.Tags = append(sv.currentEvent.Tags, sv.currentEventTag) - sv.whereWeAre = inTagsList - sv.smp.doneWithStringSlice(sv.currentEventTag) - case inTags, inTagsList: - sv.whereWeAre = inEventObject - sv.smp.doneWithTagSlice(sv.currentEvent.Tags) - - default: - return fmt.Errorf("unexpected array end at %v", sv.whereWeAre) - } - return nil -} - -func (sv *sonicVisitor) OnObjectBegin(capacity int) error { - // fmt.Println("***", "OnObjectBegin", "==", sv.whereWeAre) - - switch sv.whereWeAre { - case inEvent: - sv.whereWeAre = inEventObject - sv.currentEvent = &Event{} - case inAuth: - sv.whereWeAre = inEventObject - sv.currentEvent = &Event{} - case inReq: - sv.whereWeAre = inFilterObject - sv.currentFilter = &Filter{} - case inCount: - // set this temporarily, we will switch to a filterObject if we see "count" or "hll" - sv.whereWeAre = inFilterObject - sv.currentFilter = &Filter{} - default: - return fmt.Errorf("unexpected object begin at %v", sv.whereWeAre) - } - - return nil -} - -func (sv *sonicVisitor) OnObjectKey(key string) error { - // fmt.Println("***", "OnObjectKey", key, "==", sv.whereWeAre) - - switch sv.whereWeAre { - case inEventObject: - switch key { - case "id": - sv.whereWeAre = inId - case "sig": - sv.whereWeAre = inSig - case "pubkey": - sv.whereWeAre = inPubkey - case "content": - sv.whereWeAre = inContent - case "created_at": - sv.whereWeAre = inCreatedAt - case "kind": - sv.whereWeAre = inKind - case "tags": - sv.whereWeAre = inTags - default: - return fmt.Errorf("unexpected event attr %s", key) - } - case inFilterObject: - switch key { - case "limit": - sv.whereWeAre = inLimit - case "since": - sv.whereWeAre = inSince - case "until": - sv.whereWeAre = inUntil - case "ids": - sv.whereWeAre = inIds - sv.currentFilter.IDs = sv.smp.reusableIDArray - case "authors": - sv.whereWeAre = inAuthors - sv.currentFilter.Authors = sv.smp.reusablePubKeyArray - case "kinds": - sv.whereWeAre = inKinds - sv.currentFilter.Kinds = sv.smp.reusableUint16Array - case "search": - sv.whereWeAre = inSearch - case "count", "hll": - // oops, switch to a countObject - sv.whereWeAre = inCountObject - default: - if len(key) > 1 && key[0] == '#' { - if sv.currentFilter.Tags == nil { - sv.currentFilter.Tags = make(TagMap, 1) - } - sv.currentFilterTagList = sv.smp.reusableStringArray - sv.currentFilterTagName = key[1:] - sv.whereWeAre = inAFilterTag - } else { - return fmt.Errorf("unexpected filter attr %s", key) - } - } - case inCountObject: - // we'll judge by the shape of the value so ignore this - default: - return fmt.Errorf("unexpected object key %s at %s", key, sv.whereWeAre) - } - - return nil -} - -func (sv *sonicVisitor) OnObjectEnd() error { - // fmt.Println("***", "OnObjectEnd", "==", sv.whereWeAre) - - switch sv.whereWeAre { - case inEventObject: - if sv.event != nil { - sv.event.Event = *sv.currentEvent - sv.whereWeAre = inEvent - } else { - sv.auth.Event = *sv.currentEvent - sv.whereWeAre = inAuth - } - sv.currentEvent = nil - case inFilterObject: - if sv.req != nil { - sv.req.Filter = *sv.currentFilter - sv.whereWeAre = inReq - } else { - sv.count.Filter = *sv.currentFilter - sv.whereWeAre = inCount - } - sv.currentFilter = nil - case inCountObject: - sv.whereWeAre = inCount - default: - return fmt.Errorf("unexpected object end at %s", sv.whereWeAre) - } - - return nil -} - -func (sv *sonicVisitor) OnString(v string) error { - // fmt.Println("***", "OnString", v, "==", sv.whereWeAre) - - var err error - switch sv.whereWeAre { - case inEnvelope: - switch v { - case "EVENT": - sv.event = &EventEnvelope{} - sv.whereWeAre = inEvent - case "REQ": - sv.req = &ReqEnvelope{} - sv.whereWeAre = inReq - case "OK": - sv.ok = &OKEnvelope{} - sv.whereWeAre = inOk - case "EOSE": - sv.whereWeAre = inEose - case "COUNT": - sv.count = &CountEnvelope{} - sv.whereWeAre = inCount - case "AUTH": - sv.auth = &AuthEnvelope{} - sv.whereWeAre = inAuth - case "CLOSE": - sv.whereWeAre = inClose - case "CLOSED": - sv.closed = &ClosedEnvelope{} - sv.whereWeAre = inClosed - case "NOTICE": - sv.whereWeAre = inNotice - default: - return UnknownLabel - } - - // in an envelope - case inEvent: - sv.event.SubscriptionID = &v - case inReq: - sv.req.SubscriptionID = v - case inOk: - if sv.ok.EventID == [32]byte{} { - sv.ok.EventID, err = IDFromHex(v) - if err != nil { - return err - } - } else { - sv.ok.Reason = v - } - case inEose: - sv.eose = (*EOSEEnvelope)(&v) - case inCount: - sv.count.SubscriptionID = v - case inAuth: - sv.auth.Challenge = &v - case inClose: - sv.close = (*CloseEnvelope)(&v) - case inClosed: - if sv.closed.SubscriptionID == "" { - sv.closed.SubscriptionID = v - } else { - sv.closed.Reason = v - } - case inNotice: - sv.notice = (*NoticeEnvelope)(&v) - - // filter object properties - case inIds: - id, err := IDFromHex(v) - if err != nil { - return err - } - sv.currentFilter.IDs = append(sv.currentFilter.IDs, id) - case inAuthors: - pk, err := PubKeyFromHex(v) - if err != nil { - return err - } - sv.currentFilter.Authors = append(sv.currentFilter.Authors, pk) - case inSearch: - sv.currentFilter.Search = v - sv.whereWeAre = inFilterObject - case inAFilterTag: - sv.currentFilterTagList = append(sv.currentFilterTagList, v) - - // id object properties - case inId: - sv.currentEvent.ID = v - sv.whereWeAre = inEventObject - case inContent: - sv.currentEvent.Content = v - sv.whereWeAre = inEventObject - case inPubkey: - sv.currentEvent.PubKey = v - sv.whereWeAre = inEventObject - case inSig: - sv.currentEvent.Sig = v - sv.whereWeAre = inEventObject - case inAnEventTag: - sv.currentEventTag = append(sv.currentEventTag, v) - - // count object properties - case inCountObject: - sv.count.HyperLogLog, _ = hex.DecodeString(v) - - default: - return fmt.Errorf("unexpected string %s at %v", v, sv.whereWeAre) - } - return nil -} - -func (sv *sonicVisitor) OnInt64(v int64, _ stdlibjson.Number) error { - // fmt.Println("***", "OnInt64", v, "==", sv.whereWeAre) - - switch sv.whereWeAre { - // event object - case inCreatedAt: - sv.currentEvent.CreatedAt = Timestamp(v) - sv.whereWeAre = inEventObject - case inKind: - sv.currentEvent.Kind = int(v) - sv.whereWeAre = inEventObject - - // filter object - case inLimit: - sv.currentFilter.Limit = int(v) - sv.currentFilter.LimitZero = v == 0 - sv.whereWeAre = inFilterObject - case inSince: - sv.currentFilter.Since = (*Timestamp)(&v) - sv.whereWeAre = inFilterObject - case inUntil: - sv.currentFilter.Until = (*Timestamp)(&v) - sv.whereWeAre = inFilterObject - case inKinds: - sv.currentFilter.Kinds = append(sv.currentFilter.Kinds, int(v)) - - // count object - case inCountObject: - sv.count.Count = &v - } - return nil -} - -func (sv *sonicVisitor) OnBool(v bool) error { - // fmt.Println("***", "OnBool", v, "==", sv.whereWeAre) - - if sv.whereWeAre == inOk { - sv.ok.OK = v - return nil - } else { - return fmt.Errorf("unexpected boolean") - } -} - -func (_ sonicVisitor) OnNull() error { - return fmt.Errorf("null shouldn't be anywhere in a message") -} - -func (_ sonicVisitor) OnFloat64(v float64, n stdlibjson.Number) error { - return fmt.Errorf("float shouldn't be anywhere in a message") -} - -type sonicMessageParser struct { - reusableTagArray []Tag - reusableIDArray []ID - reusablePubKeyArray []PubKey - reusableStringArray []string - reusableUint16Array []Kind -} - -// NewMessageParser returns a sonicMessageParser object that is intended to be reused many times. -// It is not goroutine-safe. -func NewMessageParser() sonicMessageParser { - return sonicMessageParser{ - reusableTagArray: make([]Tag, 0, 10000), - reusableStringArray: make([]string, 0, 10000), - reusableIDArray: make([]ID, 0, 10000), - reusablePubKeyArray: make([]PubKey, 0, 10000), - reusableUint16Array: make([]Kind, 0, 10000), - } -} - -var NewSonicMessageParser = NewMessageParser - -func (smp *sonicMessageParser) doneWithTagSlice(slice []Tag) { - if unsafe.SliceData(smp.reusableTagArray) == unsafe.SliceData(slice) { - smp.reusableTagArray = slice[len(slice):] - } - - if cap(smp.reusableTagArray) < 7 { - // create a new one - smp.reusableTagArray = make([]Tag, 0, 10000) - } -} - -func (smp *sonicMessageParser) doneWithIDSlice(slice []string) { - if unsafe.SliceData(smp.reusableStringArray) == unsafe.SliceData(slice) { - smp.reusableStringArray = slice[len(slice):] - } - - if cap(smp.reusableStringArray) < 15 { - // create a new one - smp.reusableStringArray = make([]string, 0, 10000) - } -} - -func (smp *sonicMessageParser) doneWithUint16Slice(slice []Kind) { - if unsafe.SliceData(smp.reusableUint16Array) == unsafe.SliceData(slice) { - smp.reusableUint16Array = slice[len(slice):] - } - - if cap(smp.reusableUint16Array) < 8 { - // create a new one - smp.reusableUint16Array = make([]Kind, 0, 10000) - } -} - -// ParseMessage parses a message like ["EVENT", ...] or ["REQ", ...] and returns an Envelope. -// The returned envelopes, filters and events' slices should not be appended to, otherwise stuff -// will break. -// -// When an unexpected message (like ["NEG-OPEN", ...]) is found, the error UnknownLabel will be -// returned. Other errors will be returned if the JSON is malformed or the objects are not exactly -// as they should. -func (smp sonicMessageParser) ParseMessage(message string) (Envelope, error) { - sv := &sonicVisitor{smp: &smp} - sv.whereWeAre = inEnvelope - - err := ast.Preorder(message, sv, nil) - - return sv.mainEnvelope, err -} diff --git a/envelopes_test.go b/envelopes_test.go index 0e0614c..b3fb9ab 100644 --- a/envelopes_test.go +++ b/envelopes_test.go @@ -1,11 +1,9 @@ -//go:build sonic - package nostr import ( - "bufio" - "os" + "encoding/hex" "testing" + "unsafe" "github.com/stretchr/testify/require" ) @@ -34,17 +32,17 @@ func TestParseMessage(t *testing.T) { { Name: "EVENT envelope with subscription id", Message: `["EVENT","_",{"kind":1,"id":"dc90c95f09947507c1044e8f48bcf6350aa6bff1507dd4acfc755b9239b5c962","pubkey":"3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d","created_at":1644271588,"tags":[],"content":"now that https://blueskyweb.org/blog/2-7-2022-overview was announced we can stop working on nostr?","sig":"230e9d8f0ddaf7eb70b5f7741ccfa37e87a455c9a469282e3464e2052d3192cd63a167e196e381ef9d7e69e9ea43af2443b839974dc85d8aaab9efe1d9296524"}]`, - ExpectedEnvelope: &EventEnvelope{SubscriptionID: ptr("_"), Event: Event{Kind: 1, ID: "dc90c95f09947507c1044e8f48bcf6350aa6bff1507dd4acfc755b9239b5c962", PubKey: "3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d", CreatedAt: 1644271588, Tags: Tags{}, Content: "now that https://blueskyweb.org/blog/2-7-2022-overview was announced we can stop working on nostr?", Sig: "230e9d8f0ddaf7eb70b5f7741ccfa37e87a455c9a469282e3464e2052d3192cd63a167e196e381ef9d7e69e9ea43af2443b839974dc85d8aaab9efe1d9296524"}}, + ExpectedEnvelope: &EventEnvelope{SubscriptionID: ptr("_"), Event: Event{Kind: 1, ID: MustIDFromHex("dc90c95f09947507c1044e8f48bcf6350aa6bff1507dd4acfc755b9239b5c962"), PubKey: MustPubKeyFromHex("3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d"), CreatedAt: 1644271588, Tags: Tags{}, Content: "now that https://blueskyweb.org/blog/2-7-2022-overview was announced we can stop working on nostr?", Sig: mustSigFromHex("230e9d8f0ddaf7eb70b5f7741ccfa37e87a455c9a469282e3464e2052d3192cd63a167e196e381ef9d7e69e9ea43af2443b839974dc85d8aaab9efe1d9296524")}}, }, { Name: "EVENT envelope without subscription id", Message: `["EVENT",{"kind":1,"id":"dc90c95f09947507c1044e8f48bcf6350aa6bff1507dd4acfc755b9239b5c962","pubkey":"3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d","created_at":1644271588,"tags":[],"content":"now that https://blueskyweb.org/blog/2-7-2022-overview was announced we can stop working on nostr?","sig":"230e9d8f0ddaf7eb70b5f7741ccfa37e87a455c9a469282e3464e2052d3192cd63a167e196e381ef9d7e69e9ea43af2443b839974dc85d8aaab9efe1d9296524"}]`, - ExpectedEnvelope: &EventEnvelope{Event: Event{Kind: 1, ID: "dc90c95f09947507c1044e8f48bcf6350aa6bff1507dd4acfc755b9239b5c962", PubKey: "3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d", CreatedAt: 1644271588, Tags: Tags{}, Content: "now that https://blueskyweb.org/blog/2-7-2022-overview was announced we can stop working on nostr?", Sig: "230e9d8f0ddaf7eb70b5f7741ccfa37e87a455c9a469282e3464e2052d3192cd63a167e196e381ef9d7e69e9ea43af2443b839974dc85d8aaab9efe1d9296524"}}, + ExpectedEnvelope: &EventEnvelope{Event: Event{Kind: 1, ID: MustIDFromHex("dc90c95f09947507c1044e8f48bcf6350aa6bff1507dd4acfc755b9239b5c962"), PubKey: MustPubKeyFromHex("3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d"), CreatedAt: 1644271588, Tags: Tags{}, Content: "now that https://blueskyweb.org/blog/2-7-2022-overview was announced we can stop working on nostr?", Sig: mustSigFromHex("230e9d8f0ddaf7eb70b5f7741ccfa37e87a455c9a469282e3464e2052d3192cd63a167e196e381ef9d7e69e9ea43af2443b839974dc85d8aaab9efe1d9296524")}}, }, { Name: "EVENT envelope with tags", Message: `["EVENT",{"kind":3,"id":"9e662bdd7d8abc40b5b15ee1ff5e9320efc87e9274d8d440c58e6eed2dddfbe2","pubkey":"373ebe3d45ec91977296a178d9f19f326c70631d2a1b0bbba5c5ecc2eb53b9e7","created_at":1644844224,"tags":[["p","3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d"],["e","75fc5ac2487363293bd27fb0d14fb966477d0f1dbc6361d37806a6a740eda91e"],["p","46d0dfd3a724a302ca9175163bdf788f3606b3fd1bb12d5fe055d1e418cb60ea"]],"content":"{\"wss://nostr-pub.wellorder.net\":{\"read\":true,\"write\":true},\"wss://nostr.bitcoiner.social\":{\"read\":false,\"write\":true},\"wss://expensive-relay.fiatjaf.com\":{\"read\":true,\"write\":true},\"wss://relayer.fiatjaf.com\":{\"read\":true,\"write\":true},\"wss://relay.bitid.nz\":{\"read\":true,\"write\":true},\"wss://nostr.rocks\":{\"read\":true,\"write\":true}}","sig":"811355d3484d375df47581cb5d66bed05002c2978894098304f20b595e571b7e01b2efd906c5650080ffe49cf1c62b36715698e9d88b9e8be43029a2f3fa66be"}]`, - ExpectedEnvelope: &EventEnvelope{Event: Event{Kind: 3, ID: "9e662bdd7d8abc40b5b15ee1ff5e9320efc87e9274d8d440c58e6eed2dddfbe2", PubKey: "373ebe3d45ec91977296a178d9f19f326c70631d2a1b0bbba5c5ecc2eb53b9e7", CreatedAt: 1644844224, Tags: Tags{Tag{"p", "3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d"}, Tag{"e", "75fc5ac2487363293bd27fb0d14fb966477d0f1dbc6361d37806a6a740eda91e"}, Tag{"p", "46d0dfd3a724a302ca9175163bdf788f3606b3fd1bb12d5fe055d1e418cb60ea"}}, Content: "{\"wss://nostr-pub.wellorder.net\":{\"read\":true,\"write\":true},\"wss://nostr.bitcoiner.social\":{\"read\":false,\"write\":true},\"wss://expensive-relay.fiatjaf.com\":{\"read\":true,\"write\":true},\"wss://relayer.fiatjaf.com\":{\"read\":true,\"write\":true},\"wss://relay.bitid.nz\":{\"read\":true,\"write\":true},\"wss://nostr.rocks\":{\"read\":true,\"write\":true}}", Sig: "811355d3484d375df47581cb5d66bed05002c2978894098304f20b595e571b7e01b2efd906c5650080ffe49cf1c62b36715698e9d88b9e8be43029a2f3fa66be"}}, + ExpectedEnvelope: &EventEnvelope{Event: Event{Kind: 3, ID: MustIDFromHex("9e662bdd7d8abc40b5b15ee1ff5e9320efc87e9274d8d440c58e6eed2dddfbe2"), PubKey: MustPubKeyFromHex("373ebe3d45ec91977296a178d9f19f326c70631d2a1b0bbba5c5ecc2eb53b9e7"), CreatedAt: 1644844224, Tags: Tags{Tag{"p", "3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d"}, Tag{"e", "75fc5ac2487363293bd27fb0d14fb966477d0f1dbc6361d37806a6a740eda91e"}, Tag{"p", "46d0dfd3a724a302ca9175163bdf788f3606b3fd1bb12d5fe055d1e418cb60ea"}}, Content: "{\"wss://nostr-pub.wellorder.net\":{\"read\":true,\"write\":true},\"wss://nostr.bitcoiner.social\":{\"read\":false,\"write\":true},\"wss://expensive-relay.fiatjaf.com\":{\"read\":true,\"write\":true},\"wss://relayer.fiatjaf.com\":{\"read\":true,\"write\":true},\"wss://relay.bitid.nz\":{\"read\":true,\"write\":true},\"wss://nostr.rocks\":{\"read\":true,\"write\":true}}", Sig: mustSigFromHex("811355d3484d375df47581cb5d66bed05002c2978894098304f20b595e571b7e01b2efd906c5650080ffe49cf1c62b36715698e9d88b9e8be43029a2f3fa66be")}}, }, { Name: "NOTICE envelope", @@ -59,22 +57,22 @@ func TestParseMessage(t *testing.T) { { Name: "COUNT envelope", Message: `["COUNT","z",{"count":12}]`, - ExpectedEnvelope: &CountEnvelope{SubscriptionID: "z", Count: ptr(int64(12))}, + ExpectedEnvelope: &CountEnvelope{SubscriptionID: "z", Count: ptr(uint32(12))}, }, { Name: "COUNT envelope with HLL", Message: `["COUNT","sub1",{"count":42, "hll": "0100000101000000000000040000000001020000000002000000000200000003000002040000000101020001010000000000000007000004010000000200040000020400000000000102000002000004010000010000000301000102030002000301000300010000070000000001000004000102010000000400010002000000000103000100010001000001040100020001000000000000010000020000000000030100000001000400010000000000000901010100000000040000000b030000010100010000010000010000000003000000000000010003000100020000000000010000010100000100000104000200030001000300000001000101000102"}]`, - ExpectedEnvelope: &CountEnvelope{SubscriptionID: "sub1", Count: ptr(int64(42)), HyperLogLog: []byte{1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 1, 2, 0, 0, 0, 0, 2, 0, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 2, 4, 0, 0, 0, 1, 1, 2, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 4, 1, 0, 0, 0, 2, 0, 4, 0, 0, 2, 4, 0, 0, 0, 0, 0, 1, 2, 0, 0, 2, 0, 0, 4, 1, 0, 0, 1, 0, 0, 0, 3, 1, 0, 1, 2, 3, 0, 2, 0, 3, 1, 0, 3, 0, 1, 0, 0, 7, 0, 0, 0, 0, 1, 0, 0, 4, 0, 1, 2, 1, 0, 0, 0, 4, 0, 1, 0, 2, 0, 0, 0, 0, 1, 3, 0, 1, 0, 1, 0, 1, 0, 0, 1, 4, 1, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2, 0, 0, 0, 0, 0, 3, 1, 0, 0, 0, 1, 0, 4, 0, 1, 0, 0, 0, 0, 0, 0, 9, 1, 1, 1, 0, 0, 0, 0, 4, 0, 0, 0, 11, 3, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 1, 0, 3, 0, 1, 0, 2, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0, 1, 4, 0, 2, 0, 3, 0, 1, 0, 3, 0, 0, 0, 1, 0, 1, 1, 0, 1, 2}}, + ExpectedEnvelope: &CountEnvelope{SubscriptionID: "sub1", Count: ptr(uint32(42)), HyperLogLog: []byte{1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 1, 2, 0, 0, 0, 0, 2, 0, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 2, 4, 0, 0, 0, 1, 1, 2, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 4, 1, 0, 0, 0, 2, 0, 4, 0, 0, 2, 4, 0, 0, 0, 0, 0, 1, 2, 0, 0, 2, 0, 0, 4, 1, 0, 0, 1, 0, 0, 0, 3, 1, 0, 1, 2, 3, 0, 2, 0, 3, 1, 0, 3, 0, 1, 0, 0, 7, 0, 0, 0, 0, 1, 0, 0, 4, 0, 1, 2, 1, 0, 0, 0, 4, 0, 1, 0, 2, 0, 0, 0, 0, 1, 3, 0, 1, 0, 1, 0, 1, 0, 0, 1, 4, 1, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2, 0, 0, 0, 0, 0, 3, 1, 0, 0, 0, 1, 0, 4, 0, 1, 0, 0, 0, 0, 0, 0, 9, 1, 1, 1, 0, 0, 0, 0, 4, 0, 0, 0, 11, 3, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 1, 0, 3, 0, 1, 0, 2, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0, 1, 4, 0, 2, 0, 3, 0, 1, 0, 3, 0, 0, 0, 1, 0, 1, 1, 0, 1, 2}}, }, { Name: "OK envelope success", Message: `["OK","3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefaaaaa",true,""]`, - ExpectedEnvelope: &OKEnvelope{EventID: "3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefaaaaa", OK: true, Reason: ""}, + ExpectedEnvelope: &OKEnvelope{EventID: MustIDFromHex("3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefaaaaa"), OK: true, Reason: ""}, }, { Name: "OK envelope failure", Message: `["OK","3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefaaaaa",false,"error: could not connect to the database"]`, - ExpectedEnvelope: &OKEnvelope{EventID: "3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefaaaaa", OK: false, Reason: "error: could not connect to the database"}, + ExpectedEnvelope: &OKEnvelope{EventID: MustIDFromHex("3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefaaaaa"), OK: false, Reason: "error: could not connect to the database"}, }, { Name: "CLOSED envelope with underscore", @@ -94,12 +92,12 @@ func TestParseMessage(t *testing.T) { { Name: "AUTH envelope with event", Message: `["AUTH",{"kind":1,"id":"ae1fc7154296569d87ca4663f6bdf448c217d1590d28c85d158557b8b43b4d69","pubkey":"79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798","created_at":1683660344,"tags":[],"content":"hello world","sig":"94e10947814b1ebe38af42300ecd90c7642763896c4f69506ae97bfdf54eec3c0c21df96b7d95daa74ff3d414b1d758ee95fc258125deebc31df0c6ba9396a51"}]`, - ExpectedEnvelope: &AuthEnvelope{Event: Event{Kind: 1, ID: "ae1fc7154296569d87ca4663f6bdf448c217d1590d28c85d158557b8b43b4d69", PubKey: "79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798", CreatedAt: 1683660344, Tags: Tags{}, Content: "hello world", Sig: "94e10947814b1ebe38af42300ecd90c7642763896c4f69506ae97bfdf54eec3c0c21df96b7d95daa74ff3d414b1d758ee95fc258125deebc31df0c6ba9396a51"}}, + ExpectedEnvelope: &AuthEnvelope{Event: Event{Kind: 1, ID: MustIDFromHex("ae1fc7154296569d87ca4663f6bdf448c217d1590d28c85d158557b8b43b4d69"), PubKey: MustPubKeyFromHex("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798"), CreatedAt: 1683660344, Tags: Tags{}, Content: "hello world", Sig: mustSigFromHex("94e10947814b1ebe38af42300ecd90c7642763896c4f69506ae97bfdf54eec3c0c21df96b7d95daa74ff3d414b1d758ee95fc258125deebc31df0c6ba9396a51")}}, }, { Name: "REQ envelope", Message: `["REQ","million", {"kinds": [1]}, {"kinds": [30023 ], "#d": ["buteko", "batuke"]}]`, - ExpectedEnvelope: &ReqEnvelope{SubscriptionID: "million", Filters: Filters{{Kinds: []Kind{1}}, {Kinds: []Kind{30023}, Tags: TagMap{"d": []string{"buteko", "batuke"}}}}}, + ExpectedEnvelope: &ReqEnvelope{SubscriptionID: "million", Filters: []Filter{{Kinds: []Kind{1}}, {Kinds: []Kind{30023}, Tags: TagMap{"d": []string{"buteko", "batuke"}}}}}, }, { Name: "CLOSE envelope", @@ -114,36 +112,14 @@ func TestParseMessage(t *testing.T) { { Name: "REQ from jumble", Message: `["REQ","sub:1",{"kinds":[1,6],"limit":100}]`, - ExpectedEnvelope: &ReqEnvelope{SubscriptionID: "sub:1", Filters: Filters{{Kinds: []int{1, 6}, Limit: 100}}}, + ExpectedEnvelope: &ReqEnvelope{SubscriptionID: "sub:1", Filters: []Filter{{Kinds: []Kind{1, 6}, Limit: 100}}}, }, } t.Run("standard", func(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.Name, func(t *testing.T) { - envelope := ParseMessage(testCase.Message) - if testCase.ExpectedEnvelope == nil && envelope == nil { - return - } - - if testCase.ExpectedEnvelope == nil { - require.Nil(t, envelope, "expected nil but got %v", envelope) - return - } - - require.NotNil(t, envelope, "expected non-nil envelope but got nil") - require.Equal(t, testCase.ExpectedEnvelope.String(), envelope.String()) - }) - } - }) - - t.Run("sonic", func(t *testing.T) { - smp := NewSonicMessageParser() - - for _, testCase := range testCases { - t.Run(testCase.Name, func(t *testing.T) { - envelope, err := smp.ParseMessage(testCase.Message) - + envelope, err := ParseMessage(testCase.Message) if testCase.ExpectedEnvelope == nil && envelope == nil { return } @@ -155,45 +131,14 @@ func TestParseMessage(t *testing.T) { require.NoError(t, err) require.NotNil(t, envelope, "expected non-nil envelope but got nil") - require.Equal(t, testCase.ExpectedEnvelope, envelope) + require.Equal(t, testCase.ExpectedEnvelope.String(), envelope.String()) }) } }) } -func TestParseMessagesFromFile(t *testing.T) { - file, err := os.Open("testdata/messages.jsonl") - if err != nil { - t.Skipf("Skipping test because testdata/messages.jsonl could not be opened: %v", err) - return - } - defer file.Close() - - scanner := bufio.NewScanner(file) - smp := NewSonicMessageParser() - lineNum := 0 - - for scanner.Scan() { - lineNum++ - line := scanner.Bytes() - if len(line) == 0 { - continue - } - - standardEnvelope := ParseMessage(string(line)) - sonicEnvelope, err := smp.ParseMessage(string(line)) - - if standardEnvelope == nil { - require.Nil(t, sonicEnvelope, "line %d: standard parser returned nil but sonic parser didn't", lineNum) - continue - } - - require.NoError(t, err, "line %d: sonic parser returned error", lineNum) - require.NotNil(t, sonicEnvelope, "line %d: standard parser returned non-nil but sonic parser returned nil", lineNum) - - require.Equal(t, standardEnvelope, sonicEnvelope, - "line %d: parsers returned different results", lineNum) - } - - require.NoError(t, scanner.Err(), "error reading file") +func mustSigFromHex(sigStr string) [64]byte { + var sig [64]byte + hex.Decode(sig[:], unsafe.Slice(unsafe.StringData(sigStr), 128)) + return sig } diff --git a/eventstore/cmd/eventstore/query-or-save.go b/eventstore/cmd/eventstore/query-or-save.go index 920b6e7..f83084c 100644 --- a/eventstore/cmd/eventstore/query-or-save.go +++ b/eventstore/cmd/eventstore/query-or-save.go @@ -28,7 +28,7 @@ var queryOrSave = &cli.Command{ return doSave(ctx, line, *e) } if json.Unmarshal([]byte(line), re) == nil { - return doQuery(ctx, &re.Filter) + return doQuery(ctx, &re.Filters[0]) } if json.Unmarshal([]byte(line), f) == nil && len(f.String()) > 2 { return doQuery(ctx, f) diff --git a/go.mod b/go.mod index d3deb6d..eea683c 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( github.com/blugelabs/bluge v0.2.2 github.com/btcsuite/btcd/btcec/v2 v2.3.4 github.com/btcsuite/btcd/btcutil v1.1.5 - github.com/bytedance/sonic v1.13.1 github.com/coder/websocket v1.8.13 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 github.com/dgraph-io/badger/v4 v4.5.0 @@ -59,10 +58,8 @@ require ( github.com/blugelabs/ice/v2 v2.0.1 // indirect github.com/btcsuite/btcd v0.24.2 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect - github.com/bytedance/sonic/loader v0.2.4 // indirect github.com/caio/go-tdigest v3.1.0+incompatible // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/cloudwego/base64x v0.1.5 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/crypto/blake256 v1.1.0 // indirect github.com/dgraph-io/ristretto/v2 v2.1.0 // indirect @@ -75,7 +72,6 @@ require ( github.com/gorilla/css v1.0.1 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.18.0 // indirect - github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -88,12 +84,12 @@ require ( github.com/savsgio/gotils v0.0.0-20240704082632-aef3928b8a38 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect - github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/x448/float16 v0.8.4 // indirect go.opencensus.io v0.24.0 // indirect - golang.org/x/arch v0.15.0 // indirect golang.org/x/sys v0.31.0 // indirect google.golang.org/protobuf v1.36.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/coder/websocket => /tmp/websocket diff --git a/go.sum b/go.sum index 9cece8d..58e7c15 100644 --- a/go.sum +++ b/go.sum @@ -86,11 +86,6 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= -github.com/bytedance/sonic v1.13.1 h1:Jyd5CIvdFnkOWuKXr+wm4Nyk2h0yAFsr8ucJgEasO3g= -github.com/bytedance/sonic v1.13.1/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1+KgkJhz4= -github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= -github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCNan80NzY= -github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= github.com/caio/go-tdigest v3.1.0+incompatible h1:uoVMJ3Q5lXmVLCCqaMGHLBWnbGoN6Lpu7OAUPR60cds= github.com/caio/go-tdigest v3.1.0+incompatible/go.mod h1:sHQM/ubZStBUmF1WbB8FAm8q9GjDajLC5T7ydxE3JHI= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -99,14 +94,9 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= -github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= -github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= github.com/cmars/basen v0.0.0-20150613233007-fe3947df716e h1:0XBUw73chJ1VYSsfvcPvVT7auykAJce9FpRr10L6Qhw= github.com/cmars/basen v0.0.0-20150613233007-fe3947df716e/go.mod h1:P13beTBKr5Q18lJe1rIoLUqjM+CB1zYrRg44ZqGuQSA= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE= -github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= @@ -204,10 +194,6 @@ github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6 github.com/klauspost/compress v1.15.2/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= -github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= -github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= -github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -294,8 +280,6 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= -github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/tyler-smith/go-bip32 v1.0.0 h1:sDR9juArbUgX+bO/iblgZnMPeWY1KZMUC2AFUJdv5KE= github.com/tyler-smith/go-bip32 v1.0.0/go.mod h1:onot+eHknzV4BVPwrzqY5OoVpyCvnwD7lMawL5aQupE= github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8= @@ -314,8 +298,6 @@ github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZ github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -golang.org/x/arch v0.15.0 h1:QtOrQd0bTUnhNVNndMpLHNWrDmYzZ2KDqSrEymqInZw= -golang.org/x/arch v0.15.0/go.mod h1:JmwW7aLIoRUKgaTzhkiEFxvcEiQGyOg9BMonBJUS7EE= golang.org/x/crypto v0.0.0-20170613210332-850760c427c5/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -429,5 +411,4 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbceQyw/gSGoXfH/UPoTGduL54= launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM= -nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/khatru/handlers.go b/khatru/handlers.go index f59e1e9..d0d92ba 100644 --- a/khatru/handlers.go +++ b/khatru/handlers.go @@ -118,8 +118,6 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { rl.OnConnect(ctx) } - smp := nostr.NewMessageParser() - for { typ, msgb, err := ws.conn.ReadMessage() if err != nil { @@ -145,11 +143,8 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { // this is safe because ReadMessage() will always create a new slice message := unsafe.String(unsafe.SliceData(msgb), len(msgb)) - // parse messages sequentially otherwise sonic breaks - envelope, err := smp.ParseMessage(message) - - // then delegate to the goroutine go func(message string) { + envelope, err := nostr.ParseMessage(message) if err != nil { if err == nostr.UnknownLabel && rl.Negentropy { envelope = nip77.ParseNegMessage(message) @@ -287,22 +282,24 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { reqCtx = context.WithValue(reqCtx, subscriptionIdKey, env.SubscriptionID) // handle each filter separately -- dispatching events as they're loaded from databases - srl := rl - if rl.getSubRelayFromFilter != nil { - srl = rl.getSubRelayFromFilter(env.Filter) - } - err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, env.Filter) - if err != nil { - // fail everything if any filter is rejected - reason := err.Error() - if strings.HasPrefix(reason, "auth-required:") { - RequestAuth(ctx) + for _, filter := range env.Filters { + srl := rl + if rl.getSubRelayFromFilter != nil { + srl = rl.getSubRelayFromFilter(filter) + } + err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter) + if err != nil { + // fail everything if any filter is rejected + reason := err.Error() + if strings.HasPrefix(reason, "auth-required:") { + RequestAuth(ctx) + } + ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason}) + cancelReqCtx(errors.New("filter rejected")) + return + } else { + rl.addListener(ws, env.SubscriptionID, srl, filter, cancelReqCtx) } - ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason}) - cancelReqCtx(errors.New("filter rejected")) - return - } else { - rl.addListener(ws, env.SubscriptionID, srl, env.Filter, cancelReqCtx) } go func() { diff --git a/nip77/idsonly.go b/nip77/idsonly.go index 3ac87ee..fb247c0 100644 --- a/nip77/idsonly.go +++ b/nip77/idsonly.go @@ -51,7 +51,7 @@ func FetchIDsOnly( msg := neg.Start() open, _ := OpenEnvelope{id, filter, msg}.MarshalJSON() - err = <-r.Write(open) + err = r.WriteWithError(open) if err != nil { return nil, fmt.Errorf("failed to write to relay: %w", err) } diff --git a/nip77/nip77.go b/nip77/nip77.go index 4315be5..4184ab4 100644 --- a/nip77/nip77.go +++ b/nip77/nip77.go @@ -79,7 +79,7 @@ func NegentropySync( msg := neg.Start() open, _ := OpenEnvelope{id, filter, msg}.MarshalJSON() - err = <-r.Write(open) + err = r.WriteWithError(open) if err != nil { return fmt.Errorf("failed to write to relay: %w", err) } diff --git a/relay.go b/relay.go index 2beece2..34b232a 100644 --- a/relay.go +++ b/relay.go @@ -1,7 +1,6 @@ package nostr import ( - "bytes" "context" "crypto/tls" "errors" @@ -10,7 +9,6 @@ import ( "log" "net/http" "strconv" - "strings" "sync" "sync/atomic" "time" @@ -38,7 +36,6 @@ type Relay struct { noticeHandler func(string) // NIP-01 NOTICEs customHandler func(string) // nonstandard unparseable messages okCallbacks *xsync.MapOf[ID, func(bool, string)] - writeQueue chan writeRequest subscriptionChannelCloseQueue chan *Subscription // custom things that aren't often used @@ -46,11 +43,6 @@ type Relay struct { AssumeValid bool // this will skip verifying signatures for events received from this relay } -type writeRequest struct { - msg []byte - answer chan error -} - // NewRelay returns a new relay. It takes a context that, when canceled, will close the relay connection. func NewRelay(ctx context.Context, url string, opts RelayOptions) *Relay { ctx, cancel := context.WithCancelCause(ctx) @@ -60,7 +52,6 @@ func NewRelay(ctx context.Context, url string, opts RelayOptions) *Relay { connectionContextCancel: cancel, Subscriptions: xsync.NewMapOf[int64, *Subscription](), okCallbacks: xsync.NewMapOf[ID, func(bool, string)](), - writeQueue: make(chan writeRequest), subscriptionChannelCloseQueue: make(chan *Subscription), requestHeader: opts.RequestHeader, } @@ -103,7 +94,7 @@ func (r *Relay) String() string { func (r *Relay) Context() context.Context { return r.connectionContext } // IsConnected returns true if the connection to this relay seems to be active. -func (r *Relay) IsConnected() bool { return r.connectionContext.Err() == nil } +func (r *Relay) IsConnected() bool { return !r.Connection.closed.Load() } // Connect tries to establish a websocket connection to r.URL. // If the context expires before the connection is complete, an error is returned. @@ -128,164 +119,123 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds - var cancel context.CancelFunc - ctx, cancel = context.WithTimeoutCause(ctx, 7*time.Second, errors.New("connection took too long")) - defer cancel() + ctx, _ = context.WithTimeoutCause(ctx, 7*time.Second, errors.New("connection took too long")) } - conn, err := NewConnection(ctx, r.URL, r.requestHeader, tlsConfig) + conn, err := NewConnection(ctx, r.URL, r.handleMessage, r.requestHeader, tlsConfig) if err != nil { return fmt.Errorf("error opening websocket to '%s': %w", r.URL, err) } r.Connection = conn - // ping every 29 seconds - ticker := time.NewTicker(29 * time.Second) - - // queue all write operations here so we don't do mutex spaghetti - go func() { - for { - select { - case <-r.connectionContext.Done(): - ticker.Stop() - r.Connection = nil - - for _, sub := range r.Subscriptions.Range { - sub.unsub(fmt.Errorf("relay connection closed: %w / %w", context.Cause(r.connectionContext), r.ConnectionError)) - } - return - case <-ticker.C: - err := r.Connection.Ping(r.connectionContext) - if err != nil && !strings.Contains(err.Error(), "failed to wait for pong") { - InfoLogger.Printf("{%s} error writing ping: %v; closing websocket", r.URL, err) - r.Close() // this should trigger a context cancelation - return - } - case writeRequest := <-r.writeQueue: - // all write requests will go through this to prevent races - debugLogf("{%s} sending %v\n", r.URL, string(writeRequest.msg)) - if err := r.Connection.WriteMessage(r.connectionContext, writeRequest.msg); err != nil { - writeRequest.answer <- err - } - close(writeRequest.answer) - } - } - }() - - // general message reader loop - go func() { - buf := new(bytes.Buffer) - mp := NewMessageParser() - - for { - buf.Reset() - - if err := conn.ReadMessage(r.connectionContext, buf); err != nil { - r.ConnectionError = err - r.close(err) - break - } - - message := string(buf.Bytes()) - debugLogf("{%s} received %v\n", r.URL, message) - - // if this is an "EVENT" we will have this preparser logic that should speed things up a little - // as we skip handling duplicate events - subid := extractSubID(message) - sub, ok := r.Subscriptions.Load(subIdToSerial(subid)) - if ok { - if sub.checkDuplicate != nil { - if sub.checkDuplicate(extractEventID(message[10+len(subid):]), r.URL) { - continue - } - } else if sub.checkDuplicateReplaceable != nil { - if sub.checkDuplicateReplaceable( - ReplaceableKey{extractEventPubKey(message), extractDTag(message)}, - extractTimestamp(message), - ) { - continue - } - } - } - - envelope, err := mp.ParseMessage(message) - if envelope == nil { - if r.customHandler != nil && err == UnknownLabel { - r.customHandler(message) - } - continue - } - - switch env := envelope.(type) { - case *NoticeEnvelope: - // see WithNoticeHandler - if r.noticeHandler != nil { - r.noticeHandler(string(*env)) - } else { - log.Printf("NOTICE from %s: '%s'\n", r.URL, string(*env)) - } - case *AuthEnvelope: - if env.Challenge == nil { - continue - } - r.challenge = *env.Challenge - case *EventEnvelope: - // we already have the subscription from the pre-check above, so we can just reuse it - if sub == nil { - // InfoLogger.Printf("{%s} no subscription with id '%s'\n", r.URL, *env.SubscriptionID) - continue - } else { - // check if the event matches the desired filter, ignore otherwise - if !sub.match(env.Event) { - InfoLogger.Printf("{%s} filter does not match: %v ~ %v\n", r.URL, sub.Filter, env.Event) - continue - } - - // check signature, ignore invalid, except from trusted (AssumeValid) relays - if !r.AssumeValid { - if !env.Event.VerifySignature() { - InfoLogger.Printf("{%s} bad signature on %s\n", r.URL, env.Event.ID) - continue - } - } - - // dispatch this to the internal .events channel of the subscription - sub.dispatchEvent(env.Event) - } - case *EOSEEnvelope: - if subscription, ok := r.Subscriptions.Load(subIdToSerial(string(*env))); ok { - subscription.dispatchEose() - } - case *ClosedEnvelope: - if subscription, ok := r.Subscriptions.Load(subIdToSerial(env.SubscriptionID)); ok { - subscription.handleClosed(env.Reason) - } - case *CountEnvelope: - if subscription, ok := r.Subscriptions.Load(subIdToSerial(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil { - subscription.countResult <- *env - } - case *OKEnvelope: - if okCallback, exist := r.okCallbacks.Load(env.EventID); exist { - okCallback(env.OK, env.Reason) - } else { - InfoLogger.Printf("{%s} got an unexpected OK message for event %s", r.URL, env.EventID) - } - } - } - }() - return nil } +func (r *Relay) handleMessage(message string) { + // if this is an "EVENT" we will have this preparser logic that should speed things up a little + // as we skip handling duplicate events + subid := extractSubID(message) + sub, ok := r.Subscriptions.Load(subIdToSerial(subid)) + if ok { + if sub.checkDuplicate != nil { + if sub.checkDuplicate(extractEventID(message[10+len(subid):]), r.URL) { + return + } + } else if sub.checkDuplicateReplaceable != nil { + if sub.checkDuplicateReplaceable( + ReplaceableKey{extractEventPubKey(message), extractDTag(message)}, + extractTimestamp(message), + ) { + return + } + } + } + + envelope, err := ParseMessage(message) + if envelope == nil { + if r.customHandler != nil && err == UnknownLabel { + r.customHandler(message) + } + return + } + + switch env := envelope.(type) { + case *NoticeEnvelope: + // see WithNoticeHandler + if r.noticeHandler != nil { + r.noticeHandler(string(*env)) + } else { + log.Printf("NOTICE from %s: '%s'\n", r.URL, string(*env)) + } + case *AuthEnvelope: + if env.Challenge == nil { + return + } + r.challenge = *env.Challenge + case *EventEnvelope: + // we already have the subscription from the pre-check above, so we can just reuse it + if sub == nil { + // InfoLogger.Printf("{%s} no subscription with id '%s'\n", r.URL, *env.SubscriptionID) + return + } else { + // check if the event matches the desired filter, ignore otherwise + if !sub.match(env.Event) { + InfoLogger.Printf("{%s} filter does not match: %v ~ %v\n", r.URL, sub.Filter, env.Event) + return + } + + // check signature, ignore invalid, except from trusted (AssumeValid) relays + if !r.AssumeValid { + if !env.Event.VerifySignature() { + InfoLogger.Printf("{%s} bad signature on %s\n", r.URL, env.Event.ID) + return + } + } + + // dispatch this to the internal .events channel of the subscription + sub.dispatchEvent(env.Event) + } + case *EOSEEnvelope: + if subscription, ok := r.Subscriptions.Load(subIdToSerial(string(*env))); ok { + subscription.dispatchEose() + } + case *ClosedEnvelope: + if subscription, ok := r.Subscriptions.Load(subIdToSerial(env.SubscriptionID)); ok { + subscription.handleClosed(env.Reason) + } + case *CountEnvelope: + if subscription, ok := r.Subscriptions.Load(subIdToSerial(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil { + subscription.countResult <- *env + } + case *OKEnvelope: + if okCallback, exist := r.okCallbacks.Load(env.EventID); exist { + okCallback(env.OK, env.Reason) + } else { + InfoLogger.Printf("{%s} got an unexpected OK message for event %s", r.URL, env.EventID) + } + } +} + // Write queues an arbitrary message to be sent to the relay. -func (r *Relay) Write(msg []byte) <-chan error { +func (r *Relay) Write(msg []byte) { + select { + case r.Connection.writeQueue <- writeRequest{msg: msg, answer: nil}: + case <-r.Connection.closedNotify: + case <-r.connectionContext.Done(): + } +} + +// WriteWithError is like Write, but returns an error if the write fails (and the connection gets closed). +func (r *Relay) WriteWithError(msg []byte) error { ch := make(chan error) select { - case r.writeQueue <- writeRequest{msg: msg, answer: ch}: + case r.Connection.writeQueue <- writeRequest{msg: msg, answer: ch}: + case <-r.Connection.closedNotify: + return fmt.Errorf("failed to write to %s: ", r.URL) case <-r.connectionContext.Done(): - go func() { ch <- fmt.Errorf("connection closed") }() + return fmt.Errorf("failed to write to %s: %w", r.URL, context.Cause(r.connectionContext)) } - return ch + return <-ch } // Publish sends an "EVENT" command to the relay r as in NIP-01 and waits for an OK response. @@ -342,7 +292,7 @@ func (r *Relay) publish(ctx context.Context, id ID, env Envelope) error { // publish event envb, _ := env.MarshalJSON() - if err := <-r.Write(envb); err != nil { + if err := r.WriteWithError(envb); err != nil { return err } @@ -508,11 +458,6 @@ func (r *Relay) close(reason error) error { return fmt.Errorf("relay not connected") } - err := r.Connection.Close() - if err != nil { - return err - } - return nil } diff --git a/subscription.go b/subscription.go index 930aa1d..7740829 100644 --- a/subscription.go +++ b/subscription.go @@ -150,7 +150,7 @@ func (sub *Subscription) Close() { if sub.Relay.IsConnected() { closeMsg := CloseEnvelope(sub.id) closeb, _ := (&closeMsg).MarshalJSON() - <-sub.Relay.Write(closeb) + sub.Relay.Write(closeb) } } @@ -165,13 +165,13 @@ func (sub *Subscription) Sub(_ context.Context, filter Filter) { func (sub *Subscription) Fire() error { var reqb []byte if sub.countResult == nil { - reqb, _ = ReqEnvelope{sub.id, sub.Filter}.MarshalJSON() + reqb, _ = ReqEnvelope{sub.id, []Filter{sub.Filter}}.MarshalJSON() } else { reqb, _ = CountEnvelope{sub.id, sub.Filter, nil, nil}.MarshalJSON() } sub.live.Store(true) - if err := <-sub.Relay.Write(reqb); err != nil { + if err := sub.Relay.WriteWithError(reqb); err != nil { err := fmt.Errorf("failed to write: %w", err) sub.cancel(err) return err