From cd0d6440467337cd9d739d9ec90d5b4617c4e643 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsumoto Date: Fri, 24 Nov 2023 17:29:29 +0900 Subject: [PATCH] If EOSE is not given, they should not be deleted. --- pool.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/pool.go b/pool.go index b1b6cf0..e6a9a6a 100644 --- a/pool.go +++ b/pool.go @@ -74,6 +74,7 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt events := make(chan IncomingEvent) seenAlready := xsync.NewMapOf[Timestamp]() ticker := time.NewTicker(seenAlreadyDropTick) + eose := false pending := xsync.NewCounter() initial := len(urls) @@ -98,17 +99,21 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt } select { + case <-sub.EndOfStoredEvents: + eose = true case <-ticker.C: - del := map[string]struct{}{} - old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix()) - seenAlready.Range(func(key string, value Timestamp) bool { - if value < old { - del[evt.ID] = struct{}{} + if eose { + del := map[string]struct{}{} + old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix()) + seenAlready.Range(func(key string, value Timestamp) bool { + if value < old { + del[evt.ID] = struct{}{} + } + return true + }) + for k := range del { + seenAlready.Delete(k) } - return true - }) - for k := range del { - seenAlready.Delete(k) } case events <- IncomingEvent{Event: evt, Relay: relay}: case <-ctx.Done():