lmdb: various fixes so query works.
This commit is contained in:
@@ -111,7 +111,7 @@ func FuzzQuery(f *testing.F) {
|
|||||||
|
|
||||||
fmt.Println(" expected result")
|
fmt.Println(" expected result")
|
||||||
for i := range expected {
|
for i := range expected {
|
||||||
fmt.Println(" ", expected[i].CreatedAt, expected[i].ID[0:8], " ", res[i].CreatedAt, res[i].ID[0:8], " ", i)
|
fmt.Println(" ", expected[i].CreatedAt, expected[i].ID.Hex()[0:8], " ", res[i].CreatedAt, res[i].ID.Hex()[0:8], " ", i)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, expected[0].CreatedAt, res[0].CreatedAt, "first result is wrong")
|
require.Equal(t, expected[0].CreatedAt, res[0].CreatedAt, "first result is wrong")
|
||||||
|
|||||||
@@ -40,24 +40,26 @@ func (it *iterator) pull(n int, since uint32) {
|
|||||||
|
|
||||||
for range n {
|
for range n {
|
||||||
// in the beginning we already have a k and a v and an err from the cursor setup, so check and use these
|
// in the beginning we already have a k and a v and an err from the cursor setup, so check and use these
|
||||||
if it.err != nil ||
|
if it.err != nil {
|
||||||
len(it.key) != query.keySize ||
|
it.exhausted = true
|
||||||
!bytes.HasPrefix(it.key, query.prefix) {
|
return
|
||||||
// either iteration has errored or we reached the end of this prefix
|
}
|
||||||
// fmt.Println(" reached end", hex.EncodeToString(it.key), query.keySize, hex.EncodeToString(query.prefix), it.err)
|
|
||||||
|
if len(it.key) != query.keySize || !bytes.HasPrefix(it.key, query.prefix) {
|
||||||
|
// we reached the end of this prefix
|
||||||
it.exhausted = true
|
it.exhausted = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
|
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
|
||||||
if createdAt < since {
|
if createdAt < since {
|
||||||
// fmt.Println(" reached since", createdAt, "<", since)
|
|
||||||
it.exhausted = true
|
it.exhausted = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// got a key
|
// got a key
|
||||||
it.idxs = append(it.idxs, it.key)
|
it.idxs = append(it.idxs, it.valIdx)
|
||||||
|
it.timestamps = append(it.timestamps, createdAt)
|
||||||
it.last = createdAt
|
it.last = createdAt
|
||||||
|
|
||||||
// advance the cursor for the next call
|
// advance the cursor for the next call
|
||||||
@@ -90,7 +92,7 @@ func (it *iterator) next() {
|
|||||||
it.key, it.valIdx, it.err = it.cursor.Get(nil, nil, lmdb.Prev)
|
it.key, it.valIdx, it.err = it.cursor.Get(nil, nil, lmdb.Prev)
|
||||||
}
|
}
|
||||||
|
|
||||||
type iterators []iterator
|
type iterators []*iterator
|
||||||
|
|
||||||
// quickselect reorders the slice just enough to make the top k elements be arranged at the end
|
// quickselect reorders the slice just enough to make the top k elements be arranged at the end
|
||||||
// i.e. [1, 700, 25, 312, 44, 28] with k=3 becomes something like [700, 312, 44, 1, 25, 28]
|
// i.e. [1, 700, 25, 312, 44, 28] with k=3 becomes something like [700, 312, 44, 1, 25, 28]
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ func (b *LMDBBackend) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[no
|
|||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Printf("lmdb: unexpected id query error: %s\n", err)
|
log.Printf("lmdb: unexpected id query error: %s\n", err)
|
||||||
}
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// ignore search queries
|
// ignore search queries
|
||||||
@@ -89,7 +90,7 @@ func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
iterators[q] = iterator{
|
iterators[q] = &iterator{
|
||||||
query: query,
|
query: query,
|
||||||
cursor: cursor,
|
cursor: cursor,
|
||||||
}
|
}
|
||||||
@@ -99,11 +100,11 @@ func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield
|
|||||||
}
|
}
|
||||||
|
|
||||||
// initial pull from all queries
|
// initial pull from all queries
|
||||||
for _, it := range iterators {
|
for i := range iterators {
|
||||||
it.pull(batchSizePerQuery, since)
|
iterators[i].pull(batchSizePerQuery, since)
|
||||||
}
|
}
|
||||||
|
|
||||||
numberOfIteratorsToPullOnEachRound := max(1, int(math.Ceil(float64(len(iterators))/float64(14))))
|
numberOfIteratorsToPullOnEachRound := max(1, int(math.Ceil(float64(len(iterators))/float64(12))))
|
||||||
totalEventsEmitted := 0
|
totalEventsEmitted := 0
|
||||||
tempResults := make([]nostr.Event, 0, batchSizePerQuery*2)
|
tempResults := make([]nostr.Event, 0, batchSizePerQuery*2)
|
||||||
|
|
||||||
@@ -116,21 +117,21 @@ func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield
|
|||||||
threshold := iterators.quickselect(min(numberOfIteratorsToPullOnEachRound, len(iterators)))
|
threshold := iterators.quickselect(min(numberOfIteratorsToPullOnEachRound, len(iterators)))
|
||||||
|
|
||||||
// so we can emit all the events higher than the threshold
|
// so we can emit all the events higher than the threshold
|
||||||
for _, it := range iterators {
|
for i := range iterators {
|
||||||
for t, ts := range it.timestamps {
|
for t := 0; t < len(iterators[i].timestamps); t++ {
|
||||||
if ts >= threshold {
|
if iterators[i].timestamps[t] >= threshold {
|
||||||
idx := it.idxs[t]
|
idx := iterators[i].idxs[t]
|
||||||
|
|
||||||
// discard this regardless of what happens
|
// discard this regardless of what happens
|
||||||
internal.SwapDelete(it.timestamps, t)
|
iterators[i].timestamps = internal.SwapDelete(iterators[i].timestamps, t)
|
||||||
internal.SwapDelete(it.idxs, t)
|
iterators[i].idxs = internal.SwapDelete(iterators[i].idxs, t)
|
||||||
t--
|
t--
|
||||||
|
|
||||||
// fetch actual event
|
// fetch actual event
|
||||||
bin, err := txn.Get(b.rawEventStore, idx)
|
bin, err := txn.Get(b.rawEventStore, idx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("lmdb: failed to get %x from raw event store: %s (query prefix=%x, index=%s)\n",
|
log.Printf("lmdb: failed to get %x from raw event store: %s (query prefix=%x, index=%s)\n",
|
||||||
idx, err, it.query.prefix, b.dbiName(it.query.dbi))
|
idx, err, iterators[i].query.prefix, b.dbiName(iterators[i].query.dbi))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -148,12 +149,10 @@ func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield
|
|||||||
event := nostr.Event{}
|
event := nostr.Event{}
|
||||||
if err := betterbinary.Unmarshal(bin, &event); err != nil {
|
if err := betterbinary.Unmarshal(bin, &event); err != nil {
|
||||||
log.Printf("lmdb: value read error (id %x) on query prefix %x sp %x dbi %s: %s\n",
|
log.Printf("lmdb: value read error (id %x) on query prefix %x sp %x dbi %s: %s\n",
|
||||||
betterbinary.GetID(bin), it.query.prefix, it.query.startingPoint, b.dbiName(it.query.dbi), err)
|
betterbinary.GetID(bin), iterators[i].query.prefix, iterators[i].query.startingPoint, b.dbiName(iterators[i].query.dbi), err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// fmt.Println(" event", betterbinary.GetID(bin), "kind", betterbinary.GetKind(bin).Num(), "author", betterbinary.GetPubKey(bin), "ts", betterbinary.GetCreatedAt(bin), hex.EncodeToString(it.key), it.valIdx)
|
|
||||||
|
|
||||||
// if there is still a tag to be checked, do it now
|
// if there is still a tag to be checked, do it now
|
||||||
if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) {
|
if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) {
|
||||||
continue
|
continue
|
||||||
@@ -165,7 +164,7 @@ func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield
|
|||||||
}
|
}
|
||||||
|
|
||||||
// emit this stuff in order
|
// emit this stuff in order
|
||||||
slices.SortFunc(tempResults, nostr.CompareEvent)
|
slices.SortFunc(tempResults, nostr.CompareEventReverse)
|
||||||
for _, evt := range tempResults {
|
for _, evt := range tempResults {
|
||||||
if !yield(evt) {
|
if !yield(evt) {
|
||||||
return nil
|
return nil
|
||||||
@@ -179,17 +178,16 @@ func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield
|
|||||||
|
|
||||||
// now pull more events
|
// now pull more events
|
||||||
for i := 0; i < min(len(iterators), numberOfIteratorsToPullOnEachRound); i++ {
|
for i := 0; i < min(len(iterators), numberOfIteratorsToPullOnEachRound); i++ {
|
||||||
it := iterators[i]
|
if iterators[i].exhausted {
|
||||||
if it.exhausted {
|
if len(iterators[i].idxs) == 0 {
|
||||||
if len(it.idxs) == 0 {
|
|
||||||
// eliminating this from the list of iterators
|
// eliminating this from the list of iterators
|
||||||
internal.SwapDelete(iterators, i)
|
iterators = internal.SwapDelete(iterators, i)
|
||||||
i--
|
i--
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
it.pull(batchSizePerQuery, since)
|
iterators[i].pull(batchSizePerQuery, since)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user