diff --git a/pool.go b/pool.go index b550b55..59f5fcd 100644 --- a/pool.go +++ b/pool.go @@ -16,6 +16,11 @@ type SimplePool struct { cancel context.CancelFunc } +type IncomingEvent struct { + *Event + Relay *Relay +} + func NewSimplePool(ctx context.Context) *SimplePool { ctx, cancel := context.WithCancel(ctx) @@ -52,8 +57,8 @@ func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) { // SubMany opens a subscription with the given filters to multiple relays // the subscriptions only end when the context is canceled -func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan *Event { - uniqueEvents := make(chan *Event) +func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { + uniqueEvents := make(chan IncomingEvent) seenAlready := xsync.NewMapOf[bool]() pending := xsync.NewCounter() @@ -74,7 +79,7 @@ func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filt for evt := range sub.Events { // dispatch unique events to client if _, ok := seenAlready.LoadOrStore(evt.ID, true); !ok { - uniqueEvents <- evt + uniqueEvents <- IncomingEvent{Event: evt, Relay: relay} } } @@ -89,10 +94,10 @@ func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filt } // SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE -func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan *Event { +func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { ctx, cancel := context.WithCancel(ctx) - uniqueEvents := make(chan *Event) + uniqueEvents := make(chan IncomingEvent) seenAlready := xsync.NewMapOf[bool]() wg := sync.WaitGroup{} wg.Add(len(urls)) @@ -133,7 +138,7 @@ func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters // dispatch unique events to client if _, ok := seenAlready.LoadOrStore(evt.ID, true); !ok { select { - case uniqueEvents <- evt: + case uniqueEvents <- IncomingEvent{Event: evt, Relay: relay}: case <-ctx.Done(): return } @@ -147,11 +152,11 @@ func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters } // QuerySingle returns the first event returned by the first relay, cancels everything else. -func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *Event { +func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *IncomingEvent { ctx, cancel := context.WithCancel(ctx) defer cancel() - for evt := range pool.SubManyEose(ctx, urls, Filters{filter}) { - return evt + for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}) { + return &ievt } return nil }