From 07061404918d271f196ea2f2763a34db17698204 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Thu, 4 Dec 2025 09:22:54 -0300 Subject: [PATCH] add more *NotifyClosed variants. --- pool.go | 105 +++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 93 insertions(+), 12 deletions(-) diff --git a/pool.go b/pool.go index 9c4a392..732aed5 100644 --- a/pool.go +++ b/pool.go @@ -237,6 +237,17 @@ func (pool *Pool) SubscribeMany( return pool.subMany(ctx, urls, filter, nil, nil, opts) } +func (pool *Pool) FetchManyNotifyClosed( + ctx context.Context, + urls []string, + filter Filter, + opts SubscriptionOptions, +) (chan RelayEvent, chan RelayClosed) { + closedChan := make(chan RelayClosed) + events := pool.fetchMany(ctx, urls, filter, closedChan, opts) + return events, closedChan +} + // FetchMany opens a subscription, much like SubscribeMany, but it ends as soon as all Relays // return an EOSE message. func (pool *Pool) FetchMany( @@ -244,6 +255,16 @@ func (pool *Pool) FetchMany( urls []string, filter Filter, opts SubscriptionOptions, +) chan RelayEvent { + return pool.fetchMany(ctx, urls, filter, nil, opts) +} + +func (pool *Pool) fetchMany( + ctx context.Context, + urls []string, + filter Filter, + closedChan chan RelayClosed, + opts SubscriptionOptions, ) chan RelayEvent { seenAlready := xsync.NewMapOf[ID, struct{}]() @@ -257,7 +278,7 @@ func (pool *Pool) FetchMany( } } - return pool.subManyEose(ctx, urls, filter, opts) + return pool.subManyEose(ctx, urls, filter, closedChan, opts) } // SubscribeManyNotifyEOSE is like SubscribeMany, but also returns a channel that is closed when all subscriptions have received an EOSE @@ -275,6 +296,9 @@ func (pool *Pool) SubscribeManyNotifyEOSE( type RelayClosed struct { Reason string Relay *Relay + + // this is true when the close reason was "auth-required" and already handled internally + HandledAuth bool } // SubscribeManyNotifyClosed is like SubscribeMany, but also returns a channel that emits every time a subscription receives a CLOSED message @@ -538,18 +562,21 @@ func (pool *Pool) subMany( err := relay.Auth(ctx, pool.authHandler) if err == nil { hasAuthed = true // so we don't keep doing AUTH again and again + if closedChan != nil { + closedChan <- RelayClosed{ + Reason: reason, + Relay: relay, + HandledAuth: true, + } + } goto subscribe } - } else { - debugLogf("CLOSED from %s: '%s'\n", nm, reason) - if closedChan != nil { - select { - case closedChan <- RelayClosed{ - Reason: reason, - Relay: relay, - }: - default: - } + } + debugLogf("CLOSED from %s: '%s'\n", nm, reason) + if closedChan != nil { + closedChan <- RelayClosed{ + Reason: reason, + Relay: relay, } } @@ -576,6 +603,7 @@ func (pool *Pool) subManyEose( ctx context.Context, urls []string, filter Filter, + closedChan chan RelayClosed, opts SubscriptionOptions, ) chan RelayEvent { ctx, cancel := context.WithCancelCause(ctx) @@ -632,10 +660,23 @@ func (pool *Pool) subManyEose( err := relay.Auth(ctx, pool.authHandler) if err == nil { hasAuthed = true // so we don't keep doing AUTH again and again + if closedChan != nil { + closedChan <- RelayClosed{ + Relay: relay, + Reason: reason, + HandledAuth: true, + } + } goto subscribe } } debugLogf("[pool] CLOSED from %s: '%s'\n", nm, reason) + if closedChan != nil { + closedChan <- RelayClosed{ + Relay: relay, + Reason: reason, + } + } return case evt, more := <-sub.Events: if !more { @@ -709,11 +750,30 @@ func (pool *Pool) QuerySingle( return nil } +func (pool *Pool) BatchedQueryManyNotifyClosed( + ctx context.Context, + dfs []DirectedFilter, + opts SubscriptionOptions, +) (chan RelayEvent, chan RelayClosed) { + closedChan := make(chan RelayClosed) + events := pool.batchedQueryMany(ctx, dfs, closedChan, opts) + return events, closedChan +} + // BatchedQueryMany takes a bunch of filters and sends each to the target relay but deduplicates results smartly. func (pool *Pool) BatchedQueryMany( ctx context.Context, dfs []DirectedFilter, opts SubscriptionOptions, +) chan RelayEvent { + return pool.batchedQueryMany(ctx, dfs, nil, opts) +} + +func (pool *Pool) batchedQueryMany( + ctx context.Context, + dfs []DirectedFilter, + closedChan chan RelayClosed, + opts SubscriptionOptions, ) chan RelayEvent { res := make(chan RelayEvent) wg := sync.WaitGroup{} @@ -733,6 +793,7 @@ func (pool *Pool) BatchedQueryMany( for ie := range pool.subManyEose(ctx, []string{df.Relay}, df.Filter, + closedChan, opts, ) { select { @@ -754,11 +815,31 @@ func (pool *Pool) BatchedQueryMany( return res } +func (pool *Pool) BatchedSubscribeManyNotifyClosed( + ctx context.Context, + dfs []DirectedFilter, + opts SubscriptionOptions, +) (chan RelayEvent, chan RelayClosed) { + closedChan := make(chan RelayClosed) + events := pool.batchedSubscribeMany(ctx, dfs, closedChan, opts) + return events, closedChan +} + // BatchedSubscribeMany is like BatchedQueryMany but keeps the subscription open. func (pool *Pool) BatchedSubscribeMany( ctx context.Context, dfs []DirectedFilter, opts SubscriptionOptions, +) chan RelayEvent { + return pool.batchedSubscribeMany(ctx, dfs, nil, opts) +} + +// BatchedSubscribeMany is like BatchedQueryMany but keeps the subscription open. +func (pool *Pool) batchedSubscribeMany( + ctx context.Context, + dfs []DirectedFilter, + closedChan chan RelayClosed, + opts SubscriptionOptions, ) chan RelayEvent { res := make(chan RelayEvent) wg := sync.WaitGroup{} @@ -779,7 +860,7 @@ func (pool *Pool) BatchedSubscribeMany( []string{df.Relay}, df.Filter, nil, - nil, + closedChan, opts, ) { select {