it never ends.

This commit is contained in:
fiatjaf
2025-04-16 02:59:47 -03:00
parent cb0dd45a32
commit 5b8954461f
53 changed files with 396 additions and 673 deletions

View File

@@ -0,0 +1,35 @@
package vector
import (
"bytes"
"cmp"
"fiatjaf.com/nostr/nip77/negentropy"
)
func itemCompare(a, b negentropy.Item) int {
if a.Timestamp == b.Timestamp {
return bytes.Compare(a.ID[:], b.ID[:])
}
return cmp.Compare(a.Timestamp, b.Timestamp)
}
// binary search with custom function
func searchItemWithBound(items []negentropy.Item, bound negentropy.Bound) int {
n := len(items)
// Define x[-1] < target and x[n] >= target.
// Invariant: x[i-1] < target, x[j] >= target.
i, j := 0, n
for i < j {
h := int(uint(i+j) >> 1) // avoid overflow when computing h
// i ≤ h < j
if items[h].Timestamp < bound.Timestamp ||
(items[h].Timestamp == bound.Timestamp && bytes.Compare(items[h].ID[:], bound.IDPrefix) == -1) {
i = h + 1 // preserves x[i-1] < target
} else {
j = h // preserves x[j] >= target
}
}
// i == j, x[i-1] < target, and x[j] (= x[i]) >= target => answer is i.
return i
}

View File

@@ -1,7 +1,6 @@
package vector
import (
"fmt"
"iter"
"slices"
@@ -24,10 +23,6 @@ func New() *Vector {
}
func (v *Vector) Insert(createdAt nostr.Timestamp, id nostr.ID) {
if len(id) != 64 {
panic(fmt.Errorf("bad id size for added item: expected %d bytes, got %d", 32, len(id)/2))
}
item := negentropy.Item{Timestamp: createdAt, ID: id}
v.items = append(v.items, item)
}
@@ -39,12 +34,12 @@ func (v *Vector) Seal() {
panic("trying to seal an already sealed vector")
}
v.sealed = true
slices.SortFunc(v.items, negentropy.ItemCompare)
slices.SortFunc(v.items, itemCompare)
}
func (v *Vector) GetBound(idx int) negentropy.Bound {
if idx < len(v.items) {
return negentropy.Bound{Item: v.items[idx]}
return negentropy.Bound{Timestamp: v.items[idx].Timestamp, IDPrefix: v.items[idx].ID[:]}
}
return negentropy.InfiniteBound
}
@@ -60,7 +55,7 @@ func (v *Vector) Range(begin, end int) iter.Seq2[int, negentropy.Item] {
}
func (v *Vector) FindLowerBound(begin, end int, bound negentropy.Bound) int {
idx, _ := slices.BinarySearchFunc(v.items[begin:end], bound.Item, negentropy.ItemCompare)
idx := searchItemWithBound(v.items[begin:end], bound)
return begin + idx
}

View File

@@ -1,8 +1,6 @@
package negentropy
import (
"bytes"
"cmp"
"fmt"
"fiatjaf.com/nostr"
@@ -36,13 +34,6 @@ type Item struct {
ID nostr.ID
}
func ItemCompare(a, b Item) int {
if a.Timestamp == b.Timestamp {
return bytes.Compare(a.ID[:], b.ID[:])
}
return cmp.Compare(a.Timestamp, b.Timestamp)
}
func (i Item) String() string { return fmt.Sprintf("Item<%d:%x>", i.Timestamp, i.ID[:]) }
type Bound struct {

View File

@@ -13,8 +13,8 @@ import (
type direction struct {
label string
items chan nostr.ID
source nostr.RelayStore
target nostr.RelayStore
source nostr.QuerierPublisher
target nostr.QuerierPublisher
}
type Direction int
@@ -27,21 +27,21 @@ const (
func NegentropySync(
ctx context.Context,
store nostr.RelayStore,
store nostr.QuerierPublisher,
url string,
filter nostr.Filter,
dir Direction,
) error {
id := "go-nostr-tmp" // for now we can't have more than one subscription in the same connection
data, err := store.QuerySync(ctx, filter)
if err != nil {
return fmt.Errorf("failed to query our local store: %w", err)
}
id := "nl-tmp" // for now we can't have more than one subscription in the same connection
vec := vector.New()
neg := negentropy.New(vec, 1024*1024)
for _, evt := range data {
ch, err := store.QueryEvents(ctx, filter)
if err != nil {
return err
}
for evt := range ch {
vec.Insert(evt.CreatedAt, evt.ID)
}
vec.Seal()
@@ -49,31 +49,33 @@ func NegentropySync(
result := make(chan error)
var r *nostr.Relay
r, err = nostr.RelayConnect(ctx, url, nostr.WithCustomHandler(func(data string) {
envelope := ParseNegMessage(data)
if envelope == nil {
return
}
switch env := envelope.(type) {
case *OpenEnvelope, *CloseEnvelope:
result <- fmt.Errorf("unexpected %s received from relay", env.Label())
return
case *ErrorEnvelope:
result <- fmt.Errorf("relay returned a %s: %s", env.Label(), env.Reason)
return
case *MessageEnvelope:
nextmsg, err := neg.Reconcile(env.Message)
if err != nil {
result <- fmt.Errorf("failed to reconcile: %w", err)
r, err = nostr.RelayConnect(ctx, url, nostr.RelayOptions{
CustomHandler: func(data string) {
envelope := ParseNegMessage(data)
if envelope == nil {
return
}
switch env := envelope.(type) {
case *OpenEnvelope, *CloseEnvelope:
result <- fmt.Errorf("unexpected %s received from relay", env.Label())
return
case *ErrorEnvelope:
result <- fmt.Errorf("relay returned a %s: %s", env.Label(), env.Reason)
return
case *MessageEnvelope:
nextmsg, err := neg.Reconcile(env.Message)
if err != nil {
result <- fmt.Errorf("failed to reconcile: %w", err)
return
}
if nextmsg != "" {
msgb, _ := MessageEnvelope{id, nextmsg}.MarshalJSON()
r.Write(msgb)
if nextmsg != "" {
msgb, _ := MessageEnvelope{id, nextmsg}.MarshalJSON()
r.Write(msgb)
}
}
}
}))
},
})
if err != nil {
return err
}
@@ -122,7 +124,7 @@ func NegentropySync(
return
}
for evt := range evtch {
dir.target.Publish(ctx, *evt)
dir.target.Publish(ctx, evt)
}
}