diff --git a/connection.go b/connection.go index 7901a66..19d97fa 100644 --- a/connection.go +++ b/connection.go @@ -41,11 +41,10 @@ func (r *Relay) newConnection(ctx context.Context, tlsConfig *tls.Config) error ticker := time.NewTicker(29 * time.Second) // main websocket loop - writeQueue := make(chan writeRequest) readQueue := make(chan string) r.conn = c - r.writeQueue = writeQueue + r.writeQueue = make(chan writeRequest) r.closed = &atomic.Bool{} r.closedNotify = make(chan struct{}) @@ -67,7 +66,7 @@ func (r *Relay) newConnection(ctx context.Context, tlsConfig *tls.Config) error r.closeConnection(ws.StatusAbnormalClosure, "ping took too long") return } - case wr := <-writeQueue: + case wr := <-r.writeQueue: debugLogf("{%s} sending '%v'\n", r.URL, string(wr.msg)) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("write took too long")) err := c.Write(ctx, ws.MessageText, wr.msg) diff --git a/nip77/negentropy/negentropy.go b/nip77/negentropy/negentropy.go index 89c3051..68ebf3d 100644 --- a/nip77/negentropy/negentropy.go +++ b/nip77/negentropy/negentropy.go @@ -192,10 +192,12 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader) ([]byte, error) { if _, theyHave := theirItems[item.ID]; theyHave { // if we have and they have, ignore delete(theirItems, item.ID) - } else if n.Haves != nil { - // if we have and they don't, notify client - if n.isClient { - n.Haves <- item.ID + } else { + if n.Haves != nil { + // if we have and they don't, notify client + if n.isClient { + n.Haves <- item.ID + } } } } diff --git a/nip77/nip77.go b/nip77/nip77.go index 91075e8..e858adc 100644 --- a/nip77/nip77.go +++ b/nip77/nip77.go @@ -34,7 +34,7 @@ func NegentropySync( // handle ids received on each direction, usually called with Sync() so the corresponding events are // fetched from the source and published to the target - handle func(ctx context.Context, wg *sync.WaitGroup, directions []Direction), + handle func(ctx context.Context, directions Direction), ) error { id := "nl-tmp" // for now we can't have more than one subscription in the same connection @@ -76,23 +76,6 @@ func NegentropySync( return err } - // setup sync flows: up, down or both - directions := make([]Direction, 0, 2) - if source != nil { - directions = append(directions, Direction{ - From: source, - To: relay, - Items: neg.Haves, - }) - } - if target != nil { - directions = append(directions, Direction{ - From: relay, - To: target, - Items: neg.HaveNots, - }) - } - // fill our local vector var usedSource nostr.Querier if source != nil { @@ -125,8 +108,25 @@ func NegentropySync( wg := sync.WaitGroup{} - // handle emitted events - wg.Go(func() { handle(ctx, &wg, directions) }) + // handle emitted events from either direction + if source != nil { + wg.Go(func() { + handle(ctx, Direction{ + From: source, + To: relay, + Items: neg.Haves, + }) + }) + } + if target != nil { + wg.Go(func() { + handle(ctx, Direction{ + From: relay, + To: target, + Items: neg.HaveNots, + }) + }) + } go func() { wg.Wait() @@ -141,41 +141,27 @@ func NegentropySync( return nil } -func SyncEventsFromIDs(ctx context.Context, wg *sync.WaitGroup, directions []Direction) { - pool := sync.Pool{ - New: func() any { return make([]nostr.ID, 0, 50) }, +func SyncEventsFromIDs(ctx context.Context, dir Direction) { + // this is only necessary because relays are too ratelimiting + batch := make([]nostr.ID, 0, 50) + + seen := make(map[nostr.ID]struct{}) + for item := range dir.Items { + if _, ok := seen[item]; ok { + continue + } + seen[item] = struct{}{} + + batch = append(batch, item) + if len(batch) == 50 { + for evt := range dir.From.QueryEvents(nostr.Filter{IDs: batch}) { + dir.To.Publish(ctx, evt) + } + batch = batch[:0] + } } - for _, dir := range directions { - wg.Go(func() { - seen := make(map[nostr.ID]struct{}) - - doSync := func(ids []nostr.ID) { - defer pool.Put(ids) - - if len(ids) == 0 { - return - } - for evt := range dir.From.QueryEvents(nostr.Filter{IDs: ids}) { - dir.To.Publish(ctx, evt) - } - } - - ids := pool.Get().([]nostr.ID) - for item := range dir.Items { - if _, ok := seen[item]; ok { - continue - } - seen[item] = struct{}{} - - ids = append(ids, item) - if len(ids) == 50 { - wg.Add(1) - wg.Go(func() { doSync(ids) }) - ids = pool.Get().([]nostr.ID) - } - } - wg.Go(func() { doSync(ids) }) - }) + for evt := range dir.From.QueryEvents(nostr.Filter{IDs: batch}) { + dir.To.Publish(ctx, evt) } } diff --git a/relay.go b/relay.go index f841744..d81c279 100644 --- a/relay.go +++ b/relay.go @@ -157,7 +157,7 @@ func (r *Relay) handleMessage(message string) { envelope, err := ParseMessage(message) if envelope == nil { if r.customHandler != nil && err == UnknownLabel { - r.customHandler(message) + go r.customHandler(message) } return } @@ -230,6 +230,7 @@ func (r *Relay) Write(msg []byte) { return default: } + select { case <-r.connectionContext.Done(): case r.writeQueue <- writeRequest{msg: msg, answer: nil}: