get rid of PublishStatus and always expect relays to answer with an OK for both AUTH and EvENT publishes.
This commit is contained in:
155
relay.go
155
relay.go
@@ -17,27 +17,8 @@ import (
|
|||||||
|
|
||||||
type Status int
|
type Status int
|
||||||
|
|
||||||
const (
|
|
||||||
PublishStatusSent Status = 0
|
|
||||||
PublishStatusFailed Status = -1
|
|
||||||
PublishStatusSucceeded Status = 1
|
|
||||||
)
|
|
||||||
|
|
||||||
var subscriptionIDCounter atomic.Int32
|
var subscriptionIDCounter atomic.Int32
|
||||||
|
|
||||||
func (s Status) String() string {
|
|
||||||
switch s {
|
|
||||||
case PublishStatusSent:
|
|
||||||
return "sent"
|
|
||||||
case PublishStatusFailed:
|
|
||||||
return "failed"
|
|
||||||
case PublishStatusSucceeded:
|
|
||||||
return "success"
|
|
||||||
}
|
|
||||||
|
|
||||||
return "unknown"
|
|
||||||
}
|
|
||||||
|
|
||||||
type Relay struct {
|
type Relay struct {
|
||||||
closeMutex sync.Mutex
|
closeMutex sync.Mutex
|
||||||
|
|
||||||
@@ -106,7 +87,6 @@ func RelayConnect(ctx context.Context, url string, opts ...RelayOption) (*Relay,
|
|||||||
|
|
||||||
// When instantiating relay connections, some options may be passed.
|
// When instantiating relay connections, some options may be passed.
|
||||||
// RelayOption is the type of the argument passed for that.
|
// RelayOption is the type of the argument passed for that.
|
||||||
// Some examples of this are WithNoticeHandler and WithAuthHandler.
|
|
||||||
type RelayOption interface {
|
type RelayOption interface {
|
||||||
IsRelayOption()
|
IsRelayOption()
|
||||||
}
|
}
|
||||||
@@ -300,70 +280,13 @@ func (r *Relay) Write(msg []byte) <-chan error {
|
|||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish sends an "EVENT" command to the relay r as in NIP-01.
|
// Publish sends an "EVENT" command to the relay r as in NIP-01 and waits for an OK response.
|
||||||
// Status can be: success, failed, or sent (no response from relay before ctx times out).
|
func (r *Relay) Publish(ctx context.Context, event Event) error {
|
||||||
func (r *Relay) Publish(ctx context.Context, event Event) (Status, error) {
|
return r.publish(ctx, event.ID, &EventEnvelope{Event: event})
|
||||||
status := PublishStatusFailed
|
|
||||||
var err error
|
|
||||||
|
|
||||||
// data races on status variable without this mutex
|
|
||||||
var mu sync.Mutex
|
|
||||||
|
|
||||||
if _, ok := ctx.Deadline(); !ok {
|
|
||||||
// if no timeout is set, force it to 7 seconds
|
|
||||||
var cancel context.CancelFunc
|
|
||||||
ctx, cancel = context.WithTimeout(ctx, 7*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
// make it cancellable so we can stop everything upon receiving an "OK"
|
|
||||||
var cancel context.CancelFunc
|
|
||||||
ctx, cancel = context.WithCancel(ctx)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// listen for an OK callback
|
|
||||||
r.okCallbacks.Store(event.ID, func(ok bool, reason string) {
|
|
||||||
mu.Lock()
|
|
||||||
defer mu.Unlock()
|
|
||||||
if ok {
|
|
||||||
status = PublishStatusSucceeded
|
|
||||||
} else {
|
|
||||||
status = PublishStatusFailed
|
|
||||||
err = fmt.Errorf("msg: %s", reason)
|
|
||||||
}
|
|
||||||
cancel()
|
|
||||||
})
|
|
||||||
defer r.okCallbacks.Delete(event.ID)
|
|
||||||
|
|
||||||
// publish event
|
|
||||||
envb, _ := EventEnvelope{Event: event}.MarshalJSON()
|
|
||||||
debugLogf("{%s} sending %v\n", r.URL, envb)
|
|
||||||
status = PublishStatusSent
|
|
||||||
if err := <-r.Write(envb); err != nil {
|
|
||||||
status = PublishStatusFailed
|
|
||||||
return status, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done(): // this will be called when we get an OK
|
|
||||||
// proceed to return status as it is
|
|
||||||
// e.g. if this happens because of the timeout then status will probably be "failed"
|
|
||||||
// but if it happens because okCallback was called then it might be "succeeded"
|
|
||||||
// do not return if okCallback is in process
|
|
||||||
return status, err
|
|
||||||
case <-r.connectionContext.Done():
|
|
||||||
// same as above, but when the relay loses connectivity entirely
|
|
||||||
return status, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Auth sends an "AUTH" command client -> relay as in NIP-42.
|
// Auth sends an "AUTH" command client->relay as in NIP-42 and waits for an OK response.
|
||||||
// Status can be: success, failed, or sent (no response from relay before ctx times out).
|
func (r *Relay) Auth(ctx context.Context, sign func(event *Event) error) error {
|
||||||
func (r *Relay) Auth(ctx context.Context, sign func(event *Event) error) (Status, error) {
|
|
||||||
status := PublishStatusFailed
|
|
||||||
|
|
||||||
authEvent := Event{
|
authEvent := Event{
|
||||||
CreatedAt: Now(),
|
CreatedAt: Now(),
|
||||||
Kind: KindClientAuthentication,
|
Kind: KindClientAuthentication,
|
||||||
@@ -374,59 +297,53 @@ func (r *Relay) Auth(ctx context.Context, sign func(event *Event) error) (Status
|
|||||||
Content: "",
|
Content: "",
|
||||||
}
|
}
|
||||||
if err := sign(&authEvent); err != nil {
|
if err := sign(&authEvent); err != nil {
|
||||||
return status, fmt.Errorf("error signing auth event: %w", err)
|
return fmt.Errorf("error signing auth event: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
return r.publish(ctx, authEvent.ID, &AuthEnvelope{Event: authEvent})
|
||||||
|
}
|
||||||
|
|
||||||
// data races on status variable without this mutex
|
// publish can be used both for EVENT and for AUTH
|
||||||
var mu sync.Mutex
|
func (r *Relay) publish(ctx context.Context, id string, env Envelope) error {
|
||||||
|
var err error
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
|
||||||
if _, ok := ctx.Deadline(); !ok {
|
if _, ok := ctx.Deadline(); !ok {
|
||||||
// if no timeout is set, force it to 3 seconds
|
// if no timeout is set, force it to 7 seconds
|
||||||
var cancel context.CancelFunc
|
ctx, cancel = context.WithTimeoutCause(ctx, 7*time.Second, fmt.Errorf("given up waiting for an OK"))
|
||||||
ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
|
defer cancel()
|
||||||
|
} else {
|
||||||
|
// otherwise make the context cancellable so we can stop everything upon receiving an "OK"
|
||||||
|
ctx, cancel = context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
// make it cancellable so we can stop everything upon receiving an "OK"
|
|
||||||
var cancel context.CancelFunc
|
|
||||||
ctx, cancel = context.WithCancel(ctx)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// listen for an OK callback
|
// listen for an OK callback
|
||||||
r.okCallbacks.Store(authEvent.ID, func(ok bool, reason string) {
|
r.okCallbacks.Store(id, func(ok bool, reason string) {
|
||||||
mu.Lock()
|
if !ok {
|
||||||
if ok {
|
|
||||||
status = PublishStatusSucceeded
|
|
||||||
} else {
|
|
||||||
status = PublishStatusFailed
|
|
||||||
err = fmt.Errorf("msg: %s", reason)
|
err = fmt.Errorf("msg: %s", reason)
|
||||||
}
|
}
|
||||||
mu.Unlock()
|
|
||||||
cancel()
|
cancel()
|
||||||
})
|
})
|
||||||
defer r.okCallbacks.Delete(authEvent.ID)
|
defer r.okCallbacks.Delete(id)
|
||||||
|
|
||||||
// send AUTH
|
// publish event
|
||||||
authResponse, _ := AuthEnvelope{Event: authEvent}.MarshalJSON()
|
envb, _ := env.MarshalJSON()
|
||||||
debugLogf("{%s} sending %v\n", r.URL, authResponse)
|
debugLogf("{%s} sending %v\n", r.URL, envb)
|
||||||
if err := <-r.Write(authResponse); err != nil {
|
if err := <-r.Write(envb); err != nil {
|
||||||
// status will be "failed"
|
return err
|
||||||
return status, err
|
|
||||||
}
|
}
|
||||||
// use mu.Lock() just in case the okCallback got called, extremely unlikely.
|
|
||||||
mu.Lock()
|
|
||||||
status = PublishStatusSent
|
|
||||||
mu.Unlock()
|
|
||||||
|
|
||||||
// the context either times out, and the status is "sent"
|
for {
|
||||||
// or the okCallback is called and the status is set to "succeeded" or "failed"
|
select {
|
||||||
// NIP-42 does not mandate an "OK" reply to an "AUTH" message
|
case <-ctx.Done():
|
||||||
<-ctx.Done()
|
// this will be called when we get an OK or when the context has been canceled
|
||||||
mu.Lock()
|
return ctx.Err()
|
||||||
defer mu.Unlock()
|
case <-r.connectionContext.Done():
|
||||||
return status, err
|
// this is caused when we lose connectivity
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe sends a "REQ" command to the relay r as in NIP-01.
|
// Subscribe sends a "REQ" command to the relay r as in NIP-01.
|
||||||
|
|||||||
Reference in New Issue
Block a user