fix sending on close channel with yet another mutex.
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -21,6 +22,7 @@ type connection struct {
|
|||||||
writeQueue chan writeRequest
|
writeQueue chan writeRequest
|
||||||
closed *atomic.Bool
|
closed *atomic.Bool
|
||||||
closedNotify chan struct{}
|
closedNotify chan struct{}
|
||||||
|
closeMutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type writeRequest struct {
|
type writeRequest struct {
|
||||||
@@ -136,7 +138,9 @@ func (c *connection) doClose(code ws.StatusCode, reason string) {
|
|||||||
wasClosed := c.closed.Swap(true)
|
wasClosed := c.closed.Swap(true)
|
||||||
if !wasClosed {
|
if !wasClosed {
|
||||||
c.conn.Close(code, reason)
|
c.conn.Close(code, reason)
|
||||||
|
c.closeMutex.Lock()
|
||||||
close(c.closedNotify)
|
close(c.closedNotify)
|
||||||
close(c.writeQueue)
|
close(c.writeQueue)
|
||||||
|
c.closeMutex.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
2
pool.go
2
pool.go
@@ -471,7 +471,7 @@ func (pool *Pool) subMany(
|
|||||||
subscribe:
|
subscribe:
|
||||||
sub, err = relay.Subscribe(ctx, filter, opts)
|
sub, err = relay.Subscribe(ctx, filter, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debugLogf("[pool] subscription to %s died: %s -- will retry\n", nm, err)
|
debugLogf("[pool] subscription to %s failed: %s -- will retry\n", nm, err)
|
||||||
goto reconnect
|
goto reconnect
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
19
relay.go
19
relay.go
@@ -216,22 +216,33 @@ func (r *Relay) handleMessage(message string) {
|
|||||||
|
|
||||||
// Write queues an arbitrary message to be sent to the relay.
|
// Write queues an arbitrary message to be sent to the relay.
|
||||||
func (r *Relay) Write(msg []byte) {
|
func (r *Relay) Write(msg []byte) {
|
||||||
|
r.connection.closeMutex.Lock()
|
||||||
|
defer r.connection.closeMutex.Unlock()
|
||||||
select {
|
select {
|
||||||
case r.connection.writeQueue <- writeRequest{msg: msg, answer: nil}:
|
|
||||||
case <-r.connection.closedNotify:
|
case <-r.connection.closedNotify:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
select {
|
||||||
case <-r.connectionContext.Done():
|
case <-r.connectionContext.Done():
|
||||||
|
case r.connection.writeQueue <- writeRequest{msg: msg, answer: nil}:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteWithError is like Write, but returns an error if the write fails (and the connection gets closed).
|
// WriteWithError is like Write, but returns an error if the write fails (and the connection gets closed).
|
||||||
func (r *Relay) WriteWithError(msg []byte) error {
|
func (r *Relay) WriteWithError(msg []byte) error {
|
||||||
ch := make(chan error)
|
ch := make(chan error)
|
||||||
|
r.connection.closeMutex.Lock()
|
||||||
|
defer r.connection.closeMutex.Unlock()
|
||||||
select {
|
select {
|
||||||
case r.connection.writeQueue <- writeRequest{msg: msg, answer: ch}:
|
|
||||||
case <-r.connectionContext.Done():
|
|
||||||
return fmt.Errorf("failed to write to %s: %w", r.URL, context.Cause(r.connectionContext))
|
|
||||||
case <-r.connection.closedNotify:
|
case <-r.connection.closedNotify:
|
||||||
return fmt.Errorf("failed to write to %s: <closed>", r.URL)
|
return fmt.Errorf("failed to write to %s: <closed>", r.URL)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-r.connectionContext.Done():
|
||||||
|
return fmt.Errorf("failed to write to %s: %w", r.URL, context.Cause(r.connectionContext))
|
||||||
|
case r.connection.writeQueue <- writeRequest{msg: msg, answer: ch}:
|
||||||
}
|
}
|
||||||
return <-ch
|
return <-ch
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user