add more *NotifyClosed variants.

This commit is contained in:
fiatjaf
2025-12-04 09:22:54 -03:00
parent d5dc3abaf2
commit 0706140491

105
pool.go
View File

@@ -237,6 +237,17 @@ func (pool *Pool) SubscribeMany(
return pool.subMany(ctx, urls, filter, nil, nil, opts)
}
func (pool *Pool) FetchManyNotifyClosed(
ctx context.Context,
urls []string,
filter Filter,
opts SubscriptionOptions,
) (chan RelayEvent, chan RelayClosed) {
closedChan := make(chan RelayClosed)
events := pool.fetchMany(ctx, urls, filter, closedChan, opts)
return events, closedChan
}
// FetchMany opens a subscription, much like SubscribeMany, but it ends as soon as all Relays
// return an EOSE message.
func (pool *Pool) FetchMany(
@@ -244,6 +255,16 @@ func (pool *Pool) FetchMany(
urls []string,
filter Filter,
opts SubscriptionOptions,
) chan RelayEvent {
return pool.fetchMany(ctx, urls, filter, nil, opts)
}
func (pool *Pool) fetchMany(
ctx context.Context,
urls []string,
filter Filter,
closedChan chan RelayClosed,
opts SubscriptionOptions,
) chan RelayEvent {
seenAlready := xsync.NewMapOf[ID, struct{}]()
@@ -257,7 +278,7 @@ func (pool *Pool) FetchMany(
}
}
return pool.subManyEose(ctx, urls, filter, opts)
return pool.subManyEose(ctx, urls, filter, closedChan, opts)
}
// SubscribeManyNotifyEOSE is like SubscribeMany, but also returns a channel that is closed when all subscriptions have received an EOSE
@@ -275,6 +296,9 @@ func (pool *Pool) SubscribeManyNotifyEOSE(
type RelayClosed struct {
Reason string
Relay *Relay
// this is true when the close reason was "auth-required" and already handled internally
HandledAuth bool
}
// SubscribeManyNotifyClosed is like SubscribeMany, but also returns a channel that emits every time a subscription receives a CLOSED message
@@ -538,18 +562,21 @@ func (pool *Pool) subMany(
err := relay.Auth(ctx, pool.authHandler)
if err == nil {
hasAuthed = true // so we don't keep doing AUTH again and again
if closedChan != nil {
closedChan <- RelayClosed{
Reason: reason,
Relay: relay,
HandledAuth: true,
}
}
goto subscribe
}
} else {
debugLogf("CLOSED from %s: '%s'\n", nm, reason)
if closedChan != nil {
select {
case closedChan <- RelayClosed{
Reason: reason,
Relay: relay,
}:
default:
}
}
debugLogf("CLOSED from %s: '%s'\n", nm, reason)
if closedChan != nil {
closedChan <- RelayClosed{
Reason: reason,
Relay: relay,
}
}
@@ -576,6 +603,7 @@ func (pool *Pool) subManyEose(
ctx context.Context,
urls []string,
filter Filter,
closedChan chan RelayClosed,
opts SubscriptionOptions,
) chan RelayEvent {
ctx, cancel := context.WithCancelCause(ctx)
@@ -632,10 +660,23 @@ func (pool *Pool) subManyEose(
err := relay.Auth(ctx, pool.authHandler)
if err == nil {
hasAuthed = true // so we don't keep doing AUTH again and again
if closedChan != nil {
closedChan <- RelayClosed{
Relay: relay,
Reason: reason,
HandledAuth: true,
}
}
goto subscribe
}
}
debugLogf("[pool] CLOSED from %s: '%s'\n", nm, reason)
if closedChan != nil {
closedChan <- RelayClosed{
Relay: relay,
Reason: reason,
}
}
return
case evt, more := <-sub.Events:
if !more {
@@ -709,11 +750,30 @@ func (pool *Pool) QuerySingle(
return nil
}
func (pool *Pool) BatchedQueryManyNotifyClosed(
ctx context.Context,
dfs []DirectedFilter,
opts SubscriptionOptions,
) (chan RelayEvent, chan RelayClosed) {
closedChan := make(chan RelayClosed)
events := pool.batchedQueryMany(ctx, dfs, closedChan, opts)
return events, closedChan
}
// BatchedQueryMany takes a bunch of filters and sends each to the target relay but deduplicates results smartly.
func (pool *Pool) BatchedQueryMany(
ctx context.Context,
dfs []DirectedFilter,
opts SubscriptionOptions,
) chan RelayEvent {
return pool.batchedQueryMany(ctx, dfs, nil, opts)
}
func (pool *Pool) batchedQueryMany(
ctx context.Context,
dfs []DirectedFilter,
closedChan chan RelayClosed,
opts SubscriptionOptions,
) chan RelayEvent {
res := make(chan RelayEvent)
wg := sync.WaitGroup{}
@@ -733,6 +793,7 @@ func (pool *Pool) BatchedQueryMany(
for ie := range pool.subManyEose(ctx,
[]string{df.Relay},
df.Filter,
closedChan,
opts,
) {
select {
@@ -754,11 +815,31 @@ func (pool *Pool) BatchedQueryMany(
return res
}
func (pool *Pool) BatchedSubscribeManyNotifyClosed(
ctx context.Context,
dfs []DirectedFilter,
opts SubscriptionOptions,
) (chan RelayEvent, chan RelayClosed) {
closedChan := make(chan RelayClosed)
events := pool.batchedSubscribeMany(ctx, dfs, closedChan, opts)
return events, closedChan
}
// BatchedSubscribeMany is like BatchedQueryMany but keeps the subscription open.
func (pool *Pool) BatchedSubscribeMany(
ctx context.Context,
dfs []DirectedFilter,
opts SubscriptionOptions,
) chan RelayEvent {
return pool.batchedSubscribeMany(ctx, dfs, nil, opts)
}
// BatchedSubscribeMany is like BatchedQueryMany but keeps the subscription open.
func (pool *Pool) batchedSubscribeMany(
ctx context.Context,
dfs []DirectedFilter,
closedChan chan RelayClosed,
opts SubscriptionOptions,
) chan RelayEvent {
res := make(chan RelayEvent)
wg := sync.WaitGroup{}
@@ -779,7 +860,7 @@ func (pool *Pool) BatchedSubscribeMany(
[]string{df.Relay},
df.Filter,
nil,
nil,
closedChan,
opts,
) {
select {