From 98dbe7b9e99aceee137784818f4587775209e8ce Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Wed, 12 Nov 2025 16:27:51 -0300 Subject: [PATCH] improve and make nip77 api nicer so it could be better used in nak. --- eventstore/cmd/eventstore/neg.go | 2 +- khatru/handlers.go | 2 +- nip77/example/example.go | 2 +- nip77/idsonly.go | 70 ------------------ nip77/negentropy/negentropy.go | 2 - nip77/nip77.go | 122 +++++++++++++++---------------- 6 files changed, 63 insertions(+), 137 deletions(-) delete mode 100644 nip77/idsonly.go diff --git a/eventstore/cmd/eventstore/neg.go b/eventstore/cmd/eventstore/neg.go index 6ec5f85..c67b5ce 100644 --- a/eventstore/cmd/eventstore/neg.go +++ b/eventstore/cmd/eventstore/neg.go @@ -43,7 +43,7 @@ var neg = &cli.Command{ // create negentropy object and initialize it with events vec := vector.New() - neg := negentropy.New(vec, frameSizeLimit) + neg := negentropy.New(vec, frameSizeLimit, true, true) for evt := range db.QueryEvents(filter, math.MaxInt) { vec.Insert(evt.CreatedAt, evt.ID) } diff --git a/khatru/handlers.go b/khatru/handlers.go index 9e69040..50e3055 100644 --- a/khatru/handlers.go +++ b/khatru/handlers.go @@ -365,7 +365,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { } // reconcile to get the next message and return it - neg := negentropy.New(vec, 1024*1024) + neg := negentropy.New(vec, 1024*1024, false, false) out, err := neg.Reconcile(env.Message) if err != nil { ws.WriteJSON(nip77.ErrorEnvelope{SubscriptionID: env.SubscriptionID, Reason: err.Error()}) diff --git a/nip77/example/example.go b/nip77/example/example.go index 582a568..06217c4 100644 --- a/nip77/example/example.go +++ b/nip77/example/example.go @@ -47,7 +47,7 @@ func main() { } err := nip77.NegentropySync(ctx, - local, "ws://localhost:7777", nostr.Filter{}, nip77.Both) + "ws://localhost:7777", nostr.Filter{}, local, local, nip77.SyncEventsFromIDs) if err != nil { panic(err) } diff --git a/nip77/idsonly.go b/nip77/idsonly.go deleted file mode 100644 index fb247c0..0000000 --- a/nip77/idsonly.go +++ /dev/null @@ -1,70 +0,0 @@ -package nip77 - -import ( - "context" - "fmt" - - "fiatjaf.com/nostr" - "fiatjaf.com/nostr/nip77/negentropy" - "fiatjaf.com/nostr/nip77/negentropy/storage/empty" -) - -func FetchIDsOnly( - ctx context.Context, - url string, - filter nostr.Filter, -) (<-chan nostr.ID, error) { - id := "nl-tmp" // for now we can't have more than one subscription in the same connection - - neg := negentropy.New(empty.Empty{}, 1024*1024) - result := make(chan error) - - var r *nostr.Relay - r, err := nostr.RelayConnect(ctx, url, nostr.RelayOptions{CustomHandler: func(data string) { - envelope := ParseNegMessage(data) - if envelope == nil { - return - } - switch env := envelope.(type) { - case *OpenEnvelope, *CloseEnvelope: - result <- fmt.Errorf("unexpected %s received from relay", env.Label()) - return - case *ErrorEnvelope: - result <- fmt.Errorf("relay returned a %s: %s", env.Label(), env.Reason) - return - case *MessageEnvelope: - nextmsg, err := neg.Reconcile(env.Message) - if err != nil { - result <- fmt.Errorf("failed to reconcile: %w", err) - return - } - - if nextmsg != "" { - msgb, _ := MessageEnvelope{id, nextmsg}.MarshalJSON() - r.Write(msgb) - } - } - }}) - if err != nil { - return nil, err - } - - msg := neg.Start() - open, _ := OpenEnvelope{id, filter, msg}.MarshalJSON() - err = r.WriteWithError(open) - if err != nil { - return nil, fmt.Errorf("failed to write to relay: %w", err) - } - - ch := make(chan nostr.ID) - go func() { - for id := range neg.HaveNots { - ch <- id - } - clse, _ := CloseEnvelope{id}.MarshalJSON() - r.Write(clse) - close(ch) - }() - - return ch, nil -} diff --git a/nip77/negentropy/negentropy.go b/nip77/negentropy/negentropy.go index 3a34736..89c3051 100644 --- a/nip77/negentropy/negentropy.go +++ b/nip77/negentropy/negentropy.go @@ -203,8 +203,6 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader) ([]byte, error) { if n.isClient && n.HaveNots != nil { // notify client of what they have and we don't for id := range theirItems { - fmt.Println(" their:", id) - // skip empty strings here because those were marked to be excluded as such in the previous step n.HaveNots <- id } diff --git a/nip77/nip77.go b/nip77/nip77.go index 3333593..91075e8 100644 --- a/nip77/nip77.go +++ b/nip77/nip77.go @@ -10,17 +10,17 @@ import ( "fiatjaf.com/nostr/nip77/negentropy/storage/vector" ) -type direction struct { - label string // for debugging only - from nostr.Querier - to nostr.Publisher - items chan nostr.ID +type Direction struct { + From nostr.Querier + To nostr.Publisher + Items chan nostr.ID } func NegentropySync( ctx context.Context, - filter nostr.Filter, + relayUrl string, + filter nostr.Filter, // where our local events will be read from. // if it is nil the sync will be unidirectional: download-only. @@ -31,6 +31,10 @@ func NegentropySync( // it can also be a nostr.QuerierPublisher in case source isn't provided // and you need a download-only sync that respects local data. target nostr.Publisher, + + // handle ids received on each direction, usually called with Sync() so the corresponding events are + // fetched from the source and published to the target + handle func(ctx context.Context, wg *sync.WaitGroup, directions []Direction), ) error { id := "nl-tmp" // for now we can't have more than one subscription in the same connection @@ -73,19 +77,19 @@ func NegentropySync( } // setup sync flows: up, down or both - directions := make([]direction, 0, 2) + directions := make([]Direction, 0, 2) if source != nil { - directions = append(directions, direction{ - from: source, - to: relay, - items: neg.Haves, + directions = append(directions, Direction{ + From: source, + To: relay, + Items: neg.Haves, }) } if target != nil { - directions = append(directions, direction{ - from: relay, - to: target, - items: neg.HaveNots, + directions = append(directions, Direction{ + From: relay, + To: target, + Items: neg.HaveNots, }) } @@ -120,54 +124,9 @@ func NegentropySync( }() wg := sync.WaitGroup{} - pool := sync.Pool{ - New: func() any { return make([]nostr.ID, 0, 50) }, - } - for _, dir := range directions { - wg.Add(1) - go func(dir direction) { - fmt.Println("> dir", dir.label) - - defer wg.Done() - - seen := make(map[nostr.ID]struct{}) - - doSync := func(ids []nostr.ID) { - defer wg.Done() - defer pool.Put(ids) - - if len(ids) == 0 { - return - } - for evt := range dir.from.QueryEvents(nostr.Filter{IDs: ids}) { - dir.to.Publish(ctx, evt) - } - } - - ids := pool.Get().([]nostr.ID) - for item := range dir.items { - fmt.Println(">>>", item) - if _, ok := seen[item]; ok { - continue - } - seen[item] = struct{}{} - - fmt.Println(">>>>>", 0) - ids = append(ids, item) - if len(ids) == 50 { - wg.Add(1) - go doSync(ids) - fmt.Println(">>>>>", 1) - ids = pool.Get().([]nostr.ID) - } - fmt.Println(">>>>>", 2) - } - fmt.Println("> ?") - wg.Add(1) - doSync(ids) - }(dir) - } + // handle emitted events + wg.Go(func() { handle(ctx, &wg, directions) }) go func() { wg.Wait() @@ -181,3 +140,42 @@ func NegentropySync( return nil } + +func SyncEventsFromIDs(ctx context.Context, wg *sync.WaitGroup, directions []Direction) { + pool := sync.Pool{ + New: func() any { return make([]nostr.ID, 0, 50) }, + } + + for _, dir := range directions { + wg.Go(func() { + seen := make(map[nostr.ID]struct{}) + + doSync := func(ids []nostr.ID) { + defer pool.Put(ids) + + if len(ids) == 0 { + return + } + for evt := range dir.From.QueryEvents(nostr.Filter{IDs: ids}) { + dir.To.Publish(ctx, evt) + } + } + + ids := pool.Get().([]nostr.ID) + for item := range dir.Items { + if _, ok := seen[item]; ok { + continue + } + seen[item] = struct{}{} + + ids = append(ids, item) + if len(ids) == 50 { + wg.Add(1) + wg.Go(func() { doSync(ids) }) + ids = pool.Get().([]nostr.ID) + } + } + wg.Go(func() { doSync(ids) }) + }) + } +}