From d87066c9b9d2cefe086b02925e03158df037811b Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Tue, 27 Jan 2026 23:25:34 -0300 Subject: [PATCH] sdk: eliminate all the data races go was complaining about. --- sdk/hints/memoryh/db.go | 5 +- sdk/list.go | 10 ++-- sdk/lists_event.go | 16 ++++-- sdk/lists_profile.go | 24 +++++--- sdk/lists_relay.go | 24 +++++--- sdk/lists_topics.go | 16 ++++-- sdk/outbox.go | 9 ++- sdk/set.go | 7 +-- sdk/system.go | 121 +++++++++++++++++++++++++--------------- 9 files changed, 142 insertions(+), 90 deletions(-) diff --git a/sdk/hints/memoryh/db.go b/sdk/hints/memoryh/db.go index cb73576..cfd58a3 100644 --- a/sdk/hints/memoryh/db.go +++ b/sdk/hints/memoryh/db.go @@ -31,14 +31,15 @@ func (db *HintDB) Save(pubkey nostr.PubKey, relay string, key hints.HintKey, ts ts = now } + db.Lock() + defer db.Unlock() + relayIndex := slices.Index(db.RelayBySerial, relay) if relayIndex == -1 { relayIndex = len(db.RelayBySerial) db.RelayBySerial = append(db.RelayBySerial, relay) } - db.Lock() - defer db.Unlock() // fmt.Println(" ", relay, "index", relayIndex, "--", "adding", hints.HintKey(key).String(), ts) entries, _ := db.OrderedRelaysByPubKey[pubkey] diff --git a/sdk/list.go b/sdk/list.go index 0f4ac02..084f489 100644 --- a/sdk/list.go +++ b/sdk/list.go @@ -4,6 +4,7 @@ import ( "context" "slices" "sync" + "sync/atomic" "time" "fiatjaf.com/nostr" @@ -23,7 +24,7 @@ type TagItemWithValue[V comparable] interface { var ( genericListMutexes = [60]sync.Mutex{} - valueWasJustCached = [60]bool{} + valueWasJustCached = [60]atomic.Bool{} ) func fetchGenericList[V comparable, I TagItemWithValue[V]]( @@ -42,10 +43,9 @@ func fetchGenericList[V comparable, I TagItemWithValue[V]]( lockIdx := (nostr.Kind(n) + actualKind) % 60 genericListMutexes[lockIdx].Lock() - if valueWasJustCached[lockIdx] { + if valueWasJustCached[lockIdx].CompareAndSwap(true, false) { // this ensures the cache has had time to commit the values // so we don't repeat a fetch immediately after the other - valueWasJustCached[lockIdx] = false time.Sleep(time.Millisecond * 10) } @@ -83,7 +83,7 @@ func fetchGenericList[V comparable, I TagItemWithValue[V]]( // and finally save this to cache cache.SetWithTTL(pubkey, v, time.Hour*6) - valueWasJustCached[lockIdx] = true + valueWasJustCached[lockIdx].Store(true) return v, true } @@ -99,7 +99,7 @@ func fetchGenericList[V comparable, I TagItemWithValue[V]]( // save cache even if we didn't get anything cache.SetWithTTL(pubkey, v, time.Hour*6) - valueWasJustCached[lockIdx] = true + valueWasJustCached[lockIdx].Store(true) return v, false } diff --git a/sdk/lists_event.go b/sdk/lists_event.go index a04a90c..3393d72 100644 --- a/sdk/lists_event.go +++ b/sdk/lists_event.go @@ -12,18 +12,22 @@ type EventRef struct{ nostr.Pointer } func (e EventRef) Value() string { return e.Pointer.AsTagReference() } func (sys *System) FetchBookmarkList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, EventRef] { - if sys.BookmarkListCache == nil { - sys.BookmarkListCache = cache_memory.New[GenericList[string, EventRef]](1000) - } + sys.bookmarkListCacheOnce.Do(func() { + if sys.BookmarkListCache == nil { + sys.BookmarkListCache = cache_memory.New[GenericList[string, EventRef]](1000) + } + }) ml, _ := fetchGenericList(sys, ctx, pubkey, 10003, kind_10003, parseEventRef, sys.BookmarkListCache) return ml } func (sys *System) FetchPinList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, EventRef] { - if sys.PinListCache == nil { - sys.PinListCache = cache_memory.New[GenericList[string, EventRef]](1000) - } + sys.pinListCacheOnce.Do(func() { + if sys.PinListCache == nil { + sys.PinListCache = cache_memory.New[GenericList[string, EventRef]](1000) + } + }) ml, _ := fetchGenericList(sys, ctx, pubkey, 10001, kind_10001, parseEventRef, sys.PinListCache) return ml diff --git a/sdk/lists_profile.go b/sdk/lists_profile.go index 3da5650..2381ee2 100644 --- a/sdk/lists_profile.go +++ b/sdk/lists_profile.go @@ -18,27 +18,33 @@ type ProfileRef struct { func (f ProfileRef) Value() nostr.PubKey { return f.Pubkey } func (sys *System) FetchFollowList(ctx context.Context, pubkey nostr.PubKey) GenericList[nostr.PubKey, ProfileRef] { - if sys.FollowListCache == nil { - sys.FollowListCache = cache_memory.New[GenericList[nostr.PubKey, ProfileRef]](1000) - } + sys.followListCacheOnce.Do(func() { + if sys.FollowListCache == nil { + sys.FollowListCache = cache_memory.New[GenericList[nostr.PubKey, ProfileRef]](1000) + } + }) fl, _ := fetchGenericList(sys, ctx, pubkey, 3, kind_3, parseProfileRef, sys.FollowListCache) return fl } func (sys *System) FetchMuteList(ctx context.Context, pubkey nostr.PubKey) GenericList[nostr.PubKey, ProfileRef] { - if sys.MuteListCache == nil { - sys.MuteListCache = cache_memory.New[GenericList[nostr.PubKey, ProfileRef]](1000) - } + sys.muteListCacheOnce.Do(func() { + if sys.MuteListCache == nil { + sys.MuteListCache = cache_memory.New[GenericList[nostr.PubKey, ProfileRef]](1000) + } + }) ml, _ := fetchGenericList(sys, ctx, pubkey, 10000, kind_10000, parseProfileRef, sys.MuteListCache) return ml } func (sys *System) FetchFollowSets(ctx context.Context, pubkey nostr.PubKey) GenericSets[nostr.PubKey, ProfileRef] { - if sys.FollowSetsCache == nil { - sys.FollowSetsCache = cache_memory.New[GenericSets[nostr.PubKey, ProfileRef]](1000) - } + sys.followSetsCacheOnce.Do(func() { + if sys.FollowSetsCache == nil { + sys.FollowSetsCache = cache_memory.New[GenericSets[nostr.PubKey, ProfileRef]](1000) + } + }) ml, _ := fetchGenericSets(sys, ctx, pubkey, 30000, kind_30000, parseProfileRef, sys.FollowSetsCache) return ml diff --git a/sdk/lists_relay.go b/sdk/lists_relay.go index be00849..42fe17f 100644 --- a/sdk/lists_relay.go +++ b/sdk/lists_relay.go @@ -25,27 +25,33 @@ func (sys *System) FetchRelayList(ctx context.Context, pubkey nostr.PubKey) Gene } func (sys *System) FetchBlockedRelayList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, RelayURL] { - if sys.BlockedRelayListCache == nil { - sys.BlockedRelayListCache = cache_memory.New[GenericList[string, RelayURL]](1000) - } + sys.blockedRelayListCacheOnce.Do(func() { + if sys.BlockedRelayListCache == nil { + sys.BlockedRelayListCache = cache_memory.New[GenericList[string, RelayURL]](1000) + } + }) ml, _ := fetchGenericList(sys, ctx, pubkey, 10006, kind_10006, parseRelayURL, sys.BlockedRelayListCache) return ml } func (sys *System) FetchSearchRelayList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, RelayURL] { - if sys.SearchRelayListCache == nil { - sys.SearchRelayListCache = cache_memory.New[GenericList[string, RelayURL]](1000) - } + sys.searchRelayListCacheOnce.Do(func() { + if sys.SearchRelayListCache == nil { + sys.SearchRelayListCache = cache_memory.New[GenericList[string, RelayURL]](1000) + } + }) ml, _ := fetchGenericList(sys, ctx, pubkey, 10007, kind_10007, parseRelayURL, sys.SearchRelayListCache) return ml } func (sys *System) FetchRelaySets(ctx context.Context, pubkey nostr.PubKey) GenericSets[string, RelayURL] { - if sys.RelaySetsCache == nil { - sys.RelaySetsCache = cache_memory.New[GenericSets[string, RelayURL]](1000) - } + sys.relaySetsCacheOnce.Do(func() { + if sys.RelaySetsCache == nil { + sys.RelaySetsCache = cache_memory.New[GenericSets[string, RelayURL]](1000) + } + }) ml, _ := fetchGenericSets(sys, ctx, pubkey, 30002, kind_30002, parseRelayURL, sys.RelaySetsCache) return ml diff --git a/sdk/lists_topics.go b/sdk/lists_topics.go index f72135e..028d9e6 100644 --- a/sdk/lists_topics.go +++ b/sdk/lists_topics.go @@ -12,18 +12,22 @@ type Topic string func (r Topic) Value() string { return string(r) } func (sys *System) FetchTopicList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, Topic] { - if sys.TopicListCache == nil { - sys.TopicListCache = cache_memory.New[GenericList[string, Topic]](1000) - } + sys.topicListCacheOnce.Do(func() { + if sys.TopicListCache == nil { + sys.TopicListCache = cache_memory.New[GenericList[string, Topic]](1000) + } + }) ml, _ := fetchGenericList(sys, ctx, pubkey, 10015, kind_10015, parseTopicString, sys.TopicListCache) return ml } func (sys *System) FetchTopicSets(ctx context.Context, pubkey nostr.PubKey) GenericSets[string, Topic] { - if sys.TopicSetsCache == nil { - sys.TopicSetsCache = cache_memory.New[GenericSets[string, Topic]](1000) - } + sys.topicSetsCacheOnce.Do(func() { + if sys.TopicSetsCache == nil { + sys.TopicSetsCache = cache_memory.New[GenericSets[string, Topic]](1000) + } + }) ml, _ := fetchGenericSets(sys, ctx, pubkey, 30015, kind_30015, parseTopicString, sys.TopicSetsCache) return ml diff --git a/sdk/outbox.go b/sdk/outbox.go index 933db45..02f43ab 100644 --- a/sdk/outbox.go +++ b/sdk/outbox.go @@ -2,12 +2,13 @@ package sdk import ( "context" + "sync/atomic" "time" "fiatjaf.com/nostr" ) -var outboxShortTermCache = [256]ostcEntry{} +var outboxShortTermCache = [256]atomic.Pointer[ostcEntry]{} type ostcEntry struct { pubkey nostr.PubKey @@ -22,7 +23,9 @@ type ostcEntry struct { func (sys *System) FetchOutboxRelays(ctx context.Context, pubkey nostr.PubKey, n int) []string { ostcIndex := pubkey[7] now := time.Now() - if entry := outboxShortTermCache[ostcIndex]; entry.pubkey == pubkey && entry.when.Add(time.Minute*2).After(now) { + if entry := outboxShortTermCache[ostcIndex].Load(); entry != nil && + entry.pubkey == pubkey && + entry.when.Add(time.Minute*2).After(now) { return entry.relays } @@ -38,7 +41,7 @@ func (sys *System) FetchOutboxRelays(ctx context.Context, pubkey nostr.PubKey, n // we will have a reference to a thing that the caller to this function may change at will) relaysCopy := make([]string, len(relays)) copy(relaysCopy, relays) - outboxShortTermCache[ostcIndex] = ostcEntry{pubkey, relaysCopy, now} + outboxShortTermCache[ostcIndex].Store(&ostcEntry{pubkey, relaysCopy, now}) if len(relays) > n { relays = relays[0:n] diff --git a/sdk/set.go b/sdk/set.go index 4d4e09a..550e448 100644 --- a/sdk/set.go +++ b/sdk/set.go @@ -31,10 +31,9 @@ func fetchGenericSets[V comparable, I TagItemWithValue[V]]( lockIdx := (nostr.Kind(n) + actualKind) % 60 genericListMutexes[lockIdx].Lock() - if valueWasJustCached[lockIdx] { + if valueWasJustCached[lockIdx].CompareAndSwap(true, false) { // this ensures the cache has had time to commit the values // so we don't repeat a fetch immediately after the other - valueWasJustCached[lockIdx] = false time.Sleep(time.Millisecond * 10) } @@ -74,7 +73,7 @@ func fetchGenericSets[V comparable, I TagItemWithValue[V]]( // and finally save this to cache cache.SetWithTTL(pubkey, v, time.Hour*6) - valueWasJustCached[lockIdx] = true + valueWasJustCached[lockIdx].Store(true) return v, true } @@ -93,7 +92,7 @@ func fetchGenericSets[V comparable, I TagItemWithValue[V]]( // save cache even if we didn't get anything cache.SetWithTTL(pubkey, v, time.Hour*6) - valueWasJustCached[lockIdx] = true + valueWasJustCached[lockIdx].Store(true) return v, false } diff --git a/sdk/system.go b/sdk/system.go index 30e848c..f100f5f 100644 --- a/sdk/system.go +++ b/sdk/system.go @@ -1,7 +1,9 @@ package sdk import ( - "math/rand/v2" + "math/rand" + "sync" + "sync/atomic" "fiatjaf.com/nostr" "fiatjaf.com/nostr/eventstore" @@ -28,32 +30,47 @@ import ( // default they're set to in-memory stores, but ideally persisteable // implementations should be given (some alternatives are provided in subpackages). type System struct { - KVStore kvstore.KVStore - MetadataCache cache.Cache32[ProfileMetadata] - RelayListCache cache.Cache32[GenericList[string, Relay]] - FollowListCache cache.Cache32[GenericList[nostr.PubKey, ProfileRef]] - MuteListCache cache.Cache32[GenericList[nostr.PubKey, ProfileRef]] - BookmarkListCache cache.Cache32[GenericList[string, EventRef]] - PinListCache cache.Cache32[GenericList[string, EventRef]] - BlockedRelayListCache cache.Cache32[GenericList[string, RelayURL]] - SearchRelayListCache cache.Cache32[GenericList[string, RelayURL]] - TopicListCache cache.Cache32[GenericList[string, Topic]] - RelaySetsCache cache.Cache32[GenericSets[string, RelayURL]] - FollowSetsCache cache.Cache32[GenericSets[nostr.PubKey, ProfileRef]] - TopicSetsCache cache.Cache32[GenericSets[string, Topic]] - ZapProviderCache cache.Cache32[nostr.PubKey] - MintKeysCache cache.Cache32[map[uint64]*btcec.PublicKey] - NutZapInfoCache cache.Cache32[NutZapInfo] - Hints hints.HintsDB - Pool *nostr.Pool - RelayListRelays *RelayStream - FollowListRelays *RelayStream - MetadataRelays *RelayStream - FallbackRelays *RelayStream - JustIDRelays *RelayStream - UserSearchRelays *RelayStream - NoteSearchRelays *RelayStream - Store eventstore.Store + KVStore kvstore.KVStore + metadataCacheOnce sync.Once + MetadataCache cache.Cache32[ProfileMetadata] + relayListCacheOnce sync.Once + RelayListCache cache.Cache32[GenericList[string, Relay]] + followListCacheOnce sync.Once + FollowListCache cache.Cache32[GenericList[nostr.PubKey, ProfileRef]] + muteListCacheOnce sync.Once + MuteListCache cache.Cache32[GenericList[nostr.PubKey, ProfileRef]] + bookmarkListCacheOnce sync.Once + BookmarkListCache cache.Cache32[GenericList[string, EventRef]] + pinListCacheOnce sync.Once + PinListCache cache.Cache32[GenericList[string, EventRef]] + blockedRelayListCacheOnce sync.Once + BlockedRelayListCache cache.Cache32[GenericList[string, RelayURL]] + searchRelayListCacheOnce sync.Once + SearchRelayListCache cache.Cache32[GenericList[string, RelayURL]] + topicListCacheOnce sync.Once + TopicListCache cache.Cache32[GenericList[string, Topic]] + relaySetsCacheOnce sync.Once + RelaySetsCache cache.Cache32[GenericSets[string, RelayURL]] + followSetsCacheOnce sync.Once + FollowSetsCache cache.Cache32[GenericSets[nostr.PubKey, ProfileRef]] + topicSetsCacheOnce sync.Once + TopicSetsCache cache.Cache32[GenericSets[string, Topic]] + zapProviderCacheOnce sync.Once + ZapProviderCache cache.Cache32[nostr.PubKey] + mintKeysCacheOnce sync.Once + MintKeysCache cache.Cache32[map[uint64]*btcec.PublicKey] + nutZapInfoCacheOnce sync.Once + NutZapInfoCache cache.Cache32[NutZapInfo] + Hints hints.HintsDB + Pool *nostr.Pool + RelayListRelays *RelayStream + FollowListRelays *RelayStream + MetadataRelays *RelayStream + FallbackRelays *RelayStream + JustIDRelays *RelayStream + UserSearchRelays *RelayStream + NoteSearchRelays *RelayStream + Store eventstore.Store Publisher wrappers.StorePublisher @@ -69,18 +86,20 @@ type SystemModifier func(sys *System) // It's used to distribute requests across multiple relays. type RelayStream struct { URLs []string - serial int + serial atomic.Int32 } // NewRelayStream creates a new RelayStream with the provided URLs. func NewRelayStream(urls ...string) *RelayStream { - return &RelayStream{URLs: urls, serial: rand.Int()} + rs := &RelayStream{URLs: urls} + rs.serial.Add(rand.Int31n(int32(len(urls)))) + return rs } // Next returns the next URL in the rotation. func (rs *RelayStream) Next() string { - rs.serial++ - return rs.URLs[rs.serial%len(rs.URLs)] + v := rs.serial.Add(1) + return rs.URLs[int(v)%len(rs.URLs)] } // NewSystem creates a new System with default configuration, @@ -129,21 +148,31 @@ func NewSystem() *System { PenaltyBox: true, }) - if sys.MetadataCache == nil { - sys.MetadataCache = cache_memory.New[ProfileMetadata](8000) - } - if sys.RelayListCache == nil { - sys.RelayListCache = cache_memory.New[GenericList[string, Relay]](8000) - } - if sys.ZapProviderCache == nil { - sys.ZapProviderCache = cache_memory.New[nostr.PubKey](8000) - } - if sys.MintKeysCache == nil { - sys.MintKeysCache = cache_memory.New[map[uint64]*btcec.PublicKey](8000) - } - if sys.NutZapInfoCache == nil { - sys.NutZapInfoCache = cache_memory.New[NutZapInfo](8000) - } + sys.metadataCacheOnce.Do(func() { + if sys.MetadataCache == nil { + sys.MetadataCache = cache_memory.New[ProfileMetadata](8000) + } + }) + sys.relayListCacheOnce.Do(func() { + if sys.RelayListCache == nil { + sys.RelayListCache = cache_memory.New[GenericList[string, Relay]](8000) + } + }) + sys.zapProviderCacheOnce.Do(func() { + if sys.ZapProviderCache == nil { + sys.ZapProviderCache = cache_memory.New[nostr.PubKey](8000) + } + }) + sys.mintKeysCacheOnce.Do(func() { + if sys.MintKeysCache == nil { + sys.MintKeysCache = cache_memory.New[map[uint64]*btcec.PublicKey](8000) + } + }) + sys.nutZapInfoCacheOnce.Do(func() { + if sys.NutZapInfoCache == nil { + sys.NutZapInfoCache = cache_memory.New[NutZapInfo](8000) + } + }) if sys.Store == nil { sys.Store = &nullstore.NullStore{}