sdk: eliminate all the data races go was complaining about.

This commit is contained in:
fiatjaf
2026-01-27 23:25:34 -03:00
parent ca3730e508
commit d87066c9b9
9 changed files with 142 additions and 90 deletions

View File

@@ -31,14 +31,15 @@ func (db *HintDB) Save(pubkey nostr.PubKey, relay string, key hints.HintKey, ts
ts = now ts = now
} }
db.Lock()
defer db.Unlock()
relayIndex := slices.Index(db.RelayBySerial, relay) relayIndex := slices.Index(db.RelayBySerial, relay)
if relayIndex == -1 { if relayIndex == -1 {
relayIndex = len(db.RelayBySerial) relayIndex = len(db.RelayBySerial)
db.RelayBySerial = append(db.RelayBySerial, relay) db.RelayBySerial = append(db.RelayBySerial, relay)
} }
db.Lock()
defer db.Unlock()
// fmt.Println(" ", relay, "index", relayIndex, "--", "adding", hints.HintKey(key).String(), ts) // fmt.Println(" ", relay, "index", relayIndex, "--", "adding", hints.HintKey(key).String(), ts)
entries, _ := db.OrderedRelaysByPubKey[pubkey] entries, _ := db.OrderedRelaysByPubKey[pubkey]

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"slices" "slices"
"sync" "sync"
"sync/atomic"
"time" "time"
"fiatjaf.com/nostr" "fiatjaf.com/nostr"
@@ -23,7 +24,7 @@ type TagItemWithValue[V comparable] interface {
var ( var (
genericListMutexes = [60]sync.Mutex{} genericListMutexes = [60]sync.Mutex{}
valueWasJustCached = [60]bool{} valueWasJustCached = [60]atomic.Bool{}
) )
func fetchGenericList[V comparable, I TagItemWithValue[V]]( func fetchGenericList[V comparable, I TagItemWithValue[V]](
@@ -42,10 +43,9 @@ func fetchGenericList[V comparable, I TagItemWithValue[V]](
lockIdx := (nostr.Kind(n) + actualKind) % 60 lockIdx := (nostr.Kind(n) + actualKind) % 60
genericListMutexes[lockIdx].Lock() genericListMutexes[lockIdx].Lock()
if valueWasJustCached[lockIdx] { if valueWasJustCached[lockIdx].CompareAndSwap(true, false) {
// this ensures the cache has had time to commit the values // this ensures the cache has had time to commit the values
// so we don't repeat a fetch immediately after the other // so we don't repeat a fetch immediately after the other
valueWasJustCached[lockIdx] = false
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
} }
@@ -83,7 +83,7 @@ func fetchGenericList[V comparable, I TagItemWithValue[V]](
// and finally save this to cache // and finally save this to cache
cache.SetWithTTL(pubkey, v, time.Hour*6) cache.SetWithTTL(pubkey, v, time.Hour*6)
valueWasJustCached[lockIdx] = true valueWasJustCached[lockIdx].Store(true)
return v, true return v, true
} }
@@ -99,7 +99,7 @@ func fetchGenericList[V comparable, I TagItemWithValue[V]](
// save cache even if we didn't get anything // save cache even if we didn't get anything
cache.SetWithTTL(pubkey, v, time.Hour*6) cache.SetWithTTL(pubkey, v, time.Hour*6)
valueWasJustCached[lockIdx] = true valueWasJustCached[lockIdx].Store(true)
return v, false return v, false
} }

View File

@@ -12,18 +12,22 @@ type EventRef struct{ nostr.Pointer }
func (e EventRef) Value() string { return e.Pointer.AsTagReference() } func (e EventRef) Value() string { return e.Pointer.AsTagReference() }
func (sys *System) FetchBookmarkList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, EventRef] { func (sys *System) FetchBookmarkList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, EventRef] {
if sys.BookmarkListCache == nil { sys.bookmarkListCacheOnce.Do(func() {
sys.BookmarkListCache = cache_memory.New[GenericList[string, EventRef]](1000) if sys.BookmarkListCache == nil {
} sys.BookmarkListCache = cache_memory.New[GenericList[string, EventRef]](1000)
}
})
ml, _ := fetchGenericList(sys, ctx, pubkey, 10003, kind_10003, parseEventRef, sys.BookmarkListCache) ml, _ := fetchGenericList(sys, ctx, pubkey, 10003, kind_10003, parseEventRef, sys.BookmarkListCache)
return ml return ml
} }
func (sys *System) FetchPinList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, EventRef] { func (sys *System) FetchPinList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, EventRef] {
if sys.PinListCache == nil { sys.pinListCacheOnce.Do(func() {
sys.PinListCache = cache_memory.New[GenericList[string, EventRef]](1000) if sys.PinListCache == nil {
} sys.PinListCache = cache_memory.New[GenericList[string, EventRef]](1000)
}
})
ml, _ := fetchGenericList(sys, ctx, pubkey, 10001, kind_10001, parseEventRef, sys.PinListCache) ml, _ := fetchGenericList(sys, ctx, pubkey, 10001, kind_10001, parseEventRef, sys.PinListCache)
return ml return ml

View File

@@ -18,27 +18,33 @@ type ProfileRef struct {
func (f ProfileRef) Value() nostr.PubKey { return f.Pubkey } func (f ProfileRef) Value() nostr.PubKey { return f.Pubkey }
func (sys *System) FetchFollowList(ctx context.Context, pubkey nostr.PubKey) GenericList[nostr.PubKey, ProfileRef] { func (sys *System) FetchFollowList(ctx context.Context, pubkey nostr.PubKey) GenericList[nostr.PubKey, ProfileRef] {
if sys.FollowListCache == nil { sys.followListCacheOnce.Do(func() {
sys.FollowListCache = cache_memory.New[GenericList[nostr.PubKey, ProfileRef]](1000) if sys.FollowListCache == nil {
} sys.FollowListCache = cache_memory.New[GenericList[nostr.PubKey, ProfileRef]](1000)
}
})
fl, _ := fetchGenericList(sys, ctx, pubkey, 3, kind_3, parseProfileRef, sys.FollowListCache) fl, _ := fetchGenericList(sys, ctx, pubkey, 3, kind_3, parseProfileRef, sys.FollowListCache)
return fl return fl
} }
func (sys *System) FetchMuteList(ctx context.Context, pubkey nostr.PubKey) GenericList[nostr.PubKey, ProfileRef] { func (sys *System) FetchMuteList(ctx context.Context, pubkey nostr.PubKey) GenericList[nostr.PubKey, ProfileRef] {
if sys.MuteListCache == nil { sys.muteListCacheOnce.Do(func() {
sys.MuteListCache = cache_memory.New[GenericList[nostr.PubKey, ProfileRef]](1000) if sys.MuteListCache == nil {
} sys.MuteListCache = cache_memory.New[GenericList[nostr.PubKey, ProfileRef]](1000)
}
})
ml, _ := fetchGenericList(sys, ctx, pubkey, 10000, kind_10000, parseProfileRef, sys.MuteListCache) ml, _ := fetchGenericList(sys, ctx, pubkey, 10000, kind_10000, parseProfileRef, sys.MuteListCache)
return ml return ml
} }
func (sys *System) FetchFollowSets(ctx context.Context, pubkey nostr.PubKey) GenericSets[nostr.PubKey, ProfileRef] { func (sys *System) FetchFollowSets(ctx context.Context, pubkey nostr.PubKey) GenericSets[nostr.PubKey, ProfileRef] {
if sys.FollowSetsCache == nil { sys.followSetsCacheOnce.Do(func() {
sys.FollowSetsCache = cache_memory.New[GenericSets[nostr.PubKey, ProfileRef]](1000) if sys.FollowSetsCache == nil {
} sys.FollowSetsCache = cache_memory.New[GenericSets[nostr.PubKey, ProfileRef]](1000)
}
})
ml, _ := fetchGenericSets(sys, ctx, pubkey, 30000, kind_30000, parseProfileRef, sys.FollowSetsCache) ml, _ := fetchGenericSets(sys, ctx, pubkey, 30000, kind_30000, parseProfileRef, sys.FollowSetsCache)
return ml return ml

View File

@@ -25,27 +25,33 @@ func (sys *System) FetchRelayList(ctx context.Context, pubkey nostr.PubKey) Gene
} }
func (sys *System) FetchBlockedRelayList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, RelayURL] { func (sys *System) FetchBlockedRelayList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, RelayURL] {
if sys.BlockedRelayListCache == nil { sys.blockedRelayListCacheOnce.Do(func() {
sys.BlockedRelayListCache = cache_memory.New[GenericList[string, RelayURL]](1000) if sys.BlockedRelayListCache == nil {
} sys.BlockedRelayListCache = cache_memory.New[GenericList[string, RelayURL]](1000)
}
})
ml, _ := fetchGenericList(sys, ctx, pubkey, 10006, kind_10006, parseRelayURL, sys.BlockedRelayListCache) ml, _ := fetchGenericList(sys, ctx, pubkey, 10006, kind_10006, parseRelayURL, sys.BlockedRelayListCache)
return ml return ml
} }
func (sys *System) FetchSearchRelayList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, RelayURL] { func (sys *System) FetchSearchRelayList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, RelayURL] {
if sys.SearchRelayListCache == nil { sys.searchRelayListCacheOnce.Do(func() {
sys.SearchRelayListCache = cache_memory.New[GenericList[string, RelayURL]](1000) if sys.SearchRelayListCache == nil {
} sys.SearchRelayListCache = cache_memory.New[GenericList[string, RelayURL]](1000)
}
})
ml, _ := fetchGenericList(sys, ctx, pubkey, 10007, kind_10007, parseRelayURL, sys.SearchRelayListCache) ml, _ := fetchGenericList(sys, ctx, pubkey, 10007, kind_10007, parseRelayURL, sys.SearchRelayListCache)
return ml return ml
} }
func (sys *System) FetchRelaySets(ctx context.Context, pubkey nostr.PubKey) GenericSets[string, RelayURL] { func (sys *System) FetchRelaySets(ctx context.Context, pubkey nostr.PubKey) GenericSets[string, RelayURL] {
if sys.RelaySetsCache == nil { sys.relaySetsCacheOnce.Do(func() {
sys.RelaySetsCache = cache_memory.New[GenericSets[string, RelayURL]](1000) if sys.RelaySetsCache == nil {
} sys.RelaySetsCache = cache_memory.New[GenericSets[string, RelayURL]](1000)
}
})
ml, _ := fetchGenericSets(sys, ctx, pubkey, 30002, kind_30002, parseRelayURL, sys.RelaySetsCache) ml, _ := fetchGenericSets(sys, ctx, pubkey, 30002, kind_30002, parseRelayURL, sys.RelaySetsCache)
return ml return ml

View File

@@ -12,18 +12,22 @@ type Topic string
func (r Topic) Value() string { return string(r) } func (r Topic) Value() string { return string(r) }
func (sys *System) FetchTopicList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, Topic] { func (sys *System) FetchTopicList(ctx context.Context, pubkey nostr.PubKey) GenericList[string, Topic] {
if sys.TopicListCache == nil { sys.topicListCacheOnce.Do(func() {
sys.TopicListCache = cache_memory.New[GenericList[string, Topic]](1000) if sys.TopicListCache == nil {
} sys.TopicListCache = cache_memory.New[GenericList[string, Topic]](1000)
}
})
ml, _ := fetchGenericList(sys, ctx, pubkey, 10015, kind_10015, parseTopicString, sys.TopicListCache) ml, _ := fetchGenericList(sys, ctx, pubkey, 10015, kind_10015, parseTopicString, sys.TopicListCache)
return ml return ml
} }
func (sys *System) FetchTopicSets(ctx context.Context, pubkey nostr.PubKey) GenericSets[string, Topic] { func (sys *System) FetchTopicSets(ctx context.Context, pubkey nostr.PubKey) GenericSets[string, Topic] {
if sys.TopicSetsCache == nil { sys.topicSetsCacheOnce.Do(func() {
sys.TopicSetsCache = cache_memory.New[GenericSets[string, Topic]](1000) if sys.TopicSetsCache == nil {
} sys.TopicSetsCache = cache_memory.New[GenericSets[string, Topic]](1000)
}
})
ml, _ := fetchGenericSets(sys, ctx, pubkey, 30015, kind_30015, parseTopicString, sys.TopicSetsCache) ml, _ := fetchGenericSets(sys, ctx, pubkey, 30015, kind_30015, parseTopicString, sys.TopicSetsCache)
return ml return ml

View File

@@ -2,12 +2,13 @@ package sdk
import ( import (
"context" "context"
"sync/atomic"
"time" "time"
"fiatjaf.com/nostr" "fiatjaf.com/nostr"
) )
var outboxShortTermCache = [256]ostcEntry{} var outboxShortTermCache = [256]atomic.Pointer[ostcEntry]{}
type ostcEntry struct { type ostcEntry struct {
pubkey nostr.PubKey pubkey nostr.PubKey
@@ -22,7 +23,9 @@ type ostcEntry struct {
func (sys *System) FetchOutboxRelays(ctx context.Context, pubkey nostr.PubKey, n int) []string { func (sys *System) FetchOutboxRelays(ctx context.Context, pubkey nostr.PubKey, n int) []string {
ostcIndex := pubkey[7] ostcIndex := pubkey[7]
now := time.Now() now := time.Now()
if entry := outboxShortTermCache[ostcIndex]; entry.pubkey == pubkey && entry.when.Add(time.Minute*2).After(now) { if entry := outboxShortTermCache[ostcIndex].Load(); entry != nil &&
entry.pubkey == pubkey &&
entry.when.Add(time.Minute*2).After(now) {
return entry.relays return entry.relays
} }
@@ -38,7 +41,7 @@ func (sys *System) FetchOutboxRelays(ctx context.Context, pubkey nostr.PubKey, n
// we will have a reference to a thing that the caller to this function may change at will) // we will have a reference to a thing that the caller to this function may change at will)
relaysCopy := make([]string, len(relays)) relaysCopy := make([]string, len(relays))
copy(relaysCopy, relays) copy(relaysCopy, relays)
outboxShortTermCache[ostcIndex] = ostcEntry{pubkey, relaysCopy, now} outboxShortTermCache[ostcIndex].Store(&ostcEntry{pubkey, relaysCopy, now})
if len(relays) > n { if len(relays) > n {
relays = relays[0:n] relays = relays[0:n]

View File

@@ -31,10 +31,9 @@ func fetchGenericSets[V comparable, I TagItemWithValue[V]](
lockIdx := (nostr.Kind(n) + actualKind) % 60 lockIdx := (nostr.Kind(n) + actualKind) % 60
genericListMutexes[lockIdx].Lock() genericListMutexes[lockIdx].Lock()
if valueWasJustCached[lockIdx] { if valueWasJustCached[lockIdx].CompareAndSwap(true, false) {
// this ensures the cache has had time to commit the values // this ensures the cache has had time to commit the values
// so we don't repeat a fetch immediately after the other // so we don't repeat a fetch immediately after the other
valueWasJustCached[lockIdx] = false
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
} }
@@ -74,7 +73,7 @@ func fetchGenericSets[V comparable, I TagItemWithValue[V]](
// and finally save this to cache // and finally save this to cache
cache.SetWithTTL(pubkey, v, time.Hour*6) cache.SetWithTTL(pubkey, v, time.Hour*6)
valueWasJustCached[lockIdx] = true valueWasJustCached[lockIdx].Store(true)
return v, true return v, true
} }
@@ -93,7 +92,7 @@ func fetchGenericSets[V comparable, I TagItemWithValue[V]](
// save cache even if we didn't get anything // save cache even if we didn't get anything
cache.SetWithTTL(pubkey, v, time.Hour*6) cache.SetWithTTL(pubkey, v, time.Hour*6)
valueWasJustCached[lockIdx] = true valueWasJustCached[lockIdx].Store(true)
return v, false return v, false
} }

View File

@@ -1,7 +1,9 @@
package sdk package sdk
import ( import (
"math/rand/v2" "math/rand"
"sync"
"sync/atomic"
"fiatjaf.com/nostr" "fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore" "fiatjaf.com/nostr/eventstore"
@@ -28,32 +30,47 @@ import (
// default they're set to in-memory stores, but ideally persisteable // default they're set to in-memory stores, but ideally persisteable
// implementations should be given (some alternatives are provided in subpackages). // implementations should be given (some alternatives are provided in subpackages).
type System struct { type System struct {
KVStore kvstore.KVStore KVStore kvstore.KVStore
MetadataCache cache.Cache32[ProfileMetadata] metadataCacheOnce sync.Once
RelayListCache cache.Cache32[GenericList[string, Relay]] MetadataCache cache.Cache32[ProfileMetadata]
FollowListCache cache.Cache32[GenericList[nostr.PubKey, ProfileRef]] relayListCacheOnce sync.Once
MuteListCache cache.Cache32[GenericList[nostr.PubKey, ProfileRef]] RelayListCache cache.Cache32[GenericList[string, Relay]]
BookmarkListCache cache.Cache32[GenericList[string, EventRef]] followListCacheOnce sync.Once
PinListCache cache.Cache32[GenericList[string, EventRef]] FollowListCache cache.Cache32[GenericList[nostr.PubKey, ProfileRef]]
BlockedRelayListCache cache.Cache32[GenericList[string, RelayURL]] muteListCacheOnce sync.Once
SearchRelayListCache cache.Cache32[GenericList[string, RelayURL]] MuteListCache cache.Cache32[GenericList[nostr.PubKey, ProfileRef]]
TopicListCache cache.Cache32[GenericList[string, Topic]] bookmarkListCacheOnce sync.Once
RelaySetsCache cache.Cache32[GenericSets[string, RelayURL]] BookmarkListCache cache.Cache32[GenericList[string, EventRef]]
FollowSetsCache cache.Cache32[GenericSets[nostr.PubKey, ProfileRef]] pinListCacheOnce sync.Once
TopicSetsCache cache.Cache32[GenericSets[string, Topic]] PinListCache cache.Cache32[GenericList[string, EventRef]]
ZapProviderCache cache.Cache32[nostr.PubKey] blockedRelayListCacheOnce sync.Once
MintKeysCache cache.Cache32[map[uint64]*btcec.PublicKey] BlockedRelayListCache cache.Cache32[GenericList[string, RelayURL]]
NutZapInfoCache cache.Cache32[NutZapInfo] searchRelayListCacheOnce sync.Once
Hints hints.HintsDB SearchRelayListCache cache.Cache32[GenericList[string, RelayURL]]
Pool *nostr.Pool topicListCacheOnce sync.Once
RelayListRelays *RelayStream TopicListCache cache.Cache32[GenericList[string, Topic]]
FollowListRelays *RelayStream relaySetsCacheOnce sync.Once
MetadataRelays *RelayStream RelaySetsCache cache.Cache32[GenericSets[string, RelayURL]]
FallbackRelays *RelayStream followSetsCacheOnce sync.Once
JustIDRelays *RelayStream FollowSetsCache cache.Cache32[GenericSets[nostr.PubKey, ProfileRef]]
UserSearchRelays *RelayStream topicSetsCacheOnce sync.Once
NoteSearchRelays *RelayStream TopicSetsCache cache.Cache32[GenericSets[string, Topic]]
Store eventstore.Store zapProviderCacheOnce sync.Once
ZapProviderCache cache.Cache32[nostr.PubKey]
mintKeysCacheOnce sync.Once
MintKeysCache cache.Cache32[map[uint64]*btcec.PublicKey]
nutZapInfoCacheOnce sync.Once
NutZapInfoCache cache.Cache32[NutZapInfo]
Hints hints.HintsDB
Pool *nostr.Pool
RelayListRelays *RelayStream
FollowListRelays *RelayStream
MetadataRelays *RelayStream
FallbackRelays *RelayStream
JustIDRelays *RelayStream
UserSearchRelays *RelayStream
NoteSearchRelays *RelayStream
Store eventstore.Store
Publisher wrappers.StorePublisher Publisher wrappers.StorePublisher
@@ -69,18 +86,20 @@ type SystemModifier func(sys *System)
// It's used to distribute requests across multiple relays. // It's used to distribute requests across multiple relays.
type RelayStream struct { type RelayStream struct {
URLs []string URLs []string
serial int serial atomic.Int32
} }
// NewRelayStream creates a new RelayStream with the provided URLs. // NewRelayStream creates a new RelayStream with the provided URLs.
func NewRelayStream(urls ...string) *RelayStream { func NewRelayStream(urls ...string) *RelayStream {
return &RelayStream{URLs: urls, serial: rand.Int()} rs := &RelayStream{URLs: urls}
rs.serial.Add(rand.Int31n(int32(len(urls))))
return rs
} }
// Next returns the next URL in the rotation. // Next returns the next URL in the rotation.
func (rs *RelayStream) Next() string { func (rs *RelayStream) Next() string {
rs.serial++ v := rs.serial.Add(1)
return rs.URLs[rs.serial%len(rs.URLs)] return rs.URLs[int(v)%len(rs.URLs)]
} }
// NewSystem creates a new System with default configuration, // NewSystem creates a new System with default configuration,
@@ -129,21 +148,31 @@ func NewSystem() *System {
PenaltyBox: true, PenaltyBox: true,
}) })
if sys.MetadataCache == nil { sys.metadataCacheOnce.Do(func() {
sys.MetadataCache = cache_memory.New[ProfileMetadata](8000) if sys.MetadataCache == nil {
} sys.MetadataCache = cache_memory.New[ProfileMetadata](8000)
if sys.RelayListCache == nil { }
sys.RelayListCache = cache_memory.New[GenericList[string, Relay]](8000) })
} sys.relayListCacheOnce.Do(func() {
if sys.ZapProviderCache == nil { if sys.RelayListCache == nil {
sys.ZapProviderCache = cache_memory.New[nostr.PubKey](8000) sys.RelayListCache = cache_memory.New[GenericList[string, Relay]](8000)
} }
if sys.MintKeysCache == nil { })
sys.MintKeysCache = cache_memory.New[map[uint64]*btcec.PublicKey](8000) sys.zapProviderCacheOnce.Do(func() {
} if sys.ZapProviderCache == nil {
if sys.NutZapInfoCache == nil { sys.ZapProviderCache = cache_memory.New[nostr.PubKey](8000)
sys.NutZapInfoCache = cache_memory.New[NutZapInfo](8000) }
} })
sys.mintKeysCacheOnce.Do(func() {
if sys.MintKeysCache == nil {
sys.MintKeysCache = cache_memory.New[map[uint64]*btcec.PublicKey](8000)
}
})
sys.nutZapInfoCacheOnce.Do(func() {
if sys.NutZapInfoCache == nil {
sys.NutZapInfoCache = cache_memory.New[NutZapInfo](8000)
}
})
if sys.Store == nil { if sys.Store == nil {
sys.Store = &nullstore.NullStore{} sys.Store = &nullstore.NullStore{}