diff --git a/eventstore/README.md b/eventstore/README.md index 842f48e..67647e6 100644 --- a/eventstore/README.md +++ b/eventstore/README.md @@ -24,7 +24,7 @@ type Store interface { } ``` -[![Go Reference](https://pkg.go.dev/badge/github.com/fiatjaf/eventstore.svg)](https://pkg.go.dev/github.com/fiatjaf/eventstore) [![Run Tests](https://github.com/fiatjaf/eventstore/actions/workflows/test.yml/badge.svg)](https://github.com/fiatjaf/eventstore/actions/workflows/test.yml) +[![Go Reference](https://pkg.go.dev/badge/fiatjaf.com/nostr/eventstore.svg)](https://pkg.go.dev/fiatjaf.com/nostr/eventstore) [![Run Tests](https://fiatjaf.com/nostr/eventstore/actions/workflows/test.yml/badge.svg)](https://fiatjaf.com/nostr/eventstore/actions/workflows/test.yml) ## command-line tool diff --git a/eventstore/cmd/eventstore/README.md b/eventstore/cmd/eventstore/README.md index aa7ec9f..2b31299 100644 --- a/eventstore/cmd/eventstore/README.md +++ b/eventstore/cmd/eventstore/README.md @@ -1,7 +1,7 @@ # eventstore command-line tool ``` -go install github.com/fiatjaf/eventstore/cmd/eventstore@latest +go install fiatjaf.com/nostr/eventstore/cmd/eventstore@latest ``` ## Usage diff --git a/eventstore/cmd/eventstore/main_mmm.go b/eventstore/cmd/eventstore/main_mmm.go index 4ad499e..285ec7b 100644 --- a/eventstore/cmd/eventstore/main_mmm.go +++ b/eventstore/cmd/eventstore/main_mmm.go @@ -3,13 +3,11 @@ package main import ( - "context" "os" "path/filepath" "fiatjaf.com/nostr/eventstore" "fiatjaf.com/nostr/eventstore/mmm" - "fiatjaf.com/nostr" "github.com/rs/zerolog" ) @@ -24,9 +22,7 @@ func doMmmInit(path string) (eventstore.Store, error) { if err := mmmm.Init(); err != nil { return nil, err } - il := &mmm.IndexingLayer{ - ShouldIndex: func(ctx context.Context, e *nostr.Event) bool { return false }, - } + il := &mmm.IndexingLayer{} if err := mmmm.EnsureLayer(filepath.Base(path), il); err != nil { return nil, err } diff --git a/eventstore/cmd/eventstore/neg.go b/eventstore/cmd/eventstore/neg.go index 504ff43..b90cc71 100644 --- a/eventstore/cmd/eventstore/neg.go +++ b/eventstore/cmd/eventstore/neg.go @@ -8,11 +8,11 @@ import ( "os" "sync" - "github.com/urfave/cli/v3" - "github.com/mailru/easyjson" "fiatjaf.com/nostr" "fiatjaf.com/nostr/nip77/negentropy" "fiatjaf.com/nostr/nip77/negentropy/storage/vector" + "github.com/mailru/easyjson" + "github.com/urfave/cli/v3" ) var neg = &cli.Command{ @@ -44,11 +44,7 @@ var neg = &cli.Command{ // create negentropy object and initialize it with events vec := vector.New() neg := negentropy.New(vec, frameSizeLimit) - ch, err := db.QueryEvents(ctx, filter) - if err != nil { - return fmt.Errorf("error querying: %s\n", err) - } - for evt := range ch { + for evt := range db.QueryEvents(filter) { vec.Insert(evt.CreatedAt, evt.ID) } diff --git a/eventstore/cmd/eventstore/query.go b/eventstore/cmd/eventstore/query.go index e364b2f..a9a6017 100644 --- a/eventstore/cmd/eventstore/query.go +++ b/eventstore/cmd/eventstore/query.go @@ -5,8 +5,8 @@ import ( "fmt" "os" - "github.com/mailru/easyjson" "fiatjaf.com/nostr" + "github.com/mailru/easyjson" "github.com/urfave/cli/v3" ) @@ -25,14 +25,7 @@ var query = &cli.Command{ continue } - ch, err := db.QueryEvents(ctx, filter) - if err != nil { - fmt.Fprintf(os.Stderr, "error querying: %s\n", err) - hasError = true - continue - } - - for evt := range ch { + for evt := range db.QueryEvents(filter) { fmt.Println(evt) } } diff --git a/eventstore/cmd/eventstore/save.go b/eventstore/cmd/eventstore/save.go index 5096859..c454783 100644 --- a/eventstore/cmd/eventstore/save.go +++ b/eventstore/cmd/eventstore/save.go @@ -5,9 +5,9 @@ import ( "fmt" "os" - "github.com/urfave/cli/v3" - "github.com/mailru/easyjson" "fiatjaf.com/nostr" + "github.com/mailru/easyjson" + "github.com/urfave/cli/v3" ) var save = &cli.Command{ @@ -25,7 +25,7 @@ var save = &cli.Command{ continue } - if err := db.SaveEvent(ctx, &event); err != nil { + if err := db.SaveEvent(event); err != nil { fmt.Fprintf(os.Stderr, "failed to save event '%s': %s\n", line, err) hasError = true continue diff --git a/go.mod b/go.mod index a8abb14..ba6cea0 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,6 @@ require ( github.com/dgraph-io/ristretto v1.0.0 github.com/elnosh/gonuts v0.3.1-0.20250123162555-7c0381a585e3 github.com/fasthttp/websocket v1.5.12 - github.com/fiatjaf/eventstore v0.16.2 github.com/fiatjaf/khatru v0.17.4 github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62 github.com/json-iterator/go v1.1.12 @@ -70,6 +69,7 @@ require ( github.com/dgraph-io/ristretto/v2 v2.1.0 // indirect github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 // indirect github.com/dustin/go-humanize v1.0.1 // indirect + github.com/fiatjaf/eventstore v0.16.2 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/khatru/README.md b/khatru/README.md index 8549e5b..abd7351 100644 --- a/khatru/README.md +++ b/khatru/README.md @@ -115,7 +115,7 @@ func main() { ### But I don't want to write my own database! -Fear no more. Using the https://github.com/fiatjaf/eventstore module you get a bunch of compatible databases out of the box and you can just plug them into your relay. For example, [sqlite](https://pkg.go.dev/github.com/fiatjaf/eventstore/sqlite3): +Fear no more. Using the https://fiatjaf.com/nostr/eventstore module you get a bunch of compatible databases out of the box and you can just plug them into your relay. For example, [sqlite](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/sqlite3): ```go db := sqlite3.SQLite3Backend{DatabaseURL: "/tmp/khatru-sqlite-tmp"} diff --git a/khatru/adding.go b/khatru/adding.go index 815c6e2..74d2b7a 100644 --- a/khatru/adding.go +++ b/khatru/adding.go @@ -5,16 +5,12 @@ import ( "errors" "fmt" - "fiatjaf.com/nostr/eventstore" "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore" ) // AddEvent sends an event through then normal add pipeline, as if it was received from a websocket. -func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (skipBroadcast bool, writeError error) { - if evt == nil { - return false, errors.New("error: event is nil") - } - +func (rl *Relay) AddEvent(ctx context.Context, evt nostr.Event) (skipBroadcast bool, writeError error) { if nostr.IsEphemeralKind(evt.Kind) { return false, rl.handleEphemeral(ctx, evt) } else { @@ -22,9 +18,9 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (skipBroadcast } } -func (rl *Relay) handleNormal(ctx context.Context, evt *nostr.Event) (skipBroadcast bool, writeError error) { - for _, reject := range rl.RejectEvent { - if reject, msg := reject(ctx, evt); reject { +func (rl *Relay) handleNormal(ctx context.Context, evt nostr.Event) (skipBroadcast bool, writeError error) { + if nil != rl.OnEvent { + if reject, msg := rl.OnEvent(ctx, evt); reject { if msg == "" { return true, errors.New("blocked: no reason") } else { @@ -36,8 +32,8 @@ func (rl *Relay) handleNormal(ctx context.Context, evt *nostr.Event) (skipBroadc // will store // regular kinds are just saved directly if nostr.IsRegularKind(evt.Kind) { - for _, store := range rl.StoreEvent { - if err := store(ctx, evt); err != nil { + if nil != rl.StoreEvent { + if err := rl.StoreEvent(ctx, evt); err != nil { switch err { case eventstore.ErrDupEvent: return true, nil @@ -47,63 +43,21 @@ func (rl *Relay) handleNormal(ctx context.Context, evt *nostr.Event) (skipBroadc } } } else { - // otherwise it's a replaceable -- so we'll use the replacer functions if we have any - if len(rl.ReplaceEvent) > 0 { - for _, repl := range rl.ReplaceEvent { - if err := repl(ctx, evt); err != nil { - switch err { - case eventstore.ErrDupEvent: - return true, nil - default: - return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(err.Error(), "error")) - } - } - } - } else { - // otherwise do it the manual way - filter := nostr.Filter{Limit: 1, Kinds: []int{evt.Kind}, Authors: []string{evt.PubKey}} - if nostr.IsAddressableKind(evt.Kind) { - // when addressable, add the "d" tag to the filter - filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}} - } - - // now we fetch old events and delete them - shouldStore := true - for _, query := range rl.QueryEvents { - ch, err := query(ctx, filter) - if err != nil { - continue - } - for previous := range ch { - if isOlder(previous, evt) { - for _, del := range rl.DeleteEvent { - del(ctx, previous) - } - } else { - // we found a more recent event, so we won't delete it and also will not store this new one - shouldStore = false - } - } - } - - // store - if shouldStore { - for _, store := range rl.StoreEvent { - if saveErr := store(ctx, evt); saveErr != nil { - switch saveErr { - case eventstore.ErrDupEvent: - return true, nil - default: - return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(saveErr.Error(), "error")) - } - } + // otherwise it's a replaceable + if nil != rl.ReplaceEvent { + if err := rl.ReplaceEvent(ctx, evt); err != nil { + switch err { + case eventstore.ErrDupEvent: + return true, nil + default: + return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(err.Error(), "error")) } } } } - for _, ons := range rl.OnEventSaved { - ons(ctx, evt) + if nil != rl.OnEventSaved { + rl.OnEventSaved(ctx, evt) } // track event expiration if applicable diff --git a/khatru/blossom/authorization.go b/khatru/blossom/authorization.go index 347f19e..ba888d7 100644 --- a/khatru/blossom/authorization.go +++ b/khatru/blossom/authorization.go @@ -7,8 +7,8 @@ import ( "strconv" "strings" - "github.com/mailru/easyjson" "fiatjaf.com/nostr" + "github.com/mailru/easyjson" ) func readAuthorization(r *http.Request) (*nostr.Event, error) { @@ -28,7 +28,7 @@ func readAuthorization(r *http.Request) (*nostr.Event, error) { if evt.Kind != 24242 || !evt.CheckID() { return nil, fmt.Errorf("invalid event") } - if ok, _ := evt.CheckSignature(); !ok { + if !evt.VerifySignature() { return nil, fmt.Errorf("invalid signature") } diff --git a/khatru/blossom/blob.go b/khatru/blossom/blob.go index 7753832..24dfd10 100644 --- a/khatru/blossom/blob.go +++ b/khatru/blossom/blob.go @@ -2,6 +2,7 @@ package blossom import ( "context" + "iter" "fiatjaf.com/nostr" ) @@ -13,14 +14,14 @@ type BlobDescriptor struct { Type string `json:"type"` Uploaded nostr.Timestamp `json:"uploaded"` - Owner string `json:"-"` + Owner nostr.PubKey `json:"-"` } type BlobIndex interface { - Keep(ctx context.Context, blob BlobDescriptor, pubkey string) error - List(ctx context.Context, pubkey string) (chan BlobDescriptor, error) + Keep(ctx context.Context, blob BlobDescriptor, pubkey nostr.PubKey) error + List(ctx context.Context, pubkey nostr.PubKey) iter.Seq[BlobDescriptor] Get(ctx context.Context, sha256 string) (*BlobDescriptor, error) - Delete(ctx context.Context, sha256 string, pubkey string) error + Delete(ctx context.Context, sha256 string, pubkey nostr.PubKey) error } var _ BlobIndex = (*EventStoreBlobIndexWrapper)(nil) diff --git a/khatru/blossom/eventstorewrapper.go b/khatru/blossom/eventstorewrapper.go index 162ab06..4b1f343 100644 --- a/khatru/blossom/eventstorewrapper.go +++ b/khatru/blossom/eventstorewrapper.go @@ -2,10 +2,11 @@ package blossom import ( "context" + "iter" "strconv" - "fiatjaf.com/nostr/eventstore" "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore" ) // EventStoreBlobIndexWrapper uses fake events to keep track of what blobs we have stored and who owns them @@ -15,15 +16,15 @@ type EventStoreBlobIndexWrapper struct { ServiceURL string } -func (es EventStoreBlobIndexWrapper) Keep(ctx context.Context, blob BlobDescriptor, pubkey string) error { - ch, err := es.Store.QueryEvents(ctx, nostr.Filter{Authors: []string{pubkey}, Kinds: []int{24242}, Tags: nostr.TagMap{"x": []string{blob.SHA256}}}) - if err != nil { - return err - } +func (es EventStoreBlobIndexWrapper) Keep(ctx context.Context, blob BlobDescriptor, pubkey nostr.PubKey) error { + next, stop := iter.Pull( + es.Store.QueryEvents(nostr.Filter{Authors: []nostr.PubKey{pubkey}, Kinds: []uint16{24242}, Tags: nostr.TagMap{"x": []string{blob.SHA256}}}), + ) + defer stop() - if <-ch == nil { + if _, exists := next(); !exists { // doesn't exist, save - evt := &nostr.Event{ + evt := nostr.Event{ PubKey: pubkey, Kind: 24242, Tags: nostr.Tags{ @@ -34,38 +35,31 @@ func (es EventStoreBlobIndexWrapper) Keep(ctx context.Context, blob BlobDescript CreatedAt: blob.Uploaded, } evt.ID = evt.GetID() - es.Store.SaveEvent(ctx, evt) + es.Store.SaveEvent(evt) } return nil } -func (es EventStoreBlobIndexWrapper) List(ctx context.Context, pubkey string) (chan BlobDescriptor, error) { - ech, err := es.Store.QueryEvents(ctx, nostr.Filter{Authors: []string{pubkey}, Kinds: []int{24242}}) - if err != nil { - return nil, err - } - - ch := make(chan BlobDescriptor) - - go func() { - for evt := range ech { - ch <- es.parseEvent(evt) +func (es EventStoreBlobIndexWrapper) List(ctx context.Context, pubkey nostr.PubKey) iter.Seq[BlobDescriptor] { + return func(yield func(BlobDescriptor) bool) { + for evt := range es.Store.QueryEvents(nostr.Filter{ + Authors: []nostr.PubKey{pubkey}, + Kinds: []uint16{24242}, + }) { + yield(es.parseEvent(evt)) } - close(ch) - }() - - return ch, nil + } } func (es EventStoreBlobIndexWrapper) Get(ctx context.Context, sha256 string) (*BlobDescriptor, error) { - ech, err := es.Store.QueryEvents(ctx, nostr.Filter{Tags: nostr.TagMap{"x": []string{sha256}}, Kinds: []int{24242}, Limit: 1}) - if err != nil { - return nil, err - } + next, stop := iter.Pull( + es.Store.QueryEvents(nostr.Filter{Tags: nostr.TagMap{"x": []string{sha256}}, Kinds: []uint16{24242}, Limit: 1}), + ) - evt := <-ech - if evt != nil { + defer stop() + + if evt, found := next(); found { bd := es.parseEvent(evt) return &bd, nil } @@ -73,21 +67,27 @@ func (es EventStoreBlobIndexWrapper) Get(ctx context.Context, sha256 string) (*B return nil, nil } -func (es EventStoreBlobIndexWrapper) Delete(ctx context.Context, sha256 string, pubkey string) error { - ech, err := es.Store.QueryEvents(ctx, nostr.Filter{Authors: []string{pubkey}, Tags: nostr.TagMap{"x": []string{sha256}}, Kinds: []int{24242}, Limit: 1}) - if err != nil { - return err - } +func (es EventStoreBlobIndexWrapper) Delete(ctx context.Context, sha256 string, pubkey nostr.PubKey) error { + next, stop := iter.Pull( + es.Store.QueryEvents(nostr.Filter{ + Authors: []nostr.PubKey{pubkey}, + Tags: nostr.TagMap{"x": []string{sha256}}, + Kinds: []uint16{24242}, + Limit: 1, + }, + ), + ) - evt := <-ech - if evt != nil { - return es.Store.DeleteEvent(ctx, evt) + defer stop() + + if evt, found := next(); found { + return es.Store.DeleteEvent(evt.ID) } return nil } -func (es EventStoreBlobIndexWrapper) parseEvent(evt *nostr.Event) BlobDescriptor { +func (es EventStoreBlobIndexWrapper) parseEvent(evt nostr.Event) BlobDescriptor { hhash := evt.Tags[0][1] mimetype := evt.Tags[1][1] ext := getExtension(mimetype) diff --git a/khatru/blossom/handlers.go b/khatru/blossom/handlers.go index b5bbb64..8bbf3ec 100644 --- a/khatru/blossom/handlers.go +++ b/khatru/blossom/handlers.go @@ -87,8 +87,8 @@ func (bs BlossomServer) handleUpload(w http.ResponseWriter, r *http.Request) { } // run the reject hooks - for _, ru := range bs.RejectUpload { - reject, reason, code := ru(r.Context(), auth, size, ext) + if nil != bs.RejectUpload { + reject, reason, code := bs.RejectUpload(r.Context(), auth, size, ext) if reject { blossomError(w, reason, code) return @@ -134,8 +134,8 @@ func (bs BlossomServer) handleUpload(w http.ResponseWriter, r *http.Request) { } // save actual blob - for _, sb := range bs.StoreBlob { - if err := sb(r.Context(), hhash, b); err != nil { + if nil != bs.StoreBlob { + if err := bs.StoreBlob(r.Context(), hhash, b); err != nil { blossomError(w, "failed to save: "+err.Error(), 500) return } @@ -175,8 +175,8 @@ func (bs BlossomServer) handleGetBlob(w http.ResponseWriter, r *http.Request) { } } - for _, rg := range bs.RejectGet { - reject, reason, code := rg(r.Context(), auth, hhash) + if nil != bs.RejectGet { + reject, reason, code := bs.RejectGet(r.Context(), auth, hhash) if reject { blossomError(w, reason, code) return @@ -188,8 +188,8 @@ func (bs BlossomServer) handleGetBlob(w http.ResponseWriter, r *http.Request) { ext = "." + spl[1] } - for _, lb := range bs.LoadBlob { - reader, _ := lb(r.Context(), hhash) + if nil != bs.LoadBlob { + reader, _ := bs.LoadBlob(r.Context(), hhash) if reader != nil { // use unix epoch as the time if we can't find the descriptor // as described in the http.ServeContent documentation @@ -245,26 +245,20 @@ func (bs BlossomServer) handleList(w http.ResponseWriter, r *http.Request) { } } - pubkey := r.URL.Path[6:] + pubkey, err := nostr.PubKeyFromHex(r.URL.Path[6:]) - for _, rl := range bs.RejectList { - reject, reason, code := rl(r.Context(), auth, pubkey) + if nil != bs.RejectList { + reject, reason, code := bs.RejectList(r.Context(), auth, pubkey) if reject { blossomError(w, reason, code) return } } - ch, err := bs.Store.List(r.Context(), pubkey) - if err != nil { - blossomError(w, "failed to query: "+err.Error(), 500) - return - } - w.Write([]byte{'['}) enc := json.NewEncoder(w) first := true - for bd := range ch { + for bd := range bs.Store.List(r.Context(), pubkey) { if !first { w.Write([]byte{','}) } else { @@ -303,8 +297,8 @@ func (bs BlossomServer) handleDelete(w http.ResponseWriter, r *http.Request) { } // should we accept this delete? - for _, rd := range bs.RejectDelete { - reject, reason, code := rd(r.Context(), auth, hhash) + if nil != bs.RejectDelete { + reject, reason, code := bs.RejectDelete(r.Context(), auth, hhash) if reject { blossomError(w, reason, code) return @@ -319,8 +313,8 @@ func (bs BlossomServer) handleDelete(w http.ResponseWriter, r *http.Request) { // we will actually only delete the file if no one else owns it if bd, err := bs.Store.Get(r.Context(), hhash); err == nil && bd == nil { - for _, del := range bs.DeleteBlob { - if err := del(r.Context(), hhash); err != nil { + if nil != bs.DeleteBlob { + if err := bs.DeleteBlob(r.Context(), hhash); err != nil { blossomError(w, "failed to delete blob: "+err.Error(), 500) return } diff --git a/khatru/blossom/server.go b/khatru/blossom/server.go index 7b5b380..f2b9f0e 100644 --- a/khatru/blossom/server.go +++ b/khatru/blossom/server.go @@ -21,7 +21,7 @@ type BlossomServer struct { RejectUpload func(ctx context.Context, auth *nostr.Event, size int, ext string) (bool, string, int) RejectGet func(ctx context.Context, auth *nostr.Event, sha256 string) (bool, string, int) - RejectList func(ctx context.Context, auth *nostr.Event, pubkey string) (bool, string, int) + RejectList func(ctx context.Context, auth *nostr.Event, pubkey nostr.PubKey) (bool, string, int) RejectDelete func(ctx context.Context, auth *nostr.Event, sha256 string) (bool, string, int) } diff --git a/khatru/broadcasting.go b/khatru/broadcasting.go index 87d47fb..7a0dd76 100644 --- a/khatru/broadcasting.go +++ b/khatru/broadcasting.go @@ -6,6 +6,6 @@ import ( // BroadcastEvent emits an event to all listeners whose filters' match, skipping all filters and actions // it also doesn't attempt to store the event or trigger any reactions or callbacks -func (rl *Relay) BroadcastEvent(evt *nostr.Event) int { +func (rl *Relay) BroadcastEvent(evt nostr.Event) int { return rl.notifyListeners(evt) } diff --git a/khatru/deleting.go b/khatru/deleting.go index b356447..7187095 100644 --- a/khatru/deleting.go +++ b/khatru/deleting.go @@ -9,7 +9,7 @@ import ( "fiatjaf.com/nostr" ) -func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) error { +func (rl *Relay) handleDeleteRequest(ctx context.Context, evt nostr.Event) error { // event deletion -- nip09 for _, tag := range evt.Tags { if len(tag) >= 2 { @@ -17,7 +17,11 @@ func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) erro switch tag[0] { case "e": - f = nostr.Filter{IDs: []string{tag[1]}} + id, err := nostr.IDFromHex(tag[1]) + if err != nil { + return fmt.Errorf("invalid 'e' tag '%s': %w", tag[1], err) + } + f = nostr.Filter{IDs: []nostr.ID{id}} case "a": spl := strings.Split(tag[1], ":") if len(spl) != 3 { @@ -27,11 +31,15 @@ func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) erro if err != nil { continue } - author := spl[1] + author, err := nostr.PubKeyFromHex(spl[1]) + if err != nil { + continue + } + identifier := spl[2] f = nostr.Filter{ - Kinds: []int{kind}, - Authors: []string{author}, + Kinds: []uint16{uint16(kind)}, + Authors: []nostr.PubKey{author}, Tags: nostr.TagMap{"d": []string{identifier}}, Until: &evt.CreatedAt, } @@ -40,39 +48,30 @@ func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) erro } ctx := context.WithValue(ctx, internalCallKey, struct{}{}) - for _, query := range rl.QueryEvents { - ch, err := query(ctx, f) - if err != nil { - continue - } - target := <-ch - if target == nil { - continue - } - // got the event, now check if the user can delete it - acceptDeletion := target.PubKey == evt.PubKey - var msg string - if !acceptDeletion { - msg = "you are not the author of this event" - } - // but if we have a function to overwrite this outcome, use that instead - for _, odo := range rl.OverwriteDeletionOutcome { - acceptDeletion, msg = odo(ctx, target, evt) - } - if acceptDeletion { - // delete it - for _, del := range rl.DeleteEvent { - if err := del(ctx, target); err != nil { - return err - } + if nil != rl.QueryStored { + for target := range rl.QueryStored(ctx, f) { + // got the event, now check if the user can delete it + acceptDeletion := target.PubKey == evt.PubKey + var msg string + if !acceptDeletion { + msg = "you are not the author of this event" } - // if it was tracked to be expired that is not needed anymore - rl.expirationManager.removeEvent(target.ID) - } else { - // fail and stop here - return fmt.Errorf("blocked: %s", msg) + if acceptDeletion { + // delete it + if nil != rl.DeleteEvent { + if err := rl.DeleteEvent(ctx, target.ID); err != nil { + return err + } + } + + // if it was tracked to be expired that is not needed anymore + rl.expirationManager.removeEvent(target.ID) + } else { + // fail and stop here + return fmt.Errorf("blocked: %s", msg) + } } // don't try to query this same event again diff --git a/khatru/docs/cookbook/search.md b/khatru/docs/cookbook/search.md index 93b8fd5..2dad7a0 100644 --- a/khatru/docs/cookbook/search.md +++ b/khatru/docs/cookbook/search.md @@ -6,7 +6,7 @@ outline: deep The [`nostr.Filter` type](https://pkg.go.dev/github.com/nbd-wtf/go-nostr#Filter) has a `Search` field, so you basically just has to handle that if it's present. -It can be tricky to implement fulltext search properly though, so some [eventstores](../core/eventstore) implement it natively, such as [Bluge](https://pkg.go.dev/github.com/fiatjaf/eventstore/bluge), [OpenSearch](https://pkg.go.dev/github.com/fiatjaf/eventstore/opensearch) and [ElasticSearch](https://pkg.go.dev/github.com/fiatjaf/eventstore/elasticsearch) (although for the last two you'll need an instance of these database servers running, while with Bluge it's embedded). +It can be tricky to implement fulltext search properly though, so some [eventstores](../core/eventstore) implement it natively, such as [Bluge](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/bluge), [OpenSearch](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/opensearch) and [ElasticSearch](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/elasticsearch) (although for the last two you'll need an instance of these database servers running, while with Bluge it's embedded). If you have any of these you can just use them just like any other eventstore: @@ -33,9 +33,9 @@ func main () { } ``` -Note that in this case we're using the [LMDB](https://pkg.go.dev/github.com/fiatjaf/eventstore/lmdb) adapter for normal queries and it explicitly rejects any filter that contains a `Search` field, while [Bluge](https://pkg.go.dev/github.com/fiatjaf/eventstore/bluge) rejects any filter _without_ a `Search` value, which make them pair well together. +Note that in this case we're using the [LMDB](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/lmdb) adapter for normal queries and it explicitly rejects any filter that contains a `Search` field, while [Bluge](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/bluge) rejects any filter _without_ a `Search` value, which make them pair well together. -Other adapters, like [SQLite](https://pkg.go.dev/github.com/fiatjaf/eventstore/sqlite3), implement search functionality on their own, so if you don't want to use that you would have to have a middleware between, like: +Other adapters, like [SQLite](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/sqlite3), implement search functionality on their own, so if you don't want to use that you would have to have a middleware between, like: ```go relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent, search.SaveEvent) diff --git a/khatru/docs/core/eventstore.md b/khatru/docs/core/eventstore.md index 1195bf4..04863a9 100644 --- a/khatru/docs/core/eventstore.md +++ b/khatru/docs/core/eventstore.md @@ -6,7 +6,7 @@ outline: deep Khatru doesn't make any assumptions about how you'll want to store events. Any function can be plugged in to the `StoreEvent`, `DeleteEvent`, `ReplaceEvent` and `QueryEvents` hooks. -However the [`eventstore`](https://github.com/fiatjaf/eventstore) library has adapters that you can easily plug into `khatru`'s hooks. +However the [`eventstore`](https://fiatjaf.com/nostr/eventstore) library has adapters that you can easily plug into `khatru`'s hooks. # Using the `eventstore` library @@ -14,7 +14,7 @@ The library includes many different adapters -- often called "backends" --, writ For all of them you start by instantiating a struct containing some basic options and a pointer (a file path for local databases, a connection string for remote databases) to the data. Then you call `.Init()` and if all is well you're ready to start storing, querying and deleting events, so you can pass the respective functions to their `khatru` counterparts. These eventstores also expose a `.Close()` function that must be called if you're going to stop using that store and keep your application open. -Here's an example with the [Badger](https://pkg.go.dev/github.com/fiatjaf/eventstore/badger) adapter, made for the [Badger](https://github.com/dgraph-io/badger) embedded key-value database: +Here's an example with the [Badger](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/badger) adapter, made for the [Badger](https://github.com/dgraph-io/badger) embedded key-value database: ```go package main @@ -23,7 +23,7 @@ import ( "fmt" "net/http" - "github.com/fiatjaf/eventstore/badger" + "fiatjaf.com/nostr/eventstore/badger" "github.com/fiatjaf/khatru" ) @@ -46,11 +46,11 @@ func main() { } ``` -[LMDB](https://pkg.go.dev/github.com/fiatjaf/eventstore/lmdb) works the same way. +[LMDB](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/lmdb) works the same way. -[SQLite](https://pkg.go.dev/github.com/fiatjaf/eventstore/sqlite3) also stores things locally so it only needs a `Path`. +[SQLite](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/sqlite3) also stores things locally so it only needs a `Path`. -[PostgreSQL](https://pkg.go.dev/github.com/fiatjaf/eventstore/postgresql) and [MySQL](https://pkg.go.dev/github.com/fiatjaf/eventstore/mysql) use remote connections to database servers, so they take a `DatabaseURL` parameter, but after that it's the same. +[PostgreSQL](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/postgresql) and [MySQL](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/mysql) use remote connections to database servers, so they take a `DatabaseURL` parameter, but after that it's the same. ## Using two at a time diff --git a/khatru/docs/getting-started/index.md b/khatru/docs/getting-started/index.md index ca9ef65..eb1ea94 100644 --- a/khatru/docs/getting-started/index.md +++ b/khatru/docs/getting-started/index.md @@ -31,7 +31,7 @@ relay.Info.Description = "this is my custom relay" relay.Info.Icon = "https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Fliquipedia.net%2Fcommons%2Fimages%2F3%2F35%2FSCProbe.jpg&f=1&nofb=1&ipt=0cbbfef25bce41da63d910e86c3c343e6c3b9d63194ca9755351bb7c2efa3359&ipo=images" ``` -Now we must set up the basic functions for accepting events and answering queries. We could make our own querying engine from scratch, but we can also use [eventstore](https://github.com/fiatjaf/eventstore). In this example we'll use the SQLite adapter: +Now we must set up the basic functions for accepting events and answering queries. We could make our own querying engine from scratch, but we can also use [eventstore](https://fiatjaf.com/nostr/eventstore). In this example we'll use the SQLite adapter: ```go db := sqlite3.SQLite3Backend{DatabaseURL: "/tmp/khatru-sqlite-tmp"} diff --git a/khatru/ephemeral.go b/khatru/ephemeral.go index e443870..196f130 100644 --- a/khatru/ephemeral.go +++ b/khatru/ephemeral.go @@ -7,9 +7,9 @@ import ( "fiatjaf.com/nostr" ) -func (rl *Relay) handleEphemeral(ctx context.Context, evt *nostr.Event) error { - for _, reject := range rl.RejectEvent { - if reject, msg := reject(ctx, evt); reject { +func (rl *Relay) handleEphemeral(ctx context.Context, evt nostr.Event) error { + if nil != rl.OnEvent { + if reject, msg := rl.OnEvent(ctx, evt); reject { if msg == "" { return errors.New("blocked: no reason") } else { @@ -18,8 +18,8 @@ func (rl *Relay) handleEphemeral(ctx context.Context, evt *nostr.Event) error { } } - for _, oee := range rl.OnEphemeralEvent { - oee(ctx, evt) + if nil != rl.OnEphemeralEvent { + rl.OnEphemeralEvent(ctx, evt) } return nil diff --git a/khatru/examples/basic-badger/main.go b/khatru/examples/basic-badger/main.go index 2173c46..52a353a 100644 --- a/khatru/examples/basic-badger/main.go +++ b/khatru/examples/basic-badger/main.go @@ -11,16 +11,13 @@ import ( func main() { relay := khatru.NewRelay() - db := badger.BadgerBackend{Path: "/tmp/khatru-badgern-tmp"} + db := &badger.BadgerBackend{Path: "/tmp/khatru-badgern-tmp"} if err := db.Init(); err != nil { panic(err) } - relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent) - relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents) - relay.CountEvents = append(relay.CountEvents, db.CountEvents) - relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent) - relay.ReplaceEvent = append(relay.ReplaceEvent, db.ReplaceEvent) + relay.UseEventstore(db) + relay.Negentropy = true fmt.Println("running on :3334") diff --git a/khatru/examples/basic-lmdb/main.go b/khatru/examples/basic-lmdb/main.go index 4f74024..4be5073 100644 --- a/khatru/examples/basic-lmdb/main.go +++ b/khatru/examples/basic-lmdb/main.go @@ -12,17 +12,13 @@ import ( func main() { relay := khatru.NewRelay() - db := lmdb.LMDBBackend{Path: "/tmp/khatru-lmdb-tmp"} + db := &lmdb.LMDBBackend{Path: "/tmp/khatru-lmdb-tmp"} os.MkdirAll(db.Path, 0o755) if err := db.Init(); err != nil { panic(err) } - relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent) - relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents) - relay.CountEvents = append(relay.CountEvents, db.CountEvents) - relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent) - relay.ReplaceEvent = append(relay.ReplaceEvent, db.ReplaceEvent) + relay.UseEventstore(db) fmt.Println("running on :3334") http.ListenAndServe(":3334", relay) diff --git a/khatru/examples/blossom/main.go b/khatru/examples/blossom/main.go index b01b17f..2eac465 100644 --- a/khatru/examples/blossom/main.go +++ b/khatru/examples/blossom/main.go @@ -19,11 +19,8 @@ func main() { if err := db.Init(); err != nil { panic(err) } - relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent) - relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents) - relay.CountEvents = append(relay.CountEvents, db.CountEvents) - relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent) - relay.ReplaceEvent = append(relay.ReplaceEvent, db.ReplaceEvent) + + relay.UseEventstore(db) bdb := &badger.BadgerBackend{Path: "/tmp/khatru-badger-blossom-tmp"} if err := bdb.Init(); err != nil { @@ -31,15 +28,15 @@ func main() { } bl := blossom.New(relay, "http://localhost:3334") bl.Store = blossom.EventStoreBlobIndexWrapper{Store: bdb, ServiceURL: bl.ServiceURL} - bl.StoreBlob = append(bl.StoreBlob, func(ctx context.Context, sha256 string, body []byte) error { + bl.StoreBlob = func(ctx context.Context, sha256 string, body []byte) error { fmt.Println("storing", sha256, len(body)) return nil - }) - bl.LoadBlob = append(bl.LoadBlob, func(ctx context.Context, sha256 string) (io.ReadSeeker, error) { + } + bl.LoadBlob = func(ctx context.Context, sha256 string) (io.ReadSeeker, error) { fmt.Println("loading", sha256) blob := strings.NewReader("aaaaa") return blob, nil - }) + } fmt.Println("running on :3334") http.ListenAndServe(":3334", relay) diff --git a/khatru/examples/exclusive/main.go b/khatru/examples/exclusive/main.go index 8d216ad..240c1c6 100644 --- a/khatru/examples/exclusive/main.go +++ b/khatru/examples/exclusive/main.go @@ -6,31 +6,28 @@ import ( "net/http" "os" + "fiatjaf.com/nostr" "fiatjaf.com/nostr/eventstore/lmdb" "fiatjaf.com/nostr/khatru" "fiatjaf.com/nostr/khatru/policies" - "fiatjaf.com/nostr" ) func main() { relay := khatru.NewRelay() - db := lmdb.LMDBBackend{Path: "/tmp/exclusive"} + db := &lmdb.LMDBBackend{Path: "/tmp/exclusive"} os.MkdirAll(db.Path, 0o755) if err := db.Init(); err != nil { panic(err) } - relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent) - relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents) - relay.CountEvents = append(relay.CountEvents, db.CountEvents) - relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent) + relay.UseEventstore(db) - relay.RejectEvent = append(relay.RejectEvent, policies.PreventTooManyIndexableTags(10, nil, nil)) - relay.RejectFilter = append(relay.RejectFilter, policies.NoComplexFilters) + relay.OnEvent = policies.PreventTooManyIndexableTags(10, nil, nil) + relay.OnRequest = policies.NoComplexFilters - relay.OnEventSaved = append(relay.OnEventSaved, func(ctx context.Context, event *nostr.Event) { - }) + relay.OnEventSaved = func(ctx context.Context, event nostr.Event) { + } fmt.Println("running on :3334") http.ListenAndServe(":3334", relay) diff --git a/khatru/examples/readme-demo/main.go b/khatru/examples/readme-demo/main.go index f31a1a3..8eee4c3 100644 --- a/khatru/examples/readme-demo/main.go +++ b/khatru/examples/readme-demo/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "iter" "log" "net/http" @@ -22,45 +23,36 @@ func main() { relay.Info.Icon = "https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Fliquipedia.net%2Fcommons%2Fimages%2F3%2F35%2FSCProbe.jpg&f=1&nofb=1&ipt=0cbbfef25bce41da63d910e86c3c343e6c3b9d63194ca9755351bb7c2efa3359&ipo=images" // you must bring your own storage scheme -- if you want to have any - store := make(map[string]*nostr.Event, 120) + store := make(map[nostr.ID]nostr.Event, 120) // set up the basic relay functions - relay.StoreEvent = append(relay.StoreEvent, - func(ctx context.Context, event *nostr.Event) error { - store[event.ID] = event - return nil - }, - ) - relay.QueryEvents = append(relay.QueryEvents, - func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { - ch := make(chan *nostr.Event) - go func() { - for _, evt := range store { - if filter.Matches(evt) { - ch <- evt - } + relay.StoreEvent = func(ctx context.Context, event nostr.Event) error { + store[event.ID] = event + return nil + } + relay.QueryStored = func(ctx context.Context, filter nostr.Filter) iter.Seq[nostr.Event] { + return func(yield func(nostr.Event) bool) { + for _, evt := range store { + if filter.Matches(evt) { + yield(evt) } - close(ch) - }() - return ch, nil - }, - ) - relay.DeleteEvent = append(relay.DeleteEvent, - func(ctx context.Context, event *nostr.Event) error { - delete(store, event.ID) - return nil - }, - ) + } + } + } + relay.DeleteEvent = func(ctx context.Context, id nostr.ID) error { + delete(store, id) + return nil + } // there are many other configurable things you can set - relay.RejectEvent = append(relay.RejectEvent, + relay.OnEvent = policies.SeqEvent( // built-in policies policies.ValidateKind, + policies.PreventLargeTags(100), // define your own policies - policies.PreventLargeTags(100), - func(ctx context.Context, event *nostr.Event) (reject bool, msg string) { - if event.PubKey == "fa984bd7dbb282f07e16e7ae87b26a2a7b9b90b7246a44771f0cf5ae58018f52" { + func(ctx context.Context, event nostr.Event) (reject bool, msg string) { + if event.PubKey == nostr.MustPubKeyFromHex("fa984bd7dbb282f07e16e7ae87b26a2a7b9b90b7246a44771f0cf5ae58018f52") { return true, "we don't allow this person to write here" } return false, "" // anyone else can @@ -68,13 +60,13 @@ func main() { ) // you can request auth by rejecting an event or a request with the prefix "auth-required: " - relay.RejectFilter = append(relay.RejectFilter, + relay.OnRequest = policies.SeqRequest( // built-in policies policies.NoComplexFilters, // define your own policies func(ctx context.Context, filter nostr.Filter) (reject bool, msg string) { - if pubkey := khatru.GetAuthed(ctx); pubkey != "" { + if pubkey, isAuthed := khatru.GetAuthed(ctx); !isAuthed { log.Printf("request from %s\n", pubkey) return false, "" } diff --git a/khatru/examples/routing/main.go b/khatru/examples/routing/main.go index 28d6eef..827da7b 100644 --- a/khatru/examples/routing/main.go +++ b/khatru/examples/routing/main.go @@ -12,29 +12,20 @@ import ( ) func main() { - db1 := slicestore.SliceStore{} + db1 := &slicestore.SliceStore{} db1.Init() r1 := khatru.NewRelay() - r1.StoreEvent = append(r1.StoreEvent, db1.SaveEvent) - r1.QueryEvents = append(r1.QueryEvents, db1.QueryEvents) - r1.CountEvents = append(r1.CountEvents, db1.CountEvents) - r1.DeleteEvent = append(r1.DeleteEvent, db1.DeleteEvent) + r1.UseEventstore(db1) - db2 := badger.BadgerBackend{DatabaseURL: "/tmp/t"} + db2 := &badger.BadgerBackend{Path: "/tmp/t"} db2.Init() r2 := khatru.NewRelay() - r2.StoreEvent = append(r2.StoreEvent, db2.SaveEvent) - r2.QueryEvents = append(r2.QueryEvents, db2.QueryEvents) - r2.CountEvents = append(r2.CountEvents, db2.CountEvents) - r2.DeleteEvent = append(r2.DeleteEvent, db2.DeleteEvent) + r2.UseEventstore(db2) - db3 := slicestore.SliceStore{} + db3 := &slicestore.SliceStore{} db3.Init() r3 := khatru.NewRelay() - r3.StoreEvent = append(r3.StoreEvent, db3.SaveEvent) - r3.QueryEvents = append(r3.QueryEvents, db3.QueryEvents) - r3.CountEvents = append(r3.CountEvents, db3.CountEvents) - r3.DeleteEvent = append(r3.DeleteEvent, db3.DeleteEvent) + r3.UseEventstore(db3) router := khatru.NewRouter() diff --git a/khatru/expiration.go b/khatru/expiration.go index fbdf6d8..e9ca724 100644 --- a/khatru/expiration.go +++ b/khatru/expiration.go @@ -11,7 +11,7 @@ import ( ) type expiringEvent struct { - id string + id nostr.ID expiresAt nostr.Timestamp } @@ -74,13 +74,8 @@ func (em *expirationManager) initialScan(ctx context.Context) { // query all events ctx = context.WithValue(ctx, internalCallKey, struct{}{}) - for _, query := range em.relay.QueryEvents { - ch, err := query(ctx, nostr.Filter{}) - if err != nil { - continue - } - - for evt := range ch { + if nil != em.relay.QueryStored { + for evt := range em.relay.QueryStored(ctx, nostr.Filter{}) { if expiresAt := nip40.GetExpiration(evt.Tags); expiresAt != -1 { heap.Push(&em.events, expiringEvent{ id: evt.ID, @@ -109,23 +104,13 @@ func (em *expirationManager) checkExpiredEvents(ctx context.Context) { heap.Pop(&em.events) ctx := context.WithValue(ctx, internalCallKey, struct{}{}) - for _, query := range em.relay.QueryEvents { - ch, err := query(ctx, nostr.Filter{IDs: []string{next.id}}) - if err != nil { - continue - } - - if evt := <-ch; evt != nil { - for _, del := range em.relay.DeleteEvent { - del(ctx, evt) - } - } - break + if nil != em.relay.DeleteEvent { + em.relay.DeleteEvent(ctx, next.id) } } } -func (em *expirationManager) trackEvent(evt *nostr.Event) { +func (em *expirationManager) trackEvent(evt nostr.Event) { if expiresAt := nip40.GetExpiration(evt.Tags); expiresAt != -1 { em.mu.Lock() heap.Push(&em.events, expiringEvent{ @@ -136,7 +121,7 @@ func (em *expirationManager) trackEvent(evt *nostr.Event) { } } -func (em *expirationManager) removeEvent(id string) { +func (em *expirationManager) removeEvent(id nostr.ID) { em.mu.Lock() defer em.mu.Unlock() diff --git a/khatru/handlers.go b/khatru/handlers.go index b66a720..4310dd9 100644 --- a/khatru/handlers.go +++ b/khatru/handlers.go @@ -176,8 +176,8 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { // check NIP-70 protected if nip70.IsProtected(env.Event) { - authed := GetAuthed(ctx) - if authed == "" { + authed, isAuthed := GetAuthed(ctx) + if isAuthed { RequestAuth(ctx) ws.WriteJSON(nostr.OKEnvelope{ EventID: env.Event.ID, @@ -213,20 +213,20 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { if env.Event.Kind == 5 { // this always returns "blocked: " whenever it returns an error - writeErr = srl.handleDeleteRequest(ctx, &env.Event) + writeErr = srl.handleDeleteRequest(ctx, env.Event) } else if nostr.IsEphemeralKind(env.Event.Kind) { // this will also always return a prefixed reason - writeErr = srl.handleEphemeral(ctx, &env.Event) + writeErr = srl.handleEphemeral(ctx, env.Event) } else { // this will also always return a prefixed reason - skipBroadcast, writeErr = srl.handleNormal(ctx, &env.Event) + skipBroadcast, writeErr = srl.handleNormal(ctx, env.Event) } var reason string if writeErr == nil { ok = true if !skipBroadcast { - n := srl.notifyListeners(&env.Event) + n := srl.notifyListeners(env.Event) // the number of notified listeners matters in ephemeral events if nostr.IsEphemeralKind(env.Event.Kind) { @@ -247,12 +247,12 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { } ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: ok, Reason: reason}) case *nostr.CountEnvelope: - if rl.CountEvents == nil && rl.CountEventsHLL == nil { + if rl.Count == nil && rl.CountHLL == nil { ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: "unsupported: this relay does not support NIP-45"}) return } - var total int64 + var total uint32 var hll *hyperloglog.HyperLogLog srl := rl @@ -278,7 +278,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { case *nostr.ReqEnvelope: eose := sync.WaitGroup{} - eose.Add(len(env.Filters)) + eose.Add(1) // a context just for the "stored events" request handler reqCtx, cancelReqCtx := context.WithCancelCause(ctx) @@ -287,24 +287,22 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { reqCtx = context.WithValue(reqCtx, subscriptionIdKey, env.SubscriptionID) // handle each filter separately -- dispatching events as they're loaded from databases - for _, filter := range env.Filters { - srl := rl - if rl.getSubRelayFromFilter != nil { - srl = rl.getSubRelayFromFilter(filter) - } - err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter) - if err != nil { - // fail everything if any filter is rejected - reason := err.Error() - if strings.HasPrefix(reason, "auth-required:") { - RequestAuth(ctx) - } - ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason}) - cancelReqCtx(errors.New("filter rejected")) - return - } else { - rl.addListener(ws, env.SubscriptionID, srl, filter, cancelReqCtx) + srl := rl + if rl.getSubRelayFromFilter != nil { + srl = rl.getSubRelayFromFilter(env.Filter) + } + err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, env.Filter) + if err != nil { + // fail everything if any filter is rejected + reason := err.Error() + if strings.HasPrefix(reason, "auth-required:") { + RequestAuth(ctx) } + ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason}) + cancelReqCtx(errors.New("filter rejected")) + return + } else { + rl.addListener(ws, env.SubscriptionID, srl, env.Filter, cancelReqCtx) } go func() { @@ -317,7 +315,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { rl.removeListenerId(ws, id) case *nostr.AuthEnvelope: wsBaseUrl := strings.Replace(rl.getBaseURL(r), "http", "ws", 1) - if pubkey, ok := nip42.ValidateAuthEvent(&env.Event, ws.Challenge, wsBaseUrl); ok { + if pubkey, ok := nip42.ValidateAuthEvent(env.Event, ws.Challenge, wsBaseUrl); ok { ws.AuthedPublicKey = pubkey ws.authLock.Lock() if ws.Authed != nil { diff --git a/khatru/helpers.go b/khatru/helpers.go index e332223..2229160 100644 --- a/khatru/helpers.go +++ b/khatru/helpers.go @@ -1,6 +1,7 @@ package khatru import ( + "bytes" "net" "net/http" "strings" @@ -10,7 +11,7 @@ import ( func isOlder(previous, next *nostr.Event) bool { return previous.CreatedAt < next.CreatedAt || - (previous.CreatedAt == next.CreatedAt && previous.ID > next.ID) + (previous.CreatedAt == next.CreatedAt && bytes.Compare(previous.ID[:], next.ID[:]) == 1) } var privateMasks = func() []net.IPNet { diff --git a/khatru/listener.go b/khatru/listener.go index 9ca35f9..3730a7b 100644 --- a/khatru/listener.go +++ b/khatru/listener.go @@ -133,17 +133,17 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) { } // returns how many listeners were notified -func (rl *Relay) notifyListeners(event *nostr.Event) int { +func (rl *Relay) notifyListeners(event nostr.Event) int { count := 0 listenersloop: for _, listener := range rl.listeners { if listener.filter.Matches(event) { - for _, pb := range rl.PreventBroadcast { - if pb(listener.ws, event) { + if nil != rl.PreventBroadcast { + if rl.PreventBroadcast(listener.ws, event) { continue listenersloop } } - listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.id, Event: *event}) + listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.id, Event: event}) count++ } } diff --git a/khatru/listener_fuzz_test.go b/khatru/listener_fuzz_test.go index 82dadb9..b8252c9 100644 --- a/khatru/listener_fuzz_test.go +++ b/khatru/listener_fuzz_test.go @@ -17,7 +17,7 @@ func FuzzRandomListenerClientRemoving(f *testing.F) { rl := NewRelay() - f := nostr.Filter{Kinds: []int{1}} + f := nostr.Filter{Kinds: []uint16{1}} cancel := func(cause error) {} websockets := make([]*WebSocket, 0, totalWebsockets*baseSubs) @@ -71,7 +71,7 @@ func FuzzRandomListenerIdRemoving(f *testing.F) { rl := NewRelay() - f := nostr.Filter{Kinds: []int{1}} + f := nostr.Filter{Kinds: []uint16{1}} cancel := func(cause error) {} websockets := make([]*WebSocket, 0, totalWebsockets) @@ -150,7 +150,7 @@ func FuzzRouterListenersPabloCrash(f *testing.F) { rl.clients[ws] = make([]listenerSpec, 0, subIterations) } - f := nostr.Filter{Kinds: []int{1}} + f := nostr.Filter{Kinds: []uint16{1}} cancel := func(cause error) {} type wsid struct { diff --git a/khatru/listener_test.go b/khatru/listener_test.go index 45ef8b4..ef9a459 100644 --- a/khatru/listener_test.go +++ b/khatru/listener_test.go @@ -29,9 +29,9 @@ func TestListenerSetupAndRemoveOnce(t *testing.T) { ws1 := &WebSocket{} ws2 := &WebSocket{} - f1 := nostr.Filter{Kinds: []int{1}} - f2 := nostr.Filter{Kinds: []int{2}} - f3 := nostr.Filter{Kinds: []int{3}} + f1 := nostr.Filter{Kinds: []uint16{1}} + f2 := nostr.Filter{Kinds: []uint16{2}} + f3 := nostr.Filter{Kinds: []uint16{3}} rl.clients[ws1] = nil rl.clients[ws2] = nil @@ -86,9 +86,9 @@ func TestListenerMoreConvolutedCase(t *testing.T) { ws3 := &WebSocket{} ws4 := &WebSocket{} - f1 := nostr.Filter{Kinds: []int{1}} - f2 := nostr.Filter{Kinds: []int{2}} - f3 := nostr.Filter{Kinds: []int{3}} + f1 := nostr.Filter{Kinds: []uint16{1}} + f2 := nostr.Filter{Kinds: []uint16{2}} + f3 := nostr.Filter{Kinds: []uint16{3}} rl.clients[ws1] = nil rl.clients[ws2] = nil @@ -205,9 +205,9 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) { ws3 := &WebSocket{} ws4 := &WebSocket{} - f1 := nostr.Filter{Kinds: []int{1}} - f2 := nostr.Filter{Kinds: []int{2}} - f3 := nostr.Filter{Kinds: []int{3}} + f1 := nostr.Filter{Kinds: []uint16{1}} + f2 := nostr.Filter{Kinds: []uint16{2}} + f3 := nostr.Filter{Kinds: []uint16{3}} rlx := NewRelay() rly := NewRelay() @@ -424,7 +424,7 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) { func TestRandomListenerClientRemoving(t *testing.T) { rl := NewRelay() - f := nostr.Filter{Kinds: []int{1}} + f := nostr.Filter{Kinds: []uint16{1}} cancel := func(cause error) {} websockets := make([]*WebSocket, 0, 20) @@ -463,7 +463,7 @@ func TestRandomListenerClientRemoving(t *testing.T) { func TestRandomListenerIdRemoving(t *testing.T) { rl := NewRelay() - f := nostr.Filter{Kinds: []int{1}} + f := nostr.Filter{Kinds: []uint16{1}} cancel := func(cause error) {} websockets := make([]*WebSocket, 0, 20) @@ -531,7 +531,7 @@ func TestRouterListenersPabloCrash(t *testing.T) { rl.clients[ws2] = nil rl.clients[ws3] = nil - f := nostr.Filter{Kinds: []int{1}} + f := nostr.Filter{Kinds: []uint16{1}} cancel := func(cause error) {} rl.addListener(ws1, ":1", rla, f, cancel) diff --git a/khatru/negentropy.go b/khatru/negentropy.go index fdc11c2..5976b39 100644 --- a/khatru/negentropy.go +++ b/khatru/negentropy.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" - "fiatjaf.com/nostr/eventstore" "fiatjaf.com/nostr" "fiatjaf.com/nostr/nip77/negentropy" "fiatjaf.com/nostr/nip77/negentropy/storage/vector" @@ -17,33 +16,22 @@ type NegentropySession struct { } func (rl *Relay) startNegentropySession(ctx context.Context, filter nostr.Filter) (*vector.Vector, error) { - ctx = eventstore.SetNegentropy(ctx) - - // do the same overwrite/reject flow we do in normal REQs - for _, ovw := range rl.OverwriteFilter { - ovw(ctx, &filter) - } if filter.LimitZero { return nil, fmt.Errorf("invalid limit 0") } - for _, reject := range rl.RejectFilter { - if reject, msg := reject(ctx, filter); reject { + + ctx = SetNegentropy(ctx) + + if nil != rl.OnRequest { + if reject, msg := rl.OnRequest(ctx, filter); reject { return nil, errors.New(nostr.NormalizeOKMessage(msg, "blocked")) } } // fetch events and add them to a negentropy Vector store vec := vector.New() - for _, query := range rl.QueryEvents { - ch, err := query(ctx, filter) - if err != nil { - continue - } else if ch == nil { - continue - } - - for event := range ch { - // since the goal here is to sync databases we won't do fancy stuff like overwrite events + if nil != rl.QueryStored { + for event := range rl.QueryStored(ctx, filter) { vec.Insert(event.CreatedAt, event.ID) } } @@ -51,3 +39,13 @@ func (rl *Relay) startNegentropySession(ctx context.Context, filter nostr.Filter return vec, nil } + +var negentropySessionKey = struct{}{} + +func IsNegentropySession(ctx context.Context) bool { + return ctx.Value(negentropySessionKey) != nil +} + +func SetNegentropy(ctx context.Context) context.Context { + return context.WithValue(ctx, negentropySessionKey, struct{}{}) +} diff --git a/khatru/nip11.go b/khatru/nip11.go index 2226a2f..a72af76 100644 --- a/khatru/nip11.go +++ b/khatru/nip11.go @@ -11,10 +11,10 @@ func (rl *Relay) HandleNIP11(w http.ResponseWriter, r *http.Request) { info := *rl.Info - if len(rl.DeleteEvent) > 0 { + if nil != rl.DeleteEvent { info.AddSupportedNIP(9) } - if len(rl.CountEvents) > 0 { + if nil != rl.Count { info.AddSupportedNIP(45) } if rl.Negentropy { @@ -30,8 +30,8 @@ func (rl *Relay) HandleNIP11(w http.ResponseWriter, r *http.Request) { info.Banner = strings.TrimSuffix(baseURL, "/") + "/" + strings.TrimPrefix(info.Banner, "/") } - for _, ovw := range rl.OverwriteRelayInformation { - info = ovw(r.Context(), r, info) + if nil != rl.OverwriteRelayInformation { + info = rl.OverwriteRelayInformation(r.Context(), r, info) } json.NewEncoder(w).Encode(info) diff --git a/khatru/policies/events.go b/khatru/policies/events.go index 29f283a..344ffea 100644 --- a/khatru/policies/events.go +++ b/khatru/policies/events.go @@ -16,25 +16,25 @@ import ( // // If ignoreKinds is given this restriction will not apply to these kinds (useful for allowing a bigger). // If onlyKinds is given then all other kinds will be ignored. -func PreventTooManyIndexableTags(max int, ignoreKinds []int, onlyKinds []int) func(context.Context, *nostr.Event) (bool, string) { +func PreventTooManyIndexableTags(max int, ignoreKinds []uint16, onlyKinds []uint16) func(context.Context, nostr.Event) (bool, string) { slices.Sort(ignoreKinds) slices.Sort(onlyKinds) - ignore := func(kind int) bool { return false } + ignore := func(kind uint16) bool { return false } if len(ignoreKinds) > 0 { - ignore = func(kind int) bool { + ignore = func(kind uint16) bool { _, isIgnored := slices.BinarySearch(ignoreKinds, kind) return isIgnored } } if len(onlyKinds) > 0 { - ignore = func(kind int) bool { + ignore = func(kind uint16) bool { _, isApplicable := slices.BinarySearch(onlyKinds, kind) return !isApplicable } } - return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) { + return func(ctx context.Context, event nostr.Event) (reject bool, msg string) { if ignore(event.Kind) { return false, "" } @@ -53,8 +53,8 @@ func PreventTooManyIndexableTags(max int, ignoreKinds []int, onlyKinds []int) fu } // PreventLargeTags rejects events that have indexable tag values greater than maxTagValueLen. -func PreventLargeTags(maxTagValueLen int) func(context.Context, *nostr.Event) (bool, string) { - return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) { +func PreventLargeTags(maxTagValueLen int) func(context.Context, nostr.Event) (bool, string) { + return func(ctx context.Context, event nostr.Event) (reject bool, msg string) { for _, tag := range event.Tags { if len(tag) > 1 && len(tag[0]) == 1 { if len(tag[1]) > maxTagValueLen { @@ -68,11 +68,11 @@ func PreventLargeTags(maxTagValueLen int) func(context.Context, *nostr.Event) (b // RestrictToSpecifiedKinds returns a function that can be used as a RejectFilter that will reject // any events with kinds different than the specified ones. -func RestrictToSpecifiedKinds(allowEphemeral bool, kinds ...uint16) func(context.Context, *nostr.Event) (bool, string) { +func RestrictToSpecifiedKinds(allowEphemeral bool, kinds ...uint16) func(context.Context, nostr.Event) (bool, string) { // sort the kinds in increasing order slices.Sort(kinds) - return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) { + return func(ctx context.Context, event nostr.Event) (reject bool, msg string) { if allowEphemeral && nostr.IsEphemeralKind(event.Kind) { return false, "" } @@ -85,9 +85,9 @@ func RestrictToSpecifiedKinds(allowEphemeral bool, kinds ...uint16) func(context } } -func PreventTimestampsInThePast(threshold time.Duration) func(context.Context, *nostr.Event) (bool, string) { +func PreventTimestampsInThePast(threshold time.Duration) func(context.Context, nostr.Event) (bool, string) { thresholdSeconds := nostr.Timestamp(threshold.Seconds()) - return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) { + return func(ctx context.Context, event nostr.Event) (reject bool, msg string) { if nostr.Now()-event.CreatedAt > thresholdSeconds { return true, "event too old" } @@ -95,9 +95,9 @@ func PreventTimestampsInThePast(threshold time.Duration) func(context.Context, * } } -func PreventTimestampsInTheFuture(threshold time.Duration) func(context.Context, *nostr.Event) (bool, string) { +func PreventTimestampsInTheFuture(threshold time.Duration) func(context.Context, nostr.Event) (bool, string) { thresholdSeconds := nostr.Timestamp(threshold.Seconds()) - return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) { + return func(ctx context.Context, event nostr.Event) (reject bool, msg string) { if event.CreatedAt-nostr.Now() > thresholdSeconds { return true, "event too much in the future" } @@ -105,12 +105,12 @@ func PreventTimestampsInTheFuture(threshold time.Duration) func(context.Context, } } -func RejectEventsWithBase64Media(ctx context.Context, evt *nostr.Event) (bool, string) { +func RejectEventsWithBase64Media(ctx context.Context, evt nostr.Event) (bool, string) { return strings.Contains(evt.Content, "data:image/") || strings.Contains(evt.Content, "data:video/"), "event with base64 media" } -func OnlyAllowNIP70ProtectedEvents(ctx context.Context, event *nostr.Event) (reject bool, msg string) { - if nip70.IsProtected(*event) { +func OnlyAllowNIP70ProtectedEvents(ctx context.Context, event nostr.Event) (reject bool, msg string) { + if nip70.IsProtected(event) { return false, "" } return true, "blocked: we only accept events protected with the nip70 \"-\" tag" diff --git a/khatru/policies/filters.go b/khatru/policies/filters.go index 26531d8..e77547d 100644 --- a/khatru/policies/filters.go +++ b/khatru/policies/filters.go @@ -4,8 +4,8 @@ import ( "context" "slices" - "fiatjaf.com/nostr/khatru" "fiatjaf.com/nostr" + "fiatjaf.com/nostr/khatru" ) // NoComplexFilters disallows filters with more than 2 tags. @@ -21,7 +21,7 @@ func NoComplexFilters(ctx context.Context, filter nostr.Filter) (reject bool, ms // MustAuth requires all subscribers to be authenticated func MustAuth(ctx context.Context, filter nostr.Filter) (reject bool, msg string) { - if khatru.GetAuthed(ctx) == "" { + if _, isAuthed := khatru.GetAuthed(ctx); !isAuthed { return true, "auth-required: all requests must be authenticated" } return false, "" @@ -63,7 +63,7 @@ func RemoveSearchQueries(ctx context.Context, filter *nostr.Filter) { func RemoveAllButKinds(kinds ...uint16) func(context.Context, *nostr.Filter) { return func(ctx context.Context, filter *nostr.Filter) { if n := len(filter.Kinds); n > 0 { - newKinds := make([]int, 0, n) + newKinds := make([]uint16, 0, n) for i := 0; i < n; i++ { if k := filter.Kinds[i]; slices.Contains(kinds, uint16(k)) { newKinds = append(newKinds, k) diff --git a/khatru/policies/kind_validation.go b/khatru/policies/kind_validation.go index 5699751..19b63c4 100644 --- a/khatru/policies/kind_validation.go +++ b/khatru/policies/kind_validation.go @@ -7,7 +7,7 @@ import ( "fiatjaf.com/nostr" ) -func ValidateKind(ctx context.Context, evt *nostr.Event) (bool, string) { +func ValidateKind(ctx context.Context, evt nostr.Event) (bool, string) { switch evt.Kind { case 0: var m struct { diff --git a/khatru/policies/multi.go b/khatru/policies/multi.go new file mode 100644 index 0000000..78278a6 --- /dev/null +++ b/khatru/policies/multi.go @@ -0,0 +1,47 @@ +package policies + +import ( + "context" + + "fiatjaf.com/nostr" +) + +func SeqEvent( + funcs ...func(ctx context.Context, evt nostr.Event) (bool, string), +) func(context.Context, nostr.Event) (reject bool, reason string) { + return func(ctx context.Context, evt nostr.Event) (reject bool, reason string) { + for _, fn := range funcs { + reject, reason := fn(ctx, evt) + if reject { + return reject, reason + } + } + return false, "" + } +} + +func SeqStore(funcs ...func(ctx context.Context, evt nostr.Event) error) func(context.Context, nostr.Event) error { + return func(ctx context.Context, evt nostr.Event) error { + for _, fn := range funcs { + err := fn(ctx, evt) + if err != nil { + return err + } + } + return nil + } +} + +func SeqRequest( + funcs ...func(ctx context.Context, evt nostr.Filter) (bool, string), +) func(context.Context, nostr.Filter) (reject bool, reason string) { + return func(ctx context.Context, evt nostr.Filter) (reject bool, reason string) { + for _, fn := range funcs { + reject, reason := fn(ctx, evt) + if reject { + return reject, reason + } + } + return false, "" + } +} diff --git a/khatru/policies/nip04.go b/khatru/policies/nip04.go deleted file mode 100644 index c06b6ce..0000000 --- a/khatru/policies/nip04.go +++ /dev/null @@ -1,38 +0,0 @@ -package policies - -import ( - "context" - "slices" - - "fiatjaf.com/nostr/khatru" - "fiatjaf.com/nostr" -) - -// RejectKind04Snoopers prevents reading NIP-04 messages from people not involved in the conversation. -func RejectKind04Snoopers(ctx context.Context, filter nostr.Filter) (bool, string) { - // prevent kind-4 events from being returned to unauthed users, - // only when authentication is a thing - if !slices.Contains(filter.Kinds, 4) { - return false, "" - } - - ws := khatru.GetConnection(ctx) - senders := filter.Authors - receivers, _ := filter.Tags["p"] - switch { - case ws.AuthedPublicKey == "": - // not authenticated - return true, "restricted: this relay does not serve kind-4 to unauthenticated users, does your client implement NIP-42?" - case len(senders) == 1 && len(receivers) < 2 && (senders[0] == ws.AuthedPublicKey): - // allowed filter: ws.authed is sole sender (filter specifies one or all receivers) - return false, "" - case len(receivers) == 1 && len(senders) < 2 && (receivers[0] == ws.AuthedPublicKey): - // allowed filter: ws.authed is sole receiver (filter specifies one or all senders) - return false, "" - default: - // restricted filter: do not return any events, - // even if other elements in filters array were not restricted). - // client should know better. - return true, "restricted: authenticated user does not have authorization for requested filters." - } -} diff --git a/khatru/policies/ratelimits.go b/khatru/policies/ratelimits.go index 2bdb209..6afc0b0 100644 --- a/khatru/policies/ratelimits.go +++ b/khatru/policies/ratelimits.go @@ -5,14 +5,14 @@ import ( "net/http" "time" - "fiatjaf.com/nostr/khatru" "fiatjaf.com/nostr" + "fiatjaf.com/nostr/khatru" ) -func EventIPRateLimiter(tokensPerInterval int, interval time.Duration, maxTokens int) func(ctx context.Context, _ *nostr.Event) (reject bool, msg string) { +func EventIPRateLimiter(tokensPerInterval int, interval time.Duration, maxTokens int) func(ctx context.Context, _ nostr.Event) (reject bool, msg string) { rl := startRateLimitSystem[string](tokensPerInterval, interval, maxTokens) - return func(ctx context.Context, _ *nostr.Event) (reject bool, msg string) { + return func(ctx context.Context, _ nostr.Event) (reject bool, msg string) { ip := khatru.GetIP(ctx) if ip == "" { return false, "" @@ -21,11 +21,11 @@ func EventIPRateLimiter(tokensPerInterval int, interval time.Duration, maxTokens } } -func EventPubKeyRateLimiter(tokensPerInterval int, interval time.Duration, maxTokens int) func(ctx context.Context, _ *nostr.Event) (reject bool, msg string) { +func EventPubKeyRateLimiter(tokensPerInterval int, interval time.Duration, maxTokens int) func(ctx context.Context, _ nostr.Event) (reject bool, msg string) { rl := startRateLimitSystem[string](tokensPerInterval, interval, maxTokens) - return func(ctx context.Context, evt *nostr.Event) (reject bool, msg string) { - return rl(evt.PubKey), "rate-limited: slow down, please" + return func(ctx context.Context, evt nostr.Event) (reject bool, msg string) { + return rl(evt.PubKey.Hex()), "rate-limited: slow down, please" } } diff --git a/khatru/policies/sane_defaults.go b/khatru/policies/sane_defaults.go index 546a938..ec0309b 100644 --- a/khatru/policies/sane_defaults.go +++ b/khatru/policies/sane_defaults.go @@ -7,17 +7,15 @@ import ( ) func ApplySaneDefaults(relay *khatru.Relay) { - relay.RejectEvent = append(relay.RejectEvent, + relay.OnEvent = SeqEvent( RejectEventsWithBase64Media, EventIPRateLimiter(2, time.Minute*3, 10), ) - relay.RejectFilter = append(relay.RejectFilter, + relay.OnRequest = SeqRequest( NoComplexFilters, FilterIPRateLimiter(20, time.Minute, 100), ) - relay.RejectConnection = append(relay.RejectConnection, - ConnectionRateLimiter(1, time.Minute*5, 100), - ) + relay.RejectConnection = ConnectionRateLimiter(1, time.Minute*5, 100) } diff --git a/khatru/relay.go b/khatru/relay.go index f4efd41..d6d2616 100644 --- a/khatru/relay.go +++ b/khatru/relay.go @@ -12,6 +12,7 @@ import ( "time" "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore" "fiatjaf.com/nostr/nip11" "fiatjaf.com/nostr/nip45/hyperloglog" "github.com/fasthttp/websocket" @@ -57,23 +58,22 @@ type Relay struct { ServiceURL string // hooks that will be called at various times - RejectEvent func(ctx context.Context, event *nostr.Event) (reject bool, msg string) - OverwriteDeletionOutcome func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string) + OnEvent func(ctx context.Context, event nostr.Event) (reject bool, msg string) StoreEvent func(ctx context.Context, event nostr.Event) error ReplaceEvent func(ctx context.Context, event nostr.Event) error DeleteEvent func(ctx context.Context, id nostr.ID) error OnEventSaved func(ctx context.Context, event nostr.Event) OnEphemeralEvent func(ctx context.Context, event nostr.Event) - RejectFilter func(ctx context.Context, filter nostr.Filter) (reject bool, msg string) - RejectCountFilter func(ctx context.Context, filter nostr.Filter) (reject bool, msg string) - QueryEvents func(ctx context.Context, filter nostr.Filter) iter.Seq[nostr.Event] - CountEvents func(ctx context.Context, filter nostr.Filter) (uint32, error) - CountEventsHLL func(ctx context.Context, filter nostr.Filter, offset int) (uint32, *hyperloglog.HyperLogLog, error) + OnRequest func(ctx context.Context, filter nostr.Filter) (reject bool, msg string) + OnCountFilter func(ctx context.Context, filter nostr.Filter) (reject bool, msg string) + QueryStored func(ctx context.Context, filter nostr.Filter) iter.Seq[nostr.Event] + Count func(ctx context.Context, filter nostr.Filter) (uint32, error) + CountHLL func(ctx context.Context, filter nostr.Filter, offset int) (uint32, *hyperloglog.HyperLogLog, error) RejectConnection func(r *http.Request) bool OnConnect func(ctx context.Context) OnDisconnect func(ctx context.Context) OverwriteRelayInformation func(ctx context.Context, r *http.Request, info nip11.RelayInformationDocument) nip11.RelayInformationDocument - PreventBroadcast func(ws *WebSocket, event *nostr.Event) bool + PreventBroadcast func(ws *WebSocket, event nostr.Event) bool // these are used when this relays acts as a router routes []Route @@ -117,6 +117,24 @@ type Relay struct { expirationManager *expirationManager } +func (rl *Relay) UseEventstore(store eventstore.Store) { + rl.QueryStored = func(ctx context.Context, filter nostr.Filter) iter.Seq[nostr.Event] { + return store.QueryEvents(filter) + } + rl.Count = func(ctx context.Context, filter nostr.Filter) (uint32, error) { + return store.CountEvents(filter) + } + rl.StoreEvent = func(ctx context.Context, event nostr.Event) error { + return store.SaveEvent(event) + } + rl.ReplaceEvent = func(ctx context.Context, event nostr.Event) error { + return store.ReplaceEvent(event) + } + rl.DeleteEvent = func(ctx context.Context, id nostr.ID) error { + return store.DeleteEvent(id) + } +} + func (rl *Relay) getBaseURL(r *http.Request) string { if rl.ServiceURL != "" { return rl.ServiceURL diff --git a/khatru/relay_test.go b/khatru/relay_test.go index c8533ee..27e6dc3 100644 --- a/khatru/relay_test.go +++ b/khatru/relay_test.go @@ -7,41 +7,31 @@ import ( "testing" "time" - "fiatjaf.com/nostr/eventstore/slicestore" "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore/slicestore" ) func TestBasicRelayFunctionality(t *testing.T) { // setup relay with in-memory store relay := NewRelay() - store := slicestore.SliceStore{} + store := &slicestore.SliceStore{} store.Init() - relay.StoreEvent = append(relay.StoreEvent, store.SaveEvent) - relay.QueryEvents = append(relay.QueryEvents, store.QueryEvents) - relay.DeleteEvent = append(relay.DeleteEvent, store.DeleteEvent) + + relay.UseEventstore(store) // start test server server := httptest.NewServer(relay) defer server.Close() // create test keys - sk1 := nostr.GeneratePrivateKey() - pk1, err := nostr.GetPublicKey(sk1) - if err != nil { - t.Fatalf("Failed to get public key 1: %v", err) - } - sk2 := nostr.GeneratePrivateKey() - pk2, err := nostr.GetPublicKey(sk2) - if err != nil { - t.Fatalf("Failed to get public key 2: %v", err) - } + sk1 := nostr.Generate() + pk1 := nostr.GetPublicKey(sk1) + sk2 := nostr.Generate() + pk2 := nostr.GetPublicKey(sk2) // helper to create signed events - createEvent := func(sk string, kind int, content string, tags nostr.Tags) nostr.Event { - pk, err := nostr.GetPublicKey(sk) - if err != nil { - t.Fatalf("Failed to get public key: %v", err) - } + createEvent := func(sk nostr.SecretKey, kind uint16, content string, tags nostr.Tags) nostr.Event { + pk := nostr.GetPublicKey(sk) evt := nostr.Event{ PubKey: pk, CreatedAt: nostr.Now(), @@ -55,13 +45,13 @@ func TestBasicRelayFunctionality(t *testing.T) { // connect two test clients url := "ws" + server.URL[4:] - client1, err := nostr.RelayConnect(context.Background(), url) + client1, err := nostr.RelayConnect(t.Context(), url, nostr.RelayOptions{}) if err != nil { t.Fatalf("failed to connect client1: %v", err) } defer client1.Close() - client2, err := nostr.RelayConnect(context.Background(), url) + client2, err := nostr.RelayConnect(t.Context(), url, nostr.RelayOptions{}) if err != nil { t.Fatalf("failed to connect client2: %v", err) } @@ -69,7 +59,7 @@ func TestBasicRelayFunctionality(t *testing.T) { // test 1: store and query events t.Run("store and query events", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) defer cancel() evt1 := createEvent(sk1, 1, "hello world", nil) @@ -79,10 +69,10 @@ func TestBasicRelayFunctionality(t *testing.T) { } // Query the event back - sub, err := client2.Subscribe(ctx, []nostr.Filter{{ - Authors: []string{pk1}, - Kinds: []int{1}, - }}) + sub, err := client2.Subscribe(ctx, nostr.Filter{ + Authors: []nostr.PubKey{pk1}, + Kinds: []uint16{1}, + }, nostr.SubscriptionOptions{}) if err != nil { t.Fatalf("failed to subscribe: %v", err) } @@ -101,14 +91,14 @@ func TestBasicRelayFunctionality(t *testing.T) { // test 2: live event subscription t.Run("live event subscription", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) defer cancel() // Setup subscription first - sub, err := client1.Subscribe(ctx, []nostr.Filter{{ - Authors: []string{pk2}, - Kinds: []int{1}, - }}) + sub, err := client1.Subscribe(ctx, nostr.Filter{ + Authors: []nostr.PubKey{pk2}, + Kinds: []uint16{1}, + }, nostr.SubscriptionOptions{}) if err != nil { t.Fatalf("failed to subscribe: %v", err) } @@ -134,7 +124,7 @@ func TestBasicRelayFunctionality(t *testing.T) { // test 3: event deletion t.Run("event deletion", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) defer cancel() // Create an event to be deleted @@ -145,16 +135,16 @@ func TestBasicRelayFunctionality(t *testing.T) { } // Create deletion event - delEvent := createEvent(sk1, 5, "deleting", nostr.Tags{{"e", evt3.ID}}) + delEvent := createEvent(sk1, 5, "deleting", nostr.Tags{{"e", evt3.ID.Hex()}}) err = client1.Publish(ctx, delEvent) if err != nil { t.Fatalf("failed to publish deletion event: %v", err) } // Try to query the deleted event - sub, err := client2.Subscribe(ctx, []nostr.Filter{{ - IDs: []string{evt3.ID}, - }}) + sub, err := client2.Subscribe(ctx, nostr.Filter{ + IDs: []nostr.ID{evt3.ID}, + }, nostr.SubscriptionOptions{}) if err != nil { t.Fatalf("failed to subscribe: %v", err) } @@ -179,7 +169,7 @@ func TestBasicRelayFunctionality(t *testing.T) { // test 4: teplaceable events t.Run("replaceable events", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) defer cancel() // create initial kind:0 event @@ -210,17 +200,17 @@ func TestBasicRelayFunctionality(t *testing.T) { } // query to verify only the newest event exists - sub, err := client2.Subscribe(ctx, []nostr.Filter{{ - Authors: []string{pk1}, - Kinds: []int{0}, - }}) + sub, err := client2.Subscribe(ctx, nostr.Filter{ + Authors: []nostr.PubKey{pk1}, + Kinds: []uint16{0}, + }, nostr.SubscriptionOptions{}) if err != nil { t.Fatalf("failed to subscribe: %v", err) } defer sub.Unsub() // should only get one event back (the newest one) - var receivedEvents []*nostr.Event + var receivedEvents []nostr.Event for { select { case env := <-sub.Events: @@ -241,17 +231,15 @@ func TestBasicRelayFunctionality(t *testing.T) { // test 5: event expiration t.Run("event expiration", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) defer cancel() // create a new relay with shorter expiration check interval relay := NewRelay() relay.expirationManager.interval = 3 * time.Second // check every 3 seconds - store := slicestore.SliceStore{} + store := &slicestore.SliceStore{} store.Init() - relay.StoreEvent = append(relay.StoreEvent, store.SaveEvent) - relay.QueryEvents = append(relay.QueryEvents, store.QueryEvents) - relay.DeleteEvent = append(relay.DeleteEvent, store.DeleteEvent) + relay.UseEventstore(store) // start test server server := httptest.NewServer(relay) @@ -259,7 +247,7 @@ func TestBasicRelayFunctionality(t *testing.T) { // connect test client url := "ws" + server.URL[4:] - client, err := nostr.RelayConnect(context.Background(), url) + client, err := nostr.RelayConnect(t.Context(), url, nostr.RelayOptions{}) if err != nil { t.Fatalf("failed to connect client: %v", err) } @@ -274,9 +262,9 @@ func TestBasicRelayFunctionality(t *testing.T) { } // verify event exists initially - sub, err := client.Subscribe(ctx, []nostr.Filter{{ - IDs: []string{evt.ID}, - }}) + sub, err := client.Subscribe(ctx, nostr.Filter{ + IDs: []nostr.ID{evt.ID}, + }, nostr.SubscriptionOptions{}) if err != nil { t.Fatalf("failed to subscribe: %v", err) } @@ -296,9 +284,9 @@ func TestBasicRelayFunctionality(t *testing.T) { time.Sleep(4 * time.Second) // verify event no longer exists - sub, err = client.Subscribe(ctx, []nostr.Filter{{ - IDs: []string{evt.ID}, - }}) + sub, err = client.Subscribe(ctx, nostr.Filter{ + IDs: []nostr.ID{evt.ID}, + }, nostr.SubscriptionOptions{}) if err != nil { t.Fatalf("failed to subscribe: %v", err) } @@ -323,7 +311,7 @@ func TestBasicRelayFunctionality(t *testing.T) { // test 6: unauthorized deletion t.Run("unauthorized deletion", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) defer cancel() // create an event from client1 @@ -334,16 +322,16 @@ func TestBasicRelayFunctionality(t *testing.T) { } // Try to delete it with client2 - delEvent := createEvent(sk2, 5, "trying to delete", nostr.Tags{{"e", evt4.ID}}) + delEvent := createEvent(sk2, 5, "trying to delete", nostr.Tags{{"e", evt4.ID.Hex()}}) err = client2.Publish(ctx, delEvent) if err == nil { t.Fatalf("should have failed to publish deletion event: %v", err) } // Verify event still exists - sub, err := client1.Subscribe(ctx, []nostr.Filter{{ - IDs: []string{evt4.ID}, - }}) + sub, err := client1.Subscribe(ctx, nostr.Filter{ + IDs: []nostr.ID{evt4.ID}, + }, nostr.SubscriptionOptions{}) if err != nil { t.Fatalf("failed to subscribe: %v", err) } diff --git a/khatru/responding.go b/khatru/responding.go index b9b7139..54d3858 100644 --- a/khatru/responding.go +++ b/khatru/responding.go @@ -21,53 +21,42 @@ func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGr // because we may, for example, remove some things from the incoming filters // that we know we don't support, and then if the end result is an empty // filter we can just reject it) - if rl.RejectFilter != nil { - if reject, msg := rl.RejectFilter(ctx, filter); reject { + if nil != rl.OnRequest { + if reject, msg := rl.OnRequest(ctx, filter); reject { return errors.New(nostr.NormalizeOKMessage(msg, "blocked")) } } // run the function to query events - if rl.QueryEvents != nil { - ch, err := rl.QueryEvents(ctx, filter) - if err != nil { - ws.WriteJSON(nostr.NoticeEnvelope(err.Error())) - eose.Done() - } else if ch == nil { - eose.Done() + if nil != rl.QueryStored { + for event := range rl.QueryStored(ctx, filter) { + ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: event}) } - - go func(ch chan *nostr.Event) { - for event := range ch { - ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event}) - } - eose.Done() - }(ch) + eose.Done() } return nil } -func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, filter nostr.Filter) int64 { +func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, filter nostr.Filter) uint32 { // check if we'll reject this filter - if rl.RejectCountFilter != nil { - if rejecting, msg := rl.RejectCountFilter(ctx, filter); rejecting { + if nil != rl.OnCountFilter { + if rejecting, msg := rl.OnCountFilter(ctx, filter); rejecting { ws.WriteJSON(nostr.NoticeEnvelope(msg)) return 0 } } // run the functions to count (generally it will be just one) - var subtotal int64 = 0 - if rl.CountEvents != nil { - res, err := rl.CountEvents(ctx, filter) + if nil != rl.Count { + res, err := rl.Count(ctx, filter) if err != nil { ws.WriteJSON(nostr.NoticeEnvelope(err.Error())) } - subtotal += res + return res } - return subtotal + return 0 } func (rl *Relay) handleCountRequestWithHLL( @@ -75,32 +64,22 @@ func (rl *Relay) handleCountRequestWithHLL( ws *WebSocket, filter nostr.Filter, offset int, -) (int64, *hyperloglog.HyperLogLog) { +) (uint32, *hyperloglog.HyperLogLog) { // check if we'll reject this filter - if rl.RejectCountFilter != nil { - if rejecting, msg := rl.RejectCountFilter(ctx, filter); rejecting { + if nil != rl.OnCountFilter { + if rejecting, msg := rl.OnCountFilter(ctx, filter); rejecting { ws.WriteJSON(nostr.NoticeEnvelope(msg)) return 0, nil } } - // run the functions to count (generally it will be just one) - var subtotal int64 = 0 - var hll *hyperloglog.HyperLogLog - if rl.CountEventsHLL != nil { - res, fhll, err := rl.CountEventsHLL(ctx, filter, offset) + if nil != rl.CountHLL { + res, hll, err := rl.CountHLL(ctx, filter, offset) if err != nil { ws.WriteJSON(nostr.NoticeEnvelope(err.Error())) } - subtotal += res - if fhll != nil { - if hll == nil { - hll = fhll - } else { - hll.Merge(fhll) - } - } + return res, hll } - return subtotal, hll + return 0, nil } diff --git a/nip77/example/example.go b/nip77/example/example.go index 5753738..2ce7fc7 100644 --- a/nip77/example/example.go +++ b/nip77/example/example.go @@ -3,11 +3,12 @@ package main import ( "context" "fmt" + "slices" "time" - "github.com/fiatjaf/eventstore" - "github.com/fiatjaf/eventstore/slicestore" "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore/slicestore" + "fiatjaf.com/nostr/eventstore/wrappers" "fiatjaf.com/nostr/nip77" ) @@ -16,8 +17,8 @@ func main() { db := &slicestore.SliceStore{} db.Init() - sk := nostr.GeneratePrivateKey() - local := eventstore.RelayWrapper{Store: db} + sk := nostr.Generate() + local := wrappers.StorePublisher{Store: db} for { for i := 0; i < 20; i++ { @@ -29,7 +30,7 @@ func main() { Tags: nostr.Tags{}, } evt.Sign(sk) - db.SaveEvent(ctx, &evt) + db.SaveEvent(evt) } { @@ -40,7 +41,7 @@ func main() { Tags: nostr.Tags{}, } evt.Sign(sk) - db.SaveEvent(ctx, &evt) + db.SaveEvent(evt) } } @@ -50,11 +51,7 @@ func main() { panic(err) } - data, err := local.QuerySync(ctx, nostr.Filter{}) - if err != nil { - panic(err) - } - + data := slices.Collect(local.QueryEvents(nostr.Filter{})) fmt.Println("total local events:", len(data)) time.Sleep(time.Second * 10) } diff --git a/relay_js_test.go b/relay_js_test.go index 301effc..7914aaa 100644 --- a/relay_js_test.go +++ b/relay_js_test.go @@ -24,7 +24,7 @@ func TestConnectContext(t *testing.T) { // relay client ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - r, err := RelayConnect(ctx, testRelayURL) + r, err := RelayConnect(ctx, testRelayURL, RelayOptions{}) assert.NoError(t, err) defer r.Close() @@ -34,7 +34,7 @@ func TestConnectContextCanceled(t *testing.T) { // relay client ctx, cancel := context.WithCancel(context.Background()) cancel() // make ctx expired - _, err := RelayConnect(ctx, testRelayURL) + _, err := RelayConnect(ctx, testRelayURL, RelayOptions{}) assert.ErrorIs(t, err, context.Canceled) } @@ -60,7 +60,7 @@ func TestPublish(t *testing.T) { func makeKeyPair(t *testing.T) (priv, pub [32]byte) { t.Helper() - privkey := GeneratePrivateKey() + privkey := Generate() pubkey := GetPublicKey(privkey) return privkey, pubkey @@ -69,7 +69,7 @@ func makeKeyPair(t *testing.T) (priv, pub [32]byte) { func mustRelayConnect(t *testing.T, url string) *Relay { t.Helper() - rl, err := RelayConnect(context.Background(), url) + rl, err := RelayConnect(context.Background(), url, RelayOptions{}) require.NoError(t, err) return rl diff --git a/sdk/feeds_test.go b/sdk/feeds_test.go index 2fdb112..529fff8 100644 --- a/sdk/feeds_test.go +++ b/sdk/feeds_test.go @@ -6,7 +6,7 @@ import ( "time" "fiatjaf.com/nostr" - "github.com/fiatjaf/eventstore/slicestore" + "fiatjaf.com/nostr/eventstore/slicestore" "github.com/fiatjaf/khatru" "github.com/stretchr/testify/require" ) diff --git a/sdk/system.go b/sdk/system.go index 6164c4b..b88af83 100644 --- a/sdk/system.go +++ b/sdk/system.go @@ -13,8 +13,8 @@ import ( "fiatjaf.com/nostr/sdk/hints/memoryh" "fiatjaf.com/nostr/sdk/kvstore" kvstore_memory "fiatjaf.com/nostr/sdk/kvstore/memory" - "github.com/fiatjaf/eventstore" - "github.com/fiatjaf/eventstore/nullstore" + "fiatjaf.com/nostr/eventstore" + "fiatjaf.com/nostr/eventstore/nullstore" ) // System represents the core functionality of the SDK, providing access to