reduce allocations at subscription.GetID() because why not.
This commit is contained in:
@@ -1,6 +1,8 @@
|
|||||||
package nostr
|
package nostr
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
@@ -92,3 +94,8 @@ func arePointerValuesEqual[V comparable](a *V, b *V) bool {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func subIdToSerial(subId string) int64 {
|
||||||
|
serialId, _ := strconv.ParseInt(subId[0:strings.Index(subId, ":")], 10, 64)
|
||||||
|
return serialId
|
||||||
|
}
|
||||||
|
|||||||
31
relay.go
31
relay.go
@@ -18,7 +18,7 @@ import (
|
|||||||
|
|
||||||
type Status int
|
type Status int
|
||||||
|
|
||||||
var subscriptionIDCounter atomic.Int32
|
var subscriptionIDCounter atomic.Int64
|
||||||
|
|
||||||
type Relay struct {
|
type Relay struct {
|
||||||
closeMutex sync.Mutex
|
closeMutex sync.Mutex
|
||||||
@@ -27,7 +27,7 @@ type Relay struct {
|
|||||||
RequestHeader http.Header // e.g. for origin header
|
RequestHeader http.Header // e.g. for origin header
|
||||||
|
|
||||||
Connection *Connection
|
Connection *Connection
|
||||||
Subscriptions *xsync.MapOf[string, *Subscription]
|
Subscriptions *xsync.MapOf[int64, *Subscription]
|
||||||
|
|
||||||
ConnectionError error
|
ConnectionError error
|
||||||
connectionContext context.Context // will be canceled when the connection closes
|
connectionContext context.Context // will be canceled when the connection closes
|
||||||
@@ -57,7 +57,7 @@ func NewRelay(ctx context.Context, url string, opts ...RelayOption) *Relay {
|
|||||||
URL: NormalizeURL(url),
|
URL: NormalizeURL(url),
|
||||||
connectionContext: ctx,
|
connectionContext: ctx,
|
||||||
connectionContextCancel: cancel,
|
connectionContextCancel: cancel,
|
||||||
Subscriptions: xsync.NewMapOf[string, *Subscription](),
|
Subscriptions: xsync.NewMapOf[int64, *Subscription](),
|
||||||
okCallbacks: xsync.NewMapOf[string, func(bool, string)](),
|
okCallbacks: xsync.NewMapOf[string, func(bool, string)](),
|
||||||
writeQueue: make(chan writeRequest),
|
writeQueue: make(chan writeRequest),
|
||||||
subscriptionChannelCloseQueue: make(chan *Subscription),
|
subscriptionChannelCloseQueue: make(chan *Subscription),
|
||||||
@@ -171,10 +171,9 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
|
|||||||
r.Connection = nil
|
r.Connection = nil
|
||||||
|
|
||||||
// close all subscriptions
|
// close all subscriptions
|
||||||
r.Subscriptions.Range(func(_ string, sub *Subscription) bool {
|
for _, sub := range r.Subscriptions.Range {
|
||||||
go sub.Unsub()
|
sub.Unsub()
|
||||||
return true
|
}
|
||||||
})
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// queue all write operations here so we don't do mutex spaghetti
|
// queue all write operations here so we don't do mutex spaghetti
|
||||||
@@ -241,7 +240,8 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
|
|||||||
if env.SubscriptionID == nil {
|
if env.SubscriptionID == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if subscription, ok := r.Subscriptions.Load(*env.SubscriptionID); !ok {
|
|
||||||
|
if subscription, ok := r.Subscriptions.Load(subIdToSerial(*env.SubscriptionID)); !ok {
|
||||||
// InfoLogger.Printf("{%s} no subscription with id '%s'\n", r.URL, *env.SubscriptionID)
|
// InfoLogger.Printf("{%s} no subscription with id '%s'\n", r.URL, *env.SubscriptionID)
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
@@ -263,15 +263,15 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
|
|||||||
subscription.dispatchEvent(&env.Event)
|
subscription.dispatchEvent(&env.Event)
|
||||||
}
|
}
|
||||||
case *EOSEEnvelope:
|
case *EOSEEnvelope:
|
||||||
if subscription, ok := r.Subscriptions.Load(string(*env)); ok {
|
if subscription, ok := r.Subscriptions.Load(subIdToSerial(string(*env))); ok {
|
||||||
subscription.dispatchEose()
|
subscription.dispatchEose()
|
||||||
}
|
}
|
||||||
case *ClosedEnvelope:
|
case *ClosedEnvelope:
|
||||||
if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok {
|
if subscription, ok := r.Subscriptions.Load(subIdToSerial(env.SubscriptionID)); ok {
|
||||||
subscription.dispatchClosed(env.Reason)
|
subscription.dispatchClosed(env.Reason)
|
||||||
}
|
}
|
||||||
case *CountEnvelope:
|
case *CountEnvelope:
|
||||||
if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil {
|
if subscription, ok := r.Subscriptions.Load(subIdToSerial(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil {
|
||||||
subscription.countResult <- *env.Count
|
subscription.countResult <- *env.Count
|
||||||
}
|
}
|
||||||
case *OKEnvelope:
|
case *OKEnvelope:
|
||||||
@@ -400,7 +400,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts .
|
|||||||
Relay: r,
|
Relay: r,
|
||||||
Context: ctx,
|
Context: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
counter: int(current),
|
counter: current,
|
||||||
Events: make(chan *Event),
|
Events: make(chan *Event),
|
||||||
EndOfStoredEvents: make(chan struct{}, 1),
|
EndOfStoredEvents: make(chan struct{}, 1),
|
||||||
ClosedReason: make(chan string, 1),
|
ClosedReason: make(chan string, 1),
|
||||||
@@ -415,8 +415,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts .
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
id := sub.GetID()
|
r.Subscriptions.Store(int64(sub.counter), sub)
|
||||||
r.Subscriptions.Store(id, sub)
|
|
||||||
|
|
||||||
// start handling events, eose, unsub etc:
|
// start handling events, eose, unsub etc:
|
||||||
go sub.start()
|
go sub.start()
|
||||||
@@ -514,3 +513,7 @@ func (r *Relay) Close() error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var subIdPool = sync.Pool{
|
||||||
|
New: func() any { return make([]byte, 0, 15) },
|
||||||
|
}
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import (
|
|||||||
|
|
||||||
type Subscription struct {
|
type Subscription struct {
|
||||||
label string
|
label string
|
||||||
counter int
|
counter int64
|
||||||
|
|
||||||
Relay *Relay
|
Relay *Relay
|
||||||
Filters Filters
|
Filters Filters
|
||||||
@@ -65,7 +65,12 @@ var _ SubscriptionOption = (WithLabel)("")
|
|||||||
// GetID return the Nostr subscription ID as given to the Relay
|
// GetID return the Nostr subscription ID as given to the Relay
|
||||||
// it is a concatenation of the label and a serial number.
|
// it is a concatenation of the label and a serial number.
|
||||||
func (sub *Subscription) GetID() string {
|
func (sub *Subscription) GetID() string {
|
||||||
return sub.label + ":" + strconv.Itoa(sub.counter)
|
buf := subIdPool.Get().([]byte)
|
||||||
|
buf = strconv.AppendInt(buf, sub.counter, 10)
|
||||||
|
buf = append(buf, ':')
|
||||||
|
buf = append(buf, sub.label...)
|
||||||
|
defer subIdPool.Put(buf)
|
||||||
|
return string(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sub *Subscription) start() {
|
func (sub *Subscription) start() {
|
||||||
@@ -133,7 +138,7 @@ func (sub *Subscription) Unsub() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// remove subscription from our map
|
// remove subscription from our map
|
||||||
sub.Relay.Subscriptions.Delete(sub.GetID())
|
sub.Relay.Subscriptions.Delete(sub.counter)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close just sends a CLOSE message. You probably want Unsub() instead.
|
// Close just sends a CLOSE message. You probably want Unsub() instead.
|
||||||
|
|||||||
Reference in New Issue
Block a user