From b5e7e446fbb1a0936a673235d5aa61fd0daa128b Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sat, 20 Feb 2021 17:59:47 -0300 Subject: [PATCH] a UniqueEvents channel for each subscription. --- relaypool/relaypool.go | 1 - relaypool/subscription.go | 30 +++++++++++++++++++++++++----- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/relaypool/relaypool.go b/relaypool/relaypool.go index a850afd..33d721f 100644 --- a/relaypool/relaypool.go +++ b/relaypool/relaypool.go @@ -153,7 +153,6 @@ func (r *RelayPool) Add(url string, policy *Policy) error { } } } - } }() diff --git a/relaypool/subscription.go b/relaypool/subscription.go index aa06487..f277898 100644 --- a/relaypool/subscription.go +++ b/relaypool/subscription.go @@ -12,6 +12,14 @@ type Subscription struct { filter *filter.EventFilter Events chan EventMessage + + started bool + UniqueEvents chan event.Event +} + +type EventMessage struct { + Event event.Event + Relay string } func (subscription Subscription) Unsub() { @@ -21,6 +29,8 @@ func (subscription Subscription) Unsub() { subscription.channel, }) } + + close(subscription.Events) } func (subscription Subscription) Sub(filter *filter.EventFilter) { @@ -35,6 +45,21 @@ func (subscription Subscription) Sub(filter *filter.EventFilter) { subscription.filter, }) } + + if subscription.started { + go subscription.startHandlingUnique() + } +} + +func (subscription Subscription) startHandlingUnique() { + seen := make(map[string]struct{}) + for em := range subscription.Events { + if _, ok := seen[em.Event.ID]; ok { + continue + } + seen[em.Event.ID] = struct{}{} + subscription.UniqueEvents <- em.Event + } } func (subscription Subscription) removeRelay(relay string) { @@ -55,8 +80,3 @@ func (subscription Subscription) addRelay(relay string, ws *websocket.Conn) { subscription.filter, }) } - -type EventMessage struct { - Event event.Event - Relay string -}