From 2750ae37511c242a0ffc1855707df3f9c0987e5a Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Mon, 4 Aug 2025 15:16:38 -0300 Subject: [PATCH] lmdb: new querying mechanism. --- eventstore/internal/helpers.go | 33 +-- eventstore/lmdb/count.go | 4 +- eventstore/lmdb/helpers.go | 88 +++++- eventstore/lmdb/query.go | 451 ++++++++----------------------- eventstore/lmdb/query_planner.go | 37 ++- eventstore/lmdb/replace.go | 15 +- eventstore/lmdb/utils_test.go | 27 ++ 7 files changed, 256 insertions(+), 399 deletions(-) create mode 100644 eventstore/lmdb/utils_test.go diff --git a/eventstore/internal/helpers.go b/eventstore/internal/helpers.go index d661c65..f167cf8 100644 --- a/eventstore/internal/helpers.go +++ b/eventstore/internal/helpers.go @@ -2,7 +2,6 @@ package internal import ( "bytes" - "cmp" "math" "slices" @@ -79,14 +78,9 @@ func CopyMapWithoutKey[K comparable, V any](originalMap map[K]V, key K) map[K]V return newMap } -type IterEvent struct { - nostr.Event - Q int -} - // MergeSortMultipleBatches takes the results of multiple iterators, which are already sorted, // and merges them into a single big sorted slice -func MergeSortMultiple(batches [][]IterEvent, limit int, dst []IterEvent) []IterEvent { +func MergeSortMultiple(batches [][]nostr.Event, limit int, dst []nostr.Event) []nostr.Event { // clear up empty lists here while simultaneously computing the total count. // this helps because if there are a bunch of empty lists then this pre-clean // step will get us in the faster 'merge' branch otherwise we would go to the other. @@ -112,15 +106,15 @@ func MergeSortMultiple(batches [][]IterEvent, limit int, dst []IterEvent) []Iter // or not (batches=25, limit=60) if math.Log(float64(len(batches)*2))+math.Log(float64(limit)) < 8 { if dst == nil { - dst = make([]IterEvent, limit) + dst = make([]nostr.Event, limit) } else if cap(dst) < limit { dst = slices.Grow(dst, limit-len(dst)) } dst = dst[0:limit] - return mergesortedslices.MergeFuncNoEmptyListsIntoSlice(dst, batches, compareIterEvent) + return mergesortedslices.MergeFuncNoEmptyListsIntoSlice(dst, batches, nostr.CompareEvent) } else { if dst == nil { - dst = make([]IterEvent, total) + dst = make([]nostr.Event, total) } else if cap(dst) < total { dst = slices.Grow(dst, total-len(dst)) } @@ -133,7 +127,7 @@ func MergeSortMultiple(batches [][]IterEvent, limit int, dst []IterEvent) []Iter lastIndex += len(batch) } - slices.SortFunc(dst, compareIterEvent) + slices.SortFunc(dst, nostr.CompareEvent) for i, j := 0, total-1; i < j; i, j = i+1, j-1 { dst[i], dst[j] = dst[j], dst[i] @@ -164,20 +158,3 @@ func SwapDelete[A any](arr []A, i int) []A { arr[i] = arr[len(arr)-1] return arr[:len(arr)-1] } - -func compareIterEvent(a, b IterEvent) int { - if a.Event.ID == nostr.ZeroID { - if b.Event.ID == nostr.ZeroID { - return 0 - } else { - return -1 - } - } else if b.Event.ID == nostr.ZeroID { - return 1 - } - - if a.CreatedAt == b.CreatedAt { - return slices.Compare(a.ID[:], b.ID[:]) - } - return cmp.Compare(a.CreatedAt, b.CreatedAt) -} diff --git a/eventstore/lmdb/count.go b/eventstore/lmdb/count.go index d555caa..3ba8624 100644 --- a/eventstore/lmdb/count.go +++ b/eventstore/lmdb/count.go @@ -42,7 +42,7 @@ func (b *LMDBBackend) CountEvents(filter nostr.Filter) (uint32, error) { } // "id" indexes don't contain a timestamp - if q.timestampSize == 4 { + if q.dbi != b.indexId { createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:]) if createdAt < since { break @@ -131,7 +131,7 @@ func (b *LMDBBackend) CountEventsHLL(filter nostr.Filter, offset int) (uint32, * } // "id" indexes don't contain a timestamp - if q.timestampSize == 4 { + if q.dbi != b.indexId { createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:]) if createdAt < since { break diff --git a/eventstore/lmdb/helpers.go b/eventstore/lmdb/helpers.go index ad81416..d03768c 100644 --- a/eventstore/lmdb/helpers.go +++ b/eventstore/lmdb/helpers.go @@ -1,6 +1,7 @@ package lmdb import ( + "bytes" "crypto/md5" "encoding/binary" "encoding/hex" @@ -14,14 +15,59 @@ import ( "github.com/PowerDNS/lmdb-go/lmdb" ) -// this iterator always goes backwards type iterator struct { + query query + + // iteration stuff cursor *lmdb.Cursor key []byte valIdx []byte err error + + // this keeps track of last timestamp value pulled from this + last uint32 + + // if we shouldn't fetch more from this + exhausted bool + + // results not yet emitted + idxs [][]byte + timestamps []uint32 } +func (it *iterator) pull(n int, since uint32) { + query := it.query + + for range n { + // in the beginning we already have a k and a v and an err from the cursor setup, so check and use these + if it.err != nil || + len(it.key) != query.keySize || + !bytes.HasPrefix(it.key, query.prefix) { + // either iteration has errored or we reached the end of this prefix + // fmt.Println(" reached end", hex.EncodeToString(it.key), query.keySize, hex.EncodeToString(query.prefix), it.err) + it.exhausted = true + return + } + + createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:]) + if createdAt < since { + // fmt.Println(" reached since", createdAt, "<", since) + it.exhausted = true + return + } + + // got a key + it.idxs = append(it.idxs, it.key) + it.last = createdAt + + // advance the cursor for the next call + it.next() + } + + return +} + +// goes backwards func (it *iterator) seek(key []byte) { if _, _, errsr := it.cursor.Get(key, nil, lmdb.SetRange); errsr != nil { if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound { @@ -38,11 +84,51 @@ func (it *iterator) seek(key []byte) { } } +// goes backwards func (it *iterator) next() { // move one back (we'll look into k and v and err in the next iteration) it.key, it.valIdx, it.err = it.cursor.Get(nil, nil, lmdb.Prev) } +type iterators []iterator + +// quickselect reorders the slice just enough to make the top k elements be arranged at the end +// i.e. [1, 700, 25, 312, 44, 28] with k=3 becomes something like [28, 25, 1, 44, 312, 700] +// in this case it's hardcoded to use the 'last' field of the iterator +func (its iterators) quickselect(left int, right int, k int) { + if right == left { + return + } + + // partition + pivot := its[(right+left)/2].last + l := left + r := right + + for l <= r { + for its[l].last < pivot { + l++ + } + for its[r].last > pivot { + r-- + } + if l >= r { + break + } + its[l].last, its[r].last = its[r].last, its[l].last + r-- + l++ + } + mid := r + // ~ + + if k > mid { + its.quickselect(mid+1, right, k) + } else { + its.quickselect(left, mid, k) + } +} + type key struct { dbi lmdb.DBI key []byte diff --git a/eventstore/lmdb/query.go b/eventstore/lmdb/query.go index da387a4..e3a0fc6 100644 --- a/eventstore/lmdb/query.go +++ b/eventstore/lmdb/query.go @@ -1,11 +1,9 @@ package lmdb import ( - "bytes" - "encoding/binary" - "fmt" "iter" "log" + "math" "slices" "fiatjaf.com/nostr" @@ -20,6 +18,11 @@ func (b *LMDBBackend) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[no return } + if filter.IDs != nil { + // do a special id query + // TODO + } + // max number of events we'll return if tlimit := filter.GetTheoreticalLimit(); tlimit == 0 || filter.LimitZero { return @@ -32,368 +35,130 @@ func (b *LMDBBackend) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[no b.lmdbEnv.View(func(txn *lmdb.Txn) error { txn.RawRead = true - results, err := b.query(txn, filter, maxLimit) - - for _, ie := range results { - if !yield(ie.Event) { - break - } - } - - return err + return b.query(txn, filter, maxLimit, yield) }) } } -func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([]internal.IterEvent, error) { +func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield func(nostr.Event) bool) error { queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := b.prepareQueries(filter) if err != nil { - return nil, err + return err } - iterators := make([]*iterator, len(queries)) - exhausted := make([]bool, len(queries)) // indicates that a query won't be used anymore - results := make([][]internal.IterEvent, len(queries)) - pulledPerQuery := make([]int, len(queries)) + iterators := make(iterators, len(queries)) + batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, len(queries)) - // these are kept updated so we never pull from the iterator that is at further distance - // (i.e. the one that has the oldest event among all) - // we will continue to pull from it as soon as some other iterator takes the position - oldest := internal.IterEvent{Q: -1} - - sndPhase := false // after we have gathered enough events we will change the way we iterate - secondBatch := make([][]internal.IterEvent, 0, len(queries)+1) - sndPhaseParticipants := make([]int, 0, len(queries)+1) - - // while merging results in the second phase we will alternate between these two lists - // to avoid having to create new lists all the time - var sndPhaseResultsA []internal.IterEvent - var sndPhaseResultsB []internal.IterEvent - var sndPhaseResultsToggle bool // this is just a dummy thing we use to keep track of the alternating - var sndPhaseHasResultsPending bool - - remainingUnexhausted := len(queries) // when all queries are exhausted we can finally end this thing - batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted) - firstPhaseTotalPulled := 0 - - exhaust := func(q int) { - exhausted[q] = true - remainingUnexhausted-- - if q == oldest.Q { - oldest = internal.IterEvent{Q: -1} - } - } - - var firstPhaseResults []internal.IterEvent - - for q := range queries { + for q, query := range queries { cursor, err := txn.OpenCursor(queries[q].dbi) if err != nil { - return nil, err + return err } - iterators[q] = &iterator{cursor: cursor} + iterators[q] = iterator{ + query: query, + cursor: cursor, + } + defer cursor.Close() iterators[q].seek(queries[q].startingPoint) - results[q] = make([]internal.IterEvent, 0, batchSizePerQuery*2) } - // fmt.Println("queries", filter, len(queries)) + // initial pull from all queries + for _, it := range iterators { + it.pull(batchSizePerQuery, since) + } - for c := 0; ; c++ { - batchSizePerQuery = internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted) + numberOfIteratorsToPullOnEachRound := max(1, int(math.Ceil(float64(len(iterators))/float64(14)))) + totalEventsEmitted := 0 + tempResults := make([]nostr.Event, 0, batchSizePerQuery*2) - // fmt.Println(" iteration", c, "remaining", remainingUnexhausted, "batchsize", batchSizePerQuery) - // we will go through all the iterators in batches until we have pulled all the required results - for q, query := range queries { - if exhausted[q] { + for len(iterators) > 0 { + // reset stuff + tempResults = tempResults[:0] + + // after pulling from all iterators once we now find out what iterators are + // the ones we should keep pulling from next (i.e. which one's last emitted timestamp is the highest) + iterators.quickselect(min(numberOfIteratorsToPullOnEachRound, len(iterators)), 0, len(iterators)) + + // we now know what is our threshold + threshold := iterators[len(iterators)-1].last + // so we can emit all the events higher than it + for _, it := range iterators { + for t, ts := range it.timestamps { + if ts >= threshold { + idx := it.idxs[t] + + // discard this regardless of what happens + internal.SwapDelete(it.timestamps, t) + internal.SwapDelete(it.idxs, t) + t-- + + // fetch actual event + bin, err := txn.Get(b.rawEventStore, idx) + if err != nil { + log.Printf("lmdb: failed to get %x from raw event store: %s (query prefix=%x, index=%s)\n", + idx, err, it.query.prefix, b.dbiName(it.query.dbi)) + continue + } + + // check it against pubkeys without decoding the entire thing + if extraAuthors != nil && !slices.Contains(extraAuthors, betterbinary.GetPubKey(bin)) { + continue + } + + // check it against kinds without decoding the entire thing + if extraKinds != nil && !slices.Contains(extraKinds, betterbinary.GetKind(bin)) { + continue + } + + // decode the entire thing + event := nostr.Event{} + if err := betterbinary.Unmarshal(bin, &event); err != nil { + log.Printf("lmdb: value read error (id %x) on query prefix %x sp %x dbi %s: %s\n", + betterbinary.GetID(bin), it.query.prefix, it.query.startingPoint, b.dbiName(it.query.dbi), err) + continue + } + + // fmt.Println(" event", betterbinary.GetID(bin), "kind", betterbinary.GetKind(bin).Num(), "author", betterbinary.GetPubKey(bin), "ts", betterbinary.GetCreatedAt(bin), hex.EncodeToString(it.key), it.valIdx) + + // if there is still a tag to be checked, do it now + if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) { + continue + } + + tempResults = append(tempResults, event) + } + } + } + + // emit this stuff in order + slices.SortFunc(tempResults, nostr.CompareEvent) + for _, evt := range tempResults { + if !yield(evt) { + return nil + } + + totalEventsEmitted++ + if totalEventsEmitted == limit { + return nil + } + } + + // now pull more events + for i := 0; i < min(len(iterators), numberOfIteratorsToPullOnEachRound); i++ { + it := iterators[i] + if it.exhausted { + if len(it.idxs) == 0 { + // eliminating this from the list of iterators + internal.SwapDelete(iterators, i) + i-- + } continue } - if oldest.Q == q && remainingUnexhausted > 1 { - continue - } - // fmt.Println(" query", q, unsafe.Pointer(&results[q]), b.dbiName(query.dbi), hex.EncodeToString(query.prefix), hex.EncodeToString(query.startingPoint), len(results[q])) - it := iterators[q] - // fmt.Println(" ", q, unsafe.Pointer(iterators[q]), it.err) - pulledThisIteration := 0 - - for { - // we already have a k and a v and an err from the cursor setup, so check and use these - if it.err != nil || - len(it.key) != query.keySize || - !bytes.HasPrefix(it.key, query.prefix) { - // either iteration has errored or we reached the end of this prefix - // fmt.Println(" reached end", hex.EncodeToString(it.key), query.keySize, hex.EncodeToString(query.prefix), it.err) - exhaust(q) - break - } - - // "id" indexes don't contain a timestamp - if query.timestampSize == 4 { - createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:]) - if createdAt < since { - // fmt.Println(" reached since", createdAt, "<", since) - exhaust(q) - break - } - } - - // fetch actual event - bin, err := txn.Get(b.rawEventStore, it.valIdx) - if err != nil { - log.Printf( - "lmdb: failed to get %x based on prefix %x, index key %x from raw event store: %s\n", - it.valIdx, query.prefix, it.key, err) - return nil, fmt.Errorf("iteration error: %w", err) - } - - // check it against pubkeys without decoding the entire thing - if extraAuthors != nil && !slices.Contains(extraAuthors, betterbinary.GetPubKey(bin)) { - it.next() - continue - } - - // check it against kinds without decoding the entire thing - if extraKinds != nil && !slices.Contains(extraKinds, betterbinary.GetKind(bin)) { - it.next() - continue - } - - // decode the entire thing - event := nostr.Event{} - if err := betterbinary.Unmarshal(bin, &event); err != nil { - log.Printf("lmdb: value read error (id %x) on query prefix %x sp %x dbi %d: %s\n", betterbinary.GetID(bin), - query.prefix, query.startingPoint, query.dbi, err) - return nil, fmt.Errorf("event read error: %w", err) - } - - // fmt.Println(" event", betterbinary.GetID(bin), "kind", betterbinary.GetKind(bin).Num(), "author", betterbinary.GetPubKey(bin), "ts", betterbinary.GetCreatedAt(bin), hex.EncodeToString(it.key), it.valIdx) - - // if there is still a tag to be checked, do it now - if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) { - it.next() - continue - } - - // this event is good to be used - evt := internal.IterEvent{Event: event, Q: q} - // - // - if sndPhase { - // do the process described below at HIWAWVRTP. - // if we've reached here this means we've already passed the `since` check. - // now we have to eliminate the event currently at the `since` threshold. - nextThreshold := firstPhaseResults[len(firstPhaseResults)-2] - if oldest.Event.ID == nostr.ZeroID { - // fmt.Println(" b1", evt.ID[0:8]) - // BRANCH WHEN WE DON'T HAVE THE OLDEST EVENT (BWWDHTOE) - // when we don't have the oldest set, we will keep the results - // and not change the cutting point -- it's bad, but hopefully not that bad. - results[q] = append(results[q], evt) - sndPhaseHasResultsPending = true - } else if nextThreshold.CreatedAt > oldest.CreatedAt { - // fmt.Println(" b2", nextThreshold.CreatedAt, ">", oldest.CreatedAt, evt.ID[0:8]) - // one of the events we have stored is the actual next threshold - // eliminate last, update since with oldest - firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1] - since = uint32(oldest.CreatedAt) - // fmt.Println(" new since", since, evt.ID[0:8]) - // we null the oldest Event as we can't rely on it anymore - // (we'll fall under BWWDHTOE above) until we have a new oldest set. - oldest = internal.IterEvent{Q: -1} - // anything we got that would be above this won't trigger an update to - // the oldest anyway, because it will be discarded as being after the limit. - // - // finally - // add this to the results to be merged later - results[q] = append(results[q], evt) - sndPhaseHasResultsPending = true - } else if nextThreshold.CreatedAt < evt.CreatedAt { - // the next last event in the firstPhaseResults is the next threshold - // fmt.Println(" b3", nextThreshold.CreatedAt, "<", oldest.CreatedAt, evt.ID[0:8]) - // eliminate last, update since with the antelast - firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1] - since = uint32(nextThreshold.CreatedAt) - // fmt.Println(" new since", since) - // add this to the results to be merged later - results[q] = append(results[q], evt) - sndPhaseHasResultsPending = true - // update the oldest event - if evt.CreatedAt < oldest.CreatedAt { - oldest = evt - } - } else { - // fmt.Println(" b4", evt.ID[0:8]) - // oops, _we_ are the next `since` threshold - firstPhaseResults[len(firstPhaseResults)-1] = evt - since = uint32(evt.CreatedAt) - // fmt.Println(" new since", since) - // do not add us to the results to be merged later - // as we're already inhabiting the firstPhaseResults slice - } - } else { - results[q] = append(results[q], evt) - firstPhaseTotalPulled++ - - // update the oldest event - if oldest.Event.ID == nostr.ZeroID || evt.CreatedAt < oldest.CreatedAt { - oldest = evt - } - } - - pulledPerQuery[q]++ - pulledThisIteration++ - if pulledThisIteration > batchSizePerQuery { - // batch filled - it.next() - // fmt.Println(" filled", hex.EncodeToString(it.key), it.valIdx) - break - } - if pulledPerQuery[q] >= limit { - // batch filled + reached limit for this query (which is the global limit) - exhaust(q) - it.next() - break - } - - it.next() - } - } - - // we will do this check if we don't accumulated the requested number of events yet - // fmt.Println("oldest", oldest.Event, "from iter", oldest.Q) - if sndPhase && sndPhaseHasResultsPending && (oldest.Event.ID == nostr.ZeroID || remainingUnexhausted == 0) { - // fmt.Println("second phase aggregation!") - // when we are in the second phase we will aggressively aggregate results on every iteration - // - secondBatch = secondBatch[:0] - for s := 0; s < len(sndPhaseParticipants); s++ { - q := sndPhaseParticipants[s] - - if len(results[q]) > 0 { - secondBatch = append(secondBatch, results[q]) - } - - if exhausted[q] { - sndPhaseParticipants = internal.SwapDelete(sndPhaseParticipants, s) - s-- - } - } - - // every time we get here we will alternate between these A and B lists - // combining everything we have into a new partial results list. - // after we've done that we can again set the oldest. - // fmt.Println(" xxx", sndPhaseResultsToggle) - if sndPhaseResultsToggle { - secondBatch = append(secondBatch, sndPhaseResultsB) - sndPhaseResultsA = internal.MergeSortMultiple(secondBatch, limit, sndPhaseResultsA) - oldest = sndPhaseResultsA[len(sndPhaseResultsA)-1] - // fmt.Println(" new aggregated a", len(sndPhaseResultsB)) - } else { - secondBatch = append(secondBatch, sndPhaseResultsA) - sndPhaseResultsB = internal.MergeSortMultiple(secondBatch, limit, sndPhaseResultsB) - oldest = sndPhaseResultsB[len(sndPhaseResultsB)-1] - // fmt.Println(" new aggregated b", len(sndPhaseResultsB)) - } - sndPhaseResultsToggle = !sndPhaseResultsToggle - - since = uint32(oldest.CreatedAt) - // fmt.Println(" new since", since) - - // reset the `results` list so we can keep using it - results = results[:len(queries)] - for _, q := range sndPhaseParticipants { - results[q] = results[q][:0] - } - } else if !sndPhase && firstPhaseTotalPulled >= limit && remainingUnexhausted > 0 { - // fmt.Println("have enough!", firstPhaseTotalPulled, "/", limit, "remaining", remainingUnexhausted) - - // we will exclude this oldest number as it is not relevant anymore - // (we now want to keep track only of the oldest among the remaining iterators) - oldest = internal.IterEvent{Q: -1} - - // HOW IT WORKS AFTER WE'VE REACHED THIS POINT (HIWAWVRTP) - // now we can combine the results we have and check what is our current oldest event. - // we also discard anything that is after the current cutting point (`limit`). - // so if we have [1,2,3], [10, 15, 20] and [7, 21, 49] but we only want 6 total - // we can just keep [1,2,3,7,10,15] and discard [20, 21, 49], - // and also adjust our `since` parameter to `15`, discarding anything we get after it - // and immediately declaring that iterator exhausted. - // also every time we get result that is more recent than this updated `since` we can - // keep it but also discard the previous since, moving the needle one back -- for example, - // if we get an `8` we can keep it and move the `since` parameter to `10`, discarding `15` - // in the process. - all := make([][]internal.IterEvent, len(results)) - copy(all, results) // we have to use this otherwise internal.MergeSortMultiple will scramble our results slice - firstPhaseResults = internal.MergeSortMultiple(all, limit, nil) - oldest = firstPhaseResults[limit-1] - since = uint32(oldest.CreatedAt) - // fmt.Println("new since", since) - - for q := range queries { - if exhausted[q] { - continue - } - - // we also automatically exhaust any of the iterators that have already passed the - // cutting point (`since`) - if results[q][len(results[q])-1].CreatedAt < oldest.CreatedAt { - exhausted[q] = true - remainingUnexhausted-- - continue - } - - // for all the remaining iterators, - // since we have merged all the events in this `firstPhaseResults` slice, we can empty the - // current `results` slices and reuse them. - results[q] = results[q][:0] - - // build this index of indexes with everybody who remains - sndPhaseParticipants = append(sndPhaseParticipants, q) - } - - // we create these two lists and alternate between them so we don't have to create a - // a new one every time - sndPhaseResultsA = make([]internal.IterEvent, 0, limit*2) - sndPhaseResultsB = make([]internal.IterEvent, 0, limit*2) - - // from now on we won't run this block anymore - sndPhase = true - } - - // fmt.Println("remaining", remainingUnexhausted) - if remainingUnexhausted == 0 { - break + it.pull(batchSizePerQuery, since) } } - // fmt.Println("is sndPhase?", sndPhase) - - var combinedResults []internal.IterEvent - - if sndPhase { - // fmt.Println("ending second phase") - // when we reach this point either sndPhaseResultsA or sndPhaseResultsB will be full of stuff, - // the other will be empty - var sndPhaseResults []internal.IterEvent - // fmt.Println("xxx", sndPhaseResultsToggle, len(sndPhaseResultsA), len(sndPhaseResultsB)) - if sndPhaseResultsToggle { - sndPhaseResults = sndPhaseResultsB - combinedResults = sndPhaseResultsA[0:limit] // reuse this - // fmt.Println(" using b", len(sndPhaseResultsA)) - } else { - sndPhaseResults = sndPhaseResultsA - combinedResults = sndPhaseResultsB[0:limit] // reuse this - // fmt.Println(" using a", len(sndPhaseResultsA)) - } - - all := [][]internal.IterEvent{firstPhaseResults, sndPhaseResults} - combinedResults = internal.MergeSortMultiple(all, limit, combinedResults) - // fmt.Println("final combinedResults", len(combinedResults), cap(combinedResults), limit) - } else { - combinedResults = make([]internal.IterEvent, limit) - combinedResults = internal.MergeSortMultiple(results, limit, combinedResults) - } - - return combinedResults, nil + return nil } diff --git a/eventstore/lmdb/query_planner.go b/eventstore/lmdb/query_planner.go index 763ff4c..fde0e7f 100644 --- a/eventstore/lmdb/query_planner.go +++ b/eventstore/lmdb/query_planner.go @@ -14,9 +14,7 @@ type query struct { i int dbi lmdb.DBI prefix []byte - results chan *nostr.Event keySize int - timestampSize int startingPoint []byte } @@ -46,20 +44,19 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( sp = sp[0:len(q.prefix)] copy(sp, q.prefix) queries[i].startingPoint = binary.BigEndian.AppendUint32(sp, uint32(until)) - queries[i].results = make(chan *nostr.Event, 12) } }() - if filter.IDs != nil { - // when there are ids we ignore everything else - queries = make([]query, len(filter.IDs)) - for i, id := range filter.IDs { - prefix := make([]byte, 8) - copy(prefix[0:8], id[0:8]) - queries[i] = query{i: i, dbi: b.indexId, prefix: prefix[0:8], keySize: 8, timestampSize: 0} - } - return queries, nil, nil, "", nil, 0, nil - } + // if filter.IDs != nil { + // // when there are ids we ignore everything else + // queries = make([]query, len(filter.IDs)) + // for i, id := range filter.IDs { + // prefix := make([]byte, 8) + // copy(prefix[0:8], id[0:8]) + // queries[i] = query{i: i, dbi: b.indexId, prefix: prefix[0:8], keySize: 8} + // } + // return queries, nil, nil, "", nil, 0, nil + // } // this is where we'll end the iteration if filter.Since != 0 { @@ -94,7 +91,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid 'p' tag '%s'", value) } binary.BigEndian.PutUint16(k[8:8+2], uint16(kind)) - queries[i] = query{i: i, dbi: b.indexPTagKind, prefix: k[0 : 8+2], keySize: 8 + 2 + 4, timestampSize: 4} + queries[i] = query{i: i, dbi: b.indexPTagKind, prefix: k[0 : 8+2], keySize: 8 + 2 + 4} i++ } } @@ -110,7 +107,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( if _, err := hex.Decode(k[0:8], []byte(value[0:8*2])); err != nil { return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid 'p' tag '%s'", value) } - queries[i] = query{i: i, dbi: b.indexPTagKind, prefix: k[0:8], keySize: 8 + 2 + 4, timestampSize: 4} + queries[i] = query{i: i, dbi: b.indexPTagKind, prefix: k[0:8], keySize: 8 + 2 + 4} } } } else { @@ -121,7 +118,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( dbi, k, offset := b.getTagIndexPrefix(tagKey, value) // remove the last parts part to get just the prefix we want here prefix := k[0:offset] - queries[i] = query{i: i, dbi: dbi, prefix: prefix, keySize: len(prefix) + 4, timestampSize: 4} + queries[i] = query{i: i, dbi: dbi, prefix: prefix, keySize: len(prefix) + 4} } // add an extra kind filter if available (only do this on plain tag index, not on ptag-kind index) @@ -156,7 +153,7 @@ pubkeyMatching: // will use pubkey index queries = make([]query, len(filter.Authors)) for i, pk := range filter.Authors { - queries[i] = query{i: i, dbi: b.indexPubkey, prefix: pk[0:8], keySize: 8 + 4, timestampSize: 4} + queries[i] = query{i: i, dbi: b.indexPubkey, prefix: pk[0:8], keySize: 8 + 4} } } else { // will use pubkeyKind index @@ -167,7 +164,7 @@ pubkeyMatching: prefix := make([]byte, 8+2) copy(prefix[0:8], pk[0:8]) binary.BigEndian.PutUint16(prefix[8:8+2], uint16(kind)) - queries[i] = query{i: i, dbi: b.indexPubkeyKind, prefix: prefix[0 : 8+2], keySize: 10 + 4, timestampSize: 4} + queries[i] = query{i: i, dbi: b.indexPubkeyKind, prefix: prefix[0 : 8+2], keySize: 10 + 4} i++ } } @@ -184,7 +181,7 @@ pubkeyMatching: for i, kind := range filter.Kinds { prefix := make([]byte, 2) binary.BigEndian.PutUint16(prefix[0:2], uint16(kind)) - queries[i] = query{i: i, dbi: b.indexKind, prefix: prefix[0:2], keySize: 2 + 4, timestampSize: 4} + queries[i] = query{i: i, dbi: b.indexKind, prefix: prefix[0:2], keySize: 2 + 4} } // potentially with an extra useless tag filtering @@ -195,6 +192,6 @@ pubkeyMatching: // if we got here our query will have nothing to filter with queries = make([]query, 1) prefix := make([]byte, 0) - queries[0] = query{i: 0, dbi: b.indexCreatedAt, prefix: prefix, keySize: 0 + 4, timestampSize: 4} + queries[0] = query{i: 0, dbi: b.indexCreatedAt, prefix: prefix, keySize: 0 + 4} return queries, nil, nil, "", nil, since, nil } diff --git a/eventstore/lmdb/replace.go b/eventstore/lmdb/replace.go index f46dabf..2f2aed5 100644 --- a/eventstore/lmdb/replace.go +++ b/eventstore/lmdb/replace.go @@ -2,6 +2,7 @@ package lmdb import ( "fmt" + "iter" "math" "fiatjaf.com/nostr" @@ -23,16 +24,20 @@ func (b *LMDBBackend) ReplaceEvent(evt nostr.Event) error { } // now we fetch the past events, whatever they are, delete them and then save the new - results, err := b.query(txn, filter, 10) // in theory limit could be just 1 and this should work + var yield_ func(nostr.Event) bool + var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) { + yield_ = yield + } + err := b.query(txn, filter, 10 /* in theory limit could be just 1 and this should work */, yield_) if err != nil { return fmt.Errorf("failed to query past events with %s: %w", filter, err) } shouldStore := true - for _, previous := range results { - if internal.IsOlder(previous.Event, evt) { - if err := b.delete(txn, previous.Event.ID); err != nil { - return fmt.Errorf("failed to delete event %s for replacing: %w", previous.Event.ID, err) + for previous := range results { + if internal.IsOlder(previous, evt) { + if err := b.delete(txn, previous.ID); err != nil { + return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err) } } else { // there is a newer event already stored, so we won't store this diff --git a/eventstore/lmdb/utils_test.go b/eventstore/lmdb/utils_test.go new file mode 100644 index 0000000..cf1f3f0 --- /dev/null +++ b/eventstore/lmdb/utils_test.go @@ -0,0 +1,27 @@ +package lmdb + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestQuickselect(t *testing.T) { + its := iterators{ + {last: 781}, + {last: 900}, + {last: 1}, + {last: 81}, + {last: 325}, + {last: 781}, + {last: 562}, + {last: 81}, + {last: 444}, + } + + its.quickselect(3, 0, len(its)) + require.ElementsMatch(t, its[len(its)-3:], iterators{{last: 900}, {last: 781}, {last: 781}}) + + its.quickselect(4, 0, len(its)) + require.ElementsMatch(t, its[len(its)-4:], iterators{{last: 562}, {last: 900}, {last: 781}, {last: 781}}) +}