pool.BatchedSubscribeMany()

This commit is contained in:
fiatjaf
2025-05-12 06:13:21 -03:00
parent 65411a10c8
commit 6d44b5b0dc

68
pool.go
View File

@@ -256,7 +256,7 @@ func (pool *Pool) FetchMany(
} }
} }
return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filter, opts) return pool.subManyEose(ctx, urls, filter, opts)
} }
// SubscribeManyNotifyEOSE is like SubscribeMany, but takes a channel that is closed when // SubscribeManyNotifyEOSE is like SubscribeMany, but takes a channel that is closed when
@@ -546,7 +546,7 @@ func (pool *Pool) subMany(
return events return events
} }
func (pool *Pool) subManyEoseNonOverwriteCheckDuplicate( func (pool *Pool) subManyEose(
ctx context.Context, ctx context.Context,
urls []string, urls []string,
filter Filter, filter Filter,
@@ -683,8 +683,8 @@ func (pool *Pool) QuerySingle(
return nil return nil
} }
// BatchedSubManyEose performs batched subscriptions to multiple relays with different filters. // BatchedQueryMany takes a bunch of filters and sends each to the target relay but deduplicates results smartly.
func (pool *Pool) BatchedSubManyEose( func (pool *Pool) BatchedQueryMany(
ctx context.Context, ctx context.Context,
dfs []DirectedFilter, dfs []DirectedFilter,
opts SubscriptionOptions, opts SubscriptionOptions,
@@ -694,19 +694,17 @@ func (pool *Pool) BatchedSubManyEose(
wg.Add(len(dfs)) wg.Add(len(dfs))
seenAlready := xsync.NewMapOf[ID, struct{}]() seenAlready := xsync.NewMapOf[ID, struct{}]()
if opts.CheckDuplicate == nil { opts.CheckDuplicate = func(id ID, relay string) bool {
opts.CheckDuplicate = func(id ID, relay string) bool { _, exists := seenAlready.LoadOrStore(id, struct{}{})
_, exists := seenAlready.LoadOrStore(id, struct{}{}) if exists && pool.duplicateMiddleware != nil {
if exists && pool.duplicateMiddleware != nil { pool.duplicateMiddleware(relay, id)
pool.duplicateMiddleware(relay, id)
}
return exists
} }
return exists
} }
for _, df := range dfs { for _, df := range dfs {
go func(df DirectedFilter) { go func(df DirectedFilter) {
for ie := range pool.subManyEoseNonOverwriteCheckDuplicate(ctx, for ie := range pool.subManyEose(ctx,
[]string{df.Relay}, []string{df.Relay},
df.Filter, df.Filter,
opts, opts,
@@ -730,6 +728,52 @@ func (pool *Pool) BatchedSubManyEose(
return res return res
} }
// BatchedSubscribeMany is like BatchedQueryMany but keeps the subscription open.
func (pool *Pool) BatchedSubscribeMany(
ctx context.Context,
dfs []DirectedFilter,
opts SubscriptionOptions,
) chan RelayEvent {
res := make(chan RelayEvent)
wg := sync.WaitGroup{}
wg.Add(len(dfs))
seenAlready := xsync.NewMapOf[ID, struct{}]()
opts.CheckDuplicate = func(id ID, relay string) bool {
_, exists := seenAlready.LoadOrStore(id, struct{}{})
if exists && pool.duplicateMiddleware != nil {
pool.duplicateMiddleware(relay, id)
}
return exists
}
for _, df := range dfs {
go func(df DirectedFilter) {
for ie := range pool.subMany(ctx,
[]string{df.Relay},
df.Filter,
nil,
opts,
) {
select {
case res <- ie:
case <-ctx.Done():
wg.Done()
return
}
}
wg.Done()
}(df)
}
go func() {
wg.Wait()
close(res)
}()
return res
}
// Close closes the pool with the given reason. // Close closes the pool with the given reason.
func (pool *Pool) Close(reason string) { func (pool *Pool) Close(reason string) {
pool.cancel(fmt.Errorf("pool closed with reason: '%s'", reason)) pool.cancel(fmt.Errorf("pool closed with reason: '%s'", reason))