From 0291836eb71651508a48b6f89294caca29c0f52f Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Mon, 4 Aug 2025 22:06:31 -0300 Subject: [PATCH] lmdb: various fixes so query works. --- eventstore/lmdb/fuzz_test.go | 2 +- eventstore/lmdb/helpers.go | 18 +++++++++-------- eventstore/lmdb/query.go | 38 +++++++++++++++++------------------- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/eventstore/lmdb/fuzz_test.go b/eventstore/lmdb/fuzz_test.go index 470998d..0d85789 100644 --- a/eventstore/lmdb/fuzz_test.go +++ b/eventstore/lmdb/fuzz_test.go @@ -111,7 +111,7 @@ func FuzzQuery(f *testing.F) { fmt.Println(" expected result") for i := range expected { - fmt.Println(" ", expected[i].CreatedAt, expected[i].ID[0:8], " ", res[i].CreatedAt, res[i].ID[0:8], " ", i) + fmt.Println(" ", expected[i].CreatedAt, expected[i].ID.Hex()[0:8], " ", res[i].CreatedAt, res[i].ID.Hex()[0:8], " ", i) } require.Equal(t, expected[0].CreatedAt, res[0].CreatedAt, "first result is wrong") diff --git a/eventstore/lmdb/helpers.go b/eventstore/lmdb/helpers.go index 1734968..d84b93c 100644 --- a/eventstore/lmdb/helpers.go +++ b/eventstore/lmdb/helpers.go @@ -40,24 +40,26 @@ func (it *iterator) pull(n int, since uint32) { for range n { // in the beginning we already have a k and a v and an err from the cursor setup, so check and use these - if it.err != nil || - len(it.key) != query.keySize || - !bytes.HasPrefix(it.key, query.prefix) { - // either iteration has errored or we reached the end of this prefix - // fmt.Println(" reached end", hex.EncodeToString(it.key), query.keySize, hex.EncodeToString(query.prefix), it.err) + if it.err != nil { + it.exhausted = true + return + } + + if len(it.key) != query.keySize || !bytes.HasPrefix(it.key, query.prefix) { + // we reached the end of this prefix it.exhausted = true return } createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:]) if createdAt < since { - // fmt.Println(" reached since", createdAt, "<", since) it.exhausted = true return } // got a key - it.idxs = append(it.idxs, it.key) + it.idxs = append(it.idxs, it.valIdx) + it.timestamps = append(it.timestamps, createdAt) it.last = createdAt // advance the cursor for the next call @@ -90,7 +92,7 @@ func (it *iterator) next() { it.key, it.valIdx, it.err = it.cursor.Get(nil, nil, lmdb.Prev) } -type iterators []iterator +type iterators []*iterator // quickselect reorders the slice just enough to make the top k elements be arranged at the end // i.e. [1, 700, 25, 312, 44, 28] with k=3 becomes something like [700, 312, 44, 1, 25, 28] diff --git a/eventstore/lmdb/query.go b/eventstore/lmdb/query.go index 133a11f..80894b4 100644 --- a/eventstore/lmdb/query.go +++ b/eventstore/lmdb/query.go @@ -22,6 +22,7 @@ func (b *LMDBBackend) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[no }); err != nil { log.Printf("lmdb: unexpected id query error: %s\n", err) } + return } // ignore search queries @@ -89,7 +90,7 @@ func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield if err != nil { return err } - iterators[q] = iterator{ + iterators[q] = &iterator{ query: query, cursor: cursor, } @@ -99,11 +100,11 @@ func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield } // initial pull from all queries - for _, it := range iterators { - it.pull(batchSizePerQuery, since) + for i := range iterators { + iterators[i].pull(batchSizePerQuery, since) } - numberOfIteratorsToPullOnEachRound := max(1, int(math.Ceil(float64(len(iterators))/float64(14)))) + numberOfIteratorsToPullOnEachRound := max(1, int(math.Ceil(float64(len(iterators))/float64(12)))) totalEventsEmitted := 0 tempResults := make([]nostr.Event, 0, batchSizePerQuery*2) @@ -116,21 +117,21 @@ func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield threshold := iterators.quickselect(min(numberOfIteratorsToPullOnEachRound, len(iterators))) // so we can emit all the events higher than the threshold - for _, it := range iterators { - for t, ts := range it.timestamps { - if ts >= threshold { - idx := it.idxs[t] + for i := range iterators { + for t := 0; t < len(iterators[i].timestamps); t++ { + if iterators[i].timestamps[t] >= threshold { + idx := iterators[i].idxs[t] // discard this regardless of what happens - internal.SwapDelete(it.timestamps, t) - internal.SwapDelete(it.idxs, t) + iterators[i].timestamps = internal.SwapDelete(iterators[i].timestamps, t) + iterators[i].idxs = internal.SwapDelete(iterators[i].idxs, t) t-- // fetch actual event bin, err := txn.Get(b.rawEventStore, idx) if err != nil { log.Printf("lmdb: failed to get %x from raw event store: %s (query prefix=%x, index=%s)\n", - idx, err, it.query.prefix, b.dbiName(it.query.dbi)) + idx, err, iterators[i].query.prefix, b.dbiName(iterators[i].query.dbi)) continue } @@ -148,12 +149,10 @@ func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield event := nostr.Event{} if err := betterbinary.Unmarshal(bin, &event); err != nil { log.Printf("lmdb: value read error (id %x) on query prefix %x sp %x dbi %s: %s\n", - betterbinary.GetID(bin), it.query.prefix, it.query.startingPoint, b.dbiName(it.query.dbi), err) + betterbinary.GetID(bin), iterators[i].query.prefix, iterators[i].query.startingPoint, b.dbiName(iterators[i].query.dbi), err) continue } - // fmt.Println(" event", betterbinary.GetID(bin), "kind", betterbinary.GetKind(bin).Num(), "author", betterbinary.GetPubKey(bin), "ts", betterbinary.GetCreatedAt(bin), hex.EncodeToString(it.key), it.valIdx) - // if there is still a tag to be checked, do it now if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) { continue @@ -165,7 +164,7 @@ func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield } // emit this stuff in order - slices.SortFunc(tempResults, nostr.CompareEvent) + slices.SortFunc(tempResults, nostr.CompareEventReverse) for _, evt := range tempResults { if !yield(evt) { return nil @@ -179,17 +178,16 @@ func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield // now pull more events for i := 0; i < min(len(iterators), numberOfIteratorsToPullOnEachRound); i++ { - it := iterators[i] - if it.exhausted { - if len(it.idxs) == 0 { + if iterators[i].exhausted { + if len(iterators[i].idxs) == 0 { // eliminating this from the list of iterators - internal.SwapDelete(iterators, i) + iterators = internal.SwapDelete(iterators, i) i-- } continue } - it.pull(batchSizePerQuery, since) + iterators[i].pull(batchSizePerQuery, since) } }