From 668d6fc95694140f5893361f6b21504df4fe9e13 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Mon, 1 Dec 2025 18:08:55 -0300 Subject: [PATCH] nip77: remove third-party syncing from last commit and expose stuff so it can be implemented elsewhere (nak) directly. --- nip77/negentropy/encoding.go | 38 ++--- nip77/negentropy/negentropy.go | 49 +++--- nip77/negentropy/thirdparty.go | 268 --------------------------------- 3 files changed, 48 insertions(+), 307 deletions(-) delete mode 100644 nip77/negentropy/thirdparty.go diff --git a/nip77/negentropy/encoding.go b/nip77/negentropy/encoding.go index 1eee4eb..c501af9 100644 --- a/nip77/negentropy/encoding.go +++ b/nip77/negentropy/encoding.go @@ -7,8 +7,8 @@ import ( "fiatjaf.com/nostr" ) -func (n *Negentropy) readTimestamp(reader *bytes.Reader) (nostr.Timestamp, error) { - delta, err := readVarInt(reader) +func (br *BoundReader) ReadTimestamp(reader *bytes.Reader) (nostr.Timestamp, error) { + delta, err := ReadVarInt(reader) if err != nil { return 0, err } @@ -16,7 +16,7 @@ func (n *Negentropy) readTimestamp(reader *bytes.Reader) (nostr.Timestamp, error if delta == 0 { // zeroes are infinite timestamp := maxTimestamp - n.lastTimestampIn = timestamp + br.lastTimestampIn = timestamp return timestamp, nil } @@ -24,21 +24,21 @@ func (n *Negentropy) readTimestamp(reader *bytes.Reader) (nostr.Timestamp, error delta-- // we add the previously cached timestamp to get the current - timestamp := n.lastTimestampIn + nostr.Timestamp(delta) + timestamp := br.lastTimestampIn + nostr.Timestamp(delta) // cache this so we can apply it to the delta next time - n.lastTimestampIn = timestamp + br.lastTimestampIn = timestamp return timestamp, nil } -func (n *Negentropy) readBound(reader *bytes.Reader) (Bound, error) { - timestamp, err := n.readTimestamp(reader) +func (br *BoundReader) ReadBound(reader *bytes.Reader) (Bound, error) { + timestamp, err := br.ReadTimestamp(reader) if err != nil { return Bound{}, fmt.Errorf("failed to decode bound timestamp: %w", err) } - length, err := readVarInt(reader) + length, err := ReadVarInt(reader) if err != nil { return Bound{}, fmt.Errorf("failed to decode bound length: %w", err) } @@ -51,28 +51,28 @@ func (n *Negentropy) readBound(reader *bytes.Reader) (Bound, error) { return Bound{timestamp, pfb}, nil } -func (n *Negentropy) writeTimestamp(w *bytes.Buffer, timestamp nostr.Timestamp) { +func (bw *BoundWriter) WriteTimestamp(w *bytes.Buffer, timestamp nostr.Timestamp) { if timestamp == maxTimestamp { // zeroes are infinite - n.lastTimestampOut = maxTimestamp // cache this (see below) - writeVarInt(w, 0) + bw.lastTimestampOut = maxTimestamp // cache this (see below) + WriteVarInt(w, 0) return } // we will only encode the difference between this timestamp and the previous - delta := timestamp - n.lastTimestampOut + delta := timestamp - bw.lastTimestampOut // we cache this here as the next timestamp we encode will be just a delta from this - n.lastTimestampOut = timestamp + bw.lastTimestampOut = timestamp // add 1 to prevent zeroes from being read as infinites - writeVarInt(w, int(delta+1)) + WriteVarInt(w, int(delta+1)) return } -func (n *Negentropy) writeBound(w *bytes.Buffer, bound Bound) { - n.writeTimestamp(w, bound.Timestamp) - writeVarInt(w, len(bound.IDPrefix)) +func (bw *BoundWriter) WriteBound(w *bytes.Buffer, bound Bound) { + bw.WriteTimestamp(w, bound.Timestamp) + WriteVarInt(w, len(bound.IDPrefix)) w.Write(bound.IDPrefix) } @@ -93,7 +93,7 @@ func getMinimalBound(prev, curr Item) Bound { return Bound{curr.Timestamp, curr.ID[:(sharedPrefixBytes + 1)]} } -func readVarInt(reader *bytes.Reader) (int, error) { +func ReadVarInt(reader *bytes.Reader) (int, error) { var res int = 0 for { @@ -111,7 +111,7 @@ func readVarInt(reader *bytes.Reader) (int, error) { return res, nil } -func writeVarInt(w *bytes.Buffer, n int) { +func WriteVarInt(w *bytes.Buffer, n int) { if n == 0 { w.WriteByte(0) return diff --git a/nip77/negentropy/negentropy.go b/nip77/negentropy/negentropy.go index 7c8b707..83e07a5 100644 --- a/nip77/negentropy/negentropy.go +++ b/nip77/negentropy/negentropy.go @@ -11,7 +11,7 @@ import ( ) const ( - protocolVersion byte = 0x61 // version 1 + ProtocolVersion byte = 0x61 // version 1 maxTimestamp = nostr.Timestamp(math.MaxInt64) buckets = 16 ) @@ -19,17 +19,26 @@ const ( var InfiniteBound = Bound{Timestamp: maxTimestamp} type Negentropy struct { - storage Storage - initialized bool - frameSizeLimit int - isClient bool - lastTimestampIn nostr.Timestamp - lastTimestampOut nostr.Timestamp + storage Storage + initialized bool + frameSizeLimit int + isClient bool + + BoundWriter + BoundReader Haves chan nostr.ID HaveNots chan nostr.ID } +type BoundReader struct { + lastTimestampIn nostr.Timestamp +} + +type BoundWriter struct { + lastTimestampOut nostr.Timestamp +} + func New(storage Storage, frameSizeLimit int, up, down bool) *Negentropy { if frameSizeLimit == 0 { frameSizeLimit = math.MaxInt @@ -68,7 +77,7 @@ func (n *Negentropy) Start() string { n.isClient = true output := bytes.NewBuffer(make([]byte, 0, 1+n.storage.Size()*64)) - output.WriteByte(protocolVersion) + output.WriteByte(ProtocolVersion) n.SplitRange(0, n.storage.Size(), InfiniteBound, output) return nostr.HexEncodeToString(output.Bytes()) @@ -105,13 +114,13 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader) ([]byte, error) { n.lastTimestampIn, n.lastTimestampOut = 0, 0 // reset for each message fullOutput := bytes.NewBuffer(make([]byte, 0, 5000)) - fullOutput.WriteByte(protocolVersion) + fullOutput.WriteByte(ProtocolVersion) pv, err := reader.ReadByte() if err != nil { return nil, fmt.Errorf("failed to read pv: %w", err) } - if pv != protocolVersion { + if pv != ProtocolVersion { if n.isClient { return nil, fmt.Errorf("unsupported negentropy protocol version %v", pv) } @@ -133,16 +142,16 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader) ([]byte, error) { // end skip range, if necessary, so we can start a new bound that isn't a skip if skipping { skipping = false - n.writeBound(partialOutput, prevBound) + n.WriteBound(partialOutput, prevBound) partialOutput.WriteByte(byte(SkipMode)) } } - currBound, err := n.readBound(reader) + currBound, err := n.ReadBound(reader) if err != nil { return nil, fmt.Errorf("failed to decode bound: %w", err) } - modeVal, err := readVarInt(reader) + modeVal, err := ReadVarInt(reader) if err != nil { return nil, fmt.Errorf("failed to decode mode: %w", err) } @@ -171,7 +180,7 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader) ([]byte, error) { } case IdListMode: - numIds, err := readVarInt(reader) + numIds, err := ReadVarInt(reader) if err != nil { return nil, fmt.Errorf("failed to decode number of ids: %w", err) } @@ -235,9 +244,9 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader) ([]byte, error) { responses++ } - n.writeBound(partialOutput, endBound) + n.WriteBound(partialOutput, endBound) partialOutput.WriteByte(byte(IdListMode)) - writeVarInt(partialOutput, responses) + WriteVarInt(partialOutput, responses) partialOutput.Write(responseIds) io.Copy(fullOutput, partialOutput) @@ -251,7 +260,7 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader) ([]byte, error) { if n.frameSizeLimit-200 < fullOutput.Len()+partialOutput.Len() { // frame size limit exceeded, handle by encoding a boundary and fingerprint for the remaining range remainingFingerprint := n.storage.Fingerprint(upper, n.storage.Size()) - n.writeBound(fullOutput, InfiniteBound) + n.WriteBound(fullOutput, InfiniteBound) fullOutput.WriteByte(byte(FingerprintMode)) fullOutput.Write(remainingFingerprint[:]) @@ -273,9 +282,9 @@ func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *byte if numElems < buckets*2 { // we just send the full ids here - n.writeBound(output, upperBound) + n.WriteBound(output, upperBound) output.WriteByte(byte(IdListMode)) - writeVarInt(output, numElems) + WriteVarInt(output, numElems) for _, item := range n.storage.Range(lower, upper) { output.Write(item.ID[:]) @@ -311,7 +320,7 @@ func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *byte nextBound = minBound } - n.writeBound(output, nextBound) + n.WriteBound(output, nextBound) output.WriteByte(byte(FingerprintMode)) output.Write(ourFingerprint[:]) } diff --git a/nip77/negentropy/thirdparty.go b/nip77/negentropy/thirdparty.go deleted file mode 100644 index 92ecedb..0000000 --- a/nip77/negentropy/thirdparty.go +++ /dev/null @@ -1,268 +0,0 @@ -package negentropy - -import ( - "bytes" - "fmt" - - "fiatjaf.com/nostr" -) - -type ThirdPartyNegentropy struct { - PeerA NegentropyThirdPartyRemote - PeerB NegentropyThirdPartyRemote - Filter nostr.Filter - - Deltas chan Delta -} - -type Delta struct { - ID nostr.ID - Have NegentropyThirdPartyRemote - HaveNot NegentropyThirdPartyRemote -} - -type boundKey string - -func (b Bound) key() boundKey { - return boundKey(fmt.Sprintf("%d:%x", b.Timestamp, b.IDPrefix)) -} - -type NegentropyThirdPartyRemote interface { - SendInitialMessage(filter nostr.Filter, msg string) error - SendMessage(msg string) error - SendClose() error - Receive() (string, error) -} - -func NewThirdPartyNegentropy(peerA, peerB NegentropyThirdPartyRemote, filter nostr.Filter) *ThirdPartyNegentropy { - return &ThirdPartyNegentropy{ - PeerA: peerA, - PeerB: peerB, - Filter: filter, - Deltas: make(chan Delta, 100), - } -} - -func (n *ThirdPartyNegentropy) Start() error { - peerAIds := make(map[nostr.ID]struct{}) - peerBIds := make(map[nostr.ID]struct{}) - peerASkippedBounds := make(map[boundKey]struct{}) - peerBSkippedBounds := make(map[boundKey]struct{}) - - // send an empty message to A to start things up - initialMsg := createInitialMessage() - err := n.PeerA.SendInitialMessage(n.Filter, initialMsg) - if err != nil { - return err - } - - hasSentInitialMessageToB := false - - for { - // receive message from A - msgA, err := n.PeerA.Receive() - if err != nil { - return err - } - msgAb, _ := nostr.HexDecodeString(msgA) - if len(msgAb) == 1 { - break - } - - msgToB, err := parseMessageBuildNext( - msgA, - peerBSkippedBounds, - func(id nostr.ID) { - if _, exists := peerBIds[id]; exists { - delete(peerBIds, id) - } else { - peerAIds[id] = struct{}{} - } - }, - func(boundKey boundKey) { - peerASkippedBounds[boundKey] = struct{}{} - }, - ) - if err != nil { - return err - } - - // emit deltas from B after receiving message from A - for id := range peerBIds { - n.Deltas <- Delta{ID: id, Have: n.PeerB, HaveNot: n.PeerA} - delete(peerBIds, id) - } - - if len(msgToB) == 2 { - // exit condition (no more messages to send) - break - } - - // send message to B - if hasSentInitialMessageToB { - err = n.PeerB.SendMessage(msgToB) - } else { - err = n.PeerB.SendInitialMessage(n.Filter, msgToB) - hasSentInitialMessageToB = true - } - if err != nil { - return err - } - - // receive message from B - msgB, err := n.PeerB.Receive() - if err != nil { - return err - } - msgBb, _ := nostr.HexDecodeString(msgB) - if len(msgBb) == 1 { - break - } - - msgToA, err := parseMessageBuildNext( - msgB, - peerASkippedBounds, - func(id nostr.ID) { - if _, exists := peerAIds[id]; exists { - delete(peerAIds, id) - } else { - peerBIds[id] = struct{}{} - } - }, - func(boundKey boundKey) { - peerBSkippedBounds[boundKey] = struct{}{} - }, - ) - if err != nil { - return err - } - - // emit deltas from A after receiving message from B - for id := range peerAIds { - n.Deltas <- Delta{ID: id, Have: n.PeerA, HaveNot: n.PeerB} - delete(peerAIds, id) - } - - if len(msgToA) == 2 { - // exit condition (no more messages to send) - break - } - - // send message to A - err = n.PeerA.SendMessage(msgToA) - if err != nil { - return err - } - } - - // emit remaining deltas before exit - for id := range peerAIds { - n.Deltas <- Delta{ID: id, Have: n.PeerA, HaveNot: n.PeerB} - } - for id := range peerBIds { - n.Deltas <- Delta{ID: id, Have: n.PeerB, HaveNot: n.PeerA} - } - - n.PeerA.SendClose() - n.PeerB.SendClose() - close(n.Deltas) - - return nil -} - -func createInitialMessage() string { - output := bytes.NewBuffer(make([]byte, 0, 64)) - output.WriteByte(protocolVersion) - writeVarInt(output, 0) // timestamp for infinite - writeVarInt(output, 0) // prefix len - output.WriteByte(byte(IdListMode)) - writeVarInt(output, 0) // num ids - return nostr.HexEncodeToString(output.Bytes()) -} - -func parseMessageBuildNext( - msg string, - skippedBounds map[boundKey]struct{}, - idCallback func(id nostr.ID), - skipCallback func(boundKey boundKey), -) (next string, err error) { - msgb, err := nostr.HexDecodeString(msg) - if err != nil { - return "", err - } - - dummy := &Negentropy{} - nextMsg := bytes.NewBuffer(make([]byte, 0, len(msgb))) - dummy32BytePlaceholder := [32]byte{} - - reader := bytes.NewReader(msgb) - pv, err := reader.ReadByte() - if err != nil { - return "", err - } - if pv != protocolVersion { - return "", fmt.Errorf("unsupported protocol version %v", pv) - } - - nextMsg.WriteByte(pv) - - for reader.Len() > 0 { - bound, err := dummy.readBound(reader) - if err != nil { - return "", err - } - - modeVal, err := readVarInt(reader) - if err != nil { - return "", err - } - mode := Mode(modeVal) - - if _, skipped := skippedBounds[bound.key()]; !skipped { - dummy.writeBound(nextMsg, bound) - writeVarInt(nextMsg, modeVal) - } - - switch mode { - case SkipMode: - skipCallback(bound.key()) - case FingerprintMode: - _, err = reader.Read(dummy32BytePlaceholder[:]) - if err != nil { - return "", err - } - - if _, skipped := skippedBounds[bound.key()]; !skipped { - nextMsg.Write(dummy32BytePlaceholder[:]) - } - case IdListMode: - skipCallback(bound.key()) - - numIds, err := readVarInt(reader) - if err != nil { - return "", err - } - - if _, skipped := skippedBounds[bound.key()]; !skipped { - writeVarInt(nextMsg, numIds) - } - - for range numIds { - _, err = reader.Read(dummy32BytePlaceholder[:]) - if err != nil { - return "", err - } - - idCallback(dummy32BytePlaceholder) - - if _, skipped := skippedBounds[bound.key()]; !skipped { - nextMsg.Write(dummy32BytePlaceholder[:]) - } - } - default: - return "", fmt.Errorf("unknown mode %v", mode) - } - } - - return nostr.HexEncodeToString(nextMsg.Bytes()), nil -}