From 31a590ee4f44c36693a1bc7b1e9f3c3610ae0121 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sat, 23 Aug 2025 09:16:12 -0300 Subject: [PATCH] fix sending on close channel with yet another mutex. --- connection.go | 4 ++++ pool.go | 2 +- relay.go | 19 +++++++++++++++---- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/connection.go b/connection.go index 37247a0..fd38b84 100644 --- a/connection.go +++ b/connection.go @@ -7,6 +7,7 @@ import ( "errors" "io" "net/http" + "sync" "sync/atomic" "time" @@ -21,6 +22,7 @@ type connection struct { writeQueue chan writeRequest closed *atomic.Bool closedNotify chan struct{} + closeMutex sync.Mutex } type writeRequest struct { @@ -136,7 +138,9 @@ func (c *connection) doClose(code ws.StatusCode, reason string) { wasClosed := c.closed.Swap(true) if !wasClosed { c.conn.Close(code, reason) + c.closeMutex.Lock() close(c.closedNotify) close(c.writeQueue) + c.closeMutex.Unlock() } } diff --git a/pool.go b/pool.go index 4a08939..e8e5dce 100644 --- a/pool.go +++ b/pool.go @@ -471,7 +471,7 @@ func (pool *Pool) subMany( subscribe: sub, err = relay.Subscribe(ctx, filter, opts) if err != nil { - debugLogf("[pool] subscription to %s died: %s -- will retry\n", nm, err) + debugLogf("[pool] subscription to %s failed: %s -- will retry\n", nm, err) goto reconnect } diff --git a/relay.go b/relay.go index 13ad3e8..6b67f8c 100644 --- a/relay.go +++ b/relay.go @@ -216,22 +216,33 @@ func (r *Relay) handleMessage(message string) { // Write queues an arbitrary message to be sent to the relay. func (r *Relay) Write(msg []byte) { + r.connection.closeMutex.Lock() + defer r.connection.closeMutex.Unlock() select { - case r.connection.writeQueue <- writeRequest{msg: msg, answer: nil}: case <-r.connection.closedNotify: + return + default: + } + select { case <-r.connectionContext.Done(): + case r.connection.writeQueue <- writeRequest{msg: msg, answer: nil}: } } // WriteWithError is like Write, but returns an error if the write fails (and the connection gets closed). func (r *Relay) WriteWithError(msg []byte) error { ch := make(chan error) + r.connection.closeMutex.Lock() + defer r.connection.closeMutex.Unlock() select { - case r.connection.writeQueue <- writeRequest{msg: msg, answer: ch}: - case <-r.connectionContext.Done(): - return fmt.Errorf("failed to write to %s: %w", r.URL, context.Cause(r.connectionContext)) case <-r.connection.closedNotify: return fmt.Errorf("failed to write to %s: ", r.URL) + default: + } + select { + case <-r.connectionContext.Done(): + return fmt.Errorf("failed to write to %s: %w", r.URL, context.Cause(r.connectionContext)) + case r.connection.writeQueue <- writeRequest{msg: msg, answer: ch}: } return <-ch }