From 06a15fdaab2ab2547e1a4b7ef46e5838677266e6 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Fri, 17 Jan 2025 13:44:50 -0300 Subject: [PATCH] follow list fetching test and related changes and fixes. - make BatchedSubManyEose() use a single duplicate id index and use it for replaceable loaders; - fixes parsing follow entry from kind:3 events (and others); - adds a "cause" to most cancelation errors in relay/pool; - remove the inherent cache from dataloader (we have our own hopefully); - increase max frame size we can read from any websocket to 2**18 (262k), which gives over 2000 item lists. --- connection.go | 2 + keyer/bunker.go | 5 +- pool.go | 102 ++++++++++++++++++++------------- relay.go | 29 ++++++---- sdk/addressable_loader.go | 86 ++++++++++++---------------- sdk/lists_profile.go | 4 +- sdk/replaceable_loader.go | 117 +++++++++++++++++--------------------- sdk/sdk_test.go | 54 +++++++++++++++++- subscription.go | 19 +++++-- 9 files changed, 240 insertions(+), 178 deletions(-) diff --git a/connection.go b/connection.go index dee35f2..97c649a 100644 --- a/connection.go +++ b/connection.go @@ -20,6 +20,8 @@ func NewConnection(ctx context.Context, url string, requestHeader http.Header, t return nil, err } + c.SetReadLimit(262144) // this should be enough for contact lists of over 2000 people + return &Connection{ conn: c, }, nil diff --git a/keyer/bunker.go b/keyer/bunker.go index 683e257..7d240a3 100644 --- a/keyer/bunker.go +++ b/keyer/bunker.go @@ -2,6 +2,7 @@ package keyer import ( "context" + "errors" "time" "github.com/nbd-wtf/go-nostr" @@ -18,7 +19,7 @@ func NewBunkerSignerFromBunkerClient(bc *nip46.BunkerClient) BunkerSigner { } func (bs BunkerSigner) GetPublicKey(ctx context.Context) (string, error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx, cancel := context.WithTimeoutCause(ctx, time.Second*30, errors.New("get_public_key took too long")) defer cancel() pk, err := bs.bunker.GetPublicKey(ctx) if err != nil { @@ -28,7 +29,7 @@ func (bs BunkerSigner) GetPublicKey(ctx context.Context) (string, error) { } func (bs BunkerSigner) SignEvent(ctx context.Context, evt *nostr.Event) error { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx, cancel := context.WithTimeoutCause(ctx, time.Second*30, errors.New("sign_event took too long")) defer cancel() return bs.bunker.SignEvent(ctx, evt) } diff --git a/pool.go b/pool.go index 4874ed2..d54694f 100644 --- a/pool.go +++ b/pool.go @@ -2,6 +2,7 @@ package nostr import ( "context" + "errors" "fmt" "log" "math" @@ -24,7 +25,7 @@ type SimplePool struct { Context context.Context authHandler func(context.Context, RelayEvent) error - cancel context.CancelFunc + cancel context.CancelCauseFunc eventMiddleware func(RelayEvent) duplicateMiddleware func(relay string, id string) @@ -36,8 +37,8 @@ type SimplePool struct { relayOptions []RelayOption } -type DirectedFilters struct { - Filters +type DirectedFilter struct { + Filter Relay string } @@ -55,7 +56,7 @@ type PoolOption interface { } func NewSimplePool(ctx context.Context, opts ...PoolOption) *SimplePool { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancelCause(ctx) pool := &SimplePool{ Relays: xsync.NewMapOf[string, *Relay](), @@ -177,7 +178,11 @@ func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) { // try to connect // we use this ctx here so when the pool dies everything dies - ctx, cancel := context.WithTimeout(pool.Context, time.Second*15) + ctx, cancel := context.WithTimeoutCause( + pool.Context, + time.Second*15, + errors.New("connecting to the relay took too long"), + ) defer cancel() relay = NewRelay(context.Background(), url, pool.relayOptions...) @@ -379,17 +384,36 @@ func (pool *SimplePool) SubManyEose( filters Filters, opts ...SubscriptionOption, ) chan RelayEvent { - ctx, cancel := context.WithCancel(ctx) + seenAlready := xsync.NewMapOf[string, bool]() + return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filters, WithCheckDuplicate(func(id, relay string) bool { + _, exists := seenAlready.Load(id) + if exists && pool.duplicateMiddleware != nil { + pool.duplicateMiddleware(relay, id) + } + return exists + }), seenAlready, opts...) +} + +func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate( + ctx context.Context, + urls []string, + filters Filters, + wcd WithCheckDuplicate, + seenAlready *xsync.MapOf[string, bool], + opts ...SubscriptionOption, +) chan RelayEvent { + ctx, cancel := context.WithCancelCause(ctx) events := make(chan RelayEvent) - seenAlready := xsync.NewMapOf[string, bool]() wg := sync.WaitGroup{} wg.Add(len(urls)) + opts = append(opts, wcd) + go func() { // this will happen when all subscriptions get an eose (or when they die) wg.Wait() - cancel() + cancel(errors.New("all subscriptions ended")) close(events) }() @@ -411,19 +435,14 @@ func (pool *SimplePool) SubManyEose( relay, err := pool.EnsureRelay(nm) if err != nil { + debugLogf("error connecting to %s with %v: %s", relay, filters, err) return } hasAuthed := false subscribe: - sub, err := relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(id, relay string) bool { - _, exists := seenAlready.Load(id) - if exists && pool.duplicateMiddleware != nil { - pool.duplicateMiddleware(relay, id) - } - return exists - }))...) + sub, err := relay.Subscribe(ctx, filters, opts...) if err != nil { debugLogf("error subscribing to %s with %v: %s", relay, filters, err) return @@ -508,47 +527,52 @@ func (pool *SimplePool) CountMany( // QuerySingle returns the first event returned by the first relay, cancels everything else. func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *RelayEvent { - ctx, cancel := context.WithCancel(ctx) - defer cancel() + ctx, cancel := context.WithCancelCause(ctx) for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}) { + cancel(errors.New("got the first event and ended successfully")) return &ievt } + cancel(errors.New("SubManyEose() didn't get yield events")) return nil } -func (pool *SimplePool) batchedSubMany( +func (pool *SimplePool) BatchedSubManyEose( ctx context.Context, - dfs []DirectedFilters, - subFn func(context.Context, []string, Filters, ...SubscriptionOption) chan RelayEvent, - opts []SubscriptionOption, + dfs []DirectedFilter, + opts ...SubscriptionOption, ) chan RelayEvent { res := make(chan RelayEvent) + wg := sync.WaitGroup{} + wg.Add(len(dfs)) + seenAlready := xsync.NewMapOf[string, bool]() for _, df := range dfs { - go func(df DirectedFilters) { - for ie := range subFn(ctx, []string{df.Relay}, df.Filters, opts...) { + go func(df DirectedFilter) { + for ie := range pool.subManyEoseNonOverwriteCheckDuplicate(ctx, + []string{df.Relay}, + Filters{df.Filter}, + WithCheckDuplicate(func(id, relay string) bool { + _, exists := seenAlready.Load(id) + if exists && pool.duplicateMiddleware != nil { + pool.duplicateMiddleware(relay, id) + } + return exists + }), seenAlready, opts...) { res <- ie } + + wg.Done() }(df) } + go func() { + wg.Wait() + close(res) + }() + return res } -// BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same. -func (pool *SimplePool) BatchedSubMany( - ctx context.Context, - dfs []DirectedFilters, - opts ...SubscriptionOption, -) chan RelayEvent { - return pool.batchedSubMany(ctx, dfs, pool.SubMany, opts) -} - -// BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays. -func (pool *SimplePool) BatchedSubManyEose( - ctx context.Context, - dfs []DirectedFilters, - opts ...SubscriptionOption, -) chan RelayEvent { - return pool.batchedSubMany(ctx, dfs, pool.SubManyEose, opts) +func (pool *SimplePool) Close(reason string) { + pool.cancel(fmt.Errorf("pool closed with reason: '%s'", reason)) } diff --git a/relay.go b/relay.go index de6c3a4..d538381 100644 --- a/relay.go +++ b/relay.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/tls" + "errors" "fmt" "log" "net/http" @@ -30,7 +31,7 @@ type Relay struct { ConnectionError error connectionContext context.Context // will be canceled when the connection closes - connectionContextCancel context.CancelFunc + connectionContextCancel context.CancelCauseFunc challenge string // NIP-42 challenge, we only keep the last noticeHandler func(string) // NIP-01 NOTICEs @@ -51,7 +52,7 @@ type writeRequest struct { // NewRelay returns a new relay. The relay connection will be closed when the context is canceled. func NewRelay(ctx context.Context, url string, opts ...RelayOption) *Relay { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancelCause(ctx) r := &Relay{ URL: NormalizeURL(url), connectionContext: ctx, @@ -150,7 +151,7 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, 7*time.Second) + ctx, cancel = context.WithTimeoutCause(ctx, 7*time.Second, errors.New("connection took too long")) defer cancel() } @@ -175,7 +176,7 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error // close all subscriptions for _, sub := range r.Subscriptions.Range { - sub.Unsub() + sub.unsub(fmt.Errorf("relay connection closed: %w / %w", context.Cause(r.connectionContext), r.ConnectionError)) } }() @@ -214,7 +215,7 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error buf.Reset() if err := conn.ReadMessage(r.connectionContext, buf); err != nil { r.ConnectionError = err - r.Close() + r.close(err) break } @@ -407,7 +408,7 @@ func (r *Relay) Subscribe(ctx context.Context, filters Filters, opts ...Subscrip // Failure to do that will result in a huge number of halted goroutines being created. func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts ...SubscriptionOption) *Subscription { current := subscriptionIDCounter.Add(1) - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancelCause(ctx) sub := &Subscription{ Relay: r, @@ -431,7 +432,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts . } } - // subscription id calculation + // subscription id computation buf := subIdPool.Get().([]byte)[:0] buf = strconv.AppendInt(buf, sub.counter, 10) buf = append(buf, ':') @@ -462,7 +463,7 @@ func (r *Relay) QueryEvents(ctx context.Context, filter Filter) (chan *Event, er case <-ctx.Done(): case <-r.Context().Done(): } - sub.Unsub() + sub.unsub(errors.New("QueryEvents() ended")) return } }() @@ -474,7 +475,7 @@ func (r *Relay) QuerySync(ctx context.Context, filter Filter) ([]*Event, error) if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, 7*time.Second) + ctx, cancel = context.WithTimeoutCause(ctx, 7*time.Second, errors.New("QuerySync() took too long")) defer cancel() } @@ -512,12 +513,12 @@ func (r *Relay) countInternal(ctx context.Context, filters Filters, opts ...Subs return CountEnvelope{}, err } - defer sub.Unsub() + defer sub.unsub(errors.New("countInternal() ended")) if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, 7*time.Second) + ctx, cancel = context.WithTimeoutCause(ctx, 7*time.Second, errors.New("countInternal took too long")) defer cancel() } @@ -532,13 +533,17 @@ func (r *Relay) countInternal(ctx context.Context, filters Filters, opts ...Subs } func (r *Relay) Close() error { + return r.close(errors.New("Close() called")) +} + +func (r *Relay) close(reason error) error { r.closeMutex.Lock() defer r.closeMutex.Unlock() if r.connectionContextCancel == nil { return fmt.Errorf("relay already closed") } - r.connectionContextCancel() + r.connectionContextCancel(reason) r.connectionContextCancel = nil if r.Connection == nil { diff --git a/sdk/addressable_loader.go b/sdk/addressable_loader.go index cf686ec..1b5c9ae 100644 --- a/sdk/addressable_loader.go +++ b/sdk/addressable_loader.go @@ -2,6 +2,7 @@ package sdk import ( "context" + "errors" "fmt" "strconv" "sync" @@ -37,6 +38,7 @@ func (sys *System) createAddressableDataloader(kind int) *dataloader.Loader[stri }, dataloader.WithBatchCapacity[string, []*nostr.Event](60), dataloader.WithClearCacheOnBatch[string, []*nostr.Event](), + dataloader.WithCache(&dataloader.NoCache[string, []*nostr.Event]{}), dataloader.WithWait[string, []*nostr.Event](time.Millisecond*350), ) } @@ -45,13 +47,16 @@ func (sys *System) batchLoadAddressableEvents( kind int, pubkeys []string, ) []*dataloader.Result[[]*nostr.Event] { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*4) + ctx, cancel := context.WithTimeoutCause(context.Background(), time.Second*6, + errors.New("batch addressable load took too long"), + ) defer cancel() batchSize := len(pubkeys) results := make([]*dataloader.Result[[]*nostr.Event], batchSize) - keyPositions := make(map[string]int) // { [pubkey]: slice_index } - relayFilters := make(map[string]nostr.Filter) // { [relayUrl]: filter } + keyPositions := make(map[string]int) // { [pubkey]: slice_index } + relayFilter := make([]nostr.DirectedFilter, 0, max(3, batchSize*2)) + relayFilterIndex := make(map[string]int, max(3, batchSize*2)) wg := sync.WaitGroup{} wg.Add(len(pubkeys)) @@ -91,49 +96,60 @@ func (sys *System) batchLoadAddressableEvents( cm.Lock() for _, relay := range relays { // each relay will have a custom filter - filter, ok := relayFilters[relay] - if !ok { - filter = nostr.Filter{ - Kinds: []int{kind}, - Authors: make([]string, 0, batchSize-i /* this and all pubkeys after this can be added */), + idx, ok := relayFilterIndex[relay] + var dfilter nostr.DirectedFilter + if ok { + dfilter = relayFilter[idx] + } else { + dfilter = nostr.DirectedFilter{ + Relay: relay, + Filter: nostr.Filter{ + Kinds: []int{kind}, + Authors: make([]string, 0, batchSize-i /* this and all pubkeys after this can be added */), + }, } + idx = len(relayFilter) + relayFilterIndex[relay] = idx + relayFilter = append(relayFilter, dfilter) } - filter.Authors = append(filter.Authors, pubkey) - relayFilters[relay] = filter + dfilter.Authors = append(dfilter.Authors, pubkey) + relayFilter[idx] = dfilter } cm.Unlock() }(i, pubkey) } - // query all relays with the prepared filters + // wait for relay batches to be prepared wg.Wait() - multiSubs := sys.batchAddressableRelayQueries(ctx, relayFilters) + + // query all relays with the prepared filters + multiSubs := sys.Pool.BatchedSubManyEose(ctx, relayFilter) nextEvent: for { select { - case evt, more := <-multiSubs: + case ie, more := <-multiSubs: if !more { return results } // insert this event at the desired position - pos := keyPositions[evt.PubKey] // @unchecked: it must succeed because it must be a key we passed + pos := keyPositions[ie.PubKey] // @unchecked: it must succeed because it must be a key we passed events := results[pos].Data if events == nil { // no events found, so just add this and end - results[pos] = &dataloader.Result[[]*nostr.Event]{Data: []*nostr.Event{evt}} + results[pos] = &dataloader.Result[[]*nostr.Event]{Data: []*nostr.Event{ie.Event}} continue nextEvent } // there are events, so look for a match - d := evt.Tags.GetD() + d := ie.Tags.GetD() for i, event := range events { if event.Tags.GetD() == d { // there is a match - if event.CreatedAt < evt.CreatedAt { + if event.CreatedAt < ie.CreatedAt { // ...and this one is newer, so replace - events[i] = evt + events[i] = ie.Event } else { // ... but this one is older, so ignore } @@ -143,42 +159,10 @@ nextEvent: } // there is no match, so add to the end - events = append(events, evt) + events = append(events, ie.Event) results[pos].Data = events case <-ctx.Done(): return results } } } - -// batchAddressableRelayQueries is like batchReplaceableRelayQueries, except it doesn't count results to -// try to exit early. -func (sys *System) batchAddressableRelayQueries( - ctx context.Context, - relayFilters map[string]nostr.Filter, -) <-chan *nostr.Event { - all := make(chan *nostr.Event) - - wg := sync.WaitGroup{} - wg.Add(len(relayFilters)) - for url, filter := range relayFilters { - go func(url string, filter nostr.Filter) { - defer wg.Done() - n := len(filter.Authors) - - ctx, cancel := context.WithTimeout(ctx, time.Millisecond*450+time.Millisecond*50*time.Duration(n)) - defer cancel() - - for ie := range sys.Pool.SubManyEose(ctx, []string{url}, nostr.Filters{filter}, nostr.WithLabel("addr")) { - all <- ie.Event - } - }(url, filter) - } - - go func() { - wg.Wait() - close(all) - }() - - return all -} diff --git a/sdk/lists_profile.go b/sdk/lists_profile.go index eb0174e..c589871 100644 --- a/sdk/lists_profile.go +++ b/sdk/lists_profile.go @@ -48,11 +48,11 @@ func parseProfileRef(tag nostr.Tag) (fw ProfileRef, ok bool) { if _, err := url.Parse(tag[2]); err == nil { fw.Relay = nostr.NormalizeURL(tag[2]) } + if len(tag) > 3 { fw.Petname = strings.TrimSpace(tag[3]) } - return fw, true } - return fw, false + return fw, true } diff --git a/sdk/replaceable_loader.go b/sdk/replaceable_loader.go index d654187..42b1dda 100644 --- a/sdk/replaceable_loader.go +++ b/sdk/replaceable_loader.go @@ -2,6 +2,7 @@ package sdk import ( "context" + "errors" "fmt" "slices" "strconv" @@ -12,6 +13,10 @@ import ( "github.com/nbd-wtf/go-nostr" ) +// this is used as a hack to signal that these replaceable loader queries shouldn't use the full +// context timespan when they're being made from inside determineRelaysToQuery +var contextForSub10002Query = context.WithValue(context.Background(), "", "") + type replaceableIndex int const ( @@ -49,26 +54,39 @@ func (sys *System) initializeReplaceableDataloaders() { func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[string, *nostr.Event] { return dataloader.NewBatchedLoader( - func(_ context.Context, pubkeys []string) []*dataloader.Result[*nostr.Event] { - return sys.batchLoadReplaceableEvents(kind, pubkeys) + func(ctx context.Context, pubkeys []string) []*dataloader.Result[*nostr.Event] { + var cancel context.CancelFunc + + if ctx == contextForSub10002Query { + ctx, cancel = context.WithTimeoutCause(context.Background(), time.Millisecond*2300, + errors.New("fetching relays in subloader took too long"), + ) + } else { + ctx, cancel = context.WithTimeoutCause(context.Background(), time.Second*6, + errors.New("batch replaceable load took too long"), + ) + defer cancel() + } + + return sys.batchLoadReplaceableEvents(ctx, kind, pubkeys) }, dataloader.WithBatchCapacity[string, *nostr.Event](60), dataloader.WithClearCacheOnBatch[string, *nostr.Event](), + dataloader.WithCache(&dataloader.NoCache[string, *nostr.Event]{}), dataloader.WithWait[string, *nostr.Event](time.Millisecond*350), ) } func (sys *System) batchLoadReplaceableEvents( + ctx context.Context, kind int, pubkeys []string, ) []*dataloader.Result[*nostr.Event] { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*4) - defer cancel() - batchSize := len(pubkeys) results := make([]*dataloader.Result[*nostr.Event], batchSize) - keyPositions := make(map[string]int) // { [pubkey]: slice_index } - relayFilters := make(map[string]nostr.Filter) // { [relayUrl]: filter } + keyPositions := make(map[string]int) // { [pubkey]: slice_index } + relayFilter := make([]nostr.DirectedFilter, 0, max(3, batchSize*2)) + relayFilterIndex := make(map[string]int, max(3, batchSize*2)) wg := sync.WaitGroup{} wg.Add(len(pubkeys)) @@ -108,15 +126,24 @@ func (sys *System) batchLoadReplaceableEvents( cm.Lock() for _, relay := range relays { // each relay will have a custom filter - filter, ok := relayFilters[relay] - if !ok { - filter = nostr.Filter{ - Kinds: []int{kind}, - Authors: make([]string, 0, batchSize-i /* this and all pubkeys after this can be added */), + idx, ok := relayFilterIndex[relay] + var dfilter nostr.DirectedFilter + if ok { + dfilter = relayFilter[idx] + } else { + dfilter = nostr.DirectedFilter{ + Relay: relay, + Filter: nostr.Filter{ + Kinds: []int{kind}, + Authors: make([]string, 0, batchSize-i /* this and all pubkeys after this can be added */), + }, } + idx = len(relayFilter) + relayFilterIndex[relay] = idx + relayFilter = append(relayFilter, dfilter) } - filter.Authors = append(filter.Authors, pubkey) - relayFilters[relay] = filter + dfilter.Authors = append(dfilter.Authors, pubkey) + relayFilter[idx] = dfilter } cm.Unlock() }(i, pubkey) @@ -124,18 +151,18 @@ func (sys *System) batchLoadReplaceableEvents( // query all relays with the prepared filters wg.Wait() - multiSubs := sys.batchReplaceableRelayQueries(ctx, relayFilters) + multiSubs := sys.Pool.BatchedSubManyEose(ctx, relayFilter, nostr.WithLabel("repl~"+strconv.Itoa(kind))) for { select { - case evt, more := <-multiSubs: + case ie, more := <-multiSubs: if !more { return results } // insert this event at the desired position - pos := keyPositions[evt.PubKey] // @unchecked: it must succeed because it must be a key we passed - if results[pos].Data == nil || results[pos].Data.CreatedAt < evt.CreatedAt { - results[pos] = &dataloader.Result[*nostr.Event]{Data: evt} + pos := keyPositions[ie.PubKey] // @unchecked: it must succeed because it must be a key we passed + if results[pos].Data == nil || results[pos].Data.CreatedAt < ie.CreatedAt { + results[pos] = &dataloader.Result[*nostr.Event]{Data: ie.Event} } case <-ctx.Done(): return results @@ -153,11 +180,13 @@ func (sys *System) determineRelaysToQuery(ctx context.Context, pubkey string, ki if len(relays) == 0 { relays = []string{"wss://relay.damus.io", "wss://nos.lol"} } - } else if kind == 0 || kind == 3 { - // leave room for two hardcoded relays because people are stupid - relays = sys.FetchOutboxRelays(ctx, pubkey, 1) } else { - relays = sys.FetchOutboxRelays(ctx, pubkey, 3) + if kind == 0 || kind == 3 { + // leave room for two hardcoded relays because people are stupid + relays = sys.FetchOutboxRelays(contextForSub10002Query, pubkey, 1) + } else { + relays = sys.FetchOutboxRelays(contextForSub10002Query, pubkey, 3) + } } // use a different set of extra relays depending on the kind @@ -182,45 +211,3 @@ func (sys *System) determineRelaysToQuery(ctx context.Context, pubkey string, ki return relays } - -// batchReplaceableRelayQueries subscribes to multiple relays using a different filter for each and returns -// a single channel with all results. it closes on EOSE or when all the expected events were returned. -// -// the number of expected events is given by the number of pubkeys in the .Authors filter field. -// because of that, batchReplaceableRelayQueries is only suitable for querying replaceable events -- and -// care must be taken to not include the same pubkey more than once in the filter .Authors array. -func (sys *System) batchReplaceableRelayQueries( - ctx context.Context, - relayFilters map[string]nostr.Filter, -) <-chan *nostr.Event { - all := make(chan *nostr.Event) - - wg := sync.WaitGroup{} - wg.Add(len(relayFilters)) - for url, filter := range relayFilters { - go func(url string, filter nostr.Filter) { - defer wg.Done() - n := len(filter.Authors) - - ctx, cancel := context.WithTimeout(ctx, time.Millisecond*950+time.Millisecond*50*time.Duration(n)) - defer cancel() - - received := 0 - for ie := range sys.Pool.SubManyEose(ctx, []string{url}, nostr.Filters{filter}, nostr.WithLabel("repl")) { - all <- ie.Event - received++ - if received >= n { - // we got all events we asked for, unless the relay is shitty and sent us two from the same - return - } - } - }(url, filter) - } - - go func() { - wg.Wait() - close(all) - }() - - return all -} diff --git a/sdk/sdk_test.go b/sdk/sdk_test.go index 8540503..2a6aedb 100644 --- a/sdk/sdk_test.go +++ b/sdk/sdk_test.go @@ -2,13 +2,15 @@ package sdk import ( "context" + "fmt" + "strings" "testing" "github.com/nbd-wtf/go-nostr" "github.com/stretchr/testify/require" ) -func TestSystemFiatjaf(t *testing.T) { +func TestMetadataAndEvents(t *testing.T) { sys := NewSystem() ctx := context.Background() @@ -33,3 +35,53 @@ func TestSystemFiatjaf(t *testing.T) { require.NotEmpty(t, events[meta.PubKey]) require.GreaterOrEqual(t, len(events[meta.PubKey]), 5) } + +func TestFollowListRecursion(t *testing.T) { + sys := NewSystem() + ctx := context.Background() + + // fetch initial follow list + followList := sys.FetchFollowList(ctx, "3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d") + fmt.Println("~", len(followList.Items)) + require.Greater(t, len(followList.Items), 400, "should follow more than 400 accounts") + + // fetch metadata and follow lists for each followed account concurrently + type result struct { + pubkey string + followList GenericList[ProfileRef] + metadata ProfileMetadata + } + + results := make(chan result) + go func() { + for _, item := range followList.Items { + go func() { + fl := sys.FetchFollowList(ctx, item.Pubkey) + meta := sys.FetchProfileMetadata(ctx, item.Pubkey) + fmt.Println(" ~", item.Pubkey, meta.Name, len(fl.Items)) + results <- result{item.Pubkey, fl, meta} + }() + } + }() + + // collect results + var validAccounts int + var accountsWithManyFollows int + for i := 0; i < len(followList.Items); i++ { + r := <-results + + // skip if metadata has "bot" in name + if strings.Contains(strings.ToLower(r.metadata.Name), "bot") { + continue + } + + validAccounts++ + if len(r.followList.Items) > 20 { + accountsWithManyFollows++ + } + } + + // check if at least 90% of non-bot accounts follow more than 20 accounts + ratio := float64(accountsWithManyFollows) / float64(validAccounts) + require.Greater(t, ratio, 0.9, "at least 90%% of accounts should follow more than 20 others (actual: %.2f%%)", ratio*100) +} diff --git a/subscription.go b/subscription.go index bf9ccc3..5d53a33 100644 --- a/subscription.go +++ b/subscription.go @@ -2,6 +2,7 @@ package nostr import ( "context" + "errors" "fmt" "sync" "sync/atomic" @@ -38,7 +39,7 @@ type Subscription struct { match func(*Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints live atomic.Bool eosed atomic.Bool - cancel context.CancelFunc + cancel context.CancelCauseFunc // this keeps track of the events we've received before the EOSE that we must dispatch before // closing the EndOfStoredEvents channel @@ -74,8 +75,9 @@ var ( func (sub *Subscription) start() { <-sub.Context.Done() + // the subscription ends once the context is canceled (if not already) - sub.Unsub() // this will set sub.live to false + sub.unsub(errors.New("context done on start()")) // this will set sub.live to false // do this so we don't have the possibility of closing the Events channel and then trying to send to it sub.mu.Lock() @@ -123,15 +125,19 @@ func (sub *Subscription) handleClosed(reason string) { go func() { sub.ClosedReason <- reason sub.live.Store(false) // set this so we don't send an unnecessary CLOSE to the relay - sub.Unsub() + sub.unsub(fmt.Errorf("CLOSED received: %s", reason)) }() } // Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01. // Unsub() also closes the channel sub.Events and makes a new one. func (sub *Subscription) Unsub() { + sub.unsub(errors.New("Unsub() called")) +} + +func (sub *Subscription) unsub(err error) { // cancel the context (if it's not canceled already) - sub.cancel() + sub.cancel(err) // mark subscription as closed and send a CLOSE to the relay (naïve sync.Once implementation) if sub.live.CompareAndSwap(true, false) { @@ -169,8 +175,9 @@ func (sub *Subscription) Fire() error { sub.live.Store(true) if err := <-sub.Relay.Write(reqb); err != nil { - sub.cancel() - return fmt.Errorf("failed to write: %w", err) + err := fmt.Errorf("failed to write: %w", err) + sub.cancel(err) + return err } return nil