diff --git a/relaypool.go b/relaypool.go index 4c76ecf..2a79c8c 100644 --- a/relaypool.go +++ b/relaypool.go @@ -60,33 +60,41 @@ func NewRelayPool() *RelayPool { // Add adds a new relay to the pool, if policy is nil, it will be a simple // read+write policy. -func (r *RelayPool) Add(url string, policy RelayPoolPolicy) error { +func (r *RelayPool) Add(url string, policy RelayPoolPolicy) chan error { if policy == nil { policy = SimplePolicy{Read: true, Write: true} } - relay, err := RelayConnect(url) - if err != nil { - return err - } + cherr := make(chan error) - r.Policies.Store(relay.URL, policy) - r.Relays.Store(relay.URL, relay) + go func() { + relay, err := RelayConnect(url) + if err != nil { + cherr <- fmt.Errorf("failed to connect to %s: %w", url, err) + return + } - r.subscriptions.Range(func(id string, filters Filters) bool { - sub := relay.subscribe(id, filters) - eventStream, _ := r.eventStreams.Load(id) + r.Policies.Store(relay.URL, policy) + r.Relays.Store(relay.URL, relay) - go func(sub *Subscription) { - for evt := range sub.Events { - eventStream <- EventMessage{Relay: relay.URL, Event: evt} - } - }(sub) + r.subscriptions.Range(func(id string, filters Filters) bool { + sub := relay.subscribe(id, filters) + eventStream, _ := r.eventStreams.Load(id) - return true - }) + go func(sub *Subscription) { + for evt := range sub.Events { + eventStream <- EventMessage{Relay: relay.URL, Event: evt} + } + }(sub) - return nil + return true + }) + + cherr <- nil + close(cherr) + }() + + return cherr } // Remove removes a relay from the pool. @@ -101,7 +109,7 @@ func (r *RelayPool) Remove(url string) { } } -func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage, chan Event) { +func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage) { random := make([]byte, 7) rand.Read(random) id := hex.EncodeToString(random) @@ -109,8 +117,6 @@ func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage, chan Event) r.subscriptions.Store(id, filters) eventStream := make(chan EventMessage) r.eventStreams.Store(id, eventStream) - uniqueEvents := make(chan Event) - emittedAlready := s.MapOf[string, struct{}]{} r.Relays.Range(func(_ string, relay *Relay) bool { sub := relay.subscribe(id, filters) @@ -118,16 +124,28 @@ func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage, chan Event) go func(sub *Subscription) { for evt := range sub.Events { eventStream <- EventMessage{Relay: relay.URL, Event: evt} - if _, ok := emittedAlready.LoadOrStore(evt.ID, struct{}{}); !ok { - uniqueEvents <- evt - } } }(sub) return true }) - return id, eventStream, uniqueEvents + return id, eventStream +} + +func Unique(all chan EventMessage) chan Event { + uniqueEvents := make(chan Event) + emittedAlready := s.MapOf[string, struct{}]{} + + go func() { + for eventMessage := range all { + if _, ok := emittedAlready.LoadOrStore(eventMessage.Event.ID, struct{}{}); !ok { + uniqueEvents <- eventMessage.Event + } + } + }() + + return uniqueEvents } func (r *RelayPool) PublishEvent(evt *Event) (*Event, chan PublishStatus, error) {