lmdb: new querying mechanism.
This commit is contained in:
@@ -2,7 +2,6 @@ package internal
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"cmp"
|
|
||||||
"math"
|
"math"
|
||||||
"slices"
|
"slices"
|
||||||
|
|
||||||
@@ -79,14 +78,9 @@ func CopyMapWithoutKey[K comparable, V any](originalMap map[K]V, key K) map[K]V
|
|||||||
return newMap
|
return newMap
|
||||||
}
|
}
|
||||||
|
|
||||||
type IterEvent struct {
|
|
||||||
nostr.Event
|
|
||||||
Q int
|
|
||||||
}
|
|
||||||
|
|
||||||
// MergeSortMultipleBatches takes the results of multiple iterators, which are already sorted,
|
// MergeSortMultipleBatches takes the results of multiple iterators, which are already sorted,
|
||||||
// and merges them into a single big sorted slice
|
// and merges them into a single big sorted slice
|
||||||
func MergeSortMultiple(batches [][]IterEvent, limit int, dst []IterEvent) []IterEvent {
|
func MergeSortMultiple(batches [][]nostr.Event, limit int, dst []nostr.Event) []nostr.Event {
|
||||||
// clear up empty lists here while simultaneously computing the total count.
|
// clear up empty lists here while simultaneously computing the total count.
|
||||||
// this helps because if there are a bunch of empty lists then this pre-clean
|
// this helps because if there are a bunch of empty lists then this pre-clean
|
||||||
// step will get us in the faster 'merge' branch otherwise we would go to the other.
|
// step will get us in the faster 'merge' branch otherwise we would go to the other.
|
||||||
@@ -112,15 +106,15 @@ func MergeSortMultiple(batches [][]IterEvent, limit int, dst []IterEvent) []Iter
|
|||||||
// or not (batches=25, limit=60)
|
// or not (batches=25, limit=60)
|
||||||
if math.Log(float64(len(batches)*2))+math.Log(float64(limit)) < 8 {
|
if math.Log(float64(len(batches)*2))+math.Log(float64(limit)) < 8 {
|
||||||
if dst == nil {
|
if dst == nil {
|
||||||
dst = make([]IterEvent, limit)
|
dst = make([]nostr.Event, limit)
|
||||||
} else if cap(dst) < limit {
|
} else if cap(dst) < limit {
|
||||||
dst = slices.Grow(dst, limit-len(dst))
|
dst = slices.Grow(dst, limit-len(dst))
|
||||||
}
|
}
|
||||||
dst = dst[0:limit]
|
dst = dst[0:limit]
|
||||||
return mergesortedslices.MergeFuncNoEmptyListsIntoSlice(dst, batches, compareIterEvent)
|
return mergesortedslices.MergeFuncNoEmptyListsIntoSlice(dst, batches, nostr.CompareEvent)
|
||||||
} else {
|
} else {
|
||||||
if dst == nil {
|
if dst == nil {
|
||||||
dst = make([]IterEvent, total)
|
dst = make([]nostr.Event, total)
|
||||||
} else if cap(dst) < total {
|
} else if cap(dst) < total {
|
||||||
dst = slices.Grow(dst, total-len(dst))
|
dst = slices.Grow(dst, total-len(dst))
|
||||||
}
|
}
|
||||||
@@ -133,7 +127,7 @@ func MergeSortMultiple(batches [][]IterEvent, limit int, dst []IterEvent) []Iter
|
|||||||
lastIndex += len(batch)
|
lastIndex += len(batch)
|
||||||
}
|
}
|
||||||
|
|
||||||
slices.SortFunc(dst, compareIterEvent)
|
slices.SortFunc(dst, nostr.CompareEvent)
|
||||||
|
|
||||||
for i, j := 0, total-1; i < j; i, j = i+1, j-1 {
|
for i, j := 0, total-1; i < j; i, j = i+1, j-1 {
|
||||||
dst[i], dst[j] = dst[j], dst[i]
|
dst[i], dst[j] = dst[j], dst[i]
|
||||||
@@ -164,20 +158,3 @@ func SwapDelete[A any](arr []A, i int) []A {
|
|||||||
arr[i] = arr[len(arr)-1]
|
arr[i] = arr[len(arr)-1]
|
||||||
return arr[:len(arr)-1]
|
return arr[:len(arr)-1]
|
||||||
}
|
}
|
||||||
|
|
||||||
func compareIterEvent(a, b IterEvent) int {
|
|
||||||
if a.Event.ID == nostr.ZeroID {
|
|
||||||
if b.Event.ID == nostr.ZeroID {
|
|
||||||
return 0
|
|
||||||
} else {
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
} else if b.Event.ID == nostr.ZeroID {
|
|
||||||
return 1
|
|
||||||
}
|
|
||||||
|
|
||||||
if a.CreatedAt == b.CreatedAt {
|
|
||||||
return slices.Compare(a.ID[:], b.ID[:])
|
|
||||||
}
|
|
||||||
return cmp.Compare(a.CreatedAt, b.CreatedAt)
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ func (b *LMDBBackend) CountEvents(filter nostr.Filter) (uint32, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// "id" indexes don't contain a timestamp
|
// "id" indexes don't contain a timestamp
|
||||||
if q.timestampSize == 4 {
|
if q.dbi != b.indexId {
|
||||||
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
|
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
|
||||||
if createdAt < since {
|
if createdAt < since {
|
||||||
break
|
break
|
||||||
@@ -131,7 +131,7 @@ func (b *LMDBBackend) CountEventsHLL(filter nostr.Filter, offset int) (uint32, *
|
|||||||
}
|
}
|
||||||
|
|
||||||
// "id" indexes don't contain a timestamp
|
// "id" indexes don't contain a timestamp
|
||||||
if q.timestampSize == 4 {
|
if q.dbi != b.indexId {
|
||||||
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
|
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
|
||||||
if createdAt < since {
|
if createdAt < since {
|
||||||
break
|
break
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package lmdb
|
package lmdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
@@ -14,14 +15,59 @@ import (
|
|||||||
"github.com/PowerDNS/lmdb-go/lmdb"
|
"github.com/PowerDNS/lmdb-go/lmdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// this iterator always goes backwards
|
|
||||||
type iterator struct {
|
type iterator struct {
|
||||||
|
query query
|
||||||
|
|
||||||
|
// iteration stuff
|
||||||
cursor *lmdb.Cursor
|
cursor *lmdb.Cursor
|
||||||
key []byte
|
key []byte
|
||||||
valIdx []byte
|
valIdx []byte
|
||||||
err error
|
err error
|
||||||
|
|
||||||
|
// this keeps track of last timestamp value pulled from this
|
||||||
|
last uint32
|
||||||
|
|
||||||
|
// if we shouldn't fetch more from this
|
||||||
|
exhausted bool
|
||||||
|
|
||||||
|
// results not yet emitted
|
||||||
|
idxs [][]byte
|
||||||
|
timestamps []uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (it *iterator) pull(n int, since uint32) {
|
||||||
|
query := it.query
|
||||||
|
|
||||||
|
for range n {
|
||||||
|
// in the beginning we already have a k and a v and an err from the cursor setup, so check and use these
|
||||||
|
if it.err != nil ||
|
||||||
|
len(it.key) != query.keySize ||
|
||||||
|
!bytes.HasPrefix(it.key, query.prefix) {
|
||||||
|
// either iteration has errored or we reached the end of this prefix
|
||||||
|
// fmt.Println(" reached end", hex.EncodeToString(it.key), query.keySize, hex.EncodeToString(query.prefix), it.err)
|
||||||
|
it.exhausted = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
|
||||||
|
if createdAt < since {
|
||||||
|
// fmt.Println(" reached since", createdAt, "<", since)
|
||||||
|
it.exhausted = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// got a key
|
||||||
|
it.idxs = append(it.idxs, it.key)
|
||||||
|
it.last = createdAt
|
||||||
|
|
||||||
|
// advance the cursor for the next call
|
||||||
|
it.next()
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// goes backwards
|
||||||
func (it *iterator) seek(key []byte) {
|
func (it *iterator) seek(key []byte) {
|
||||||
if _, _, errsr := it.cursor.Get(key, nil, lmdb.SetRange); errsr != nil {
|
if _, _, errsr := it.cursor.Get(key, nil, lmdb.SetRange); errsr != nil {
|
||||||
if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound {
|
if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound {
|
||||||
@@ -38,11 +84,51 @@ func (it *iterator) seek(key []byte) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// goes backwards
|
||||||
func (it *iterator) next() {
|
func (it *iterator) next() {
|
||||||
// move one back (we'll look into k and v and err in the next iteration)
|
// move one back (we'll look into k and v and err in the next iteration)
|
||||||
it.key, it.valIdx, it.err = it.cursor.Get(nil, nil, lmdb.Prev)
|
it.key, it.valIdx, it.err = it.cursor.Get(nil, nil, lmdb.Prev)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type iterators []iterator
|
||||||
|
|
||||||
|
// quickselect reorders the slice just enough to make the top k elements be arranged at the end
|
||||||
|
// i.e. [1, 700, 25, 312, 44, 28] with k=3 becomes something like [28, 25, 1, 44, 312, 700]
|
||||||
|
// in this case it's hardcoded to use the 'last' field of the iterator
|
||||||
|
func (its iterators) quickselect(left int, right int, k int) {
|
||||||
|
if right == left {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// partition
|
||||||
|
pivot := its[(right+left)/2].last
|
||||||
|
l := left
|
||||||
|
r := right
|
||||||
|
|
||||||
|
for l <= r {
|
||||||
|
for its[l].last < pivot {
|
||||||
|
l++
|
||||||
|
}
|
||||||
|
for its[r].last > pivot {
|
||||||
|
r--
|
||||||
|
}
|
||||||
|
if l >= r {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
its[l].last, its[r].last = its[r].last, its[l].last
|
||||||
|
r--
|
||||||
|
l++
|
||||||
|
}
|
||||||
|
mid := r
|
||||||
|
// ~
|
||||||
|
|
||||||
|
if k > mid {
|
||||||
|
its.quickselect(mid+1, right, k)
|
||||||
|
} else {
|
||||||
|
its.quickselect(left, mid, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type key struct {
|
type key struct {
|
||||||
dbi lmdb.DBI
|
dbi lmdb.DBI
|
||||||
key []byte
|
key []byte
|
||||||
|
|||||||
@@ -1,11 +1,9 @@
|
|||||||
package lmdb
|
package lmdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/binary"
|
|
||||||
"fmt"
|
|
||||||
"iter"
|
"iter"
|
||||||
"log"
|
"log"
|
||||||
|
"math"
|
||||||
"slices"
|
"slices"
|
||||||
|
|
||||||
"fiatjaf.com/nostr"
|
"fiatjaf.com/nostr"
|
||||||
@@ -20,6 +18,11 @@ func (b *LMDBBackend) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[no
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if filter.IDs != nil {
|
||||||
|
// do a special id query
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
// max number of events we'll return
|
// max number of events we'll return
|
||||||
if tlimit := filter.GetTheoreticalLimit(); tlimit == 0 || filter.LimitZero {
|
if tlimit := filter.GetTheoreticalLimit(); tlimit == 0 || filter.LimitZero {
|
||||||
return
|
return
|
||||||
@@ -32,368 +35,130 @@ func (b *LMDBBackend) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[no
|
|||||||
|
|
||||||
b.lmdbEnv.View(func(txn *lmdb.Txn) error {
|
b.lmdbEnv.View(func(txn *lmdb.Txn) error {
|
||||||
txn.RawRead = true
|
txn.RawRead = true
|
||||||
results, err := b.query(txn, filter, maxLimit)
|
return b.query(txn, filter, maxLimit, yield)
|
||||||
|
|
||||||
for _, ie := range results {
|
|
||||||
if !yield(ie.Event) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([]internal.IterEvent, error) {
|
func (b *LMDBBackend) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield func(nostr.Event) bool) error {
|
||||||
queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := b.prepareQueries(filter)
|
queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := b.prepareQueries(filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
iterators := make([]*iterator, len(queries))
|
iterators := make(iterators, len(queries))
|
||||||
exhausted := make([]bool, len(queries)) // indicates that a query won't be used anymore
|
batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, len(queries))
|
||||||
results := make([][]internal.IterEvent, len(queries))
|
|
||||||
pulledPerQuery := make([]int, len(queries))
|
|
||||||
|
|
||||||
// these are kept updated so we never pull from the iterator that is at further distance
|
for q, query := range queries {
|
||||||
// (i.e. the one that has the oldest event among all)
|
|
||||||
// we will continue to pull from it as soon as some other iterator takes the position
|
|
||||||
oldest := internal.IterEvent{Q: -1}
|
|
||||||
|
|
||||||
sndPhase := false // after we have gathered enough events we will change the way we iterate
|
|
||||||
secondBatch := make([][]internal.IterEvent, 0, len(queries)+1)
|
|
||||||
sndPhaseParticipants := make([]int, 0, len(queries)+1)
|
|
||||||
|
|
||||||
// while merging results in the second phase we will alternate between these two lists
|
|
||||||
// to avoid having to create new lists all the time
|
|
||||||
var sndPhaseResultsA []internal.IterEvent
|
|
||||||
var sndPhaseResultsB []internal.IterEvent
|
|
||||||
var sndPhaseResultsToggle bool // this is just a dummy thing we use to keep track of the alternating
|
|
||||||
var sndPhaseHasResultsPending bool
|
|
||||||
|
|
||||||
remainingUnexhausted := len(queries) // when all queries are exhausted we can finally end this thing
|
|
||||||
batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted)
|
|
||||||
firstPhaseTotalPulled := 0
|
|
||||||
|
|
||||||
exhaust := func(q int) {
|
|
||||||
exhausted[q] = true
|
|
||||||
remainingUnexhausted--
|
|
||||||
if q == oldest.Q {
|
|
||||||
oldest = internal.IterEvent{Q: -1}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var firstPhaseResults []internal.IterEvent
|
|
||||||
|
|
||||||
for q := range queries {
|
|
||||||
cursor, err := txn.OpenCursor(queries[q].dbi)
|
cursor, err := txn.OpenCursor(queries[q].dbi)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
iterators[q] = &iterator{cursor: cursor}
|
iterators[q] = iterator{
|
||||||
|
query: query,
|
||||||
|
cursor: cursor,
|
||||||
|
}
|
||||||
|
|
||||||
defer cursor.Close()
|
defer cursor.Close()
|
||||||
iterators[q].seek(queries[q].startingPoint)
|
iterators[q].seek(queries[q].startingPoint)
|
||||||
results[q] = make([]internal.IterEvent, 0, batchSizePerQuery*2)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// fmt.Println("queries", filter, len(queries))
|
// initial pull from all queries
|
||||||
|
for _, it := range iterators {
|
||||||
|
it.pull(batchSizePerQuery, since)
|
||||||
|
}
|
||||||
|
|
||||||
for c := 0; ; c++ {
|
numberOfIteratorsToPullOnEachRound := max(1, int(math.Ceil(float64(len(iterators))/float64(14))))
|
||||||
batchSizePerQuery = internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted)
|
totalEventsEmitted := 0
|
||||||
|
tempResults := make([]nostr.Event, 0, batchSizePerQuery*2)
|
||||||
|
|
||||||
// fmt.Println(" iteration", c, "remaining", remainingUnexhausted, "batchsize", batchSizePerQuery)
|
for len(iterators) > 0 {
|
||||||
// we will go through all the iterators in batches until we have pulled all the required results
|
// reset stuff
|
||||||
for q, query := range queries {
|
tempResults = tempResults[:0]
|
||||||
if exhausted[q] {
|
|
||||||
|
// after pulling from all iterators once we now find out what iterators are
|
||||||
|
// the ones we should keep pulling from next (i.e. which one's last emitted timestamp is the highest)
|
||||||
|
iterators.quickselect(min(numberOfIteratorsToPullOnEachRound, len(iterators)), 0, len(iterators))
|
||||||
|
|
||||||
|
// we now know what is our threshold
|
||||||
|
threshold := iterators[len(iterators)-1].last
|
||||||
|
// so we can emit all the events higher than it
|
||||||
|
for _, it := range iterators {
|
||||||
|
for t, ts := range it.timestamps {
|
||||||
|
if ts >= threshold {
|
||||||
|
idx := it.idxs[t]
|
||||||
|
|
||||||
|
// discard this regardless of what happens
|
||||||
|
internal.SwapDelete(it.timestamps, t)
|
||||||
|
internal.SwapDelete(it.idxs, t)
|
||||||
|
t--
|
||||||
|
|
||||||
|
// fetch actual event
|
||||||
|
bin, err := txn.Get(b.rawEventStore, idx)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("lmdb: failed to get %x from raw event store: %s (query prefix=%x, index=%s)\n",
|
||||||
|
idx, err, it.query.prefix, b.dbiName(it.query.dbi))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// check it against pubkeys without decoding the entire thing
|
||||||
|
if extraAuthors != nil && !slices.Contains(extraAuthors, betterbinary.GetPubKey(bin)) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// check it against kinds without decoding the entire thing
|
||||||
|
if extraKinds != nil && !slices.Contains(extraKinds, betterbinary.GetKind(bin)) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// decode the entire thing
|
||||||
|
event := nostr.Event{}
|
||||||
|
if err := betterbinary.Unmarshal(bin, &event); err != nil {
|
||||||
|
log.Printf("lmdb: value read error (id %x) on query prefix %x sp %x dbi %s: %s\n",
|
||||||
|
betterbinary.GetID(bin), it.query.prefix, it.query.startingPoint, b.dbiName(it.query.dbi), err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// fmt.Println(" event", betterbinary.GetID(bin), "kind", betterbinary.GetKind(bin).Num(), "author", betterbinary.GetPubKey(bin), "ts", betterbinary.GetCreatedAt(bin), hex.EncodeToString(it.key), it.valIdx)
|
||||||
|
|
||||||
|
// if there is still a tag to be checked, do it now
|
||||||
|
if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
tempResults = append(tempResults, event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// emit this stuff in order
|
||||||
|
slices.SortFunc(tempResults, nostr.CompareEvent)
|
||||||
|
for _, evt := range tempResults {
|
||||||
|
if !yield(evt) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
totalEventsEmitted++
|
||||||
|
if totalEventsEmitted == limit {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// now pull more events
|
||||||
|
for i := 0; i < min(len(iterators), numberOfIteratorsToPullOnEachRound); i++ {
|
||||||
|
it := iterators[i]
|
||||||
|
if it.exhausted {
|
||||||
|
if len(it.idxs) == 0 {
|
||||||
|
// eliminating this from the list of iterators
|
||||||
|
internal.SwapDelete(iterators, i)
|
||||||
|
i--
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if oldest.Q == q && remainingUnexhausted > 1 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// fmt.Println(" query", q, unsafe.Pointer(&results[q]), b.dbiName(query.dbi), hex.EncodeToString(query.prefix), hex.EncodeToString(query.startingPoint), len(results[q]))
|
|
||||||
|
|
||||||
it := iterators[q]
|
it.pull(batchSizePerQuery, since)
|
||||||
// fmt.Println(" ", q, unsafe.Pointer(iterators[q]), it.err)
|
|
||||||
pulledThisIteration := 0
|
|
||||||
|
|
||||||
for {
|
|
||||||
// we already have a k and a v and an err from the cursor setup, so check and use these
|
|
||||||
if it.err != nil ||
|
|
||||||
len(it.key) != query.keySize ||
|
|
||||||
!bytes.HasPrefix(it.key, query.prefix) {
|
|
||||||
// either iteration has errored or we reached the end of this prefix
|
|
||||||
// fmt.Println(" reached end", hex.EncodeToString(it.key), query.keySize, hex.EncodeToString(query.prefix), it.err)
|
|
||||||
exhaust(q)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// "id" indexes don't contain a timestamp
|
|
||||||
if query.timestampSize == 4 {
|
|
||||||
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
|
|
||||||
if createdAt < since {
|
|
||||||
// fmt.Println(" reached since", createdAt, "<", since)
|
|
||||||
exhaust(q)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// fetch actual event
|
|
||||||
bin, err := txn.Get(b.rawEventStore, it.valIdx)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf(
|
|
||||||
"lmdb: failed to get %x based on prefix %x, index key %x from raw event store: %s\n",
|
|
||||||
it.valIdx, query.prefix, it.key, err)
|
|
||||||
return nil, fmt.Errorf("iteration error: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// check it against pubkeys without decoding the entire thing
|
|
||||||
if extraAuthors != nil && !slices.Contains(extraAuthors, betterbinary.GetPubKey(bin)) {
|
|
||||||
it.next()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// check it against kinds without decoding the entire thing
|
|
||||||
if extraKinds != nil && !slices.Contains(extraKinds, betterbinary.GetKind(bin)) {
|
|
||||||
it.next()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// decode the entire thing
|
|
||||||
event := nostr.Event{}
|
|
||||||
if err := betterbinary.Unmarshal(bin, &event); err != nil {
|
|
||||||
log.Printf("lmdb: value read error (id %x) on query prefix %x sp %x dbi %d: %s\n", betterbinary.GetID(bin),
|
|
||||||
query.prefix, query.startingPoint, query.dbi, err)
|
|
||||||
return nil, fmt.Errorf("event read error: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// fmt.Println(" event", betterbinary.GetID(bin), "kind", betterbinary.GetKind(bin).Num(), "author", betterbinary.GetPubKey(bin), "ts", betterbinary.GetCreatedAt(bin), hex.EncodeToString(it.key), it.valIdx)
|
|
||||||
|
|
||||||
// if there is still a tag to be checked, do it now
|
|
||||||
if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) {
|
|
||||||
it.next()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// this event is good to be used
|
|
||||||
evt := internal.IterEvent{Event: event, Q: q}
|
|
||||||
//
|
|
||||||
//
|
|
||||||
if sndPhase {
|
|
||||||
// do the process described below at HIWAWVRTP.
|
|
||||||
// if we've reached here this means we've already passed the `since` check.
|
|
||||||
// now we have to eliminate the event currently at the `since` threshold.
|
|
||||||
nextThreshold := firstPhaseResults[len(firstPhaseResults)-2]
|
|
||||||
if oldest.Event.ID == nostr.ZeroID {
|
|
||||||
// fmt.Println(" b1", evt.ID[0:8])
|
|
||||||
// BRANCH WHEN WE DON'T HAVE THE OLDEST EVENT (BWWDHTOE)
|
|
||||||
// when we don't have the oldest set, we will keep the results
|
|
||||||
// and not change the cutting point -- it's bad, but hopefully not that bad.
|
|
||||||
results[q] = append(results[q], evt)
|
|
||||||
sndPhaseHasResultsPending = true
|
|
||||||
} else if nextThreshold.CreatedAt > oldest.CreatedAt {
|
|
||||||
// fmt.Println(" b2", nextThreshold.CreatedAt, ">", oldest.CreatedAt, evt.ID[0:8])
|
|
||||||
// one of the events we have stored is the actual next threshold
|
|
||||||
// eliminate last, update since with oldest
|
|
||||||
firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1]
|
|
||||||
since = uint32(oldest.CreatedAt)
|
|
||||||
// fmt.Println(" new since", since, evt.ID[0:8])
|
|
||||||
// we null the oldest Event as we can't rely on it anymore
|
|
||||||
// (we'll fall under BWWDHTOE above) until we have a new oldest set.
|
|
||||||
oldest = internal.IterEvent{Q: -1}
|
|
||||||
// anything we got that would be above this won't trigger an update to
|
|
||||||
// the oldest anyway, because it will be discarded as being after the limit.
|
|
||||||
//
|
|
||||||
// finally
|
|
||||||
// add this to the results to be merged later
|
|
||||||
results[q] = append(results[q], evt)
|
|
||||||
sndPhaseHasResultsPending = true
|
|
||||||
} else if nextThreshold.CreatedAt < evt.CreatedAt {
|
|
||||||
// the next last event in the firstPhaseResults is the next threshold
|
|
||||||
// fmt.Println(" b3", nextThreshold.CreatedAt, "<", oldest.CreatedAt, evt.ID[0:8])
|
|
||||||
// eliminate last, update since with the antelast
|
|
||||||
firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1]
|
|
||||||
since = uint32(nextThreshold.CreatedAt)
|
|
||||||
// fmt.Println(" new since", since)
|
|
||||||
// add this to the results to be merged later
|
|
||||||
results[q] = append(results[q], evt)
|
|
||||||
sndPhaseHasResultsPending = true
|
|
||||||
// update the oldest event
|
|
||||||
if evt.CreatedAt < oldest.CreatedAt {
|
|
||||||
oldest = evt
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// fmt.Println(" b4", evt.ID[0:8])
|
|
||||||
// oops, _we_ are the next `since` threshold
|
|
||||||
firstPhaseResults[len(firstPhaseResults)-1] = evt
|
|
||||||
since = uint32(evt.CreatedAt)
|
|
||||||
// fmt.Println(" new since", since)
|
|
||||||
// do not add us to the results to be merged later
|
|
||||||
// as we're already inhabiting the firstPhaseResults slice
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
results[q] = append(results[q], evt)
|
|
||||||
firstPhaseTotalPulled++
|
|
||||||
|
|
||||||
// update the oldest event
|
|
||||||
if oldest.Event.ID == nostr.ZeroID || evt.CreatedAt < oldest.CreatedAt {
|
|
||||||
oldest = evt
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pulledPerQuery[q]++
|
|
||||||
pulledThisIteration++
|
|
||||||
if pulledThisIteration > batchSizePerQuery {
|
|
||||||
// batch filled
|
|
||||||
it.next()
|
|
||||||
// fmt.Println(" filled", hex.EncodeToString(it.key), it.valIdx)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if pulledPerQuery[q] >= limit {
|
|
||||||
// batch filled + reached limit for this query (which is the global limit)
|
|
||||||
exhaust(q)
|
|
||||||
it.next()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
it.next()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// we will do this check if we don't accumulated the requested number of events yet
|
|
||||||
// fmt.Println("oldest", oldest.Event, "from iter", oldest.Q)
|
|
||||||
if sndPhase && sndPhaseHasResultsPending && (oldest.Event.ID == nostr.ZeroID || remainingUnexhausted == 0) {
|
|
||||||
// fmt.Println("second phase aggregation!")
|
|
||||||
// when we are in the second phase we will aggressively aggregate results on every iteration
|
|
||||||
//
|
|
||||||
secondBatch = secondBatch[:0]
|
|
||||||
for s := 0; s < len(sndPhaseParticipants); s++ {
|
|
||||||
q := sndPhaseParticipants[s]
|
|
||||||
|
|
||||||
if len(results[q]) > 0 {
|
|
||||||
secondBatch = append(secondBatch, results[q])
|
|
||||||
}
|
|
||||||
|
|
||||||
if exhausted[q] {
|
|
||||||
sndPhaseParticipants = internal.SwapDelete(sndPhaseParticipants, s)
|
|
||||||
s--
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// every time we get here we will alternate between these A and B lists
|
|
||||||
// combining everything we have into a new partial results list.
|
|
||||||
// after we've done that we can again set the oldest.
|
|
||||||
// fmt.Println(" xxx", sndPhaseResultsToggle)
|
|
||||||
if sndPhaseResultsToggle {
|
|
||||||
secondBatch = append(secondBatch, sndPhaseResultsB)
|
|
||||||
sndPhaseResultsA = internal.MergeSortMultiple(secondBatch, limit, sndPhaseResultsA)
|
|
||||||
oldest = sndPhaseResultsA[len(sndPhaseResultsA)-1]
|
|
||||||
// fmt.Println(" new aggregated a", len(sndPhaseResultsB))
|
|
||||||
} else {
|
|
||||||
secondBatch = append(secondBatch, sndPhaseResultsA)
|
|
||||||
sndPhaseResultsB = internal.MergeSortMultiple(secondBatch, limit, sndPhaseResultsB)
|
|
||||||
oldest = sndPhaseResultsB[len(sndPhaseResultsB)-1]
|
|
||||||
// fmt.Println(" new aggregated b", len(sndPhaseResultsB))
|
|
||||||
}
|
|
||||||
sndPhaseResultsToggle = !sndPhaseResultsToggle
|
|
||||||
|
|
||||||
since = uint32(oldest.CreatedAt)
|
|
||||||
// fmt.Println(" new since", since)
|
|
||||||
|
|
||||||
// reset the `results` list so we can keep using it
|
|
||||||
results = results[:len(queries)]
|
|
||||||
for _, q := range sndPhaseParticipants {
|
|
||||||
results[q] = results[q][:0]
|
|
||||||
}
|
|
||||||
} else if !sndPhase && firstPhaseTotalPulled >= limit && remainingUnexhausted > 0 {
|
|
||||||
// fmt.Println("have enough!", firstPhaseTotalPulled, "/", limit, "remaining", remainingUnexhausted)
|
|
||||||
|
|
||||||
// we will exclude this oldest number as it is not relevant anymore
|
|
||||||
// (we now want to keep track only of the oldest among the remaining iterators)
|
|
||||||
oldest = internal.IterEvent{Q: -1}
|
|
||||||
|
|
||||||
// HOW IT WORKS AFTER WE'VE REACHED THIS POINT (HIWAWVRTP)
|
|
||||||
// now we can combine the results we have and check what is our current oldest event.
|
|
||||||
// we also discard anything that is after the current cutting point (`limit`).
|
|
||||||
// so if we have [1,2,3], [10, 15, 20] and [7, 21, 49] but we only want 6 total
|
|
||||||
// we can just keep [1,2,3,7,10,15] and discard [20, 21, 49],
|
|
||||||
// and also adjust our `since` parameter to `15`, discarding anything we get after it
|
|
||||||
// and immediately declaring that iterator exhausted.
|
|
||||||
// also every time we get result that is more recent than this updated `since` we can
|
|
||||||
// keep it but also discard the previous since, moving the needle one back -- for example,
|
|
||||||
// if we get an `8` we can keep it and move the `since` parameter to `10`, discarding `15`
|
|
||||||
// in the process.
|
|
||||||
all := make([][]internal.IterEvent, len(results))
|
|
||||||
copy(all, results) // we have to use this otherwise internal.MergeSortMultiple will scramble our results slice
|
|
||||||
firstPhaseResults = internal.MergeSortMultiple(all, limit, nil)
|
|
||||||
oldest = firstPhaseResults[limit-1]
|
|
||||||
since = uint32(oldest.CreatedAt)
|
|
||||||
// fmt.Println("new since", since)
|
|
||||||
|
|
||||||
for q := range queries {
|
|
||||||
if exhausted[q] {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// we also automatically exhaust any of the iterators that have already passed the
|
|
||||||
// cutting point (`since`)
|
|
||||||
if results[q][len(results[q])-1].CreatedAt < oldest.CreatedAt {
|
|
||||||
exhausted[q] = true
|
|
||||||
remainingUnexhausted--
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// for all the remaining iterators,
|
|
||||||
// since we have merged all the events in this `firstPhaseResults` slice, we can empty the
|
|
||||||
// current `results` slices and reuse them.
|
|
||||||
results[q] = results[q][:0]
|
|
||||||
|
|
||||||
// build this index of indexes with everybody who remains
|
|
||||||
sndPhaseParticipants = append(sndPhaseParticipants, q)
|
|
||||||
}
|
|
||||||
|
|
||||||
// we create these two lists and alternate between them so we don't have to create a
|
|
||||||
// a new one every time
|
|
||||||
sndPhaseResultsA = make([]internal.IterEvent, 0, limit*2)
|
|
||||||
sndPhaseResultsB = make([]internal.IterEvent, 0, limit*2)
|
|
||||||
|
|
||||||
// from now on we won't run this block anymore
|
|
||||||
sndPhase = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// fmt.Println("remaining", remainingUnexhausted)
|
|
||||||
if remainingUnexhausted == 0 {
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// fmt.Println("is sndPhase?", sndPhase)
|
return nil
|
||||||
|
|
||||||
var combinedResults []internal.IterEvent
|
|
||||||
|
|
||||||
if sndPhase {
|
|
||||||
// fmt.Println("ending second phase")
|
|
||||||
// when we reach this point either sndPhaseResultsA or sndPhaseResultsB will be full of stuff,
|
|
||||||
// the other will be empty
|
|
||||||
var sndPhaseResults []internal.IterEvent
|
|
||||||
// fmt.Println("xxx", sndPhaseResultsToggle, len(sndPhaseResultsA), len(sndPhaseResultsB))
|
|
||||||
if sndPhaseResultsToggle {
|
|
||||||
sndPhaseResults = sndPhaseResultsB
|
|
||||||
combinedResults = sndPhaseResultsA[0:limit] // reuse this
|
|
||||||
// fmt.Println(" using b", len(sndPhaseResultsA))
|
|
||||||
} else {
|
|
||||||
sndPhaseResults = sndPhaseResultsA
|
|
||||||
combinedResults = sndPhaseResultsB[0:limit] // reuse this
|
|
||||||
// fmt.Println(" using a", len(sndPhaseResultsA))
|
|
||||||
}
|
|
||||||
|
|
||||||
all := [][]internal.IterEvent{firstPhaseResults, sndPhaseResults}
|
|
||||||
combinedResults = internal.MergeSortMultiple(all, limit, combinedResults)
|
|
||||||
// fmt.Println("final combinedResults", len(combinedResults), cap(combinedResults), limit)
|
|
||||||
} else {
|
|
||||||
combinedResults = make([]internal.IterEvent, limit)
|
|
||||||
combinedResults = internal.MergeSortMultiple(results, limit, combinedResults)
|
|
||||||
}
|
|
||||||
|
|
||||||
return combinedResults, nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,9 +14,7 @@ type query struct {
|
|||||||
i int
|
i int
|
||||||
dbi lmdb.DBI
|
dbi lmdb.DBI
|
||||||
prefix []byte
|
prefix []byte
|
||||||
results chan *nostr.Event
|
|
||||||
keySize int
|
keySize int
|
||||||
timestampSize int
|
|
||||||
startingPoint []byte
|
startingPoint []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -46,20 +44,19 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
|
|||||||
sp = sp[0:len(q.prefix)]
|
sp = sp[0:len(q.prefix)]
|
||||||
copy(sp, q.prefix)
|
copy(sp, q.prefix)
|
||||||
queries[i].startingPoint = binary.BigEndian.AppendUint32(sp, uint32(until))
|
queries[i].startingPoint = binary.BigEndian.AppendUint32(sp, uint32(until))
|
||||||
queries[i].results = make(chan *nostr.Event, 12)
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if filter.IDs != nil {
|
// if filter.IDs != nil {
|
||||||
// when there are ids we ignore everything else
|
// // when there are ids we ignore everything else
|
||||||
queries = make([]query, len(filter.IDs))
|
// queries = make([]query, len(filter.IDs))
|
||||||
for i, id := range filter.IDs {
|
// for i, id := range filter.IDs {
|
||||||
prefix := make([]byte, 8)
|
// prefix := make([]byte, 8)
|
||||||
copy(prefix[0:8], id[0:8])
|
// copy(prefix[0:8], id[0:8])
|
||||||
queries[i] = query{i: i, dbi: b.indexId, prefix: prefix[0:8], keySize: 8, timestampSize: 0}
|
// queries[i] = query{i: i, dbi: b.indexId, prefix: prefix[0:8], keySize: 8}
|
||||||
}
|
// }
|
||||||
return queries, nil, nil, "", nil, 0, nil
|
// return queries, nil, nil, "", nil, 0, nil
|
||||||
}
|
// }
|
||||||
|
|
||||||
// this is where we'll end the iteration
|
// this is where we'll end the iteration
|
||||||
if filter.Since != 0 {
|
if filter.Since != 0 {
|
||||||
@@ -94,7 +91,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
|
|||||||
return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid 'p' tag '%s'", value)
|
return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid 'p' tag '%s'", value)
|
||||||
}
|
}
|
||||||
binary.BigEndian.PutUint16(k[8:8+2], uint16(kind))
|
binary.BigEndian.PutUint16(k[8:8+2], uint16(kind))
|
||||||
queries[i] = query{i: i, dbi: b.indexPTagKind, prefix: k[0 : 8+2], keySize: 8 + 2 + 4, timestampSize: 4}
|
queries[i] = query{i: i, dbi: b.indexPTagKind, prefix: k[0 : 8+2], keySize: 8 + 2 + 4}
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -110,7 +107,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
|
|||||||
if _, err := hex.Decode(k[0:8], []byte(value[0:8*2])); err != nil {
|
if _, err := hex.Decode(k[0:8], []byte(value[0:8*2])); err != nil {
|
||||||
return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid 'p' tag '%s'", value)
|
return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid 'p' tag '%s'", value)
|
||||||
}
|
}
|
||||||
queries[i] = query{i: i, dbi: b.indexPTagKind, prefix: k[0:8], keySize: 8 + 2 + 4, timestampSize: 4}
|
queries[i] = query{i: i, dbi: b.indexPTagKind, prefix: k[0:8], keySize: 8 + 2 + 4}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -121,7 +118,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
|
|||||||
dbi, k, offset := b.getTagIndexPrefix(tagKey, value)
|
dbi, k, offset := b.getTagIndexPrefix(tagKey, value)
|
||||||
// remove the last parts part to get just the prefix we want here
|
// remove the last parts part to get just the prefix we want here
|
||||||
prefix := k[0:offset]
|
prefix := k[0:offset]
|
||||||
queries[i] = query{i: i, dbi: dbi, prefix: prefix, keySize: len(prefix) + 4, timestampSize: 4}
|
queries[i] = query{i: i, dbi: dbi, prefix: prefix, keySize: len(prefix) + 4}
|
||||||
}
|
}
|
||||||
|
|
||||||
// add an extra kind filter if available (only do this on plain tag index, not on ptag-kind index)
|
// add an extra kind filter if available (only do this on plain tag index, not on ptag-kind index)
|
||||||
@@ -156,7 +153,7 @@ pubkeyMatching:
|
|||||||
// will use pubkey index
|
// will use pubkey index
|
||||||
queries = make([]query, len(filter.Authors))
|
queries = make([]query, len(filter.Authors))
|
||||||
for i, pk := range filter.Authors {
|
for i, pk := range filter.Authors {
|
||||||
queries[i] = query{i: i, dbi: b.indexPubkey, prefix: pk[0:8], keySize: 8 + 4, timestampSize: 4}
|
queries[i] = query{i: i, dbi: b.indexPubkey, prefix: pk[0:8], keySize: 8 + 4}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// will use pubkeyKind index
|
// will use pubkeyKind index
|
||||||
@@ -167,7 +164,7 @@ pubkeyMatching:
|
|||||||
prefix := make([]byte, 8+2)
|
prefix := make([]byte, 8+2)
|
||||||
copy(prefix[0:8], pk[0:8])
|
copy(prefix[0:8], pk[0:8])
|
||||||
binary.BigEndian.PutUint16(prefix[8:8+2], uint16(kind))
|
binary.BigEndian.PutUint16(prefix[8:8+2], uint16(kind))
|
||||||
queries[i] = query{i: i, dbi: b.indexPubkeyKind, prefix: prefix[0 : 8+2], keySize: 10 + 4, timestampSize: 4}
|
queries[i] = query{i: i, dbi: b.indexPubkeyKind, prefix: prefix[0 : 8+2], keySize: 10 + 4}
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -184,7 +181,7 @@ pubkeyMatching:
|
|||||||
for i, kind := range filter.Kinds {
|
for i, kind := range filter.Kinds {
|
||||||
prefix := make([]byte, 2)
|
prefix := make([]byte, 2)
|
||||||
binary.BigEndian.PutUint16(prefix[0:2], uint16(kind))
|
binary.BigEndian.PutUint16(prefix[0:2], uint16(kind))
|
||||||
queries[i] = query{i: i, dbi: b.indexKind, prefix: prefix[0:2], keySize: 2 + 4, timestampSize: 4}
|
queries[i] = query{i: i, dbi: b.indexKind, prefix: prefix[0:2], keySize: 2 + 4}
|
||||||
}
|
}
|
||||||
|
|
||||||
// potentially with an extra useless tag filtering
|
// potentially with an extra useless tag filtering
|
||||||
@@ -195,6 +192,6 @@ pubkeyMatching:
|
|||||||
// if we got here our query will have nothing to filter with
|
// if we got here our query will have nothing to filter with
|
||||||
queries = make([]query, 1)
|
queries = make([]query, 1)
|
||||||
prefix := make([]byte, 0)
|
prefix := make([]byte, 0)
|
||||||
queries[0] = query{i: 0, dbi: b.indexCreatedAt, prefix: prefix, keySize: 0 + 4, timestampSize: 4}
|
queries[0] = query{i: 0, dbi: b.indexCreatedAt, prefix: prefix, keySize: 0 + 4}
|
||||||
return queries, nil, nil, "", nil, since, nil
|
return queries, nil, nil, "", nil, since, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package lmdb
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"iter"
|
||||||
"math"
|
"math"
|
||||||
|
|
||||||
"fiatjaf.com/nostr"
|
"fiatjaf.com/nostr"
|
||||||
@@ -23,16 +24,20 @@ func (b *LMDBBackend) ReplaceEvent(evt nostr.Event) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// now we fetch the past events, whatever they are, delete them and then save the new
|
// now we fetch the past events, whatever they are, delete them and then save the new
|
||||||
results, err := b.query(txn, filter, 10) // in theory limit could be just 1 and this should work
|
var yield_ func(nostr.Event) bool
|
||||||
|
var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) {
|
||||||
|
yield_ = yield
|
||||||
|
}
|
||||||
|
err := b.query(txn, filter, 10 /* in theory limit could be just 1 and this should work */, yield_)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to query past events with %s: %w", filter, err)
|
return fmt.Errorf("failed to query past events with %s: %w", filter, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
shouldStore := true
|
shouldStore := true
|
||||||
for _, previous := range results {
|
for previous := range results {
|
||||||
if internal.IsOlder(previous.Event, evt) {
|
if internal.IsOlder(previous, evt) {
|
||||||
if err := b.delete(txn, previous.Event.ID); err != nil {
|
if err := b.delete(txn, previous.ID); err != nil {
|
||||||
return fmt.Errorf("failed to delete event %s for replacing: %w", previous.Event.ID, err)
|
return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// there is a newer event already stored, so we won't store this
|
// there is a newer event already stored, so we won't store this
|
||||||
|
|||||||
27
eventstore/lmdb/utils_test.go
Normal file
27
eventstore/lmdb/utils_test.go
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
package lmdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestQuickselect(t *testing.T) {
|
||||||
|
its := iterators{
|
||||||
|
{last: 781},
|
||||||
|
{last: 900},
|
||||||
|
{last: 1},
|
||||||
|
{last: 81},
|
||||||
|
{last: 325},
|
||||||
|
{last: 781},
|
||||||
|
{last: 562},
|
||||||
|
{last: 81},
|
||||||
|
{last: 444},
|
||||||
|
}
|
||||||
|
|
||||||
|
its.quickselect(3, 0, len(its))
|
||||||
|
require.ElementsMatch(t, its[len(its)-3:], iterators{{last: 900}, {last: 781}, {last: 781}})
|
||||||
|
|
||||||
|
its.quickselect(4, 0, len(its))
|
||||||
|
require.ElementsMatch(t, its[len(its)-4:], iterators{{last: 562}, {last: 900}, {last: 781}, {last: 781}})
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user