a bunch of [32]byte conversions. still more needed.
This commit is contained in:
103
pool.go
103
pool.go
@@ -12,7 +12,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr/nip45/hyperloglog"
|
||||
"fiatjaf.com/nostr/nip45/hyperloglog"
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
)
|
||||
|
||||
@@ -29,8 +29,8 @@ type SimplePool struct {
|
||||
cancel context.CancelCauseFunc
|
||||
|
||||
eventMiddleware func(RelayEvent)
|
||||
duplicateMiddleware func(relay string, id string)
|
||||
queryMiddleware func(relay string, pubkey string, kind int)
|
||||
duplicateMiddleware func(relay string, id ID)
|
||||
queryMiddleware func(relay string, pubkey PubKey, kind uint16)
|
||||
|
||||
// custom things not often used
|
||||
penaltyBoxMu sync.Mutex
|
||||
@@ -139,7 +139,7 @@ func (h WithEventMiddleware) ApplyPoolOption(pool *SimplePool) {
|
||||
}
|
||||
|
||||
// WithDuplicateMiddleware is a function that will be called with all duplicate ids received.
|
||||
type WithDuplicateMiddleware func(relay string, id string)
|
||||
type WithDuplicateMiddleware func(relay string, id ID)
|
||||
|
||||
func (h WithDuplicateMiddleware) ApplyPoolOption(pool *SimplePool) {
|
||||
pool.duplicateMiddleware = h
|
||||
@@ -147,7 +147,7 @@ func (h WithDuplicateMiddleware) ApplyPoolOption(pool *SimplePool) {
|
||||
|
||||
// WithAuthorKindQueryMiddleware is a function that will be called with every combination of relay+pubkey+kind queried
|
||||
// in a .SubMany*() call -- when applicable (i.e. when the query contains a pubkey and a kind).
|
||||
type WithAuthorKindQueryMiddleware func(relay string, pubkey string, kind int)
|
||||
type WithAuthorKindQueryMiddleware func(relay string, pubkey PubKey, kind uint16)
|
||||
|
||||
func (h WithAuthorKindQueryMiddleware) ApplyPoolOption(pool *SimplePool) {
|
||||
pool.queryMiddleware = h
|
||||
@@ -271,7 +271,7 @@ func (pool *SimplePool) SubscribeMany(
|
||||
filter Filter,
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
return pool.subMany(ctx, urls, Filters{filter}, nil, opts...)
|
||||
return pool.subMany(ctx, urls, filter, nil, opts...)
|
||||
}
|
||||
|
||||
// FetchMany opens a subscription, much like SubscribeMany, but it ends as soon as all Relays
|
||||
@@ -282,17 +282,7 @@ func (pool *SimplePool) FetchMany(
|
||||
filter Filter,
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
return pool.SubManyEose(ctx, urls, Filters{filter}, opts...)
|
||||
}
|
||||
|
||||
// Deprecated: SubMany is deprecated: use SubscribeMany instead.
|
||||
func (pool *SimplePool) SubMany(
|
||||
ctx context.Context,
|
||||
urls []string,
|
||||
filters Filters,
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
return pool.subMany(ctx, urls, filters, nil, opts...)
|
||||
return pool.SubManyEose(ctx, urls, filter, opts...)
|
||||
}
|
||||
|
||||
// SubscribeManyNotifyEOSE is like SubscribeMany, but takes a channel that is closed when
|
||||
@@ -304,11 +294,11 @@ func (pool *SimplePool) SubscribeManyNotifyEOSE(
|
||||
eoseChan chan struct{},
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
return pool.subMany(ctx, urls, Filters{filter}, eoseChan, opts...)
|
||||
return pool.subMany(ctx, urls, filter, eoseChan, opts...)
|
||||
}
|
||||
|
||||
type ReplaceableKey struct {
|
||||
PubKey string
|
||||
PubKey PubKey
|
||||
D string
|
||||
}
|
||||
|
||||
@@ -363,7 +353,7 @@ func (pool *SimplePool) FetchManyReplaceable(
|
||||
hasAuthed := false
|
||||
|
||||
subscribe:
|
||||
sub, err := relay.Subscribe(ctx, Filters{filter}, opts...)
|
||||
sub, err := relay.Subscribe(ctx, filter, opts...)
|
||||
if err != nil {
|
||||
debugLogf("error subscribing to %s with %v: %s", relay, filter, err)
|
||||
return
|
||||
@@ -414,14 +404,14 @@ func (pool *SimplePool) FetchManyReplaceable(
|
||||
func (pool *SimplePool) subMany(
|
||||
ctx context.Context,
|
||||
urls []string,
|
||||
filters Filters,
|
||||
filter Filter,
|
||||
eoseChan chan struct{},
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
ctx, cancel := context.WithCancelCause(ctx)
|
||||
_ = cancel // do this so `go vet` will stop complaining
|
||||
events := make(chan RelayEvent)
|
||||
seenAlready := xsync.NewMapOf[string, Timestamp]()
|
||||
seenAlready := xsync.NewMapOf[ID, Timestamp]()
|
||||
ticker := time.NewTicker(seenAlreadyDropTick)
|
||||
|
||||
eoseWg := sync.WaitGroup{}
|
||||
@@ -471,12 +461,10 @@ func (pool *SimplePool) subMany(
|
||||
var sub *Subscription
|
||||
|
||||
if mh := pool.queryMiddleware; mh != nil {
|
||||
for _, filter := range filters {
|
||||
if filter.Kinds != nil && filter.Authors != nil {
|
||||
for _, kind := range filter.Kinds {
|
||||
for _, author := range filter.Authors {
|
||||
mh(nm, author, kind)
|
||||
}
|
||||
if filter.Kinds != nil && filter.Authors != nil {
|
||||
for _, kind := range filter.Kinds {
|
||||
for _, author := range filter.Authors {
|
||||
mh(nm, author, kind)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -497,13 +485,15 @@ func (pool *SimplePool) subMany(
|
||||
hasAuthed = false
|
||||
|
||||
subscribe:
|
||||
sub, err = relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(id, relay string) bool {
|
||||
_, exists := seenAlready.Load(id)
|
||||
if exists && pool.duplicateMiddleware != nil {
|
||||
pool.duplicateMiddleware(relay, id)
|
||||
}
|
||||
return exists
|
||||
}))...)
|
||||
sub, err = relay.Subscribe(ctx, filter, append(opts,
|
||||
WithCheckDuplicate(func(id ID, relay string) bool {
|
||||
_, exists := seenAlready.Load(id)
|
||||
if exists && pool.duplicateMiddleware != nil {
|
||||
pool.duplicateMiddleware(relay, id)
|
||||
}
|
||||
return exists
|
||||
}),
|
||||
)...)
|
||||
if err != nil {
|
||||
debugLogf("%s reconnecting because subscription died\n", nm)
|
||||
goto reconnect
|
||||
@@ -529,9 +519,7 @@ func (pool *SimplePool) subMany(
|
||||
// so we will update the filters here to include only events seem from now on
|
||||
// and try to reconnect until we succeed
|
||||
now := Now()
|
||||
for i := range filters {
|
||||
filters[i].Since = &now
|
||||
}
|
||||
filter.Since = &now
|
||||
debugLogf("%s reconnecting because sub.Events is broken\n", nm)
|
||||
goto reconnect
|
||||
}
|
||||
@@ -591,25 +579,26 @@ func (pool *SimplePool) subMany(
|
||||
func (pool *SimplePool) SubManyEose(
|
||||
ctx context.Context,
|
||||
urls []string,
|
||||
filters Filters,
|
||||
filter Filter,
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
seenAlready := xsync.NewMapOf[string, struct{}]()
|
||||
return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filters,
|
||||
WithCheckDuplicate(func(id, relay string) bool {
|
||||
seenAlready := xsync.NewMapOf[ID, struct{}]()
|
||||
return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filter,
|
||||
WithCheckDuplicate(func(id ID, relay string) bool {
|
||||
_, exists := seenAlready.LoadOrStore(id, struct{}{})
|
||||
if exists && pool.duplicateMiddleware != nil {
|
||||
pool.duplicateMiddleware(relay, id)
|
||||
}
|
||||
return exists
|
||||
}),
|
||||
opts...)
|
||||
opts...,
|
||||
)
|
||||
}
|
||||
|
||||
func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate(
|
||||
ctx context.Context,
|
||||
urls []string,
|
||||
filters Filters,
|
||||
filter Filter,
|
||||
wcd WithCheckDuplicate,
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
@@ -633,12 +622,10 @@ func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate(
|
||||
defer wg.Done()
|
||||
|
||||
if mh := pool.queryMiddleware; mh != nil {
|
||||
for _, filter := range filters {
|
||||
if filter.Kinds != nil && filter.Authors != nil {
|
||||
for _, kind := range filter.Kinds {
|
||||
for _, author := range filter.Authors {
|
||||
mh(nm, author, kind)
|
||||
}
|
||||
if filter.Kinds != nil && filter.Authors != nil {
|
||||
for _, kind := range filter.Kinds {
|
||||
for _, author := range filter.Authors {
|
||||
mh(nm, author, kind)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -646,16 +633,16 @@ func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate(
|
||||
|
||||
relay, err := pool.EnsureRelay(nm)
|
||||
if err != nil {
|
||||
debugLogf("error connecting to %s with %v: %s", nm, filters, err)
|
||||
debugLogf("error connecting to %s with %v: %s", nm, filter, err)
|
||||
return
|
||||
}
|
||||
|
||||
hasAuthed := false
|
||||
|
||||
subscribe:
|
||||
sub, err := relay.Subscribe(ctx, filters, opts...)
|
||||
sub, err := relay.Subscribe(ctx, filter, opts...)
|
||||
if err != nil {
|
||||
debugLogf("error subscribing to %s with %v: %s", relay, filters, err)
|
||||
debugLogf("error subscribing to %s with %v: %s", relay, filter, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -719,7 +706,7 @@ func (pool *SimplePool) CountMany(
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ce, err := relay.countInternal(ctx, Filters{filter}, opts...)
|
||||
ce, err := relay.countInternal(ctx, filter, opts...)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -742,7 +729,7 @@ func (pool *SimplePool) QuerySingle(
|
||||
opts ...SubscriptionOption,
|
||||
) *RelayEvent {
|
||||
ctx, cancel := context.WithCancelCause(ctx)
|
||||
for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}, opts...) {
|
||||
for ievt := range pool.SubManyEose(ctx, urls, filter, opts...) {
|
||||
cancel(errors.New("got the first event and ended successfully"))
|
||||
return &ievt
|
||||
}
|
||||
@@ -759,14 +746,14 @@ func (pool *SimplePool) BatchedSubManyEose(
|
||||
res := make(chan RelayEvent)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(dfs))
|
||||
seenAlready := xsync.NewMapOf[string, struct{}]()
|
||||
seenAlready := xsync.NewMapOf[ID, struct{}]()
|
||||
|
||||
for _, df := range dfs {
|
||||
go func(df DirectedFilter) {
|
||||
for ie := range pool.subManyEoseNonOverwriteCheckDuplicate(ctx,
|
||||
[]string{df.Relay},
|
||||
Filters{df.Filter},
|
||||
WithCheckDuplicate(func(id, relay string) bool {
|
||||
df.Filter,
|
||||
WithCheckDuplicate(func(id ID, relay string) bool {
|
||||
_, exists := seenAlready.LoadOrStore(id, struct{}{})
|
||||
if exists && pool.duplicateMiddleware != nil {
|
||||
pool.duplicateMiddleware(relay, id)
|
||||
|
||||
Reference in New Issue
Block a user