WithCheckDuplicate(), let's see if this works.
This commit is contained in:
14
pool.go
14
pool.go
@@ -287,15 +287,14 @@ func (pool *SimplePool) SubMany(
|
|||||||
hasAuthed = false
|
hasAuthed = false
|
||||||
|
|
||||||
subscribe:
|
subscribe:
|
||||||
sub = relay.PrepareSubscription(ctx, filters, opts...)
|
sub, err = relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(relay, id string) bool {
|
||||||
sub.CheckDuplicate = func(id, relay string) bool {
|
|
||||||
_, exists := seenAlready.Load(id)
|
_, exists := seenAlready.Load(id)
|
||||||
if exists && pool.duplicateMiddleware != nil {
|
if exists && pool.duplicateMiddleware != nil {
|
||||||
pool.duplicateMiddleware(relay, id)
|
pool.duplicateMiddleware(relay, id)
|
||||||
}
|
}
|
||||||
return exists
|
return exists
|
||||||
}
|
}))...)
|
||||||
if err := sub.Fire(); err != nil {
|
if err != nil {
|
||||||
goto reconnect
|
goto reconnect
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -418,15 +417,14 @@ func (pool *SimplePool) SubManyEose(
|
|||||||
hasAuthed := false
|
hasAuthed := false
|
||||||
|
|
||||||
subscribe:
|
subscribe:
|
||||||
sub := relay.PrepareSubscription(ctx, filters, opts...)
|
sub, err := relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(relay, id string) bool {
|
||||||
sub.CheckDuplicate = func(id, relay string) bool {
|
|
||||||
_, exists := seenAlready.Load(id)
|
_, exists := seenAlready.Load(id)
|
||||||
if exists && pool.duplicateMiddleware != nil {
|
if exists && pool.duplicateMiddleware != nil {
|
||||||
pool.duplicateMiddleware(relay, id)
|
pool.duplicateMiddleware(relay, id)
|
||||||
}
|
}
|
||||||
return exists
|
return exists
|
||||||
}
|
}))...)
|
||||||
if err := sub.Fire(); err != nil {
|
if err != nil {
|
||||||
debugLogf("error subscribing to %s with %v: %s", relay, filters, err)
|
debugLogf("error subscribing to %s with %v: %s", relay, filters, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
6
relay.go
6
relay.go
@@ -225,8 +225,8 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
|
|||||||
// as we skip handling duplicate events
|
// as we skip handling duplicate events
|
||||||
subid := extractSubID(message)
|
subid := extractSubID(message)
|
||||||
subscription, ok := r.Subscriptions.Load(subIdToSerial(subid))
|
subscription, ok := r.Subscriptions.Load(subIdToSerial(subid))
|
||||||
if ok && subscription.CheckDuplicate != nil {
|
if ok && subscription.checkDuplicate != nil {
|
||||||
if !subscription.CheckDuplicate(extractEventID(message[10+len(subid):]), r.URL) {
|
if !subscription.checkDuplicate(extractEventID(message[10+len(subid):]), r.URL) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -426,6 +426,8 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts .
|
|||||||
switch o := opt.(type) {
|
switch o := opt.(type) {
|
||||||
case WithLabel:
|
case WithLabel:
|
||||||
label = string(o)
|
label = string(o)
|
||||||
|
case WithCheckDuplicate:
|
||||||
|
sub.checkDuplicate = o
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
"github.com/nbd-wtf/go-nostr/nip19"
|
"github.com/nbd-wtf/go-nostr/nip19"
|
||||||
@@ -125,30 +124,8 @@ attempts:
|
|||||||
// actually fetch the event here
|
// actually fetch the event here
|
||||||
countdown := 6.0
|
countdown := 6.0
|
||||||
subManyCtx := ctx
|
subManyCtx := ctx
|
||||||
subMany := sys.Pool.SubManyEose
|
|
||||||
if attempt.slowWithRelays {
|
|
||||||
subMany = sys.Pool.SubManyEoseNonUnique
|
|
||||||
}
|
|
||||||
|
|
||||||
if attempt.slowWithRelays {
|
for ie := range sys.Pool.SubManyEose(
|
||||||
// keep track of where we have actually found the event so we can show that
|
|
||||||
var cancel context.CancelFunc
|
|
||||||
subManyCtx, cancel = context.WithTimeout(ctx, time.Second*6)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
if countdown <= 0 {
|
|
||||||
cancel()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
countdown -= 0.1
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
for ie := range subMany(
|
|
||||||
subManyCtx,
|
subManyCtx,
|
||||||
attempt.relays,
|
attempt.relays,
|
||||||
nostr.Filters{filter},
|
nostr.Filters{filter},
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ type Subscription struct {
|
|||||||
|
|
||||||
// if it is not nil, CheckDuplicate will be called for every event received
|
// if it is not nil, CheckDuplicate will be called for every event received
|
||||||
// if it returns true that event will not be processed further.
|
// if it returns true that event will not be processed further.
|
||||||
CheckDuplicate func(id string, relay string) bool
|
checkDuplicate func(id string, relay string) bool
|
||||||
|
|
||||||
match func(*Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints
|
match func(*Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints
|
||||||
live atomic.Bool
|
live atomic.Bool
|
||||||
@@ -62,7 +62,15 @@ type WithLabel string
|
|||||||
|
|
||||||
func (_ WithLabel) IsSubscriptionOption() {}
|
func (_ WithLabel) IsSubscriptionOption() {}
|
||||||
|
|
||||||
var _ SubscriptionOption = (WithLabel)("")
|
// WithCheckDuplicate sets checkDuplicate on the subscription
|
||||||
|
type WithCheckDuplicate func(relay, id string) bool
|
||||||
|
|
||||||
|
func (_ WithCheckDuplicate) IsSubscriptionOption() {}
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ SubscriptionOption = (WithLabel)("")
|
||||||
|
_ SubscriptionOption = (WithCheckDuplicate)(nil)
|
||||||
|
)
|
||||||
|
|
||||||
func (sub *Subscription) start() {
|
func (sub *Subscription) start() {
|
||||||
<-sub.Context.Done()
|
<-sub.Context.Done()
|
||||||
|
|||||||
Reference in New Issue
Block a user