From 65923197742a037dd84d838e99132c8085ab26ea Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sun, 13 Jul 2025 14:21:15 -0300 Subject: [PATCH] stop exposing connection, more debugLog calls and properly fail subscriptions when a relay dies. --- connection.go | 26 ++++++++++++++++++-------- pool.go | 20 +++++++++++--------- relay.go | 25 +++++++++++++++---------- 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/connection.go b/connection.go index 646789b..37247a0 100644 --- a/connection.go +++ b/connection.go @@ -13,8 +13,10 @@ import ( ws "github.com/coder/websocket" ) +var ErrDisconnected = errors.New("") + // Connection represents a websocket connection to a Nostr relay. -type Connection struct { +type connection struct { conn *ws.Conn writeQueue chan writeRequest closed *atomic.Bool @@ -26,14 +28,15 @@ type writeRequest struct { answer chan error } -// NewConnection creates a new websocket connection to a Nostr relay. -func NewConnection( +func newConnection( ctx context.Context, url string, handleMessage func(string), requestHeader http.Header, tlsConfig *tls.Config, -) (*Connection, error) { +) (*connection, error) { + debugLogf("{%s} connecting!\n", url) + dialCtx := ctx if _, ok := dialCtx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds @@ -55,7 +58,7 @@ func NewConnection( writeQueue := make(chan writeRequest) readQueue := make(chan string) - conn := &Connection{ + conn := &connection{ conn: c, writeQueue: writeQueue, closed: &atomic.Bool{}, @@ -67,6 +70,9 @@ func NewConnection( select { case <-ctx.Done(): conn.doClose(ws.StatusNormalClosure, "") + debugLogf("{%s} closing!, context done: '%s'\n", url, context.Cause(ctx)) + return + case <-conn.closedNotify: return case <-ticker.C: ctx, cancel := context.WithTimeoutCause(ctx, time.Millisecond*800, errors.New("ping took too long")) @@ -74,18 +80,20 @@ func NewConnection( cancel() if err != nil { conn.doClose(ws.StatusAbnormalClosure, "ping took too long") + debugLogf("{%s} closing!, ping failed: '%s'\n", url, err) return } case wr := <-writeQueue: - debugLogf("{%s} sending %v\n", url, string(wr.msg)) + debugLogf("{%s} sending '%v'\n", url, string(wr.msg)) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("write took too long")) err := c.Write(ctx, ws.MessageText, wr.msg) cancel() if err != nil { - conn.doClose(ws.StatusAbnormalClosure, "write took too long") + conn.doClose(ws.StatusAbnormalClosure, "write failed") if wr.answer != nil { wr.answer <- err } + debugLogf("{%s} closing!, write failed: '%s'\n", url, err) return } if wr.answer != nil { @@ -107,10 +115,12 @@ func NewConnection( _, reader, err := c.Reader(ctx) if err != nil { + debugLogf("{%s} closing!, reader failure: '%s'\n", url, err) conn.doClose(ws.StatusAbnormalClosure, "failed to get reader") return } if _, err := io.Copy(buf, reader); err != nil { + debugLogf("{%s} closing!, read failure: '%s'\n", url, err) conn.doClose(ws.StatusAbnormalClosure, "failed to read") return } @@ -122,7 +132,7 @@ func NewConnection( return conn, nil } -func (c *Connection) doClose(code ws.StatusCode, reason string) { +func (c *connection) doClose(code ws.StatusCode, reason string) { wasClosed := c.closed.Swap(true) if !wasClosed { c.conn.Close(code, reason) diff --git a/pool.go b/pool.go index 764faa8..7b9a3e4 100644 --- a/pool.go +++ b/pool.go @@ -313,7 +313,7 @@ func (pool *Pool) FetchManyReplaceable( relay, err := pool.EnsureRelay(nm) if err != nil { - debugLogf("error connecting to %s with %v: %s", nm, filter, err) + debugLogf("[pool] error connecting to %s with %v: %s", nm, filter, err) return } @@ -322,7 +322,7 @@ func (pool *Pool) FetchManyReplaceable( subscribe: sub, err := relay.Subscribe(ctx, filter, opts) if err != nil { - debugLogf("error subscribing to %s with %v: %s", relay, filter, err) + debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err) return } @@ -341,7 +341,7 @@ func (pool *Pool) FetchManyReplaceable( goto subscribe } } - debugLogf("CLOSED from %s: '%s'\n", nm, reason) + debugLogf("[pool] CLOSED from %s: '%s'\n", nm, reason) return case evt, more := <-sub.Events: if !more { @@ -449,11 +449,12 @@ func (pool *Pool) subMany( if err != nil { // if we never connected to this just fail if firstConnection { + debugLogf("[pool] connection to %s failed, won't retry as it was the first attempt\n", nm) return } // otherwise (if we were connected and got disconnected) keep trying to reconnect - debugLogf("%s reconnecting because connection failed\n", nm) + debugLogf("[pool] connection to %s failed, will retry\n", nm) goto reconnect } firstConnection = false @@ -462,7 +463,7 @@ func (pool *Pool) subMany( subscribe: sub, err = relay.Subscribe(ctx, filter, opts) if err != nil { - debugLogf("%s reconnecting because subscription died: %s\n", nm, err) + debugLogf("[pool] subscription to %s died: %s -- will retry\n", nm, err) goto reconnect } @@ -486,7 +487,7 @@ func (pool *Pool) subMany( // so we will update the filters here to include only events seem from now on // and try to reconnect until we succeed filter.Since = Now() - debugLogf("%s reconnecting because sub.Events is broken\n", nm) + debugLogf("[pool] retrying %s because sub.Events is broken\n", nm) goto reconnect } @@ -530,6 +531,7 @@ func (pool *Pool) subMany( reconnect: // we will go back to the beginning of the loop and try to connect again and again // until the context is canceled + debugLogf("[pool] retrying %s in %s\n", nm, interval) time.Sleep(interval) interval = interval * 17 / 10 // the next time we try we will wait longer } @@ -574,7 +576,7 @@ func (pool *Pool) subManyEose( relay, err := pool.EnsureRelay(nm) if err != nil { - debugLogf("error connecting to %s with %v: %s", nm, filter, err) + debugLogf("[pool] error connecting to %s with %v: %s", nm, filter, err) return } @@ -583,7 +585,7 @@ func (pool *Pool) subManyEose( subscribe: sub, err := relay.Subscribe(ctx, filter, opts) if err != nil { - debugLogf("error subscribing to %s with %v: %s", relay, filter, err) + debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err) return } @@ -602,7 +604,7 @@ func (pool *Pool) subManyEose( goto subscribe } } - debugLogf("CLOSED from %s: '%s'\n", nm, reason) + debugLogf("[pool] CLOSED from %s: '%s'\n", nm, reason) return case evt, more := <-sub.Events: if !more { diff --git a/relay.go b/relay.go index 9e12e42..1c50a88 100644 --- a/relay.go +++ b/relay.go @@ -25,7 +25,7 @@ type Relay struct { URL string requestHeader http.Header // e.g. for origin header - Connection *Connection + connection *connection Subscriptions *xsync.MapOf[int64, *Subscription] ConnectionError error @@ -94,7 +94,7 @@ func (r *Relay) String() string { func (r *Relay) Context() context.Context { return r.connectionContext } // IsConnected returns true if the connection to this relay seems to be active. -func (r *Relay) IsConnected() bool { return !r.Connection.closed.Load() } +func (r *Relay) IsConnected() bool { return !r.connection.closed.Load() } // Connect tries to establish a websocket connection to r.URL. // If the context expires before the connection is complete, an error is returned. @@ -117,11 +117,11 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error return fmt.Errorf("invalid relay URL '%s'", r.URL) } - conn, err := NewConnection(ctx, r.URL, r.handleMessage, r.requestHeader, tlsConfig) + conn, err := newConnection(ctx, r.URL, r.handleMessage, r.requestHeader, tlsConfig) if err != nil { return fmt.Errorf("error opening websocket to '%s': %w", r.URL, err) } - r.Connection = conn + r.connection = conn return nil } @@ -214,8 +214,8 @@ func (r *Relay) handleMessage(message string) { // Write queues an arbitrary message to be sent to the relay. func (r *Relay) Write(msg []byte) { select { - case r.Connection.writeQueue <- writeRequest{msg: msg, answer: nil}: - case <-r.Connection.closedNotify: + case r.connection.writeQueue <- writeRequest{msg: msg, answer: nil}: + case <-r.connection.closedNotify: case <-r.connectionContext.Done(): } } @@ -224,10 +224,10 @@ func (r *Relay) Write(msg []byte) { func (r *Relay) WriteWithError(msg []byte) error { ch := make(chan error) select { - case r.Connection.writeQueue <- writeRequest{msg: msg, answer: ch}: + case r.connection.writeQueue <- writeRequest{msg: msg, answer: ch}: case <-r.connectionContext.Done(): return fmt.Errorf("failed to write to %s: %w", r.URL, context.Cause(r.connectionContext)) - case <-r.Connection.closedNotify: + case <-r.connection.closedNotify: return fmt.Errorf("failed to write to %s: ", r.URL) } return <-ch @@ -315,7 +315,7 @@ func (r *Relay) publish(ctx context.Context, id ID, env Envelope) error { func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionOptions) (*Subscription, error) { sub := r.PrepareSubscription(ctx, filter, opts) - if r.Connection == nil { + if r.connection == nil { return nil, fmt.Errorf("not connected to %s", r.URL) } @@ -323,6 +323,11 @@ func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionO return nil, fmt.Errorf("couldn't subscribe to %v at %s: %w", filter, r.URL, err) } + go func() { + <-r.connection.closedNotify + sub.unsub(ErrDisconnected) + }() + return sub, nil } @@ -449,7 +454,7 @@ func (r *Relay) close(reason error) error { r.connectionContextCancel(reason) r.connectionContextCancel = nil - if r.Connection == nil { + if r.connection == nil { return fmt.Errorf("relay not connected") }