allow custom CheckDuplicate in pool subscriptions.
This commit is contained in:
6
pool.go
6
pool.go
@@ -246,6 +246,7 @@ func (pool *Pool) FetchMany(
|
|||||||
) chan RelayEvent {
|
) chan RelayEvent {
|
||||||
seenAlready := xsync.NewMapOf[ID, struct{}]()
|
seenAlready := xsync.NewMapOf[ID, struct{}]()
|
||||||
|
|
||||||
|
if opts.CheckDuplicate == nil {
|
||||||
opts.CheckDuplicate = func(id ID, relay string) bool {
|
opts.CheckDuplicate = func(id ID, relay string) bool {
|
||||||
_, exists := seenAlready.LoadOrStore(id, struct{}{})
|
_, exists := seenAlready.LoadOrStore(id, struct{}{})
|
||||||
if exists && pool.duplicateMiddleware != nil {
|
if exists && pool.duplicateMiddleware != nil {
|
||||||
@@ -253,6 +254,7 @@ func (pool *Pool) FetchMany(
|
|||||||
}
|
}
|
||||||
return exists
|
return exists
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filter, opts)
|
return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filter, opts)
|
||||||
}
|
}
|
||||||
@@ -393,6 +395,7 @@ func (pool *Pool) subMany(
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if opts.CheckDuplicate == nil {
|
||||||
opts.CheckDuplicate = func(id ID, relay string) bool {
|
opts.CheckDuplicate = func(id ID, relay string) bool {
|
||||||
_, exists := seenAlready.Load(id)
|
_, exists := seenAlready.Load(id)
|
||||||
if exists && pool.duplicateMiddleware != nil {
|
if exists && pool.duplicateMiddleware != nil {
|
||||||
@@ -400,6 +403,7 @@ func (pool *Pool) subMany(
|
|||||||
}
|
}
|
||||||
return exists
|
return exists
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pending := xsync.NewCounter()
|
pending := xsync.NewCounter()
|
||||||
pending.Add(int64(len(urls)))
|
pending.Add(int64(len(urls)))
|
||||||
@@ -691,6 +695,7 @@ func (pool *Pool) BatchedSubManyEose(
|
|||||||
wg.Add(len(dfs))
|
wg.Add(len(dfs))
|
||||||
seenAlready := xsync.NewMapOf[ID, struct{}]()
|
seenAlready := xsync.NewMapOf[ID, struct{}]()
|
||||||
|
|
||||||
|
if opts.CheckDuplicate == nil {
|
||||||
opts.CheckDuplicate = func(id ID, relay string) bool {
|
opts.CheckDuplicate = func(id ID, relay string) bool {
|
||||||
_, exists := seenAlready.LoadOrStore(id, struct{}{})
|
_, exists := seenAlready.LoadOrStore(id, struct{}{})
|
||||||
if exists && pool.duplicateMiddleware != nil {
|
if exists && pool.duplicateMiddleware != nil {
|
||||||
@@ -698,6 +703,7 @@ func (pool *Pool) BatchedSubManyEose(
|
|||||||
}
|
}
|
||||||
return exists
|
return exists
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, df := range dfs {
|
for _, df := range dfs {
|
||||||
go func(df DirectedFilter) {
|
go func(df DirectedFilter) {
|
||||||
|
|||||||
Reference in New Issue
Block a user