From d5dc3abaf24a33102ebb5e91a401a9b13562619a Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Thu, 4 Dec 2025 08:51:34 -0300 Subject: [PATCH] SubscribeManyNotifyClosed() --- nip60/wallet.go | 8 ++------ pool.go | 40 ++++++++++++++++++++++++++++++++++------ 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/nip60/wallet.go b/nip60/wallet.go index ee5864b..d99148c 100644 --- a/nip60/wallet.go +++ b/nip60/wallet.go @@ -81,21 +81,17 @@ func loadWalletFromPool( kinds = append(kinds, 7376) } - eoseChanE := make(chan struct{}) - events := pool.SubscribeManyNotifyEOSE( + events, eoseChanE := pool.SubscribeManyNotifyEOSE( ctx, relays, nostr.Filter{Kinds: kinds, Authors: []nostr.PubKey{pk}}, - eoseChanE, nostr.SubscriptionOptions{}, ) - eoseChanD := make(chan struct{}) - deletions := pool.SubscribeManyNotifyEOSE( + deletions, eoseChanD := pool.SubscribeManyNotifyEOSE( ctx, relays, nostr.Filter{Kinds: []nostr.Kind{5}, Tags: nostr.TagMap{"k": []string{"7375"}}, Authors: []nostr.PubKey{pk}}, - eoseChanD, nostr.SubscriptionOptions{}, ) diff --git a/pool.go b/pool.go index 9a34320..9c4a392 100644 --- a/pool.go +++ b/pool.go @@ -234,7 +234,7 @@ func (pool *Pool) SubscribeMany( filter Filter, opts SubscriptionOptions, ) chan RelayEvent { - return pool.subMany(ctx, urls, filter, nil, opts) + return pool.subMany(ctx, urls, filter, nil, nil, opts) } // FetchMany opens a subscription, much like SubscribeMany, but it ends as soon as all Relays @@ -260,16 +260,33 @@ func (pool *Pool) FetchMany( return pool.subManyEose(ctx, urls, filter, opts) } -// SubscribeManyNotifyEOSE is like SubscribeMany, but takes a channel that is closed when -// all subscriptions have received an EOSE +// SubscribeManyNotifyEOSE is like SubscribeMany, but also returns a channel that is closed when all subscriptions have received an EOSE func (pool *Pool) SubscribeManyNotifyEOSE( ctx context.Context, urls []string, filter Filter, - eoseChan chan struct{}, opts SubscriptionOptions, -) chan RelayEvent { - return pool.subMany(ctx, urls, filter, eoseChan, opts) +) (chan RelayEvent, chan struct{}) { + eoseChan := make(chan struct{}) + events := pool.subMany(ctx, urls, filter, eoseChan, nil, opts) + return events, eoseChan +} + +type RelayClosed struct { + Reason string + Relay *Relay +} + +// SubscribeManyNotifyClosed is like SubscribeMany, but also returns a channel that emits every time a subscription receives a CLOSED message +func (pool *Pool) SubscribeManyNotifyClosed( + ctx context.Context, + urls []string, + filter Filter, + opts SubscriptionOptions, +) (chan RelayEvent, chan RelayClosed) { + closedChan := make(chan RelayClosed) + events := pool.subMany(ctx, urls, filter, nil, closedChan, opts) + return events, closedChan } type ReplaceableKey struct { @@ -382,6 +399,7 @@ func (pool *Pool) subMany( urls []string, filter Filter, eoseChan chan struct{}, + closedChan chan RelayClosed, opts SubscriptionOptions, ) chan RelayEvent { ctx, cancel := context.WithCancelCause(ctx) @@ -524,6 +542,15 @@ func (pool *Pool) subMany( } } else { debugLogf("CLOSED from %s: '%s'\n", nm, reason) + if closedChan != nil { + select { + case closedChan <- RelayClosed{ + Reason: reason, + Relay: relay, + }: + default: + } + } } return @@ -752,6 +779,7 @@ func (pool *Pool) BatchedSubscribeMany( []string{df.Relay}, df.Filter, nil, + nil, opts, ) { select {