stop exposing connection, more debugLog calls and properly fail subscriptions when a relay dies.

This commit is contained in:
fiatjaf
2025-07-13 14:21:15 -03:00
parent 9f8b810882
commit 6592319774
3 changed files with 44 additions and 27 deletions

View File

@@ -13,8 +13,10 @@ import (
ws "github.com/coder/websocket" ws "github.com/coder/websocket"
) )
var ErrDisconnected = errors.New("<disconnected>")
// Connection represents a websocket connection to a Nostr relay. // Connection represents a websocket connection to a Nostr relay.
type Connection struct { type connection struct {
conn *ws.Conn conn *ws.Conn
writeQueue chan writeRequest writeQueue chan writeRequest
closed *atomic.Bool closed *atomic.Bool
@@ -26,14 +28,15 @@ type writeRequest struct {
answer chan error answer chan error
} }
// NewConnection creates a new websocket connection to a Nostr relay. func newConnection(
func NewConnection(
ctx context.Context, ctx context.Context,
url string, url string,
handleMessage func(string), handleMessage func(string),
requestHeader http.Header, requestHeader http.Header,
tlsConfig *tls.Config, tlsConfig *tls.Config,
) (*Connection, error) { ) (*connection, error) {
debugLogf("{%s} connecting!\n", url)
dialCtx := ctx dialCtx := ctx
if _, ok := dialCtx.Deadline(); !ok { if _, ok := dialCtx.Deadline(); !ok {
// if no timeout is set, force it to 7 seconds // if no timeout is set, force it to 7 seconds
@@ -55,7 +58,7 @@ func NewConnection(
writeQueue := make(chan writeRequest) writeQueue := make(chan writeRequest)
readQueue := make(chan string) readQueue := make(chan string)
conn := &Connection{ conn := &connection{
conn: c, conn: c,
writeQueue: writeQueue, writeQueue: writeQueue,
closed: &atomic.Bool{}, closed: &atomic.Bool{},
@@ -67,6 +70,9 @@ func NewConnection(
select { select {
case <-ctx.Done(): case <-ctx.Done():
conn.doClose(ws.StatusNormalClosure, "") conn.doClose(ws.StatusNormalClosure, "")
debugLogf("{%s} closing!, context done: '%s'\n", url, context.Cause(ctx))
return
case <-conn.closedNotify:
return return
case <-ticker.C: case <-ticker.C:
ctx, cancel := context.WithTimeoutCause(ctx, time.Millisecond*800, errors.New("ping took too long")) ctx, cancel := context.WithTimeoutCause(ctx, time.Millisecond*800, errors.New("ping took too long"))
@@ -74,18 +80,20 @@ func NewConnection(
cancel() cancel()
if err != nil { if err != nil {
conn.doClose(ws.StatusAbnormalClosure, "ping took too long") conn.doClose(ws.StatusAbnormalClosure, "ping took too long")
debugLogf("{%s} closing!, ping failed: '%s'\n", url, err)
return return
} }
case wr := <-writeQueue: case wr := <-writeQueue:
debugLogf("{%s} sending %v\n", url, string(wr.msg)) debugLogf("{%s} sending '%v'\n", url, string(wr.msg))
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("write took too long")) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("write took too long"))
err := c.Write(ctx, ws.MessageText, wr.msg) err := c.Write(ctx, ws.MessageText, wr.msg)
cancel() cancel()
if err != nil { if err != nil {
conn.doClose(ws.StatusAbnormalClosure, "write took too long") conn.doClose(ws.StatusAbnormalClosure, "write failed")
if wr.answer != nil { if wr.answer != nil {
wr.answer <- err wr.answer <- err
} }
debugLogf("{%s} closing!, write failed: '%s'\n", url, err)
return return
} }
if wr.answer != nil { if wr.answer != nil {
@@ -107,10 +115,12 @@ func NewConnection(
_, reader, err := c.Reader(ctx) _, reader, err := c.Reader(ctx)
if err != nil { if err != nil {
debugLogf("{%s} closing!, reader failure: '%s'\n", url, err)
conn.doClose(ws.StatusAbnormalClosure, "failed to get reader") conn.doClose(ws.StatusAbnormalClosure, "failed to get reader")
return return
} }
if _, err := io.Copy(buf, reader); err != nil { if _, err := io.Copy(buf, reader); err != nil {
debugLogf("{%s} closing!, read failure: '%s'\n", url, err)
conn.doClose(ws.StatusAbnormalClosure, "failed to read") conn.doClose(ws.StatusAbnormalClosure, "failed to read")
return return
} }
@@ -122,7 +132,7 @@ func NewConnection(
return conn, nil return conn, nil
} }
func (c *Connection) doClose(code ws.StatusCode, reason string) { func (c *connection) doClose(code ws.StatusCode, reason string) {
wasClosed := c.closed.Swap(true) wasClosed := c.closed.Swap(true)
if !wasClosed { if !wasClosed {
c.conn.Close(code, reason) c.conn.Close(code, reason)

20
pool.go
View File

@@ -313,7 +313,7 @@ func (pool *Pool) FetchManyReplaceable(
relay, err := pool.EnsureRelay(nm) relay, err := pool.EnsureRelay(nm)
if err != nil { if err != nil {
debugLogf("error connecting to %s with %v: %s", nm, filter, err) debugLogf("[pool] error connecting to %s with %v: %s", nm, filter, err)
return return
} }
@@ -322,7 +322,7 @@ func (pool *Pool) FetchManyReplaceable(
subscribe: subscribe:
sub, err := relay.Subscribe(ctx, filter, opts) sub, err := relay.Subscribe(ctx, filter, opts)
if err != nil { if err != nil {
debugLogf("error subscribing to %s with %v: %s", relay, filter, err) debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err)
return return
} }
@@ -341,7 +341,7 @@ func (pool *Pool) FetchManyReplaceable(
goto subscribe goto subscribe
} }
} }
debugLogf("CLOSED from %s: '%s'\n", nm, reason) debugLogf("[pool] CLOSED from %s: '%s'\n", nm, reason)
return return
case evt, more := <-sub.Events: case evt, more := <-sub.Events:
if !more { if !more {
@@ -449,11 +449,12 @@ func (pool *Pool) subMany(
if err != nil { if err != nil {
// if we never connected to this just fail // if we never connected to this just fail
if firstConnection { if firstConnection {
debugLogf("[pool] connection to %s failed, won't retry as it was the first attempt\n", nm)
return return
} }
// otherwise (if we were connected and got disconnected) keep trying to reconnect // otherwise (if we were connected and got disconnected) keep trying to reconnect
debugLogf("%s reconnecting because connection failed\n", nm) debugLogf("[pool] connection to %s failed, will retry\n", nm)
goto reconnect goto reconnect
} }
firstConnection = false firstConnection = false
@@ -462,7 +463,7 @@ func (pool *Pool) subMany(
subscribe: subscribe:
sub, err = relay.Subscribe(ctx, filter, opts) sub, err = relay.Subscribe(ctx, filter, opts)
if err != nil { if err != nil {
debugLogf("%s reconnecting because subscription died: %s\n", nm, err) debugLogf("[pool] subscription to %s died: %s -- will retry\n", nm, err)
goto reconnect goto reconnect
} }
@@ -486,7 +487,7 @@ func (pool *Pool) subMany(
// so we will update the filters here to include only events seem from now on // so we will update the filters here to include only events seem from now on
// and try to reconnect until we succeed // and try to reconnect until we succeed
filter.Since = Now() filter.Since = Now()
debugLogf("%s reconnecting because sub.Events is broken\n", nm) debugLogf("[pool] retrying %s because sub.Events is broken\n", nm)
goto reconnect goto reconnect
} }
@@ -530,6 +531,7 @@ func (pool *Pool) subMany(
reconnect: reconnect:
// we will go back to the beginning of the loop and try to connect again and again // we will go back to the beginning of the loop and try to connect again and again
// until the context is canceled // until the context is canceled
debugLogf("[pool] retrying %s in %s\n", nm, interval)
time.Sleep(interval) time.Sleep(interval)
interval = interval * 17 / 10 // the next time we try we will wait longer interval = interval * 17 / 10 // the next time we try we will wait longer
} }
@@ -574,7 +576,7 @@ func (pool *Pool) subManyEose(
relay, err := pool.EnsureRelay(nm) relay, err := pool.EnsureRelay(nm)
if err != nil { if err != nil {
debugLogf("error connecting to %s with %v: %s", nm, filter, err) debugLogf("[pool] error connecting to %s with %v: %s", nm, filter, err)
return return
} }
@@ -583,7 +585,7 @@ func (pool *Pool) subManyEose(
subscribe: subscribe:
sub, err := relay.Subscribe(ctx, filter, opts) sub, err := relay.Subscribe(ctx, filter, opts)
if err != nil { if err != nil {
debugLogf("error subscribing to %s with %v: %s", relay, filter, err) debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err)
return return
} }
@@ -602,7 +604,7 @@ func (pool *Pool) subManyEose(
goto subscribe goto subscribe
} }
} }
debugLogf("CLOSED from %s: '%s'\n", nm, reason) debugLogf("[pool] CLOSED from %s: '%s'\n", nm, reason)
return return
case evt, more := <-sub.Events: case evt, more := <-sub.Events:
if !more { if !more {

View File

@@ -25,7 +25,7 @@ type Relay struct {
URL string URL string
requestHeader http.Header // e.g. for origin header requestHeader http.Header // e.g. for origin header
Connection *Connection connection *connection
Subscriptions *xsync.MapOf[int64, *Subscription] Subscriptions *xsync.MapOf[int64, *Subscription]
ConnectionError error ConnectionError error
@@ -94,7 +94,7 @@ func (r *Relay) String() string {
func (r *Relay) Context() context.Context { return r.connectionContext } func (r *Relay) Context() context.Context { return r.connectionContext }
// IsConnected returns true if the connection to this relay seems to be active. // IsConnected returns true if the connection to this relay seems to be active.
func (r *Relay) IsConnected() bool { return !r.Connection.closed.Load() } func (r *Relay) IsConnected() bool { return !r.connection.closed.Load() }
// Connect tries to establish a websocket connection to r.URL. // Connect tries to establish a websocket connection to r.URL.
// If the context expires before the connection is complete, an error is returned. // If the context expires before the connection is complete, an error is returned.
@@ -117,11 +117,11 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
return fmt.Errorf("invalid relay URL '%s'", r.URL) return fmt.Errorf("invalid relay URL '%s'", r.URL)
} }
conn, err := NewConnection(ctx, r.URL, r.handleMessage, r.requestHeader, tlsConfig) conn, err := newConnection(ctx, r.URL, r.handleMessage, r.requestHeader, tlsConfig)
if err != nil { if err != nil {
return fmt.Errorf("error opening websocket to '%s': %w", r.URL, err) return fmt.Errorf("error opening websocket to '%s': %w", r.URL, err)
} }
r.Connection = conn r.connection = conn
return nil return nil
} }
@@ -214,8 +214,8 @@ func (r *Relay) handleMessage(message string) {
// Write queues an arbitrary message to be sent to the relay. // Write queues an arbitrary message to be sent to the relay.
func (r *Relay) Write(msg []byte) { func (r *Relay) Write(msg []byte) {
select { select {
case r.Connection.writeQueue <- writeRequest{msg: msg, answer: nil}: case r.connection.writeQueue <- writeRequest{msg: msg, answer: nil}:
case <-r.Connection.closedNotify: case <-r.connection.closedNotify:
case <-r.connectionContext.Done(): case <-r.connectionContext.Done():
} }
} }
@@ -224,10 +224,10 @@ func (r *Relay) Write(msg []byte) {
func (r *Relay) WriteWithError(msg []byte) error { func (r *Relay) WriteWithError(msg []byte) error {
ch := make(chan error) ch := make(chan error)
select { select {
case r.Connection.writeQueue <- writeRequest{msg: msg, answer: ch}: case r.connection.writeQueue <- writeRequest{msg: msg, answer: ch}:
case <-r.connectionContext.Done(): case <-r.connectionContext.Done():
return fmt.Errorf("failed to write to %s: %w", r.URL, context.Cause(r.connectionContext)) return fmt.Errorf("failed to write to %s: %w", r.URL, context.Cause(r.connectionContext))
case <-r.Connection.closedNotify: case <-r.connection.closedNotify:
return fmt.Errorf("failed to write to %s: <closed>", r.URL) return fmt.Errorf("failed to write to %s: <closed>", r.URL)
} }
return <-ch return <-ch
@@ -315,7 +315,7 @@ func (r *Relay) publish(ctx context.Context, id ID, env Envelope) error {
func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionOptions) (*Subscription, error) { func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionOptions) (*Subscription, error) {
sub := r.PrepareSubscription(ctx, filter, opts) sub := r.PrepareSubscription(ctx, filter, opts)
if r.Connection == nil { if r.connection == nil {
return nil, fmt.Errorf("not connected to %s", r.URL) return nil, fmt.Errorf("not connected to %s", r.URL)
} }
@@ -323,6 +323,11 @@ func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionO
return nil, fmt.Errorf("couldn't subscribe to %v at %s: %w", filter, r.URL, err) return nil, fmt.Errorf("couldn't subscribe to %v at %s: %w", filter, r.URL, err)
} }
go func() {
<-r.connection.closedNotify
sub.unsub(ErrDisconnected)
}()
return sub, nil return sub, nil
} }
@@ -449,7 +454,7 @@ func (r *Relay) close(reason error) error {
r.connectionContextCancel(reason) r.connectionContextCancel(reason)
r.connectionContextCancel = nil r.connectionContextCancel = nil
if r.Connection == nil { if r.connection == nil {
return fmt.Errorf("relay not connected") return fmt.Errorf("relay not connected")
} }