we should be doing this since years ago: force an EOSE if the relay refuses to give us one.
This commit is contained in:
3
pool.go
3
pool.go
@@ -304,6 +304,9 @@ func (pool *Pool) FetchManyReplaceable(
|
|||||||
})
|
})
|
||||||
return discard
|
return discard
|
||||||
}
|
}
|
||||||
|
if opts.MaxWaitForEOSE == 0 {
|
||||||
|
opts.MaxWaitForEOSE = time.Second * 4
|
||||||
|
}
|
||||||
|
|
||||||
for _, url := range urls {
|
for _, url := range urls {
|
||||||
go func(nm string) {
|
go func(nm string) {
|
||||||
|
|||||||
13
relay.go
13
relay.go
@@ -7,6 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"iter"
|
"iter"
|
||||||
"log"
|
"log"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -404,6 +405,18 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filter Filter, opts Sub
|
|||||||
// we track subscriptions only by their counter, no need for the full id
|
// we track subscriptions only by their counter, no need for the full id
|
||||||
r.Subscriptions.Store(int64(sub.counter), sub)
|
r.Subscriptions.Store(int64(sub.counter), sub)
|
||||||
|
|
||||||
|
// start counting down for dispatching the fake EOSE
|
||||||
|
if opts.MaxWaitForEOSE != math.MaxInt64 {
|
||||||
|
if opts.MaxWaitForEOSE == 0 {
|
||||||
|
opts.MaxWaitForEOSE = time.Second * 7
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
time.Sleep(opts.MaxWaitForEOSE)
|
||||||
|
sub.dispatchEose()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// start handling events, eose, unsub etc:
|
// start handling events, eose, unsub etc:
|
||||||
go sub.start()
|
go sub.start()
|
||||||
|
|
||||||
|
|||||||
@@ -122,7 +122,8 @@ func (sys *System) batchLoadReplaceableEvents(
|
|||||||
// query all relays with the prepared filters
|
// query all relays with the prepared filters
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
multiSubs := sys.Pool.BatchedQueryMany(aggregatedContext, relayFilter, nostr.SubscriptionOptions{
|
multiSubs := sys.Pool.BatchedQueryMany(aggregatedContext, relayFilter, nostr.SubscriptionOptions{
|
||||||
Label: "repl~" + strconv.Itoa(int(kind)),
|
Label: "repl~" + strconv.Itoa(int(kind)),
|
||||||
|
MaxWaitForEOSE: time.Second * 3,
|
||||||
})
|
})
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Subscription represents a subscription to a relay.
|
// Subscription represents a subscription to a relay.
|
||||||
@@ -62,6 +63,10 @@ type SubscriptionOptions struct {
|
|||||||
|
|
||||||
// CheckDuplicateReplaceable is like CheckDuplicate, but runs on replaceable/addressable events
|
// CheckDuplicateReplaceable is like CheckDuplicate, but runs on replaceable/addressable events
|
||||||
CheckDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool
|
CheckDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool
|
||||||
|
|
||||||
|
// a fake EndOfStoredEvents will be dispatched at this time if nothing is received before.
|
||||||
|
// defaults to 7s (in order to disable, set it to time.Duration(math.MaxInt64))
|
||||||
|
MaxWaitForEOSE time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sub *Subscription) start() {
|
func (sub *Subscription) start() {
|
||||||
|
|||||||
Reference in New Issue
Block a user