From edb2f782cf35f19b453ca3adf4d29c41cb29cfbe Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Tue, 28 Jan 2025 16:12:53 -0300 Subject: [PATCH] pool.SubManyNotifyEOSE() --- pool.go | 41 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/pool.go b/pool.go index cf5ace6..7ec4c1e 100644 --- a/pool.go +++ b/pool.go @@ -234,6 +234,28 @@ func (pool *SimplePool) SubMany( urls []string, filters Filters, opts ...SubscriptionOption, +) chan RelayEvent { + return pool.subMany(ctx, urls, filters, nil, opts...) +} + +// SubManyNotifyEOSE is like SubMany, but takes a channel that is closed when +// all subscriptions have received an EOSE +func (pool *SimplePool) SubManyNotifyEOSE( + ctx context.Context, + urls []string, + filters Filters, + eoseChan chan struct{}, + opts ...SubscriptionOption, +) chan RelayEvent { + return pool.subMany(ctx, urls, filters, eoseChan, opts...) +} + +func (pool *SimplePool) subMany( + ctx context.Context, + urls []string, + filters Filters, + eoseChan chan struct{}, + opts ...SubscriptionOption, ) chan RelayEvent { ctx, cancel := context.WithCancel(ctx) _ = cancel // do this so `go vet` will stop complaining @@ -242,6 +264,14 @@ func (pool *SimplePool) SubMany( ticker := time.NewTicker(seenAlreadyDropTick) eose := false + eoseWg := sync.WaitGroup{} + eoseWg.Add(len(urls)) + if eoseChan != nil { + go func() { + eoseWg.Wait() + close(eoseChan) + }() + } pending := xsync.NewCounter() pending.Add(int64(len(urls))) @@ -250,6 +280,7 @@ func (pool *SimplePool) SubMany( urls[i] = url if idx := slices.Index(urls, url); idx != i { // skip duplicate relays in the list + eoseWg.Done() continue } @@ -305,7 +336,12 @@ func (pool *SimplePool) SubMany( go func() { <-sub.EndOfStoredEvents - eose = true + + // guard here otherwise a resubscription will trigger a duplicate call to eoseWg.Done() + if !eose { + eose = true + eoseWg.Done() + } }() // reset interval when we get a good subscription @@ -359,6 +395,9 @@ func (pool *SimplePool) SubMany( } else { log.Printf("CLOSED from %s: '%s'\n", nm, reason) } + + eoseWg.Done() + return case <-ctx.Done(): return