diff --git a/nip77/negentropy/negentropy.go b/nip77/negentropy/negentropy.go index 31966da..3a34736 100644 --- a/nip77/negentropy/negentropy.go +++ b/nip77/negentropy/negentropy.go @@ -31,19 +31,26 @@ type Negentropy struct { HaveNots chan nostr.ID } -func New(storage Storage, frameSizeLimit int) *Negentropy { +func New(storage Storage, frameSizeLimit int, up, down bool) *Negentropy { if frameSizeLimit == 0 { frameSizeLimit = math.MaxInt } else if frameSizeLimit < 4096 { panic(fmt.Errorf("frameSizeLimit can't be smaller than 4096, was %d", frameSizeLimit)) } - return &Negentropy{ + n := &Negentropy{ storage: storage, frameSizeLimit: frameSizeLimit, - Haves: make(chan nostr.ID, buckets*4), - HaveNots: make(chan nostr.ID, buckets*4), } + + if up { + n.Haves = make(chan nostr.ID, buckets*4) + } + if down { + n.HaveNots = make(chan nostr.ID, buckets*4) + } + + return n } func (n *Negentropy) String() string { @@ -83,8 +90,12 @@ func (n *Negentropy) Reconcile(msg string) (string, error) { } if len(output) == 1 && n.isClient { - close(n.Haves) - close(n.HaveNots) + if n.Haves != nil { + close(n.Haves) + } + if n.HaveNots != nil { + close(n.HaveNots) + } return "", nil } @@ -178,22 +189,22 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader) ([]byte, error) { // what we have for _, item := range n.storage.Range(lower, upper) { - id := item.ID - - if _, theyHave := theirItems[id]; theyHave { + if _, theyHave := theirItems[item.ID]; theyHave { // if we have and they have, ignore - delete(theirItems, id) - } else { + delete(theirItems, item.ID) + } else if n.Haves != nil { // if we have and they don't, notify client if n.isClient { - n.Haves <- id + n.Haves <- item.ID } } } - if n.isClient { + if n.isClient && n.HaveNots != nil { // notify client of what they have and we don't for id := range theirItems { + fmt.Println(" their:", id) + // skip empty strings here because those were marked to be excluded as such in the previous step n.HaveNots <- id } diff --git a/nip77/nip77.go b/nip77/nip77.go index 4184ab4..3333593 100644 --- a/nip77/nip77.go +++ b/nip77/nip77.go @@ -11,42 +11,37 @@ import ( ) type direction struct { - label string - items chan nostr.ID - source nostr.QuerierPublisher - target nostr.QuerierPublisher + label string // for debugging only + from nostr.Querier + to nostr.Publisher + items chan nostr.ID } -type Direction int - -const ( - Up = 0 - Down = 1 - Both = 2 -) - func NegentropySync( ctx context.Context, - store nostr.QuerierPublisher, - url string, filter nostr.Filter, - dir Direction, + relayUrl string, + + // where our local events will be read from. + // if it is nil the sync will be unidirectional: download-only. + source nostr.Querier, + + // where new events received from the relay will be written to. + // if it is nil the sync will be unidirectional: upload-only. + // it can also be a nostr.QuerierPublisher in case source isn't provided + // and you need a download-only sync that respects local data. + target nostr.Publisher, ) error { id := "nl-tmp" // for now we can't have more than one subscription in the same connection vec := vector.New() - neg := negentropy.New(vec, 1024*1024) - - for evt := range store.QueryEvents(filter) { - vec.Insert(evt.CreatedAt, evt.ID) - } - vec.Seal() - - result := make(chan error) + neg := negentropy.New(vec, 1024*1024, source != nil, target != nil) + // connect to relay var err error - var r *nostr.Relay - r, err = nostr.RelayConnect(ctx, url, nostr.RelayOptions{ + errch := make(chan error) + var relay *nostr.Relay + relay, err = nostr.RelayConnect(ctx, relayUrl, nostr.RelayOptions{ CustomHandler: func(data string) { envelope := ParseNegMessage(data) if envelope == nil { @@ -54,21 +49,21 @@ func NegentropySync( } switch env := envelope.(type) { case *OpenEnvelope, *CloseEnvelope: - result <- fmt.Errorf("unexpected %s received from relay", env.Label()) + errch <- fmt.Errorf("unexpected %s received from relay", env.Label()) return case *ErrorEnvelope: - result <- fmt.Errorf("relay returned a %s: %s", env.Label(), env.Reason) + errch <- fmt.Errorf("relay returned a %s: %s", env.Label(), env.Reason) return case *MessageEnvelope: nextmsg, err := neg.Reconcile(env.Message) if err != nil { - result <- fmt.Errorf("failed to reconcile: %w", err) + errch <- fmt.Errorf("failed to reconcile: %w", err) return } if nextmsg != "" { msgb, _ := MessageEnvelope{id, nextmsg}.MarshalJSON() - r.Write(msgb) + relay.Write(msgb) } } }, @@ -77,16 +72,51 @@ func NegentropySync( return err } + // setup sync flows: up, down or both + directions := make([]direction, 0, 2) + if source != nil { + directions = append(directions, direction{ + from: source, + to: relay, + items: neg.Haves, + }) + } + if target != nil { + directions = append(directions, direction{ + from: relay, + to: target, + items: neg.HaveNots, + }) + } + + // fill our local vector + var usedSource nostr.Querier + if source != nil { + for evt := range source.QueryEvents(filter) { + vec.Insert(evt.CreatedAt, evt.ID) + } + usedSource = source + } + if target != nil { + if targetSource, ok := target.(nostr.Querier); ok && targetSource != usedSource { + for evt := range source.QueryEvents(filter) { + vec.Insert(evt.CreatedAt, evt.ID) + } + } + } + vec.Seal() + + // kickstart the process msg := neg.Start() open, _ := OpenEnvelope{id, filter, msg}.MarshalJSON() - err = r.WriteWithError(open) + err = relay.WriteWithError(open) if err != nil { return fmt.Errorf("failed to write to relay: %w", err) } defer func() { clse, _ := CloseEnvelope{id}.MarshalJSON() - r.Write(clse) + relay.Write(clse) }() wg := sync.WaitGroup{} @@ -94,16 +124,11 @@ func NegentropySync( New: func() any { return make([]nostr.ID, 0, 50) }, } - // Define sync directions - directions := [][]direction{ - {{"up", neg.Haves, store, r}}, - {{"down", neg.HaveNots, r, store}}, - {{"up", neg.Haves, store, r}, {"down", neg.HaveNots, r, store}}, - } - - for _, dir := range directions[dir] { + for _, dir := range directions { wg.Add(1) go func(dir direction) { + fmt.Println("> dir", dir.label) + defer wg.Done() seen := make(map[nostr.ID]struct{}) @@ -115,25 +140,30 @@ func NegentropySync( if len(ids) == 0 { return } - for evt := range dir.source.QueryEvents(nostr.Filter{IDs: ids}) { - dir.target.Publish(ctx, evt) + for evt := range dir.from.QueryEvents(nostr.Filter{IDs: ids}) { + dir.to.Publish(ctx, evt) } } ids := pool.Get().([]nostr.ID) for item := range dir.items { + fmt.Println(">>>", item) if _, ok := seen[item]; ok { continue } seen[item] = struct{}{} + fmt.Println(">>>>>", 0) ids = append(ids, item) if len(ids) == 50 { wg.Add(1) go doSync(ids) + fmt.Println(">>>>>", 1) ids = pool.Get().([]nostr.ID) } + fmt.Println(">>>>>", 2) } + fmt.Println("> ?") wg.Add(1) doSync(ids) }(dir) @@ -141,10 +171,10 @@ func NegentropySync( go func() { wg.Wait() - result <- nil + errch <- nil }() - err = <-result + err = <-errch if err != nil { return err }