From 80d0546ce6aeccd45f74b80d2114d5d658c8cc7e Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Mon, 25 Aug 2025 22:42:05 -0300 Subject: [PATCH] we should be doing this since years ago: force an EOSE if the relay refuses to give us one. --- pool.go | 3 +++ relay.go | 13 +++++++++++++ sdk/replaceable_loader.go | 3 ++- subscription.go | 5 +++++ 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/pool.go b/pool.go index e8e5dce..4b4cdde 100644 --- a/pool.go +++ b/pool.go @@ -304,6 +304,9 @@ func (pool *Pool) FetchManyReplaceable( }) return discard } + if opts.MaxWaitForEOSE == 0 { + opts.MaxWaitForEOSE = time.Second * 4 + } for _, url := range urls { go func(nm string) { diff --git a/relay.go b/relay.go index 8acf861..8a0584f 100644 --- a/relay.go +++ b/relay.go @@ -7,6 +7,7 @@ import ( "fmt" "iter" "log" + "math" "net/http" "strconv" "sync" @@ -404,6 +405,18 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filter Filter, opts Sub // we track subscriptions only by their counter, no need for the full id r.Subscriptions.Store(int64(sub.counter), sub) + // start counting down for dispatching the fake EOSE + if opts.MaxWaitForEOSE != math.MaxInt64 { + if opts.MaxWaitForEOSE == 0 { + opts.MaxWaitForEOSE = time.Second * 7 + } + + go func() { + time.Sleep(opts.MaxWaitForEOSE) + sub.dispatchEose() + }() + } + // start handling events, eose, unsub etc: go sub.start() diff --git a/sdk/replaceable_loader.go b/sdk/replaceable_loader.go index a49e16a..382ae4c 100644 --- a/sdk/replaceable_loader.go +++ b/sdk/replaceable_loader.go @@ -122,7 +122,8 @@ func (sys *System) batchLoadReplaceableEvents( // query all relays with the prepared filters wg.Wait() multiSubs := sys.Pool.BatchedQueryMany(aggregatedContext, relayFilter, nostr.SubscriptionOptions{ - Label: "repl~" + strconv.Itoa(int(kind)), + Label: "repl~" + strconv.Itoa(int(kind)), + MaxWaitForEOSE: time.Second * 3, }) for { select { diff --git a/subscription.go b/subscription.go index bea4c2e..802fe62 100644 --- a/subscription.go +++ b/subscription.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" ) // Subscription represents a subscription to a relay. @@ -62,6 +63,10 @@ type SubscriptionOptions struct { // CheckDuplicateReplaceable is like CheckDuplicate, but runs on replaceable/addressable events CheckDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool + + // a fake EndOfStoredEvents will be dispatched at this time if nothing is received before. + // defaults to 7s (in order to disable, set it to time.Duration(math.MaxInt64)) + MaxWaitForEOSE time.Duration } func (sub *Subscription) start() {