ensure eose channel closes after events have been emitted.
This commit is contained in:
9
relay.go
9
relay.go
@@ -299,14 +299,7 @@ func (r *Relay) Connect(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
case *EOSEEnvelope:
|
case *EOSEEnvelope:
|
||||||
if subscription, ok := r.Subscriptions.Load(string(*env)); ok {
|
if subscription, ok := r.Subscriptions.Load(string(*env)); ok {
|
||||||
// implementation adapted from the naïve/incorrect implementation of sync.Once
|
subscription.dispatchEose()
|
||||||
// (which is ok for this use case)
|
|
||||||
if subscription.eosed.CompareAndSwap(false, true) {
|
|
||||||
go func() {
|
|
||||||
time.Sleep(time.Millisecond) // this basically ensures the EndOfStoredEvents call happens after the last EVENT
|
|
||||||
close(subscription.EndOfStoredEvents)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
case *CountEnvelope:
|
case *CountEnvelope:
|
||||||
if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil {
|
if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil {
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Subscription struct {
|
type Subscription struct {
|
||||||
@@ -32,6 +33,10 @@ type Subscription struct {
|
|||||||
live atomic.Bool
|
live atomic.Bool
|
||||||
eosed atomic.Bool
|
eosed atomic.Bool
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
|
||||||
|
// this keeps track of the events we've received before the EOSE that we must dispatch before
|
||||||
|
// closing the EndOfStoredEvents channel
|
||||||
|
storedwg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
type EventMessage struct {
|
type EventMessage struct {
|
||||||
@@ -67,14 +72,20 @@ func (sub *Subscription) start() {
|
|||||||
case event := <-sub.events:
|
case event := <-sub.events:
|
||||||
// this is guarded such that it will only fire until the .Events channel is closed
|
// this is guarded such that it will only fire until the .Events channel is closed
|
||||||
go func() {
|
go func() {
|
||||||
|
if !sub.eosed.Load() {
|
||||||
|
sub.storedwg.Add(1)
|
||||||
|
defer sub.storedwg.Done()
|
||||||
|
}
|
||||||
|
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
|
||||||
if sub.live.Load() {
|
if sub.live.Load() {
|
||||||
select {
|
select {
|
||||||
case sub.Events <- event:
|
case sub.Events <- event:
|
||||||
case <-sub.Context.Done():
|
case <-sub.Context.Done():
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mu.Unlock()
|
|
||||||
}()
|
}()
|
||||||
case <-sub.Context.Done():
|
case <-sub.Context.Done():
|
||||||
// the subscription ends once the context is canceled (if not already)
|
// the subscription ends once the context is canceled (if not already)
|
||||||
@@ -90,6 +101,16 @@ func (sub *Subscription) start() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sub *Subscription) dispatchEose() {
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
if sub.eosed.CompareAndSwap(false, true) {
|
||||||
|
go func() {
|
||||||
|
sub.storedwg.Wait()
|
||||||
|
close(sub.EndOfStoredEvents)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01.
|
// Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01.
|
||||||
// Unsub() also closes the channel sub.Events and makes a new one.
|
// Unsub() also closes the channel sub.Events and makes a new one.
|
||||||
func (sub *Subscription) Unsub() {
|
func (sub *Subscription) Unsub() {
|
||||||
|
|||||||
Reference in New Issue
Block a user