diff --git a/pool.go b/pool.go index d1f8ff9..138f24f 100644 --- a/pool.go +++ b/pool.go @@ -256,7 +256,7 @@ func (pool *Pool) FetchMany( } } - return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filter, opts) + return pool.subManyEose(ctx, urls, filter, opts) } // SubscribeManyNotifyEOSE is like SubscribeMany, but takes a channel that is closed when @@ -546,7 +546,7 @@ func (pool *Pool) subMany( return events } -func (pool *Pool) subManyEoseNonOverwriteCheckDuplicate( +func (pool *Pool) subManyEose( ctx context.Context, urls []string, filter Filter, @@ -683,8 +683,8 @@ func (pool *Pool) QuerySingle( return nil } -// BatchedSubManyEose performs batched subscriptions to multiple relays with different filters. -func (pool *Pool) BatchedSubManyEose( +// BatchedQueryMany takes a bunch of filters and sends each to the target relay but deduplicates results smartly. +func (pool *Pool) BatchedQueryMany( ctx context.Context, dfs []DirectedFilter, opts SubscriptionOptions, @@ -694,19 +694,17 @@ func (pool *Pool) BatchedSubManyEose( wg.Add(len(dfs)) seenAlready := xsync.NewMapOf[ID, struct{}]() - if opts.CheckDuplicate == nil { - opts.CheckDuplicate = func(id ID, relay string) bool { - _, exists := seenAlready.LoadOrStore(id, struct{}{}) - if exists && pool.duplicateMiddleware != nil { - pool.duplicateMiddleware(relay, id) - } - return exists + opts.CheckDuplicate = func(id ID, relay string) bool { + _, exists := seenAlready.LoadOrStore(id, struct{}{}) + if exists && pool.duplicateMiddleware != nil { + pool.duplicateMiddleware(relay, id) } + return exists } for _, df := range dfs { go func(df DirectedFilter) { - for ie := range pool.subManyEoseNonOverwriteCheckDuplicate(ctx, + for ie := range pool.subManyEose(ctx, []string{df.Relay}, df.Filter, opts, @@ -730,6 +728,52 @@ func (pool *Pool) BatchedSubManyEose( return res } +// BatchedSubscribeMany is like BatchedQueryMany but keeps the subscription open. +func (pool *Pool) BatchedSubscribeMany( + ctx context.Context, + dfs []DirectedFilter, + opts SubscriptionOptions, +) chan RelayEvent { + res := make(chan RelayEvent) + wg := sync.WaitGroup{} + wg.Add(len(dfs)) + seenAlready := xsync.NewMapOf[ID, struct{}]() + + opts.CheckDuplicate = func(id ID, relay string) bool { + _, exists := seenAlready.LoadOrStore(id, struct{}{}) + if exists && pool.duplicateMiddleware != nil { + pool.duplicateMiddleware(relay, id) + } + return exists + } + + for _, df := range dfs { + go func(df DirectedFilter) { + for ie := range pool.subMany(ctx, + []string{df.Relay}, + df.Filter, + nil, + opts, + ) { + select { + case res <- ie: + case <-ctx.Done(): + wg.Done() + return + } + } + wg.Done() + }(df) + } + + go func() { + wg.Wait() + close(res) + }() + + return res +} + // Close closes the pool with the given reason. func (pool *Pool) Close(reason string) { pool.cancel(fmt.Errorf("pool closed with reason: '%s'", reason))