mmm: copy new lmdb querying mechanism because it's so much more elegant.
This commit is contained in:
@@ -3,9 +3,7 @@ package internal
|
||||
import (
|
||||
"bytes"
|
||||
"math"
|
||||
"slices"
|
||||
|
||||
mergesortedslices "fiatjaf.com/lib/merge-sorted-slices"
|
||||
"fiatjaf.com/nostr"
|
||||
)
|
||||
|
||||
@@ -78,68 +76,6 @@ func CopyMapWithoutKey[K comparable, V any](originalMap map[K]V, key K) map[K]V
|
||||
return newMap
|
||||
}
|
||||
|
||||
// MergeSortMultipleBatches takes the results of multiple iterators, which are already sorted,
|
||||
// and merges them into a single big sorted slice
|
||||
func MergeSortMultiple(batches [][]nostr.Event, limit int, dst []nostr.Event) []nostr.Event {
|
||||
// clear up empty lists here while simultaneously computing the total count.
|
||||
// this helps because if there are a bunch of empty lists then this pre-clean
|
||||
// step will get us in the faster 'merge' branch otherwise we would go to the other.
|
||||
// we would have to do the cleaning anyway inside it.
|
||||
// and even if we still go on the other we save one iteration by already computing the
|
||||
// total count.
|
||||
total := 0
|
||||
for i := len(batches) - 1; i >= 0; i-- {
|
||||
if len(batches[i]) == 0 {
|
||||
batches = SwapDelete(batches, i)
|
||||
} else {
|
||||
total += len(batches[i])
|
||||
}
|
||||
}
|
||||
|
||||
if limit == -1 {
|
||||
limit = total
|
||||
}
|
||||
|
||||
// this amazing equation will ensure that if one of the two sides goes very small (like 1 or 2)
|
||||
// the other can go very high (like 500) and we're still in the 'merge' branch.
|
||||
// if values go somewhere in the middle then they may match the 'merge' branch (batches=20,limit=70)
|
||||
// or not (batches=25, limit=60)
|
||||
if math.Log(float64(len(batches)*2))+math.Log(float64(limit)) < 8 {
|
||||
if dst == nil {
|
||||
dst = make([]nostr.Event, limit)
|
||||
} else if cap(dst) < limit {
|
||||
dst = slices.Grow(dst, limit-len(dst))
|
||||
}
|
||||
dst = dst[0:limit]
|
||||
return mergesortedslices.MergeFuncNoEmptyListsIntoSlice(dst, batches, nostr.CompareEvent)
|
||||
} else {
|
||||
if dst == nil {
|
||||
dst = make([]nostr.Event, total)
|
||||
} else if cap(dst) < total {
|
||||
dst = slices.Grow(dst, total-len(dst))
|
||||
}
|
||||
dst = dst[0:total]
|
||||
|
||||
// use quicksort in a dumb way that will still be fast because it's cheated
|
||||
lastIndex := 0
|
||||
for _, batch := range batches {
|
||||
copy(dst[lastIndex:], batch)
|
||||
lastIndex += len(batch)
|
||||
}
|
||||
|
||||
slices.SortFunc(dst, nostr.CompareEvent)
|
||||
|
||||
for i, j := 0, total-1; i < j; i, j = i+1, j-1 {
|
||||
dst[i], dst[j] = dst[j], dst[i]
|
||||
}
|
||||
|
||||
if limit < len(dst) {
|
||||
return dst[0:limit]
|
||||
}
|
||||
return dst
|
||||
}
|
||||
}
|
||||
|
||||
// BatchSizePerNumberOfQueries tries to make an educated guess for the batch size given the total filter limit and
|
||||
// the number of abstract queries we'll be conducting at the same time
|
||||
func BatchSizePerNumberOfQueries(totalFilterLimit int, numberOfQueries int) int {
|
||||
|
||||
@@ -103,7 +103,6 @@ func FuzzTest(f *testing.F) {
|
||||
layer := mmm.layers[rnd.Int()%len(mmm.layers)]
|
||||
|
||||
evt, layers := mmm.GetByID(id)
|
||||
|
||||
if slices.Contains(deleted[id], layer) {
|
||||
// already deleted from this layer
|
||||
require.NotContains(t, layers, layer)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package mmm
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"iter"
|
||||
@@ -12,14 +13,61 @@ import (
|
||||
"github.com/PowerDNS/lmdb-go/lmdb"
|
||||
)
|
||||
|
||||
// this iterator always goes backwards
|
||||
type iterator struct {
|
||||
query query
|
||||
|
||||
// iteration stuff
|
||||
cursor *lmdb.Cursor
|
||||
key []byte
|
||||
posb []byte
|
||||
err error
|
||||
|
||||
// this keeps track of last timestamp value pulled from this
|
||||
last uint32
|
||||
|
||||
// if we shouldn't fetch more from this
|
||||
exhausted bool
|
||||
|
||||
// results not yet emitted
|
||||
posbs [][]byte
|
||||
timestamps []uint32
|
||||
}
|
||||
|
||||
func (it *iterator) pull(n int, since uint32) {
|
||||
query := it.query
|
||||
|
||||
for range n {
|
||||
// in the beginning we already have a k and a v and an err from the cursor setup, so check and use these
|
||||
if it.err != nil {
|
||||
it.exhausted = true
|
||||
return
|
||||
}
|
||||
|
||||
if len(it.key) != query.keySize || !bytes.HasPrefix(it.key, query.prefix) {
|
||||
// we reached the end of this prefix
|
||||
it.exhausted = true
|
||||
return
|
||||
}
|
||||
|
||||
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
|
||||
if createdAt < since {
|
||||
it.exhausted = true
|
||||
return
|
||||
}
|
||||
|
||||
// got a key
|
||||
it.posbs = append(it.posbs, it.posb)
|
||||
it.timestamps = append(it.timestamps, createdAt)
|
||||
it.last = createdAt
|
||||
|
||||
// advance the cursor for the next call
|
||||
it.next()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// goes backwards
|
||||
func (it *iterator) seek(key []byte) {
|
||||
if _, _, errsr := it.cursor.Get(key, nil, lmdb.SetRange); errsr != nil {
|
||||
if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound {
|
||||
@@ -36,11 +84,88 @@ func (it *iterator) seek(key []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
// goes backwards
|
||||
func (it *iterator) next() {
|
||||
// move one back (we'll look into k and v and err in the next iteration)
|
||||
it.key, it.posb, it.err = it.cursor.Get(nil, nil, lmdb.Prev)
|
||||
}
|
||||
|
||||
type iterators []*iterator
|
||||
|
||||
// quickselect reorders the slice just enough to make the top k elements be arranged at the end
|
||||
// i.e. [1, 700, 25, 312, 44, 28] with k=3 becomes something like [700, 312, 44, 1, 25, 28]
|
||||
// in this case it's hardcoded to use the 'last' field of the iterator
|
||||
// copied from https://github.com/chrislee87/go-quickselect
|
||||
// this is modified to also return the highest 'last' (because it's not guaranteed it will be the first item)
|
||||
func (its iterators) quickselect(k int) uint32 {
|
||||
if len(its) == 0 || k >= len(its) {
|
||||
return 0
|
||||
}
|
||||
|
||||
left, right := 0, len(its)-1
|
||||
|
||||
for {
|
||||
// insertion sort for small ranges
|
||||
if right-left <= 20 {
|
||||
for i := left + 1; i <= right; i++ {
|
||||
for j := i; j > 0 && its[j].last > its[j-1].last; j-- {
|
||||
its[j], its[j-1] = its[j-1], its[j]
|
||||
}
|
||||
}
|
||||
return its[0].last
|
||||
}
|
||||
|
||||
// median-of-three to choose pivot
|
||||
pivotIndex := left + (right-left)/2
|
||||
if its[right].last > its[left].last {
|
||||
its[right], its[left] = its[left], its[right]
|
||||
}
|
||||
if its[pivotIndex].last > its[left].last {
|
||||
its[pivotIndex], its[left] = its[left], its[pivotIndex]
|
||||
}
|
||||
if its[right].last > its[pivotIndex].last {
|
||||
its[right], its[pivotIndex] = its[pivotIndex], its[right]
|
||||
}
|
||||
|
||||
// partition
|
||||
its[left], its[pivotIndex] = its[pivotIndex], its[left]
|
||||
ll := left + 1
|
||||
rr := right
|
||||
for ll <= rr {
|
||||
for ll <= right && its[ll].last > its[left].last {
|
||||
ll++
|
||||
}
|
||||
for rr >= left && its[left].last > its[rr].last {
|
||||
rr--
|
||||
}
|
||||
if ll <= rr {
|
||||
its[ll], its[rr] = its[rr], its[ll]
|
||||
ll++
|
||||
rr--
|
||||
}
|
||||
}
|
||||
its[left], its[rr] = its[rr], its[left] // swap into right place
|
||||
pivotIndex = rr
|
||||
|
||||
if k == pivotIndex {
|
||||
// now that stuff is selected we get the highest "last"
|
||||
highest := its[0].last
|
||||
for i := 1; i < k; i++ {
|
||||
if its[i].last > highest {
|
||||
highest = its[i].last
|
||||
}
|
||||
}
|
||||
return highest
|
||||
}
|
||||
|
||||
if k < pivotIndex {
|
||||
right = pivotIndex - 1
|
||||
} else {
|
||||
left = pivotIndex + 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type key struct {
|
||||
dbi lmdb.DBI
|
||||
key []byte
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
package mmm
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"iter"
|
||||
"log"
|
||||
"math"
|
||||
"slices"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
@@ -16,18 +16,15 @@ import (
|
||||
|
||||
// GetByID returns the event -- if found in this mmm -- and all the IndexingLayers it belongs to.
|
||||
func (b *MultiMmapManager) GetByID(id nostr.ID) (*nostr.Event, IndexingLayers) {
|
||||
presence := make(chan []uint16)
|
||||
|
||||
var event *nostr.Event
|
||||
b.queryByIDs(func(evt nostr.Event) bool {
|
||||
layers := b.queryByIDs([]nostr.ID{id}, func(evt nostr.Event) bool {
|
||||
event = &evt
|
||||
return false
|
||||
}, []nostr.ID{id}, presence)
|
||||
}, true)
|
||||
|
||||
if event != nil {
|
||||
p := <-presence
|
||||
present := make([]*IndexingLayer, len(p))
|
||||
for i, id := range p {
|
||||
present := make([]*IndexingLayer, len(layers))
|
||||
for i, id := range layers {
|
||||
present[i] = b.layers.ByID(id)
|
||||
}
|
||||
return event, present
|
||||
@@ -37,14 +34,9 @@ func (b *MultiMmapManager) GetByID(id nostr.ID) (*nostr.Event, IndexingLayers) {
|
||||
}
|
||||
|
||||
// queryByIDs emits the events of the given id to the given channel if they exist anywhere in this mmm.
|
||||
// if presence is given it will also be used to emit slices of the ids of the IndexingLayers this event is stored in.
|
||||
// it closes the channels when it ends.
|
||||
func (b *MultiMmapManager) queryByIDs(yield func(nostr.Event) bool, ids []nostr.ID, presence chan []uint16) {
|
||||
func (b *MultiMmapManager) queryByIDs(ids []nostr.ID, yield func(nostr.Event) bool, withLayers bool) (layers []uint16) {
|
||||
b.lmdbEnv.View(func(txn *lmdb.Txn) error {
|
||||
txn.RawRead = true
|
||||
if presence != nil {
|
||||
defer close(presence)
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
val, err := txn.Get(b.indexId, id[0:8])
|
||||
@@ -55,30 +47,34 @@ func (b *MultiMmapManager) queryByIDs(yield func(nostr.Event) bool, ids []nostr.
|
||||
panic(fmt.Errorf("failed to decode event from %v: %w", pos, err))
|
||||
}
|
||||
|
||||
if !yield(evt) {
|
||||
return nil
|
||||
}
|
||||
stop := yield(evt)
|
||||
|
||||
if presence != nil {
|
||||
layers := make([]uint16, 0, (len(val)-12)/2)
|
||||
if withLayers {
|
||||
layers = make([]uint16, 0, (len(val)-12)/2)
|
||||
for s := 12; s < len(val); s += 2 {
|
||||
layers = append(layers, binary.BigEndian.Uint16(val[s:s+2]))
|
||||
}
|
||||
presence <- layers
|
||||
}
|
||||
|
||||
if stop {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return layers
|
||||
}
|
||||
|
||||
func (il *IndexingLayer) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[nostr.Event] {
|
||||
return func(yield func(nostr.Event) bool) {
|
||||
if len(filter.IDs) > 0 {
|
||||
il.mmmm.queryByIDs(yield, filter.IDs, nil)
|
||||
il.mmmm.queryByIDs(filter.IDs, yield, false)
|
||||
return
|
||||
}
|
||||
|
||||
if filter.Search != "" {
|
||||
return
|
||||
}
|
||||
@@ -96,361 +92,121 @@ func (il *IndexingLayer) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq
|
||||
il.lmdbEnv.View(func(txn *lmdb.Txn) error {
|
||||
txn.RawRead = true
|
||||
|
||||
results, err := il.query(txn, filter, maxLimit)
|
||||
for _, ie := range results {
|
||||
if !yield(ie.Event) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
return il.query(txn, filter, maxLimit, yield)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([]internal.IterEvent, error) {
|
||||
func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield func(nostr.Event) bool) error {
|
||||
queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := il.prepareQueries(filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
iterators := make([]*iterator, len(queries))
|
||||
exhausted := make([]bool, len(queries)) // indicates that a query won't be used anymore
|
||||
results := make([][]internal.IterEvent, len(queries))
|
||||
pulledPerQuery := make([]int, len(queries))
|
||||
iterators := make(iterators, len(queries))
|
||||
batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, len(queries))
|
||||
|
||||
// these are kept updated so we never pull from the iterator that is at further distance
|
||||
// (i.e. the one that has the oldest event among all)
|
||||
// we will continue to pull from it as soon as some other iterator takes the position
|
||||
oldest := internal.IterEvent{Q: -1}
|
||||
|
||||
sndPhase := false // after we have gathered enough events we will change the way we iterate
|
||||
secondBatch := make([][]internal.IterEvent, 0, len(queries)+1)
|
||||
sndPhaseParticipants := make([]int, 0, len(queries)+1)
|
||||
|
||||
// while merging results in the second phase we will alternate between these two lists
|
||||
// to avoid having to create new lists all the time
|
||||
var sndPhaseResultsA []internal.IterEvent
|
||||
var sndPhaseResultsB []internal.IterEvent
|
||||
var sndPhaseResultsToggle bool // this is just a dummy thing we use to keep track of the alternating
|
||||
var sndPhaseHasResultsPending bool
|
||||
|
||||
remainingUnexhausted := len(queries) // when all queries are exhausted we can finally end this thing
|
||||
batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted)
|
||||
firstPhaseTotalPulled := 0
|
||||
|
||||
exhaust := func(q int) {
|
||||
exhausted[q] = true
|
||||
remainingUnexhausted--
|
||||
if q == oldest.Q {
|
||||
oldest = internal.IterEvent{Q: -1}
|
||||
}
|
||||
}
|
||||
|
||||
var firstPhaseResults []internal.IterEvent
|
||||
|
||||
for q := range queries {
|
||||
for q, query := range queries {
|
||||
cursor, err := txn.OpenCursor(queries[q].dbi)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
iterators[q] = &iterator{cursor: cursor}
|
||||
iterators[q] = &iterator{
|
||||
query: query,
|
||||
cursor: cursor,
|
||||
}
|
||||
|
||||
defer cursor.Close()
|
||||
iterators[q].seek(queries[q].startingPoint)
|
||||
results[q] = make([]internal.IterEvent, 0, batchSizePerQuery*2)
|
||||
}
|
||||
|
||||
// fmt.Println("queries", len(queries))
|
||||
// initial pull from all queries
|
||||
for i := range iterators {
|
||||
iterators[i].pull(batchSizePerQuery, since)
|
||||
}
|
||||
|
||||
for c := 0; ; c++ {
|
||||
batchSizePerQuery = internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted)
|
||||
numberOfIteratorsToPullOnEachRound := max(1, int(math.Ceil(float64(len(iterators))/float64(12))))
|
||||
totalEventsEmitted := 0
|
||||
tempResults := make([]nostr.Event, 0, batchSizePerQuery*2)
|
||||
|
||||
// fmt.Println(" iteration", c, "remaining", remainingUnexhausted, "batchsize", batchSizePerQuery)
|
||||
// we will go through all the iterators in batches until we have pulled all the required results
|
||||
for q, query := range queries {
|
||||
if exhausted[q] {
|
||||
for len(iterators) > 0 {
|
||||
// reset stuff
|
||||
tempResults = tempResults[:0]
|
||||
|
||||
// after pulling from all iterators once we now find out what iterators are
|
||||
// the ones we should keep pulling from next (i.e. which one's last emitted timestamp is the highest)
|
||||
threshold := iterators.quickselect(min(numberOfIteratorsToPullOnEachRound, len(iterators)))
|
||||
|
||||
// so we can emit all the events higher than the threshold
|
||||
for i := range iterators {
|
||||
for t := 0; t < len(iterators[i].timestamps); t++ {
|
||||
if iterators[i].timestamps[t] >= threshold {
|
||||
posb := iterators[i].posbs[t]
|
||||
|
||||
// discard this regardless of what happens
|
||||
iterators[i].timestamps = internal.SwapDelete(iterators[i].timestamps, t)
|
||||
iterators[i].posbs = internal.SwapDelete(iterators[i].posbs, t)
|
||||
t--
|
||||
|
||||
// fetch actual event
|
||||
pos := positionFromBytes(posb)
|
||||
bin := il.mmmm.mmapf[pos.start : pos.start+uint64(pos.size)]
|
||||
|
||||
// check it against pubkeys without decoding the entire thing
|
||||
if extraAuthors != nil && !slices.Contains(extraAuthors, betterbinary.GetPubKey(bin)) {
|
||||
continue
|
||||
}
|
||||
|
||||
// check it against kinds without decoding the entire thing
|
||||
if extraKinds != nil && !slices.Contains(extraKinds, betterbinary.GetKind(bin)) {
|
||||
continue
|
||||
}
|
||||
|
||||
// decode the entire thing
|
||||
event := nostr.Event{}
|
||||
if err := betterbinary.Unmarshal(bin, &event); err != nil {
|
||||
log.Printf("lmdb: value read error (id %x) on query prefix %x sp %x dbi %v: %s\n",
|
||||
betterbinary.GetID(bin), iterators[i].query.prefix, iterators[i].query.startingPoint, iterators[i].query.dbi, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// if there is still a tag to be checked, do it now
|
||||
if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) {
|
||||
continue
|
||||
}
|
||||
|
||||
tempResults = append(tempResults, event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// emit this stuff in order
|
||||
slices.SortFunc(tempResults, nostr.CompareEventReverse)
|
||||
for _, evt := range tempResults {
|
||||
if !yield(evt) {
|
||||
return nil
|
||||
}
|
||||
|
||||
totalEventsEmitted++
|
||||
if totalEventsEmitted == limit {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// now pull more events
|
||||
for i := 0; i < min(len(iterators), numberOfIteratorsToPullOnEachRound); i++ {
|
||||
if iterators[i].exhausted {
|
||||
if len(iterators[i].posbs) == 0 {
|
||||
// eliminating this from the list of iterators
|
||||
iterators = internal.SwapDelete(iterators, i)
|
||||
i--
|
||||
}
|
||||
continue
|
||||
}
|
||||
if oldest.Q == q && remainingUnexhausted > 1 {
|
||||
continue
|
||||
}
|
||||
// fmt.Println(" query", q, unsafe.Pointer(&results[q]), hex.EncodeToString(query.prefix), len(results[q]))
|
||||
|
||||
it := iterators[q]
|
||||
pulledThisIteration := 0
|
||||
|
||||
for {
|
||||
// we already have a k and a v and an err from the cursor setup, so check and use these
|
||||
if it.err != nil ||
|
||||
len(it.key) != query.keySize ||
|
||||
!bytes.HasPrefix(it.key, query.prefix) {
|
||||
// either iteration has errored or we reached the end of this prefix
|
||||
// fmt.Println(" reached end", it.key, query.keySize, query.prefix)
|
||||
exhaust(q)
|
||||
break
|
||||
}
|
||||
|
||||
// "id" indexes don't contain a timestamp
|
||||
if query.timestampSize == 4 {
|
||||
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
|
||||
if createdAt < since {
|
||||
// fmt.Println(" reached since", createdAt, "<", since)
|
||||
exhaust(q)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// fetch actual event
|
||||
pos := positionFromBytes(it.posb)
|
||||
bin := il.mmmm.mmapf[pos.start : pos.start+uint64(pos.size)]
|
||||
|
||||
// check it against pubkeys without decoding the entire thing
|
||||
if extraAuthors != nil && !slices.Contains(extraAuthors, betterbinary.GetPubKey(bin)) {
|
||||
it.next()
|
||||
continue
|
||||
}
|
||||
|
||||
// check it against kinds without decoding the entire thing
|
||||
if extraKinds != nil && !slices.Contains(extraKinds, betterbinary.GetKind(bin)) {
|
||||
it.next()
|
||||
continue
|
||||
}
|
||||
|
||||
// decode the entire thing (TODO: do a conditional decode while also checking the extra tag)
|
||||
event := nostr.Event{}
|
||||
if err := betterbinary.Unmarshal(bin, &event); err != nil {
|
||||
log.Printf("mmm: value read error (id %x) on query prefix %x sp %x dbi %d: %s\n",
|
||||
bin[0:32], query.prefix, query.startingPoint, query.dbi, err)
|
||||
return nil, fmt.Errorf("event read error: %w", err)
|
||||
}
|
||||
|
||||
// fmt.Println(" event", betterbinary.GetID(bin), "kind", betterbinary.GetKind(bin).Num(), "author", betterbinary.GetPubKey(bin), "ts", betterbinary.GetCreatedAt(bin), hex.EncodeToString(it.key), it.valIdx)
|
||||
|
||||
// if there is still a tag to be checked, do it now
|
||||
if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) {
|
||||
it.next()
|
||||
continue
|
||||
}
|
||||
|
||||
// this event is good to be used
|
||||
evt := internal.IterEvent{Event: event, Q: q}
|
||||
//
|
||||
//
|
||||
if sndPhase {
|
||||
// do the process described below at HIWAWVRTP.
|
||||
// if we've reached here this means we've already passed the `since` check.
|
||||
// now we have to eliminate the event currently at the `since` threshold.
|
||||
nextThreshold := firstPhaseResults[len(firstPhaseResults)-2]
|
||||
if oldest.Event.ID == nostr.ZeroID {
|
||||
// fmt.Println(" b1", evt.ID[0:8])
|
||||
// BRANCH WHEN WE DON'T HAVE THE OLDEST EVENT (BWWDHTOE)
|
||||
// when we don't have the oldest set, we will keep the results
|
||||
// and not change the cutting point -- it's bad, but hopefully not that bad.
|
||||
results[q] = append(results[q], evt)
|
||||
sndPhaseHasResultsPending = true
|
||||
} else if nextThreshold.CreatedAt > oldest.CreatedAt {
|
||||
// fmt.Println(" b2", nextThreshold.CreatedAt, ">", oldest.CreatedAt, evt.ID[0:8])
|
||||
// one of the events we have stored is the actual next threshold
|
||||
// eliminate last, update since with oldest
|
||||
firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1]
|
||||
since = uint32(oldest.CreatedAt)
|
||||
// fmt.Println(" new since", since, evt.ID[0:8])
|
||||
// we null the oldest Event as we can't rely on it anymore
|
||||
// (we'll fall under BWWDHTOE above) until we have a new oldest set.
|
||||
oldest = internal.IterEvent{Q: -1}
|
||||
// anything we got that would be above this won't trigger an update to
|
||||
// the oldest anyway, because it will be discarded as being after the limit.
|
||||
//
|
||||
// finally
|
||||
// add this to the results to be merged later
|
||||
results[q] = append(results[q], evt)
|
||||
sndPhaseHasResultsPending = true
|
||||
} else if nextThreshold.CreatedAt < evt.CreatedAt {
|
||||
// the next last event in the firstPhaseResults is the next threshold
|
||||
// fmt.Println(" b3", nextThreshold.CreatedAt, "<", oldest.CreatedAt, evt.ID[0:8])
|
||||
// eliminate last, update since with the antelast
|
||||
firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1]
|
||||
since = uint32(nextThreshold.CreatedAt)
|
||||
// fmt.Println(" new since", since)
|
||||
// add this to the results to be merged later
|
||||
results[q] = append(results[q], evt)
|
||||
sndPhaseHasResultsPending = true
|
||||
// update the oldest event
|
||||
if evt.CreatedAt < oldest.CreatedAt {
|
||||
oldest = evt
|
||||
}
|
||||
} else {
|
||||
// fmt.Println(" b4", evt.ID[0:8])
|
||||
// oops, _we_ are the next `since` threshold
|
||||
firstPhaseResults[len(firstPhaseResults)-1] = evt
|
||||
since = uint32(evt.CreatedAt)
|
||||
// fmt.Println(" new since", since)
|
||||
// do not add us to the results to be merged later
|
||||
// as we're already inhabiting the firstPhaseResults slice
|
||||
}
|
||||
} else {
|
||||
results[q] = append(results[q], evt)
|
||||
firstPhaseTotalPulled++
|
||||
|
||||
// update the oldest event
|
||||
if oldest.Event.ID == nostr.ZeroID || evt.CreatedAt < oldest.CreatedAt {
|
||||
oldest = evt
|
||||
}
|
||||
}
|
||||
|
||||
pulledPerQuery[q]++
|
||||
pulledThisIteration++
|
||||
if pulledThisIteration > batchSizePerQuery {
|
||||
// batch filled
|
||||
it.next()
|
||||
// fmt.Println(" filled", hex.EncodeToString(it.key), it.valIdx)
|
||||
break
|
||||
}
|
||||
if pulledPerQuery[q] >= limit {
|
||||
// batch filled + reached limit for this query (which is the global limit)
|
||||
exhaust(q)
|
||||
it.next()
|
||||
break
|
||||
}
|
||||
|
||||
it.next()
|
||||
}
|
||||
}
|
||||
|
||||
// we will do this check if we don't accumulated the requested number of events yet
|
||||
// fmt.Println("oldest", oldest.Event, "from iter", oldest.Q)
|
||||
if sndPhase && sndPhaseHasResultsPending && (oldest.Event.ID == nostr.ZeroID || remainingUnexhausted == 0) {
|
||||
// fmt.Println("second phase aggregation!")
|
||||
// when we are in the second phase we will aggressively aggregate results on every iteration
|
||||
//
|
||||
secondBatch = secondBatch[:0]
|
||||
for s := 0; s < len(sndPhaseParticipants); s++ {
|
||||
q := sndPhaseParticipants[s]
|
||||
|
||||
if len(results[q]) > 0 {
|
||||
secondBatch = append(secondBatch, results[q])
|
||||
}
|
||||
|
||||
if exhausted[q] {
|
||||
sndPhaseParticipants = internal.SwapDelete(sndPhaseParticipants, s)
|
||||
s--
|
||||
}
|
||||
}
|
||||
|
||||
// every time we get here we will alternate between these A and B lists
|
||||
// combining everything we have into a new partial results list.
|
||||
// after we've done that we can again set the oldest.
|
||||
// fmt.Println(" xxx", sndPhaseResultsToggle)
|
||||
if sndPhaseResultsToggle {
|
||||
secondBatch = append(secondBatch, sndPhaseResultsB)
|
||||
sndPhaseResultsA = internal.MergeSortMultiple(secondBatch, limit, sndPhaseResultsA)
|
||||
oldest = sndPhaseResultsA[len(sndPhaseResultsA)-1]
|
||||
// fmt.Println(" new aggregated a", len(sndPhaseResultsB))
|
||||
} else {
|
||||
secondBatch = append(secondBatch, sndPhaseResultsA)
|
||||
sndPhaseResultsB = internal.MergeSortMultiple(secondBatch, limit, sndPhaseResultsB)
|
||||
oldest = sndPhaseResultsB[len(sndPhaseResultsB)-1]
|
||||
// fmt.Println(" new aggregated b", len(sndPhaseResultsB))
|
||||
}
|
||||
sndPhaseResultsToggle = !sndPhaseResultsToggle
|
||||
|
||||
since = uint32(oldest.CreatedAt)
|
||||
// fmt.Println(" new since", since)
|
||||
|
||||
// reset the `results` list so we can keep using it
|
||||
results = results[:len(queries)]
|
||||
for _, q := range sndPhaseParticipants {
|
||||
results[q] = results[q][:0]
|
||||
}
|
||||
} else if !sndPhase && firstPhaseTotalPulled >= limit && remainingUnexhausted > 0 {
|
||||
// fmt.Println("have enough!", firstPhaseTotalPulled, "/", limit, "remaining", remainingUnexhausted)
|
||||
|
||||
// we will exclude this oldest number as it is not relevant anymore
|
||||
// (we now want to keep track only of the oldest among the remaining iterators)
|
||||
oldest = internal.IterEvent{Q: -1}
|
||||
|
||||
// HOW IT WORKS AFTER WE'VE REACHED THIS POINT (HIWAWVRTP)
|
||||
// now we can combine the results we have and check what is our current oldest event.
|
||||
// we also discard anything that is after the current cutting point (`limit`).
|
||||
// so if we have [1,2,3], [10, 15, 20] and [7, 21, 49] but we only want 6 total
|
||||
// we can just keep [1,2,3,7,10,15] and discard [20, 21, 49],
|
||||
// and also adjust our `since` parameter to `15`, discarding anything we get after it
|
||||
// and immediately declaring that iterator exhausted.
|
||||
// also every time we get result that is more recent than this updated `since` we can
|
||||
// keep it but also discard the previous since, moving the needle one back -- for example,
|
||||
// if we get an `8` we can keep it and move the `since` parameter to `10`, discarding `15`
|
||||
// in the process.
|
||||
all := make([][]internal.IterEvent, len(results))
|
||||
copy(all, results) // we have to use this otherwise internal.MergeSortMultiple will scramble our results slice
|
||||
firstPhaseResults = internal.MergeSortMultiple(all, limit, nil)
|
||||
oldest = firstPhaseResults[limit-1]
|
||||
since = uint32(oldest.CreatedAt)
|
||||
// fmt.Println("new since", since)
|
||||
|
||||
for q := range queries {
|
||||
if exhausted[q] {
|
||||
continue
|
||||
}
|
||||
|
||||
// we also automatically exhaust any of the iterators that have already passed the
|
||||
// cutting point (`since`)
|
||||
if results[q][len(results[q])-1].CreatedAt < oldest.CreatedAt {
|
||||
exhausted[q] = true
|
||||
remainingUnexhausted--
|
||||
continue
|
||||
}
|
||||
|
||||
// for all the remaining iterators,
|
||||
// since we have merged all the events in this `firstPhaseResults` slice, we can empty the
|
||||
// current `results` slices and reuse them.
|
||||
results[q] = results[q][:0]
|
||||
|
||||
// build this index of indexes with everybody who remains
|
||||
sndPhaseParticipants = append(sndPhaseParticipants, q)
|
||||
}
|
||||
|
||||
// we create these two lists and alternate between them so we don't have to create a
|
||||
// a new one every time
|
||||
sndPhaseResultsA = make([]internal.IterEvent, 0, limit*2)
|
||||
sndPhaseResultsB = make([]internal.IterEvent, 0, limit*2)
|
||||
|
||||
// from now on we won't run this block anymore
|
||||
sndPhase = true
|
||||
}
|
||||
|
||||
// fmt.Println("remaining", remainingUnexhausted)
|
||||
if remainingUnexhausted == 0 {
|
||||
break
|
||||
iterators[i].pull(batchSizePerQuery, since)
|
||||
}
|
||||
}
|
||||
|
||||
// fmt.Println("is sndPhase?", sndPhase)
|
||||
|
||||
var combinedResults []internal.IterEvent
|
||||
|
||||
if sndPhase {
|
||||
// fmt.Println("ending second phase")
|
||||
// when we reach this point either sndPhaseResultsA or sndPhaseResultsB will be full of stuff,
|
||||
// the other will be empty
|
||||
var sndPhaseResults []internal.IterEvent
|
||||
// fmt.Println("xxx", sndPhaseResultsToggle, len(sndPhaseResultsA), len(sndPhaseResultsB))
|
||||
if sndPhaseResultsToggle {
|
||||
sndPhaseResults = sndPhaseResultsB
|
||||
combinedResults = sndPhaseResultsA[0:limit] // reuse this
|
||||
// fmt.Println(" using b", len(sndPhaseResultsA))
|
||||
} else {
|
||||
sndPhaseResults = sndPhaseResultsA
|
||||
combinedResults = sndPhaseResultsB[0:limit] // reuse this
|
||||
// fmt.Println(" using a", len(sndPhaseResultsA))
|
||||
}
|
||||
|
||||
all := [][]internal.IterEvent{firstPhaseResults, sndPhaseResults}
|
||||
combinedResults = internal.MergeSortMultiple(all, limit, combinedResults)
|
||||
// fmt.Println("final combinedResults", len(combinedResults), cap(combinedResults), limit)
|
||||
} else {
|
||||
combinedResults = make([]internal.IterEvent, limit)
|
||||
combinedResults = internal.MergeSortMultiple(results, limit, combinedResults)
|
||||
}
|
||||
|
||||
return combinedResults, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package mmm
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"iter"
|
||||
"math"
|
||||
"runtime"
|
||||
|
||||
@@ -33,16 +34,20 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error {
|
||||
|
||||
return il.lmdbEnv.Update(func(iltxn *lmdb.Txn) error {
|
||||
// now we fetch the past events, whatever they are, delete them and then save the new
|
||||
prevResults, err := il.query(iltxn, filter, 10) // in theory limit could be just 1 and this should work
|
||||
var yield_ func(nostr.Event) bool
|
||||
var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) {
|
||||
yield_ = yield
|
||||
}
|
||||
err := il.query(iltxn, filter, 10 /* in theory limit could be just 1 and this should work */, yield_)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query past events with %s: %w", filter, err)
|
||||
}
|
||||
|
||||
shouldStore := true
|
||||
for _, previous := range prevResults {
|
||||
if internal.IsOlder(previous.Event, evt) {
|
||||
if err := il.delete(mmmtxn, iltxn, previous.Event.ID); err != nil {
|
||||
return fmt.Errorf("failed to delete event %s for replacing: %w", previous.Event.ID, err)
|
||||
for previous := range results {
|
||||
if internal.IsOlder(previous, evt) {
|
||||
if err := il.delete(mmmtxn, iltxn, previous.ID); err != nil {
|
||||
return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err)
|
||||
}
|
||||
} else {
|
||||
// there is a newer event already stored, so we won't store this
|
||||
|
||||
Reference in New Issue
Block a user