diff --git a/eventstore/wrappers/querier.go b/eventstore/wrappers/querier.go deleted file mode 100644 index 73d899f..0000000 --- a/eventstore/wrappers/querier.go +++ /dev/null @@ -1,26 +0,0 @@ -package wrappers - -import ( - "context" - - "fiatjaf.com/nostr" - "fiatjaf.com/nostr/eventstore" -) - -var _ nostr.Querier = StoreQuerier{} - -type StoreQuerier struct { - eventstore.Store -} - -func (w StoreQuerier) QueryEvents(ctx context.Context, filter nostr.Filter) (chan nostr.Event, error) { - ch := make(chan nostr.Event) - - go func() { - for evt := range w.Store.QueryEvents(filter) { - ch <- evt - } - }() - - return ch, nil -} diff --git a/interfaces.go b/interfaces.go index 7f27736..35c9b39 100644 --- a/interfaces.go +++ b/interfaces.go @@ -2,6 +2,7 @@ package nostr import ( "context" + "iter" ) type Publisher interface { @@ -9,7 +10,7 @@ type Publisher interface { } type Querier interface { - QueryEvents(context.Context, Filter) (chan Event, error) + QueryEvents(Filter) iter.Seq[Event] } type QuerierPublisher interface { diff --git a/nip77/nip77.go b/nip77/nip77.go index b574a7a..4315be5 100644 --- a/nip77/nip77.go +++ b/nip77/nip77.go @@ -36,18 +36,15 @@ func NegentropySync( vec := vector.New() neg := negentropy.New(vec, 1024*1024) - ch, err := store.QueryEvents(ctx, filter) - if err != nil { - return err - } - for evt := range ch { + for evt := range store.QueryEvents(filter) { vec.Insert(evt.CreatedAt, evt.ID) } vec.Seal() result := make(chan error) + var err error var r *nostr.Relay r, err = nostr.RelayConnect(ctx, url, nostr.RelayOptions{ CustomHandler: func(data string) { @@ -118,12 +115,7 @@ func NegentropySync( if len(ids) == 0 { return } - evtch, err := dir.source.QueryEvents(ctx, nostr.Filter{IDs: ids}) - if err != nil { - result <- fmt.Errorf("error querying source on %s: %w", dir.label, err) - return - } - for evt := range evtch { + for evt := range dir.source.QueryEvents(nostr.Filter{IDs: ids}) { dir.target.Publish(ctx, evt) } } diff --git a/relay.go b/relay.go index d1164c9..50a61c9 100644 --- a/relay.go +++ b/relay.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "errors" "fmt" + "iter" "log" "net/http" "strconv" @@ -434,19 +435,21 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filter Filter, opts Sub } // implement Querier interface -func (r *Relay) QueryEvents(ctx context.Context, filter Filter) (chan Event, error) { - sub, err := r.Subscribe(ctx, filter, SubscriptionOptions{Label: "queryevents"}) - if err != nil { - return nil, err - } +func (r *Relay) QueryEvents(filter Filter) iter.Seq[Event] { + ctx, cancel := context.WithCancel(r.connectionContext) - ch := make(chan Event) + return func(yield func(Event) bool) { + defer cancel() + + sub, err := r.Subscribe(ctx, filter, SubscriptionOptions{Label: "queryevents"}) + if err != nil { + return + } - go func() { for { select { case evt := <-sub.Events: - ch <- evt + yield(evt) case <-sub.EndOfStoredEvents: return case <-sub.ClosedReason: @@ -455,9 +458,7 @@ func (r *Relay) QueryEvents(ctx context.Context, filter Filter) (chan Event, err return } } - }() - - return ch, nil + } } // Count sends a "COUNT" command to the relay and returns the count of events matching the filters.