Files
nostrlib/subscription.go
fiatjaf 7289da9c72 improve/refactor websocket connections hoping this will fix the undetected disconnections we're seeing.
this commit also remove all the sonic envelope parsing and reintroduces filters in REQ as a slice instead of as a singleton.

why? well, the sonic stuff wasn't really that fast, it was a little bit but only got fast enough once I introduced unsafe conversions between []byte and string and did weird unsafe reuse of []byte in order to save the values of tags, which would definitely cause issues in the future if the caller wasn't aware of it (and even if they were, like myself).

and the filters stuff is because we abandoned the idea of changing NIP-01 to only accept one filter per REQ.
2025-07-10 22:58:37 -03:00

182 lines
5.0 KiB
Go

package nostr
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
)
// Subscription represents a subscription to a relay.
type Subscription struct {
counter int64
id string
Relay *Relay
Filter Filter
// for this to be treated as a COUNT and not a REQ this must be set
countResult chan CountEnvelope
// the Events channel emits all EVENTs that come in a Subscription
// will be closed when the subscription ends
Events chan Event
mu sync.Mutex
// the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription
EndOfStoredEvents chan struct{}
// the ClosedReason channel emits the reason when a CLOSED message is received
ClosedReason chan string
// Context will be .Done() when the subscription ends
Context context.Context
// if it is not nil, checkDuplicate will be called for every event received
// if it returns true that event will not be processed further.
checkDuplicate func(id ID, relay string) bool
// if it is not nil, checkDuplicateReplaceable will be called for every event received
// if it returns true that event will not be processed further.
checkDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool
match func(Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints
live atomic.Bool
eosed atomic.Bool
cancel context.CancelCauseFunc
// this keeps track of the events we've received before the EOSE that we must dispatch before
// closing the EndOfStoredEvents channel
storedwg sync.WaitGroup
}
// All SubscriptionOptions fields are optional
type SubscriptionOptions struct {
// Label puts a label on the subscription (it is prepended to the automatic id) that is sent to relays.
Label string
// ID sets the ID absolutely on a subscription. Do not touch unless you know what you're doing.
ID string
// CheckDuplicate is a function that, when present, is ran on events before they're parsed.
// if it returns true the event will be discarded and not processed further.
CheckDuplicate func(id ID, relay string) bool
// CheckDuplicateReplaceable is like CheckDuplicate, but runs on replaceable/addressable events
CheckDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool
}
func (sub *Subscription) start() {
<-sub.Context.Done()
// the subscription ends once the context is canceled (if not already)
sub.unsub(errors.New("context done on start()")) // this will set sub.live to false
// do this so we don't have the possibility of closing the Events channel and then trying to send to it
sub.mu.Lock()
close(sub.Events)
sub.mu.Unlock()
}
// GetID returns the subscription ID.
func (sub *Subscription) GetID() string { return sub.id }
func (sub *Subscription) dispatchEvent(evt Event) {
added := false
if !sub.eosed.Load() {
sub.storedwg.Add(1)
added = true
}
go func() {
sub.mu.Lock()
defer sub.mu.Unlock()
if sub.live.Load() {
select {
case sub.Events <- evt:
case <-sub.Context.Done():
}
}
if added {
sub.storedwg.Done()
}
}()
}
func (sub *Subscription) dispatchEose() {
if sub.eosed.CompareAndSwap(false, true) {
sub.match = sub.Filter.MatchesIgnoringTimestampConstraints
go func() {
sub.storedwg.Wait()
sub.EndOfStoredEvents <- struct{}{}
}()
}
}
// handleClosed handles the CLOSED message from a relay.
func (sub *Subscription) handleClosed(reason string) {
go func() {
sub.ClosedReason <- reason
sub.live.Store(false) // set this so we don't send an unnecessary CLOSE to the relay
sub.unsub(fmt.Errorf("CLOSED received: %s", reason))
}()
}
// Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01.
// Unsub() also closes the channel sub.Events and makes a new one.
func (sub *Subscription) Unsub() {
sub.unsub(errors.New("Unsub() called"))
}
// unsub is the internal implementation of Unsub.
func (sub *Subscription) unsub(err error) {
// cancel the context (if it's not canceled already)
sub.cancel(err)
// mark subscription as closed and send a CLOSE to the relay (naïve sync.Once implementation)
if sub.live.CompareAndSwap(true, false) {
sub.Close()
}
// remove subscription from our map
sub.Relay.Subscriptions.Delete(sub.counter)
}
// Close just sends a CLOSE message. You probably want Unsub() instead.
func (sub *Subscription) Close() {
if sub.Relay.IsConnected() {
closeMsg := CloseEnvelope(sub.id)
closeb, _ := (&closeMsg).MarshalJSON()
sub.Relay.Write(closeb)
}
}
// Sub sets sub.Filters and then calls sub.Fire(ctx).
// The subscription will be closed if the context expires.
func (sub *Subscription) Sub(_ context.Context, filter Filter) {
sub.Filter = filter
sub.Fire()
}
// Fire sends the "REQ" command to the relay.
func (sub *Subscription) Fire() error {
var reqb []byte
if sub.countResult == nil {
reqb, _ = ReqEnvelope{sub.id, []Filter{sub.Filter}}.MarshalJSON()
} else {
reqb, _ = CountEnvelope{sub.id, sub.Filter, nil, nil}.MarshalJSON()
}
sub.live.Store(true)
if err := sub.Relay.WriteWithError(reqb); err != nil {
err := fmt.Errorf("failed to write: %w", err)
sub.cancel(err)
return err
}
return nil
}