more conversions.
This commit is contained in:
@@ -8,7 +8,7 @@ import (
|
||||
|
||||
"github.com/mailru/easyjson"
|
||||
jwriter "github.com/mailru/easyjson/jwriter"
|
||||
"fiatjaf.com/nostrlib"
|
||||
"fiatjaf.com/nostr"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
|
||||
@@ -7,8 +7,8 @@ import (
|
||||
|
||||
"github.com/fiatjaf/eventstore"
|
||||
"github.com/fiatjaf/eventstore/slicestore"
|
||||
"fiatjaf.com/nostrlib"
|
||||
"fiatjaf.com/nostrlib/nip77"
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/nip77"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
package nip77
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type idlistpool struct {
|
||||
initialsize int
|
||||
pool [][]string
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func newidlistpool(initialsize int) *idlistpool {
|
||||
ilp := idlistpool{
|
||||
initialsize: initialsize,
|
||||
pool: make([][]string, 1, 2),
|
||||
}
|
||||
|
||||
ilp.pool[0] = make([]string, 0, initialsize)
|
||||
|
||||
return &ilp
|
||||
}
|
||||
|
||||
func (ilp *idlistpool) grab() []string {
|
||||
ilp.Lock()
|
||||
defer ilp.Unlock()
|
||||
|
||||
l := len(ilp.pool)
|
||||
if l > 0 {
|
||||
idlist := ilp.pool[l-1]
|
||||
ilp.pool = ilp.pool[0 : l-1]
|
||||
return idlist
|
||||
}
|
||||
idlist := make([]string, 0, ilp.initialsize)
|
||||
return idlist
|
||||
}
|
||||
|
||||
func (ilp *idlistpool) giveback(idlist []string) {
|
||||
idlist = idlist[:0]
|
||||
ilp.pool = append(ilp.pool, idlist)
|
||||
}
|
||||
@@ -4,16 +4,16 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
"github.com/nbd-wtf/go-nostr/nip77/negentropy"
|
||||
"github.com/nbd-wtf/go-nostr/nip77/negentropy/storage/empty"
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/nip77/negentropy"
|
||||
"fiatjaf.com/nostr/nip77/negentropy/storage/empty"
|
||||
)
|
||||
|
||||
func FetchIDsOnly(
|
||||
ctx context.Context,
|
||||
url string,
|
||||
filter nostr.Filter,
|
||||
) (<-chan string, error) {
|
||||
) (<-chan nostr.ID, error) {
|
||||
id := "go-nostr-tmp" // for now we can't have more than one subscription in the same connection
|
||||
|
||||
neg := negentropy.New(empty.Empty{}, 1024*1024)
|
||||
@@ -56,7 +56,7 @@ func FetchIDsOnly(
|
||||
return nil, fmt.Errorf("failed to write to relay: %w", err)
|
||||
}
|
||||
|
||||
ch := make(chan string)
|
||||
ch := make(chan nostr.ID)
|
||||
go func() {
|
||||
for id := range neg.HaveNots {
|
||||
ch <- id
|
||||
|
||||
@@ -3,7 +3,7 @@ package negentropy
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"fiatjaf.com/nostrlib"
|
||||
"fiatjaf.com/nostr"
|
||||
)
|
||||
|
||||
func (n *Negentropy) readTimestamp(reader *StringHexReader) (nostr.Timestamp, error) {
|
||||
@@ -42,12 +42,12 @@ func (n *Negentropy) readBound(reader *StringHexReader) (Bound, error) {
|
||||
return Bound{}, fmt.Errorf("failed to decode bound length: %w", err)
|
||||
}
|
||||
|
||||
id, err := reader.ReadString(length * 2)
|
||||
if err != nil {
|
||||
pfb := make([]byte, length)
|
||||
if err := reader.ReadHexBytes(pfb); err != nil {
|
||||
return Bound{}, fmt.Errorf("failed to read bound id: %w", err)
|
||||
}
|
||||
|
||||
return Bound{Item{timestamp, id}}, nil
|
||||
return Bound{timestamp, pfb}, nil
|
||||
}
|
||||
|
||||
func (n *Negentropy) writeTimestamp(w *StringHexWriter, timestamp nostr.Timestamp) {
|
||||
@@ -71,26 +71,25 @@ func (n *Negentropy) writeTimestamp(w *StringHexWriter, timestamp nostr.Timestam
|
||||
|
||||
func (n *Negentropy) writeBound(w *StringHexWriter, bound Bound) {
|
||||
n.writeTimestamp(w, bound.Timestamp)
|
||||
writeVarInt(w, len(bound.ID)/2)
|
||||
w.WriteHex(bound.Item.ID)
|
||||
writeVarInt(w, len(bound.IDPrefix))
|
||||
w.WriteBytes(bound.IDPrefix)
|
||||
}
|
||||
|
||||
func getMinimalBound(prev, curr Item) Bound {
|
||||
if curr.Timestamp != prev.Timestamp {
|
||||
return Bound{Item{curr.Timestamp, ""}}
|
||||
return Bound{curr.Timestamp, nil}
|
||||
}
|
||||
|
||||
sharedPrefixBytes := 0
|
||||
|
||||
for i := 0; i < 32; i += 2 {
|
||||
if curr.ID[i:i+2] != prev.ID[i:i+2] {
|
||||
for i := 0; i < 31; i++ {
|
||||
if curr.ID[i] != prev.ID[i] {
|
||||
break
|
||||
}
|
||||
sharedPrefixBytes++
|
||||
}
|
||||
|
||||
// sharedPrefixBytes + 1 to include the first differing byte, or the entire ID if identical.
|
||||
return Bound{Item{curr.Timestamp, curr.ID[:(sharedPrefixBytes+1)*2]}}
|
||||
return Bound{curr.Timestamp, curr.ID[:(sharedPrefixBytes + 1)]}
|
||||
}
|
||||
|
||||
func readVarInt(reader *StringHexReader) (int, error) {
|
||||
|
||||
@@ -9,9 +9,9 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"fiatjaf.com/nostrlib"
|
||||
"fiatjaf.com/nostrlib/nip77/negentropy"
|
||||
"fiatjaf.com/nostrlib/nip77/negentropy/storage/vector"
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/nip77/negentropy"
|
||||
"fiatjaf.com/nostr/nip77/negentropy/storage/vector"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
package negentropy
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"unsafe"
|
||||
|
||||
"fiatjaf.com/nostrlib"
|
||||
"fiatjaf.com/nostr"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -15,7 +16,7 @@ const (
|
||||
buckets = 16
|
||||
)
|
||||
|
||||
var InfiniteBound = Bound{Item: Item{Timestamp: maxTimestamp}}
|
||||
var InfiniteBound = Bound{Timestamp: maxTimestamp}
|
||||
|
||||
type Negentropy struct {
|
||||
storage Storage
|
||||
@@ -25,8 +26,8 @@ type Negentropy struct {
|
||||
lastTimestampIn nostr.Timestamp
|
||||
lastTimestampOut nostr.Timestamp
|
||||
|
||||
Haves chan string
|
||||
HaveNots chan string
|
||||
Haves chan nostr.ID
|
||||
HaveNots chan nostr.ID
|
||||
}
|
||||
|
||||
func New(storage Storage, frameSizeLimit int) *Negentropy {
|
||||
@@ -39,8 +40,8 @@ func New(storage Storage, frameSizeLimit int) *Negentropy {
|
||||
return &Negentropy{
|
||||
storage: storage,
|
||||
frameSizeLimit: frameSizeLimit,
|
||||
Haves: make(chan string, buckets*4),
|
||||
HaveNots: make(chan string, buckets*4),
|
||||
Haves: make(chan nostr.ID, buckets*4),
|
||||
HaveNots: make(chan nostr.ID, buckets*4),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,9 +159,10 @@ func (n *Negentropy) reconcileAux(reader *StringHexReader) (string, error) {
|
||||
}
|
||||
|
||||
// what they have
|
||||
theirItems := make(map[string]struct{}, numIds)
|
||||
theirItems := make(map[nostr.ID]struct{}, numIds)
|
||||
for i := 0; i < numIds; i++ {
|
||||
if id, err := reader.ReadString(64); err != nil {
|
||||
var id [32]byte
|
||||
if err := reader.ReadHexBytes(id[:]); err != nil {
|
||||
return "", fmt.Errorf("failed to read id (#%d/%d) in list: %w", i, numIds, err)
|
||||
} else {
|
||||
theirItems[id] = struct{}{}
|
||||
@@ -203,11 +205,11 @@ func (n *Negentropy) reconcileAux(reader *StringHexReader) (string, error) {
|
||||
|
||||
for index, item := range n.storage.Range(lower, upper) {
|
||||
if n.frameSizeLimit-200 < fullOutput.Len()/2+responseIds.Len()/2 {
|
||||
endBound = Bound{item}
|
||||
endBound = Bound{item.Timestamp, item.ID[:]}
|
||||
upper = index
|
||||
break
|
||||
}
|
||||
responseIds.WriteString(item.ID)
|
||||
responseIds.WriteString(hex.EncodeToString(item.ID[:]))
|
||||
responses++
|
||||
}
|
||||
|
||||
@@ -254,7 +256,7 @@ func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *Stri
|
||||
writeVarInt(output, numElems)
|
||||
|
||||
for _, item := range n.storage.Range(lower, upper) {
|
||||
output.WriteHex(item.ID)
|
||||
output.WriteBytes(item.ID[:])
|
||||
}
|
||||
} else {
|
||||
itemsPerBucket := numElems / buckets
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
|
||||
"fiatjaf.com/nostrlib/nip77/negentropy"
|
||||
"fiatjaf.com/nostr/nip77/negentropy"
|
||||
)
|
||||
|
||||
type Accumulator struct {
|
||||
|
||||
@@ -3,8 +3,8 @@ package empty
|
||||
import (
|
||||
"iter"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr/nip77/negentropy"
|
||||
"github.com/nbd-wtf/go-nostr/nip77/negentropy/storage"
|
||||
"fiatjaf.com/nostr/nip77/negentropy"
|
||||
"fiatjaf.com/nostr/nip77/negentropy/storage"
|
||||
)
|
||||
|
||||
var acc storage.Accumulator
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package vector
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"iter"
|
||||
"slices"
|
||||
@@ -24,7 +23,7 @@ func New() *Vector {
|
||||
}
|
||||
}
|
||||
|
||||
func (v *Vector) Insert(createdAt nostr.Timestamp, id string) {
|
||||
func (v *Vector) Insert(createdAt nostr.Timestamp, id nostr.ID) {
|
||||
if len(id) != 64 {
|
||||
panic(fmt.Errorf("bad id size for added item: expected %d bytes, got %d", 32, len(id)/2))
|
||||
}
|
||||
@@ -68,10 +67,8 @@ func (v *Vector) FindLowerBound(begin, end int, bound negentropy.Bound) int {
|
||||
func (v *Vector) Fingerprint(begin, end int) string {
|
||||
v.acc.Reset()
|
||||
|
||||
tmp := make([]byte, 32)
|
||||
for _, item := range v.Range(begin, end) {
|
||||
hex.Decode(tmp, []byte(item.ID))
|
||||
v.acc.AddBytes(tmp)
|
||||
v.acc.AddBytes(item.ID[:])
|
||||
}
|
||||
|
||||
return v.acc.GetFingerprint(end - begin)
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
package negentropy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"cmp"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"fiatjaf.com/nostrlib"
|
||||
"fiatjaf.com/nostr"
|
||||
)
|
||||
|
||||
const FingerprintSize = 16
|
||||
@@ -33,23 +33,26 @@ func (v Mode) String() string {
|
||||
|
||||
type Item struct {
|
||||
Timestamp nostr.Timestamp
|
||||
ID string
|
||||
ID nostr.ID
|
||||
}
|
||||
|
||||
func ItemCompare(a, b Item) int {
|
||||
if a.Timestamp == b.Timestamp {
|
||||
return strings.Compare(a.ID, b.ID)
|
||||
return bytes.Compare(a.ID[:], b.ID[:])
|
||||
}
|
||||
return cmp.Compare(a.Timestamp, b.Timestamp)
|
||||
}
|
||||
|
||||
func (i Item) String() string { return fmt.Sprintf("Item<%d:%s>", i.Timestamp, i.ID) }
|
||||
func (i Item) String() string { return fmt.Sprintf("Item<%d:%x>", i.Timestamp, i.ID[:]) }
|
||||
|
||||
type Bound struct{ Item }
|
||||
type Bound struct {
|
||||
Timestamp nostr.Timestamp
|
||||
IDPrefix []byte
|
||||
}
|
||||
|
||||
func (b Bound) String() string {
|
||||
if b.Timestamp == InfiniteBound.Timestamp {
|
||||
return "Bound<infinite>"
|
||||
}
|
||||
return fmt.Sprintf("Bound<%d:%s>", b.Timestamp, b.ID)
|
||||
return fmt.Sprintf("Bound<%d:%x>", b.Timestamp, b.IDPrefix)
|
||||
}
|
||||
|
||||
@@ -6,9 +6,9 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"fiatjaf.com/nostrlib"
|
||||
"fiatjaf.com/nostrlib/nip77/negentropy"
|
||||
"fiatjaf.com/nostrlib/nip77/negentropy/storage/vector"
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/nip77/negentropy"
|
||||
"fiatjaf.com/nostr/nip77/negentropy/storage/vector"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
||||
@@ -5,14 +5,14 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"fiatjaf.com/nostrlib"
|
||||
"fiatjaf.com/nostrlib/nip77/negentropy"
|
||||
"fiatjaf.com/nostrlib/nip77/negentropy/storage/vector"
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/nip77/negentropy"
|
||||
"fiatjaf.com/nostr/nip77/negentropy/storage/vector"
|
||||
)
|
||||
|
||||
type direction struct {
|
||||
label string
|
||||
items chan string
|
||||
items chan nostr.ID
|
||||
source nostr.RelayStore
|
||||
target nostr.RelayStore
|
||||
}
|
||||
@@ -91,7 +91,9 @@ func NegentropySync(
|
||||
}()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
pool := newidlistpool(50)
|
||||
pool := sync.Pool{
|
||||
New: func() any { return make([]nostr.ID, 0, 50) },
|
||||
}
|
||||
|
||||
// Define sync directions
|
||||
directions := [][]direction{
|
||||
@@ -105,11 +107,11 @@ func NegentropySync(
|
||||
go func(dir direction) {
|
||||
defer wg.Done()
|
||||
|
||||
seen := make(map[string]struct{})
|
||||
seen := make(map[nostr.ID]struct{})
|
||||
|
||||
doSync := func(ids []string) {
|
||||
doSync := func(ids []nostr.ID) {
|
||||
defer wg.Done()
|
||||
defer pool.giveback(ids)
|
||||
defer pool.Put(ids)
|
||||
|
||||
if len(ids) == 0 {
|
||||
return
|
||||
@@ -124,7 +126,7 @@ func NegentropySync(
|
||||
}
|
||||
}
|
||||
|
||||
ids := pool.grab()
|
||||
ids := pool.Get().([]nostr.ID)
|
||||
for item := range dir.items {
|
||||
if _, ok := seen[item]; ok {
|
||||
continue
|
||||
@@ -135,7 +137,7 @@ func NegentropySync(
|
||||
if len(ids) == 50 {
|
||||
wg.Add(1)
|
||||
go doSync(ids)
|
||||
ids = pool.grab()
|
||||
ids = pool.Get().([]nostr.ID)
|
||||
}
|
||||
}
|
||||
wg.Add(1)
|
||||
|
||||
Reference in New Issue
Block a user