From f992c6e7ead489040acc6344df419b54b5afadcd Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Mon, 20 Jan 2025 12:59:27 -0300 Subject: [PATCH] sdk: pubkey feeds, live and past pages. --- sdk/feeds.go | 157 ++++++++++++++++++++++++++++++++++++ sdk/kvstore/memory/store.go | 4 +- sdk/loader_helpers.go | 6 -- sdk/metadata.go | 25 ------ sdk/outbox.go | 55 ------------- 5 files changed, 159 insertions(+), 88 deletions(-) create mode 100644 sdk/feeds.go diff --git a/sdk/feeds.go b/sdk/feeds.go new file mode 100644 index 0000000..0a5537b --- /dev/null +++ b/sdk/feeds.go @@ -0,0 +1,157 @@ +package sdk + +import ( + "context" + "encoding/hex" + "fmt" + "slices" + "sync" + + "github.com/nbd-wtf/go-nostr" +) + +const ( + pubkeyStreamLatestPrefix = byte('L') + pubkeyStreamOldestPrefix = byte('O') +) + +func makePubkeyStreamKey(prefix byte, pubkey string) []byte { + key := make([]byte, 1+8) + key[0] = prefix + hex.Decode(key[1:], []byte(pubkey[0:16])) + return key +} + +// StreamPubkeysForward starts listening for new events from the given pubkeys, +// taking into account their outbox relays. It returns a channel that emits events +// continuously. The events are fetched from the time of the last seen event for +// each pubkey (stored in KVStore) onwards. +func (sys *System) StreamLiveFeed( + ctx context.Context, + pubkeys []string, + kinds []int, +) (<-chan *nostr.Event, error) { + events := make(chan *nostr.Event) + + // start a subscription for each relay group + for _, pubkey := range pubkeys { + relays := sys.FetchOutboxRelays(ctx, pubkey, 2) + if len(relays) == 0 { + continue + } + + latestKey := makePubkeyStreamKey(pubkeyStreamLatestPrefix, pubkey) + latest := nostr.Timestamp(0) + oldestKey := makePubkeyStreamKey(pubkeyStreamOldestPrefix, pubkey) + oldest := nostr.Timestamp(0) + + serial := 0 + + var since *nostr.Timestamp + if data, _ := sys.KVStore.Get(latestKey); data != nil { + latest = decodeTimestamp(data) + since = &latest + } + + filter := nostr.Filter{ + Authors: []string{pubkey}, + Since: since, + Kinds: kinds, + } + + sub := sys.Pool.SubMany(ctx, relays, nostr.Filters{filter}) + for evt := range sub { + go func() { + sys.StoreRelay.Publish(ctx, *evt.Event) + if latest < evt.CreatedAt { + latest = evt.CreatedAt + serial++ + if serial%10 == 0 { + sys.KVStore.Set(latestKey, encodeTimestamp(latest)) + } + } else if oldest > evt.CreatedAt { + oldest = evt.CreatedAt + sys.KVStore.Set(oldestKey, encodeTimestamp(oldest)) + } + }() + events <- evt.Event + } + } + + return events, nil +} + +// FetchFeedNextPage fetches historical events from the given pubkeys in descending order starting from the +// given until timestamp. The limit argument is just a hint of how much content you want for the entire list, +// it isn't guaranteed that this quantity of events will be returned -- it could be more or less. +// +// It relies on KVStore's latestKey and oldestKey in order to determine if we should go to relays to ask +// for events or if we should just return what we have stored locally. +func (sys *System) FetchFeedPage( + ctx context.Context, + pubkeys []string, + kinds []int, + until nostr.Timestamp, + totalLimit int, +) ([]*nostr.Event, error) { + limitPerKey := PerQueryLimitInBatch(totalLimit, len(pubkeys)) + events := make([]*nostr.Event, 0, len(pubkeys)*limitPerKey) + + wg := sync.WaitGroup{} + wg.Add(len(pubkeys)) + + for _, pubkey := range pubkeys { + oldestKey := makePubkeyStreamKey(pubkeyStreamOldestPrefix, pubkey) + + filter := nostr.Filter{Authors: []string{pubkey}, Kinds: kinds} + if data, _ := sys.KVStore.Get(oldestKey); data != nil { + oldest := decodeTimestamp(data) + filter.Since = &oldest + } + + if filter.Since != nil && *filter.Until < until { + // eligible for a local query + filter.Until = &until + res, err := sys.StoreRelay.QuerySync(ctx, filter) + if err != nil { + return nil, fmt.Errorf("query failure at '%s': %w", pubkey, err) + } + + if len(res) >= limitPerKey { + // we got enough from the local store + events = append(events, res...) + wg.Done() + continue + } + } + + // if we didn't query the local store or we didn't get enough, then we will fetch from relays + filter.Until = filter.Since + filter.Since = nil + + relays := sys.FetchOutboxRelays(ctx, pubkey, 2) + if len(relays) == 0 { + wg.Done() + continue + } + + go func() { + sub := sys.Pool.SubManyEose(ctx, relays, nostr.Filters{filter}) + var oldest nostr.Timestamp + for ie := range sub { + sys.StoreRelay.Publish(ctx, *ie.Event) + oldest = ie.Event.CreatedAt + events = append(events, ie.Event) + } + wg.Done() + if oldest != 0 && oldest < *filter.Until { + sys.KVStore.Set(oldestKey, encodeTimestamp(oldest)) + } + }() + } + + wg.Wait() + slices.SortFunc(events, nostr.CompareEventPtrReverse) + + return events, nil +} diff --git a/sdk/kvstore/memory/store.go b/sdk/kvstore/memory/store.go index fca4b76..fef5737 100644 --- a/sdk/kvstore/memory/store.go +++ b/sdk/kvstore/memory/store.go @@ -24,7 +24,7 @@ func (s *Store) Get(key []byte) ([]byte, error) { defer s.RUnlock() if val, ok := s.data[string(key)]; ok { - // Return a copy to prevent modification of stored data + // return a copy to prevent modification of stored data cp := make([]byte, len(val)) copy(cp, val) return cp, nil @@ -36,7 +36,7 @@ func (s *Store) Set(key []byte, value []byte) error { s.Lock() defer s.Unlock() - // Store a copy to prevent modification of stored data + // store a copy to prevent modification of stored data cp := make([]byte, len(value)) copy(cp, value) s.data[string(key)] = cp diff --git a/sdk/loader_helpers.go b/sdk/loader_helpers.go index 8837c3c..1766163 100644 --- a/sdk/loader_helpers.go +++ b/sdk/loader_helpers.go @@ -28,9 +28,3 @@ func encodeTimestamp(t nostr.Timestamp) []byte { func decodeTimestamp(b []byte) nostr.Timestamp { return nostr.Timestamp(binary.BigEndian.Uint32(b)) } - -// shouldRefreshFromNetwork checks if we should try fetching from network -func shouldRefreshFromNetwork(lastFetchData []byte) bool { - lastFetch := decodeTimestamp(lastFetchData) - return nostr.Now()-lastFetch > 7*24*60*60 -} diff --git a/sdk/metadata.go b/sdk/metadata.go index 6f9ecf8..c371aef 100644 --- a/sdk/metadata.go +++ b/sdk/metadata.go @@ -3,7 +3,6 @@ package sdk import ( "context" "fmt" - "sync" "time" "github.com/nbd-wtf/go-nostr" @@ -140,30 +139,6 @@ func (sys *System) FetchProfileMetadata(ctx context.Context, pubkey string) (pm return pm } -// FetchUserEvents fetches events from each users' outbox relays, grouping queries when possible. -func (sys *System) FetchUserEvents(ctx context.Context, filter nostr.Filter) (map[string][]*nostr.Event, error) { - filters, err := sys.ExpandQueriesByAuthorAndRelays(ctx, filter) - if err != nil { - return nil, fmt.Errorf("failed to expand queries: %w", err) - } - - results := make(map[string][]*nostr.Event) - wg := sync.WaitGroup{} - wg.Add(len(filters)) - for relayURL, filter := range filters { - go func(relayURL string, filter nostr.Filter) { - defer wg.Done() - filter.Limit = filter.Limit * len(filter.Authors) // hack - for ie := range sys.Pool.SubManyEose(ctx, []string{relayURL}, nostr.Filters{filter}, nostr.WithLabel("userevts")) { - results[ie.PubKey] = append(results[ie.PubKey], ie.Event) - } - }(relayURL, filter) - } - wg.Wait() - - return results, nil -} - func (sys *System) tryFetchMetadataFromNetwork(ctx context.Context, pubkey string) *ProfileMetadata { thunk0 := sys.replaceableLoaders[kind_0].Load(ctx, pubkey) evt, err := thunk0() diff --git a/sdk/outbox.go b/sdk/outbox.go index ee8387c..a06c60d 100644 --- a/sdk/outbox.go +++ b/sdk/outbox.go @@ -2,12 +2,8 @@ package sdk import ( "context" - "fmt" "strconv" - "sync" "time" - - "github.com/nbd-wtf/go-nostr" ) var outboxShortTermCache = [256]ostcEntry{} @@ -45,54 +41,3 @@ func (sys *System) FetchOutboxRelays(ctx context.Context, pubkey string, n int) return relays } - -func (sys *System) ExpandQueriesByAuthorAndRelays( - ctx context.Context, - filter nostr.Filter, -) (map[string]nostr.Filter, error) { - n := len(filter.Authors) - if n == 0 { - return nil, fmt.Errorf("no authors in filter") - } - - relaysForPubkey := make(map[string][]string, n) - mu := sync.Mutex{} - - wg := sync.WaitGroup{} - wg.Add(n) - for _, pubkey := range filter.Authors { - go func(pubkey string) { - defer wg.Done() - relayURLs := sys.FetchOutboxRelays(ctx, pubkey, 3) - c := 0 - for _, r := range relayURLs { - relay, err := sys.Pool.EnsureRelay(r) - if err != nil { - continue - } - mu.Lock() - relaysForPubkey[pubkey] = append(relaysForPubkey[pubkey], relay.URL) - mu.Unlock() - c++ - if c == 3 { - return - } - } - }(pubkey) - } - wg.Wait() - - filterForRelay := make(map[string]nostr.Filter, n) // { [relay]: filter } - for pubkey, relays := range relaysForPubkey { - for _, relay := range relays { - flt, ok := filterForRelay[relay] - if !ok { - flt = filter.Clone() - filterForRelay[relay] = flt - } - flt.Authors = append(flt.Authors, pubkey) - } - } - - return filterForRelay, nil -}