go mod tidy works now at least.

This commit is contained in:
fiatjaf
2025-04-15 18:39:14 -03:00
parent 2b5b646a62
commit cb0dd45a32
37 changed files with 540 additions and 917 deletions

View File

@@ -2,16 +2,15 @@ package mmm
import (
"bytes"
"context"
"encoding/binary"
"slices"
"github.com/PowerDNS/lmdb-go/lmdb"
"fiatjaf.com/nostr/eventstore/mmm/betterbinary"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/codec/betterbinary"
"github.com/PowerDNS/lmdb-go/lmdb"
)
func (il *IndexingLayer) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) {
func (il *IndexingLayer) CountEvents(filter nostr.Filter) (int64, error) {
var count int64 = 0
queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := il.prepareQueries(filter)

View File

@@ -1,39 +1,39 @@
package mmm
import (
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"slices"
"github.com/PowerDNS/lmdb-go/lmdb"
"fiatjaf.com/nostr"
"github.com/PowerDNS/lmdb-go/lmdb"
)
func (il *IndexingLayer) DeleteEvent(ctx context.Context, evt *nostr.Event) error {
func (il *IndexingLayer) DeleteEvent(id nostr.ID) error {
il.mmmm.writeMutex.Lock()
defer il.mmmm.writeMutex.Unlock()
return il.mmmm.lmdbEnv.Update(func(mmmtxn *lmdb.Txn) error {
return il.lmdbEnv.Update(func(iltxn *lmdb.Txn) error {
return il.delete(mmmtxn, iltxn, evt)
return il.delete(mmmtxn, iltxn, id)
})
})
}
func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, evt *nostr.Event) error {
func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, id nostr.ID) error {
zeroRefs := false
b := il.mmmm
b.Logger.Debug().Str("layer", il.name).Uint16("il", il.id).Msg("deleting")
// first in the mmmm txn we check if we have the event still
idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2])
val, err := mmmtxn.Get(b.indexId, idPrefix8)
val, err := mmmtxn.Get(b.indexId, id[0:8])
if err != nil {
if lmdb.IsNotFound(err) {
// we already do not have this anywhere
return nil
}
return fmt.Errorf("failed to check if we have the event %x: %w", idPrefix8, err)
return fmt.Errorf("failed to check if we have the event %x: %w", id, err)
}
// we have this, but do we have it in the current layer?
@@ -49,8 +49,8 @@ func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, evt *nostr.Ev
copy(nextval, val[0:i])
copy(nextval[i:], val[i+2:])
if err := mmmtxn.Put(b.indexId, idPrefix8, nextval, 0); err != nil {
return fmt.Errorf("failed to update references for %x: %w", idPrefix8, err)
if err := mmmtxn.Put(b.indexId, id[0:8], nextval, 0); err != nil {
return fmt.Errorf("failed to update references for %x: %w", id[:], err)
}
// if there are no more layers we will delete everything later
@@ -60,6 +60,12 @@ func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, evt *nostr.Ev
}
}
// load the event so we can compute the indexes
var evt nostr.Event
if err := il.mmmm.loadEvent(pos, &evt); err != nil {
return fmt.Errorf("failed to load event %x when deleting: %w", id[:], err)
}
// calculate all index keys we have for this event and delete them
for k := range il.getIndexKeysForEvent(evt) {
if err := iltxn.Del(k.dbi, k.key, val[0:12]); err != nil && !lmdb.IsNotFound(err) {
@@ -69,7 +75,7 @@ func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, evt *nostr.Ev
// if there are no more refs we delete the event from the id index and mmap
if zeroRefs {
if err := b.purge(mmmtxn, idPrefix8, pos); err != nil {
if err := b.purge(mmmtxn, id[0:8], pos); err != nil {
panic(err)
}
}

View File

@@ -8,8 +8,8 @@ import (
"strconv"
"strings"
"github.com/PowerDNS/lmdb-go/lmdb"
"fiatjaf.com/nostr"
"github.com/PowerDNS/lmdb-go/lmdb"
)
// this iterator always goes backwards
@@ -46,7 +46,7 @@ type key struct {
key []byte
}
func (il *IndexingLayer) getIndexKeysForEvent(evt *nostr.Event) iter.Seq[key] {
func (il *IndexingLayer) getIndexKeysForEvent(evt nostr.Event) iter.Seq[key] {
return func(yield func(key) bool) {
{
// ~ by pubkey+date

View File

@@ -1,15 +1,11 @@
package mmm
import (
"context"
"encoding/binary"
"fmt"
"os"
"path/filepath"
"github.com/PowerDNS/lmdb-go/lmdb"
"fiatjaf.com/nostr/eventstore"
"fiatjaf.com/nostr"
"github.com/PowerDNS/lmdb-go/lmdb"
)
var _ eventstore.Store = (*IndexingLayer)(nil)
@@ -18,10 +14,8 @@ type IndexingLayer struct {
isInitialized bool
name string
ShouldIndex func(context.Context, *nostr.Event) bool
MaxLimit int
mmmm *MultiMmapManager
MaxLimit int
mmmm *MultiMmapManager
// this is stored in the knownLayers db as a value, and used to keep track of which layer owns each event
id uint16
@@ -136,65 +130,6 @@ func (il *IndexingLayer) Init() error {
func (il *IndexingLayer) Name() string { return il.name }
func (il *IndexingLayer) runThroughEvents(txn *lmdb.Txn) error {
ctx := context.Background()
b := il.mmmm
// run through all events we have and see if this new index wants them
cursor, err := txn.OpenCursor(b.indexId)
if err != nil {
return fmt.Errorf("when opening cursor on %v: %w", b.indexId, err)
}
defer cursor.Close()
for {
idPrefix8, val, err := cursor.Get(nil, nil, lmdb.Next)
if lmdb.IsNotFound(err) {
break
}
if err != nil {
return fmt.Errorf("when moving the cursor: %w", err)
}
update := false
posb := val[0:12]
pos := positionFromBytes(posb)
evt := &nostr.Event{}
if err := b.loadEvent(pos, evt); err != nil {
return fmt.Errorf("when loading event from mmap: %w", err)
}
if il.ShouldIndex != nil && il.ShouldIndex(ctx, evt) {
// add the current reference
val = binary.BigEndian.AppendUint16(val, il.id)
// if we were already updating to remove the reference
// now that we've added the reference back we don't really have to update
update = !update
// actually index
if err := il.lmdbEnv.Update(func(iltxn *lmdb.Txn) error {
for k := range il.getIndexKeysForEvent(evt) {
if err := iltxn.Put(k.dbi, k.key, posb, 0); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("failed to index: %w", err)
}
}
if update {
if err := txn.Put(b.indexId, idPrefix8, val, 0); err != nil {
return fmt.Errorf("failed to put updated index+refs: %w", err)
}
}
}
return nil
}
func (il *IndexingLayer) Close() {
il.lmdbEnv.Close()
}

View File

@@ -10,9 +10,9 @@ import (
"syscall"
"unsafe"
"github.com/PowerDNS/lmdb-go/lmdb"
"fiatjaf.com/nostr/eventstore/mmm/betterbinary"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/codec/betterbinary"
"github.com/PowerDNS/lmdb-go/lmdb"
"github.com/rs/zerolog"
)
@@ -30,14 +30,14 @@ type MultiMmapManager struct {
mmapf mmap
mmapfEnd uint64
writeMutex sync.Mutex
lmdbEnv *lmdb.Env
stuff lmdb.DBI
knownLayers lmdb.DBI
indexId lmdb.DBI
freeRanges []position
mutex sync.Mutex
}
func (b *MultiMmapManager) String() string {
@@ -147,8 +147,8 @@ func (b *MultiMmapManager) Init() error {
}
func (b *MultiMmapManager) EnsureLayer(name string, il *IndexingLayer) error {
b.mutex.Lock()
defer b.mutex.Unlock()
b.writeMutex.Lock()
defer b.writeMutex.Unlock()
il.mmmm = b
il.name = name
@@ -168,9 +168,6 @@ func (b *MultiMmapManager) EnsureLayer(name string, il *IndexingLayer) error {
return fmt.Errorf("failed to init new layer %s: %w", name, err)
}
if err := il.runThroughEvents(txn); err != nil {
return fmt.Errorf("failed to run %s through events: %w", name, err)
}
return txn.Put(b.knownLayers, []byte(name), binary.BigEndian.AppendUint16(nil, il.id), 0)
} else if err == nil {
il.id = binary.BigEndian.Uint16(idv)
@@ -193,8 +190,8 @@ func (b *MultiMmapManager) EnsureLayer(name string, il *IndexingLayer) error {
}
func (b *MultiMmapManager) DropLayer(name string) error {
b.mutex.Lock()
defer b.mutex.Unlock()
b.writeMutex.Lock()
defer b.writeMutex.Unlock()
// get layer reference
idx := slices.IndexFunc(b.layers, func(il *IndexingLayer) bool { return il.name == name })

View File

@@ -2,42 +2,46 @@ package mmm
import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"iter"
"log"
"slices"
"github.com/PowerDNS/lmdb-go/lmdb"
"fiatjaf.com/nostr/eventstore/internal"
"fiatjaf.com/nostr/eventstore/mmm/betterbinary"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/codec/betterbinary"
"fiatjaf.com/nostr/eventstore/internal"
"github.com/PowerDNS/lmdb-go/lmdb"
)
// GetByID returns the event -- if found in this mmm -- and all the IndexingLayers it belongs to.
func (b *MultiMmapManager) GetByID(id string) (*nostr.Event, IndexingLayers) {
events := make(chan *nostr.Event)
func (b *MultiMmapManager) GetByID(id nostr.ID) (*nostr.Event, IndexingLayers) {
presence := make(chan []uint16)
b.queryByIDs(events, []string{id}, presence)
for evt := range events {
var event *nostr.Event
b.queryByIDs(func(evt nostr.Event) bool {
event = &evt
return false
}, []nostr.ID{id}, presence)
if event != nil {
p := <-presence
present := make([]*IndexingLayer, len(p))
for i, id := range p {
present[i] = b.layers.ByID(id)
}
return evt, present
return event, present
}
return nil, nil
}
// queryByIDs emits the events of the given id to the given channel if they exist anywhere in this mmm.
// if presence is given it will also be used to emit slices of the ids of the IndexingLayers this event is stored in.
// it closes the channels when it ends.
func (b *MultiMmapManager) queryByIDs(ch chan *nostr.Event, ids []string, presence chan []uint16) {
go b.lmdbEnv.View(func(txn *lmdb.Txn) error {
func (b *MultiMmapManager) queryByIDs(yield func(nostr.Event) bool, ids []nostr.ID, presence chan []uint16) {
b.lmdbEnv.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
defer close(ch)
if presence != nil {
defer close(presence)
}
@@ -47,15 +51,17 @@ func (b *MultiMmapManager) queryByIDs(ch chan *nostr.Event, ids []string, presen
continue
}
idPrefix8, _ := hex.DecodeString(id[0 : 8*2])
val, err := txn.Get(b.indexId, idPrefix8)
val, err := txn.Get(b.indexId, id[0:8])
if err == nil {
pos := positionFromBytes(val[0:12])
evt := &nostr.Event{}
if err := b.loadEvent(pos, evt); err != nil {
evt := nostr.Event{}
if err := b.loadEvent(pos, &evt); err != nil {
panic(fmt.Errorf("failed to decode event from %v: %w", pos, err))
}
ch <- evt
if !yield(evt) {
return nil
}
if presence != nil {
layers := make([]uint16, 0, (len(val)-12)/2)
@@ -71,45 +77,42 @@ func (b *MultiMmapManager) queryByIDs(ch chan *nostr.Event, ids []string, presen
})
}
func (il *IndexingLayer) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
ch := make(chan *nostr.Event)
if len(filter.IDs) > 0 {
il.mmmm.queryByIDs(ch, filter.IDs, nil)
return ch, nil
}
if filter.Search != "" {
close(ch)
return ch, nil
}
// max number of events we'll return
limit := il.MaxLimit / 4
if filter.Limit > 0 && filter.Limit < il.MaxLimit {
limit = filter.Limit
}
if tlimit := nostr.GetTheoreticalLimit(filter); tlimit == 0 {
close(ch)
return ch, nil
} else if tlimit > 0 {
limit = tlimit
}
go il.lmdbEnv.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
defer close(ch)
results, err := il.query(txn, filter, limit)
for _, ie := range results {
ch <- ie.Event
func (il *IndexingLayer) QueryEvents(filter nostr.Filter) iter.Seq[nostr.Event] {
return func(yield func(nostr.Event) bool) {
if len(filter.IDs) > 0 {
il.mmmm.queryByIDs(yield, filter.IDs, nil)
return
}
return err
})
if filter.Search != "" {
return
}
return ch, nil
// max number of events we'll return
limit := il.MaxLimit / 4
if filter.Limit > 0 && filter.Limit < il.MaxLimit {
limit = filter.Limit
}
if tlimit := nostr.GetTheoreticalLimit(filter); tlimit == 0 {
return
} else if tlimit > 0 {
limit = tlimit
}
il.lmdbEnv.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
results, err := il.query(txn, filter, limit)
for _, ie := range results {
if !yield(ie.Event) {
break
}
}
return err
})
}
}
func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([]internal.IterEvent, error) {
@@ -128,16 +131,16 @@ func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([
// we will continue to pull from it as soon as some other iterator takes the position
oldest := internal.IterEvent{Q: -1}
secondPhase := false // after we have gathered enough events we will change the way we iterate
sndPhase := false // after we have gathered enough events we will change the way we iterate
secondBatch := make([][]internal.IterEvent, 0, len(queries)+1)
secondPhaseParticipants := make([]int, 0, len(queries)+1)
sndPhaseParticipants := make([]int, 0, len(queries)+1)
// while merging results in the second phase we will alternate between these two lists
// to avoid having to create new lists all the time
var secondPhaseResultsA []internal.IterEvent
var secondPhaseResultsB []internal.IterEvent
var secondPhaseResultsToggle bool // this is just a dummy thing we use to keep track of the alternating
var secondPhaseHasResultsPending bool
var sndPhaseResultsA []internal.IterEvent
var sndPhaseResultsB []internal.IterEvent
var sndPhaseResultsToggle bool // this is just a dummy thing we use to keep track of the alternating
var sndPhaseHasResultsPending bool
remainingUnexhausted := len(queries) // when all queries are exhausted we can finally end this thing
batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted)
@@ -221,8 +224,8 @@ func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([
}
// decode the entire thing (TODO: do a conditional decode while also checking the extra tag)
event := &nostr.Event{}
if err := betterbinary.Unmarshal(bin, event); err != nil {
event := nostr.Event{}
if err := betterbinary.Unmarshal(bin, &event); err != nil {
log.Printf("mmm: value read error (id %x) on query prefix %x sp %x dbi %d: %s\n",
bin[0:32], query.prefix, query.startingPoint, query.dbi, err)
return nil, fmt.Errorf("event read error: %w", err)
@@ -240,18 +243,18 @@ func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([
evt := internal.IterEvent{Event: event, Q: q}
//
//
if secondPhase {
if sndPhase {
// do the process described below at HIWAWVRTP.
// if we've reached here this means we've already passed the `since` check.
// now we have to eliminate the event currently at the `since` threshold.
nextThreshold := firstPhaseResults[len(firstPhaseResults)-2]
if oldest.Event == nil {
if oldest.Event.ID == nostr.ZeroID {
// fmt.Println(" b1", evt.ID[0:8])
// BRANCH WHEN WE DON'T HAVE THE OLDEST EVENT (BWWDHTOE)
// when we don't have the oldest set, we will keep the results
// and not change the cutting point -- it's bad, but hopefully not that bad.
results[q] = append(results[q], evt)
secondPhaseHasResultsPending = true
sndPhaseHasResultsPending = true
} else if nextThreshold.CreatedAt > oldest.CreatedAt {
// fmt.Println(" b2", nextThreshold.CreatedAt, ">", oldest.CreatedAt, evt.ID[0:8])
// one of the events we have stored is the actual next threshold
@@ -268,7 +271,7 @@ func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([
// finally
// add this to the results to be merged later
results[q] = append(results[q], evt)
secondPhaseHasResultsPending = true
sndPhaseHasResultsPending = true
} else if nextThreshold.CreatedAt < evt.CreatedAt {
// the next last event in the firstPhaseResults is the next threshold
// fmt.Println(" b3", nextThreshold.CreatedAt, "<", oldest.CreatedAt, evt.ID[0:8])
@@ -278,7 +281,7 @@ func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([
// fmt.Println(" new since", since)
// add this to the results to be merged later
results[q] = append(results[q], evt)
secondPhaseHasResultsPending = true
sndPhaseHasResultsPending = true
// update the oldest event
if evt.CreatedAt < oldest.CreatedAt {
oldest = evt
@@ -297,7 +300,7 @@ func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([
firstPhaseTotalPulled++
// update the oldest event
if oldest.Event == nil || evt.CreatedAt < oldest.CreatedAt {
if oldest.Event.ID == nostr.ZeroID || evt.CreatedAt < oldest.CreatedAt {
oldest = evt
}
}
@@ -323,20 +326,20 @@ func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([
// we will do this check if we don't accumulated the requested number of events yet
// fmt.Println("oldest", oldest.Event, "from iter", oldest.Q)
if secondPhase && secondPhaseHasResultsPending && (oldest.Event == nil || remainingUnexhausted == 0) {
if sndPhase && sndPhaseHasResultsPending && (oldest.Event.ID == nostr.ZeroID || remainingUnexhausted == 0) {
// fmt.Println("second phase aggregation!")
// when we are in the second phase we will aggressively aggregate results on every iteration
//
secondBatch = secondBatch[:0]
for s := 0; s < len(secondPhaseParticipants); s++ {
q := secondPhaseParticipants[s]
for s := 0; s < len(sndPhaseParticipants); s++ {
q := sndPhaseParticipants[s]
if len(results[q]) > 0 {
secondBatch = append(secondBatch, results[q])
}
if exhausted[q] {
secondPhaseParticipants = internal.SwapDelete(secondPhaseParticipants, s)
sndPhaseParticipants = internal.SwapDelete(sndPhaseParticipants, s)
s--
}
}
@@ -344,29 +347,29 @@ func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([
// every time we get here we will alternate between these A and B lists
// combining everything we have into a new partial results list.
// after we've done that we can again set the oldest.
// fmt.Println(" xxx", secondPhaseResultsToggle)
if secondPhaseResultsToggle {
secondBatch = append(secondBatch, secondPhaseResultsB)
secondPhaseResultsA = internal.MergeSortMultiple(secondBatch, limit, secondPhaseResultsA)
oldest = secondPhaseResultsA[len(secondPhaseResultsA)-1]
// fmt.Println(" new aggregated a", len(secondPhaseResultsB))
// fmt.Println(" xxx", sndPhaseResultsToggle)
if sndPhaseResultsToggle {
secondBatch = append(secondBatch, sndPhaseResultsB)
sndPhaseResultsA = internal.MergeSortMultiple(secondBatch, limit, sndPhaseResultsA)
oldest = sndPhaseResultsA[len(sndPhaseResultsA)-1]
// fmt.Println(" new aggregated a", len(sndPhaseResultsB))
} else {
secondBatch = append(secondBatch, secondPhaseResultsA)
secondPhaseResultsB = internal.MergeSortMultiple(secondBatch, limit, secondPhaseResultsB)
oldest = secondPhaseResultsB[len(secondPhaseResultsB)-1]
// fmt.Println(" new aggregated b", len(secondPhaseResultsB))
secondBatch = append(secondBatch, sndPhaseResultsA)
sndPhaseResultsB = internal.MergeSortMultiple(secondBatch, limit, sndPhaseResultsB)
oldest = sndPhaseResultsB[len(sndPhaseResultsB)-1]
// fmt.Println(" new aggregated b", len(sndPhaseResultsB))
}
secondPhaseResultsToggle = !secondPhaseResultsToggle
sndPhaseResultsToggle = !sndPhaseResultsToggle
since = uint32(oldest.CreatedAt)
// fmt.Println(" new since", since)
// reset the `results` list so we can keep using it
results = results[:len(queries)]
for _, q := range secondPhaseParticipants {
for _, q := range sndPhaseParticipants {
results[q] = results[q][:0]
}
} else if !secondPhase && firstPhaseTotalPulled >= limit && remainingUnexhausted > 0 {
} else if !sndPhase && firstPhaseTotalPulled >= limit && remainingUnexhausted > 0 {
// fmt.Println("have enough!", firstPhaseTotalPulled, "/", limit, "remaining", remainingUnexhausted)
// we will exclude this oldest number as it is not relevant anymore
@@ -410,16 +413,16 @@ func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([
results[q] = results[q][:0]
// build this index of indexes with everybody who remains
secondPhaseParticipants = append(secondPhaseParticipants, q)
sndPhaseParticipants = append(sndPhaseParticipants, q)
}
// we create these two lists and alternate between them so we don't have to create a
// a new one every time
secondPhaseResultsA = make([]internal.IterEvent, 0, limit*2)
secondPhaseResultsB = make([]internal.IterEvent, 0, limit*2)
sndPhaseResultsA = make([]internal.IterEvent, 0, limit*2)
sndPhaseResultsB = make([]internal.IterEvent, 0, limit*2)
// from now on we won't run this block anymore
secondPhase = true
sndPhase = true
}
// fmt.Println("remaining", remainingUnexhausted)
@@ -428,27 +431,27 @@ func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([
}
}
// fmt.Println("is secondPhase?", secondPhase)
// fmt.Println("is sndPhase?", sndPhase)
var combinedResults []internal.IterEvent
if secondPhase {
if sndPhase {
// fmt.Println("ending second phase")
// when we reach this point either secondPhaseResultsA or secondPhaseResultsB will be full of stuff,
// when we reach this point either sndPhaseResultsA or sndPhaseResultsB will be full of stuff,
// the other will be empty
var secondPhaseResults []internal.IterEvent
// fmt.Println("xxx", secondPhaseResultsToggle, len(secondPhaseResultsA), len(secondPhaseResultsB))
if secondPhaseResultsToggle {
secondPhaseResults = secondPhaseResultsB
combinedResults = secondPhaseResultsA[0:limit] // reuse this
// fmt.Println(" using b", len(secondPhaseResultsA))
var sndPhaseResults []internal.IterEvent
// fmt.Println("xxx", sndPhaseResultsToggle, len(sndPhaseResultsA), len(sndPhaseResultsB))
if sndPhaseResultsToggle {
sndPhaseResults = sndPhaseResultsB
combinedResults = sndPhaseResultsA[0:limit] // reuse this
// fmt.Println(" using b", len(sndPhaseResultsA))
} else {
secondPhaseResults = secondPhaseResultsA
combinedResults = secondPhaseResultsB[0:limit] // reuse this
// fmt.Println(" using a", len(secondPhaseResultsA))
sndPhaseResults = sndPhaseResultsA
combinedResults = sndPhaseResultsB[0:limit] // reuse this
// fmt.Println(" using a", len(sndPhaseResultsA))
}
all := [][]internal.IterEvent{firstPhaseResults, secondPhaseResults}
all := [][]internal.IterEvent{firstPhaseResults, sndPhaseResults}
combinedResults = internal.MergeSortMultiple(all, limit, combinedResults)
// fmt.Println("final combinedResults", len(combinedResults), cap(combinedResults), limit)
} else {

View File

@@ -5,9 +5,9 @@ import (
"encoding/hex"
"fmt"
"github.com/PowerDNS/lmdb-go/lmdb"
"fiatjaf.com/nostr/eventstore/internal"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/internal"
"github.com/PowerDNS/lmdb-go/lmdb"
)
type query struct {
@@ -127,7 +127,7 @@ func (il *IndexingLayer) prepareQueries(filter nostr.Filter) (
if filter.Authors != nil {
extraAuthors = make([][32]byte, len(filter.Authors))
for i, pk := range filter.Authors {
hex.Decode(extraAuthors[i][:], []byte(pk))
copy(extraAuthors[i][:], pk[:])
}
}

View File

@@ -1,22 +1,28 @@
package mmm
import (
"context"
"fmt"
"math"
"runtime"
"github.com/PowerDNS/lmdb-go/lmdb"
"fiatjaf.com/nostr/eventstore/internal"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/internal"
"github.com/PowerDNS/lmdb-go/lmdb"
)
func (il *IndexingLayer) ReplaceEvent(ctx context.Context, evt *nostr.Event) error {
func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error {
// sanity checking
if evt.CreatedAt > math.MaxUint32 || evt.Kind > math.MaxUint16 {
return fmt.Errorf("event with values out of expected boundaries")
}
filter := nostr.Filter{Limit: 1, Kinds: []int{evt.Kind}, Authors: []string{evt.PubKey}}
il.mmmm.writeMutex.Lock()
defer il.mmmm.writeMutex.Unlock()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
filter := nostr.Filter{Limit: 1, Kinds: []uint16{evt.Kind}, Authors: []nostr.PubKey{evt.PubKey}}
if nostr.IsAddressableKind(evt.Kind) {
// when addressable, add the "d" tag to the filter
filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}}
@@ -35,7 +41,7 @@ func (il *IndexingLayer) ReplaceEvent(ctx context.Context, evt *nostr.Event) err
shouldStore := true
for _, previous := range prevResults {
if internal.IsOlder(previous.Event, evt) {
if err := il.delete(mmmtxn, iltxn, previous.Event); err != nil {
if err := il.delete(mmmtxn, iltxn, previous.Event.ID); err != nil {
return fmt.Errorf("failed to delete event %s for replacing: %w", previous.Event.ID, err)
}
} else {

View File

@@ -1,9 +1,7 @@
package mmm
import (
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"os"
"runtime"
@@ -11,73 +9,15 @@ import (
"syscall"
"unsafe"
"github.com/PowerDNS/lmdb-go/lmdb"
"fiatjaf.com/nostr/eventstore/mmm/betterbinary"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/codec/betterbinary"
"github.com/PowerDNS/lmdb-go/lmdb"
)
func (b *MultiMmapManager) StoreGlobal(ctx context.Context, evt *nostr.Event) (stored bool, err error) {
someoneWantsIt := false
func (il *IndexingLayer) SaveEvent(evt nostr.Event) error {
il.mmmm.writeMutex.Lock()
defer il.mmmm.writeMutex.Unlock()
b.mutex.Lock()
defer b.mutex.Unlock()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
// do this just so it's cleaner, we're already locking the thread and the mutex anyway
mmmtxn, err := b.lmdbEnv.BeginTxn(nil, 0)
if err != nil {
return false, fmt.Errorf("failed to begin global transaction: %w", err)
}
mmmtxn.RawRead = true
iltxns := make([]*lmdb.Txn, 0, len(b.layers))
ils := make([]*IndexingLayer, 0, len(b.layers))
// ask if any of the indexing layers want this
for _, il := range b.layers {
if il.ShouldIndex != nil && il.ShouldIndex(ctx, evt) {
someoneWantsIt = true
iltxn, err := il.lmdbEnv.BeginTxn(nil, 0)
if err != nil {
mmmtxn.Abort()
for _, txn := range iltxns {
txn.Abort()
}
return false, fmt.Errorf("failed to start txn on %s: %w", il.name, err)
}
ils = append(ils, il)
iltxns = append(iltxns, iltxn)
}
}
if !someoneWantsIt {
// no one wants it
mmmtxn.Abort()
return false, fmt.Errorf("not wanted")
}
stored, err = b.storeOn(mmmtxn, ils, iltxns, evt)
if stored {
mmmtxn.Commit()
for _, txn := range iltxns {
txn.Commit()
}
} else {
mmmtxn.Abort()
for _, txn := range iltxns {
txn.Abort()
}
}
return stored, err
}
func (il *IndexingLayer) SaveEvent(ctx context.Context, evt *nostr.Event) error {
il.mmmm.mutex.Lock()
defer il.mmmm.mutex.Unlock()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
@@ -111,7 +51,7 @@ func (b *MultiMmapManager) storeOn(
mmmtxn *lmdb.Txn,
ils []*IndexingLayer,
iltxns []*lmdb.Txn,
evt *nostr.Event,
evt nostr.Event,
) (stored bool, err error) {
// sanity checking
if evt.CreatedAt > maxuint32 || evt.Kind > maxuint16 {
@@ -119,8 +59,7 @@ func (b *MultiMmapManager) storeOn(
}
// check if we already have this id
idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2])
val, err := mmmtxn.Get(b.indexId, idPrefix8)
val, err := mmmtxn.Get(b.indexId, evt.ID[0:8])
if err == nil {
// we found the event, now check if it is already indexed by the layers that want to store it
for i := len(ils) - 1; i >= 0; i-- {
@@ -149,7 +88,7 @@ func (b *MultiMmapManager) storeOn(
// get event binary size
pos := position{
size: uint32(betterbinary.Measure(*evt)),
size: uint32(betterbinary.Measure(evt)),
}
if pos.size >= 1<<16 {
return false, fmt.Errorf("event too large to store, max %d, got %d", 1<<16, pos.size)
@@ -193,7 +132,7 @@ func (b *MultiMmapManager) storeOn(
}
// write to the mmap
if err := betterbinary.Marshal(*evt, b.mmapf[pos.start:]); err != nil {
if err := betterbinary.Marshal(evt, b.mmapf[pos.start:]); err != nil {
return false, fmt.Errorf("error marshaling to %d: %w", pos.start, err)
}
@@ -219,8 +158,8 @@ func (b *MultiMmapManager) storeOn(
}
// store the id index with the refcounts
if err := mmmtxn.Put(b.indexId, idPrefix8, val, 0); err != nil {
panic(fmt.Errorf("failed to store %x by id: %w", idPrefix8, err))
if err := mmmtxn.Put(b.indexId, evt.ID[0:8], val, 0); err != nil {
panic(fmt.Errorf("failed to store %x by id: %w", evt.ID[:], err))
}
// msync