From 47ca205e9e1519fc0d1c620a66b6305b0f5dabc5 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Tue, 5 Aug 2025 16:25:55 -0300 Subject: [PATCH] boltdb: it works! --- eventstore/boltdb/count.go | 12 ++-- eventstore/boltdb/delete.go | 4 +- eventstore/boltdb/fuzz_test.go | 2 +- eventstore/boltdb/helpers.go | 77 +++++++++++++++++-------- eventstore/boltdb/lib.go | 32 +++++----- eventstore/boltdb/migration.go | 2 +- eventstore/boltdb/query.go | 21 ++++--- eventstore/boltdb/query_planner.go | 2 +- eventstore/boltdb/replace.go | 2 +- eventstore/boltdb/save.go | 4 +- eventstore/boltdb/utils_test.go | 2 +- eventstore/internal/checks/interface.go | 2 + eventstore/test/db_test.go | 8 +++ 13 files changed, 104 insertions(+), 66 deletions(-) diff --git a/eventstore/boltdb/count.go b/eventstore/boltdb/count.go index 9a178c6..36b0440 100644 --- a/eventstore/boltdb/count.go +++ b/eventstore/boltdb/count.go @@ -1,4 +1,4 @@ -package bolt +package boltdb import ( "bytes" @@ -28,13 +28,12 @@ func (b *BoltBackend) CountEvents(filter nostr.Filter) (uint32, error) { for _, q := range queries { cursor := txn.Bucket(q.bucket).Cursor() - it := &iterator{cursor: cursor} + it := newIterator(q, cursor) it.seek(q.startingPoint) for { // we already have a k and a v and an err from the cursor setup, so check and use these - if it.err != nil || - !bytes.HasPrefix(it.key, q.prefix) { + if !bytes.HasPrefix(it.key, q.prefix) { // either iteration has errored or we reached the end of this prefix break // stop this cursor and move to the next one } @@ -113,13 +112,12 @@ func (b *BoltBackend) CountEventsHLL(filter nostr.Filter, offset int) (uint32, * for _, q := range queries { cursor := txn.Bucket(q.bucket).Cursor() - it := &iterator{cursor: cursor} + it := newIterator(q, cursor) it.seek(q.startingPoint) for { // we already have a k and a v and an err from the cursor setup, so check and use these - if it.err != nil || - !bytes.HasPrefix(it.key, q.prefix) { + if !bytes.HasPrefix(it.key, q.prefix) { // either iteration has errored or we reached the end of this prefix break // stop this cursor and move to the next one } diff --git a/eventstore/boltdb/delete.go b/eventstore/boltdb/delete.go index 359bf53..b9b77b8 100644 --- a/eventstore/boltdb/delete.go +++ b/eventstore/boltdb/delete.go @@ -1,4 +1,4 @@ -package bolt +package boltdb import ( "fmt" @@ -31,7 +31,7 @@ func (b *BoltBackend) delete(txn *bbolt.Tx, id nostr.ID) error { // calculate all index keys we have for this event and delete them for k := range b.getIndexKeysForEvent(evt) { - err := txn.Bucket(k.bucket).Delete(k.key) + err := txn.Bucket(k.bucket).Delete(k.fullkey) if err != nil { return fmt.Errorf("failed to delete index entry %s for %x: %w", b.keyName(k), evt.ID[0:8], err) } diff --git a/eventstore/boltdb/fuzz_test.go b/eventstore/boltdb/fuzz_test.go index 1ea4221..d52d2f2 100644 --- a/eventstore/boltdb/fuzz_test.go +++ b/eventstore/boltdb/fuzz_test.go @@ -1,4 +1,4 @@ -package bolt +package boltdb import ( "cmp" diff --git a/eventstore/boltdb/helpers.go b/eventstore/boltdb/helpers.go index a22003b..b2cc9dc 100644 --- a/eventstore/boltdb/helpers.go +++ b/eventstore/boltdb/helpers.go @@ -1,4 +1,4 @@ -package bolt +package boltdb import ( "bytes" @@ -22,7 +22,6 @@ type iterator struct { cursor *bbolt.Cursor key []byte currIdPtr []byte - err error // this keeps track of last timestamp value pulled from this last uint32 @@ -35,16 +34,21 @@ type iterator struct { timestamps []uint32 } +func newIterator(query query, cursor *bbolt.Cursor) *iterator { + return &iterator{ + query: query, + cursor: cursor, + + key: make([]byte, 0, 31), + currIdPtr: make([]byte, 8), + } +} + func (it *iterator) pull(n int, since uint32) { query := it.query for range n { - // in the beginning we already have a k and a v and an err from the cursor setup, so check and use these - if it.err != nil { - it.exhausted = true - return - } - + // in the beginning we already have a k and a v from the cursor setup, so check and use these if !bytes.HasPrefix(it.key, query.prefix) { // we reached the end of this prefix it.exhausted = true @@ -58,29 +62,49 @@ func (it *iterator) pull(n int, since uint32) { } // got a key - it.idPtrs = append(it.idPtrs, it.currIdPtr) + it.idPtrs = append(it.idPtrs, append([]byte{}, it.currIdPtr...)) it.timestamps = append(it.timestamps, createdAt) it.last = createdAt // advance the cursor for the next call it.next() + if it.exhausted { + return + } } return } -func (it *iterator) seek(keyPrefix []byte) { - fullkey, _ := it.cursor.Seek(keyPrefix) - copy(it.key, fullkey[len(fullkey)-8-4:]) - copy(it.currIdPtr, fullkey[len(fullkey)-8:]) +func (it *iterator) seek(key []byte) { + fullkey, _ := it.cursor.Seek(key) + if fullkey == nil || bytes.Compare(fullkey, key) == 1 { + fullkey, _ = it.cursor.Prev() + if fullkey == nil { + it.exhausted = true + return + } + } + + s := len(fullkey) + it.key = it.key[0 : s-8] + copy(it.key, fullkey[0:s-8]) + copy(it.currIdPtr, fullkey[s-8:]) } // goes backwards func (it *iterator) next() { - // move one back (we'll look into k and v and err in the next iteration) + // move one back (we'll look into key in the next iteration) fullkey, _ := it.cursor.Prev() - copy(it.key, fullkey[len(fullkey)-8-4:]) - copy(it.currIdPtr, fullkey[len(fullkey)-8:]) + if fullkey == nil { + it.exhausted = true + return + } + + s := len(fullkey) + it.key = it.key[0 : s-8] + copy(it.key, fullkey[0:s-8]) + copy(it.currIdPtr, fullkey[s-8:]) } type iterators []*iterator @@ -160,12 +184,14 @@ func (its iterators) quickselect(k int) uint32 { } type key struct { - bucket []byte - key []byte + bucket []byte + fullkey []byte } func (b *BoltBackend) keyName(key key) string { - return fmt.Sprintf("", string(key.bucket), key.key) + s := len(key.fullkey) + return fmt.Sprintf("", + string(key.bucket), key.fullkey[0:s-8-4], key.fullkey[s-8-4:s-8], key.fullkey[s-8:]) } func (b *BoltBackend) getIndexKeysForEvent(evt nostr.Event) iter.Seq[key] { @@ -176,7 +202,7 @@ func (b *BoltBackend) getIndexKeysForEvent(evt nostr.Event) iter.Seq[key] { copy(k[0:8], evt.PubKey[0:8]) binary.BigEndian.PutUint32(k[8:8+4], uint32(evt.CreatedAt)) copy(k[8+4:8+4+8], evt.ID[16:24]) - if !yield(key{bucket: indexPubkey, key: k[0 : 8+4]}) { + if !yield(key{bucket: indexPubkey, fullkey: k}) { return } } @@ -187,7 +213,7 @@ func (b *BoltBackend) getIndexKeysForEvent(evt nostr.Event) iter.Seq[key] { binary.BigEndian.PutUint16(k[0:2], uint16(evt.Kind)) binary.BigEndian.PutUint32(k[2:2+4], uint32(evt.CreatedAt)) copy(k[2+4:2+4+8], evt.ID[16:24]) - if !yield(key{bucket: indexKind, key: k[0 : 2+4]}) { + if !yield(key{bucket: indexKind, fullkey: k}) { return } } @@ -198,8 +224,8 @@ func (b *BoltBackend) getIndexKeysForEvent(evt nostr.Event) iter.Seq[key] { copy(k[0:8], evt.PubKey[0:8]) binary.BigEndian.PutUint16(k[8:8+2], uint16(evt.Kind)) binary.BigEndian.PutUint32(k[8+2:8+2+4], uint32(evt.CreatedAt)) - copy(k[8+2:8+2+8], evt.ID[16:24]) - if !yield(key{bucket: indexPubkeyKind, key: k[0 : 8+2+4]}) { + copy(k[8+2+4:8+2+4+8], evt.ID[16:24]) + if !yield(key{bucket: indexPubkeyKind, fullkey: k}) { return } } @@ -222,9 +248,10 @@ func (b *BoltBackend) getIndexKeysForEvent(evt nostr.Event) iter.Seq[key] { // get key prefix (with full length) and offset where to write the created_at bucket, k := b.getTagIndexPrefix(tag[0], tag[1]) // keys always end with 4 bytes of created_at + 8 bytes of the id ptr + binary.BigEndian.PutUint32(k[len(k)-8-4:], uint32(evt.CreatedAt)) copy(k[len(k)-8:], evt.ID[16:24]) - if !yield(key{bucket: bucket, key: k}) { + if !yield(key{bucket: bucket, fullkey: k}) { return } } @@ -234,7 +261,7 @@ func (b *BoltBackend) getIndexKeysForEvent(evt nostr.Event) iter.Seq[key] { k := make([]byte, 4+8) binary.BigEndian.PutUint32(k[0:4], uint32(evt.CreatedAt)) copy(k[4:4+8], evt.ID[16:24]) - if !yield(key{bucket: indexCreatedAt, key: k[0:4]}) { + if !yield(key{bucket: indexCreatedAt, fullkey: k}) { return } } diff --git a/eventstore/boltdb/lib.go b/eventstore/boltdb/lib.go index 6d9e78c..4411aa8 100644 --- a/eventstore/boltdb/lib.go +++ b/eventstore/boltdb/lib.go @@ -1,7 +1,7 @@ -package bolt +package boltdb import ( - "os" + "time" "fiatjaf.com/nostr" "fiatjaf.com/nostr/eventstore" @@ -32,24 +32,20 @@ type BoltBackend struct { } func (b *BoltBackend) Init() error { - // create directory if it doesn't exist and open it - if err := os.MkdirAll(b.Path, 0755); err != nil { - return err - } - - return b.initialize() -} - -func (b *BoltBackend) Close() { - b.DB.Close() -} - -func (b *BoltBackend) initialize() error { - db, err := bbolt.Open(b.Path, 0600, nil) + db, err := bbolt.Open(b.Path, 0600, &bbolt.Options{ + Timeout: 2 * time.Second, + PreLoadFreelist: true, + FreelistType: bbolt.FreelistMapType, + }) if err != nil { return err } + db.AllocSize = 64 * 1024 * 1024 + db.MaxBatchDelay = time.Millisecond * 40 + + b.DB = db + db.Update(func(txn *bbolt.Tx) error { if _, err := txn.CreateBucketIfNotExists(settingsStore); err != nil { return err @@ -86,3 +82,7 @@ func (b *BoltBackend) initialize() error { return b.migrate() } + +func (b *BoltBackend) Close() { + b.DB.Close() +} diff --git a/eventstore/boltdb/migration.go b/eventstore/boltdb/migration.go index c06759c..ea40700 100644 --- a/eventstore/boltdb/migration.go +++ b/eventstore/boltdb/migration.go @@ -1,4 +1,4 @@ -package bolt +package boltdb import ( "encoding/binary" diff --git a/eventstore/boltdb/query.go b/eventstore/boltdb/query.go index d77f774..8508f70 100644 --- a/eventstore/boltdb/query.go +++ b/eventstore/boltdb/query.go @@ -1,4 +1,4 @@ -package bolt +package boltdb import ( "iter" @@ -76,20 +76,23 @@ func (b *BoltBackend) query(txn *bbolt.Tx, filter nostr.Filter, limit int, yield return err } - iterators := make(iterators, len(queries)) - batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, len(queries)) + iterators := make(iterators, 0, len(queries)) + for _, query := range queries { + bucket := txn.Bucket(query.bucket) - for q, query := range queries { - bucket := txn.Bucket(queries[q].bucket) + it := newIterator(query, bucket.Cursor()) - iterators[q] = &iterator{ - query: query, - cursor: bucket.Cursor(), + it.seek(query.startingPoint) + if it.exhausted { + // this may happen rarely + continue } - iterators[q].seek(queries[q].startingPoint) + iterators = append(iterators, it) } + batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, len(queries)) + // initial pull from all queries for i := range iterators { iterators[i].pull(batchSizePerQuery, since) diff --git a/eventstore/boltdb/query_planner.go b/eventstore/boltdb/query_planner.go index 458c0e6..ae0fb84 100644 --- a/eventstore/boltdb/query_planner.go +++ b/eventstore/boltdb/query_planner.go @@ -1,4 +1,4 @@ -package bolt +package boltdb import ( "encoding/binary" diff --git a/eventstore/boltdb/replace.go b/eventstore/boltdb/replace.go index 55297fd..6ec05e7 100644 --- a/eventstore/boltdb/replace.go +++ b/eventstore/boltdb/replace.go @@ -1,4 +1,4 @@ -package bolt +package boltdb import ( "fmt" diff --git a/eventstore/boltdb/save.go b/eventstore/boltdb/save.go index b14e24a..becad74 100644 --- a/eventstore/boltdb/save.go +++ b/eventstore/boltdb/save.go @@ -1,4 +1,4 @@ -package bolt +package boltdb import ( "fmt" @@ -55,7 +55,7 @@ func (b *BoltBackend) save(txn *bbolt.Tx, evt nostr.Event) error { // put indexes for k := range b.getIndexKeysForEvent(evt) { - err := txn.Bucket(k.bucket).Put(k.key, nil) + err := txn.Bucket(k.bucket).Put(k.fullkey, nil) if err != nil { return err } diff --git a/eventstore/boltdb/utils_test.go b/eventstore/boltdb/utils_test.go index 3da9733..8037adc 100644 --- a/eventstore/boltdb/utils_test.go +++ b/eventstore/boltdb/utils_test.go @@ -1,4 +1,4 @@ -package bolt +package boltdb import ( "testing" diff --git a/eventstore/internal/checks/interface.go b/eventstore/internal/checks/interface.go index 1b3890a..b6cb768 100644 --- a/eventstore/internal/checks/interface.go +++ b/eventstore/internal/checks/interface.go @@ -3,6 +3,7 @@ package checks import ( "fiatjaf.com/nostr/eventstore" "fiatjaf.com/nostr/eventstore/bluge" + "fiatjaf.com/nostr/eventstore/boltdb" "fiatjaf.com/nostr/eventstore/lmdb" "fiatjaf.com/nostr/eventstore/mmm" ) @@ -11,5 +12,6 @@ import ( var ( _ eventstore.Store = (*lmdb.LMDBBackend)(nil) _ eventstore.Store = (*mmm.IndexingLayer)(nil) + _ eventstore.Store = (*boltdb.BoltBackend)(nil) _ eventstore.Store = (*bluge.BlugeBackend)(nil) ) diff --git a/eventstore/test/db_test.go b/eventstore/test/db_test.go index a329023..6b7bf24 100644 --- a/eventstore/test/db_test.go +++ b/eventstore/test/db_test.go @@ -7,6 +7,7 @@ import ( "fiatjaf.com/nostr" "fiatjaf.com/nostr/eventstore" + "fiatjaf.com/nostr/eventstore/boltdb" "fiatjaf.com/nostr/eventstore/lmdb" "fiatjaf.com/nostr/eventstore/mmm" "fiatjaf.com/nostr/eventstore/slicestore" @@ -46,6 +47,13 @@ func TestLMDB(t *testing.T) { } } +func TestBoltDB(t *testing.T) { + for _, test := range tests { + os.RemoveAll(dbpath + "boltdb") + t.Run(test.name, func(t *testing.T) { test.run(t, &boltdb.BoltBackend{Path: dbpath + "boltdb"}) }) + } +} + func TestMMM(t *testing.T) { for _, test := range tests { os.RemoveAll(dbpath + "mmm")