diff --git a/eventstore/boltdb/count.go b/eventstore/boltdb/count.go new file mode 100644 index 0000000..9a178c6 --- /dev/null +++ b/eventstore/boltdb/count.go @@ -0,0 +1,228 @@ +package bolt + +import ( + "bytes" + "encoding/binary" + "encoding/hex" + "slices" + + "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore/codec/betterbinary" + "fiatjaf.com/nostr/nip45" + "fiatjaf.com/nostr/nip45/hyperloglog" + "go.etcd.io/bbolt" +) + +func (b *BoltBackend) CountEvents(filter nostr.Filter) (uint32, error) { + var count uint32 = 0 + + queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := b.prepareQueries(filter) + if err != nil { + return 0, err + } + + err = b.DB.View(func(txn *bbolt.Tx) error { + rawBucket := txn.Bucket(rawEventStore) + + // actually iterate + for _, q := range queries { + cursor := txn.Bucket(q.bucket).Cursor() + + it := &iterator{cursor: cursor} + it.seek(q.startingPoint) + + for { + // we already have a k and a v and an err from the cursor setup, so check and use these + if it.err != nil || + !bytes.HasPrefix(it.key, q.prefix) { + // either iteration has errored or we reached the end of this prefix + break // stop this cursor and move to the next one + } + + createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:]) + if createdAt < since { + break + } + + if extraAuthors == nil && extraKinds == nil && extraTagValues == nil { + count++ + } else { + // fetch actual event + bin := rawBucket.Get(it.currIdPtr) + if bin == nil { + it.next() + continue + } + + // check it against pubkeys without decoding the entire thing + if !slices.Contains(extraAuthors, betterbinary.GetPubKey(bin)) { + it.next() + continue + } + + // check it against kinds without decoding the entire thing + if !slices.Contains(extraKinds, betterbinary.GetKind(bin)) { + it.next() + continue + } + + evt := &nostr.Event{} + if err := betterbinary.Unmarshal(bin, evt); err != nil { + it.next() + continue + } + + // if there is still a tag to be checked, do it now + if !evt.Tags.ContainsAny(extraTagKey, extraTagValues) { + it.next() + continue + } + + count++ + } + } + } + + return nil + }) + + return count, err +} + +// CountEventsHLL is like CountEvents, but it will build a hyperloglog value while iterating through results, +// following NIP-45 +func (b *BoltBackend) CountEventsHLL(filter nostr.Filter, offset int) (uint32, *hyperloglog.HyperLogLog, error) { + if useCache, _ := b.EnableHLLCacheFor(filter.Kinds[0]); useCache { + return b.countEventsHLLCached(filter) + } + + var count uint32 = 0 + + // this is different than CountEvents because some of these extra checks are not applicable in HLL-valid filters + queries, _, extraKinds, extraTagKey, extraTagValues, since, err := b.prepareQueries(filter) + if err != nil { + return 0, nil, err + } + + hll := hyperloglog.New(offset) + + err = b.DB.View(func(txn *bbolt.Tx) error { + rawBucket := txn.Bucket(rawEventStore) + + // actually iterate + for _, q := range queries { + cursor := txn.Bucket(q.bucket).Cursor() + + it := &iterator{cursor: cursor} + it.seek(q.startingPoint) + + for { + // we already have a k and a v and an err from the cursor setup, so check and use these + if it.err != nil || + !bytes.HasPrefix(it.key, q.prefix) { + // either iteration has errored or we reached the end of this prefix + break // stop this cursor and move to the next one + } + + createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:]) + if createdAt < since { + break + } + + // fetch actual event (we need it regardless because we need the pubkey for the hll) + bin := rawBucket.Get(it.currIdPtr) + if bin == nil { + continue + } + + if extraKinds == nil && extraTagValues == nil { + // nothing extra to check + count++ + hll.AddBytes(betterbinary.GetPubKey(bin)) + } else { + // check it against kinds without decoding the entire thing + if !slices.Contains(extraKinds, betterbinary.GetKind(bin)) { + it.next() + continue + } + + evt := &nostr.Event{} + if err := betterbinary.Unmarshal(bin, evt); err != nil { + it.next() + continue + } + + // if there is still a tag to be checked, do it now + if !evt.Tags.ContainsAny(extraTagKey, extraTagValues) { + it.next() + continue + } + + count++ + hll.Add(evt.PubKey) + } + } + } + + return nil + }) + + return count, hll, err +} + +// countEventsHLLCached will just return a cached value from disk (and presumably we don't even have the events required to compute this anymore). +func (b *BoltBackend) countEventsHLLCached(filter nostr.Filter) (uint32, *hyperloglog.HyperLogLog, error) { + cacheKey := make([]byte, 2+8) + binary.BigEndian.PutUint16(cacheKey[0:2], uint16(filter.Kinds[0])) + switch filter.Kinds[0] { + case 3: + hex.Decode(cacheKey[2:2+8], []byte(filter.Tags["p"][0][0:8*2])) + case 7: + hex.Decode(cacheKey[2:2+8], []byte(filter.Tags["e"][0][0:8*2])) + case 1111: + hex.Decode(cacheKey[2:2+8], []byte(filter.Tags["E"][0][0:8*2])) + } + + var count uint32 + var hll *hyperloglog.HyperLogLog + + err := b.DB.View(func(txn *bbolt.Tx) error { + val := txn.Bucket(hllCache).Get(cacheKey) + if val == nil { + return nil + } + hll = hyperloglog.NewWithRegisters(val, 0) // offset doesn't matter here + count = uint32(hll.Count()) + return nil + }) + + return count, hll, err +} + +func (b *BoltBackend) updateHyperLogLogCachedValues(txn *bbolt.Tx, evt nostr.Event) error { + cacheKey := make([]byte, 2+8) + binary.BigEndian.PutUint16(cacheKey[0:2], uint16(evt.Kind)) + hllBucket := txn.Bucket(hllCache) + + for ref, offset := range nip45.HyperLogLogEventPubkeyOffsetsAndReferencesForEvent(evt) { + // setup cache key (reusing buffer) + hex.Decode(cacheKey[2:2+8], []byte(ref[0:8*2])) + + // fetch hll value from cache db + hll := hyperloglog.New(offset) + val := hllBucket.Get(cacheKey) + if val != nil { + hll.SetRegisters(val) + } + + // add this event + hll.Add(evt.PubKey) + + // save values back again + if err := hllBucket.Put(cacheKey, hll.GetRegisters()); err != nil { + return err + } + } + + return nil +} diff --git a/eventstore/boltdb/delete.go b/eventstore/boltdb/delete.go new file mode 100644 index 0000000..359bf53 --- /dev/null +++ b/eventstore/boltdb/delete.go @@ -0,0 +1,46 @@ +package bolt + +import ( + "fmt" + + "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore/codec/betterbinary" + "go.etcd.io/bbolt" +) + +func (b *BoltBackend) DeleteEvent(id nostr.ID) error { + return b.DB.Update(func(txn *bbolt.Tx) error { + return b.delete(txn, id) + }) +} + +func (b *BoltBackend) delete(txn *bbolt.Tx, id nostr.ID) error { + rawBucket := txn.Bucket(rawEventStore) + + // check if we have this actually + bin := rawBucket.Get(id[16:24]) + if bin == nil { + // we already do not have this + return nil + } + + var evt nostr.Event + if err := betterbinary.Unmarshal(bin, &evt); err != nil { + return fmt.Errorf("failed to unmarshal raw event %x to delete: %w", id, err) + } + + // calculate all index keys we have for this event and delete them + for k := range b.getIndexKeysForEvent(evt) { + err := txn.Bucket(k.bucket).Delete(k.key) + if err != nil { + return fmt.Errorf("failed to delete index entry %s for %x: %w", b.keyName(k), evt.ID[0:8], err) + } + } + + // delete the raw event + if err := rawBucket.Delete(id[16:24]); err != nil { + return fmt.Errorf("failed to delete raw event %x: %w", evt.ID[16:24], err) + } + + return nil +} diff --git a/eventstore/boltdb/fuzz_test.go b/eventstore/boltdb/fuzz_test.go new file mode 100644 index 0000000..1ea4221 --- /dev/null +++ b/eventstore/boltdb/fuzz_test.go @@ -0,0 +1,125 @@ +package bolt + +import ( + "cmp" + "encoding/binary" + "fmt" + "os" + "slices" + "testing" + "time" + + "fiatjaf.com/nostr" + "github.com/stretchr/testify/require" +) + +func FuzzQuery(f *testing.F) { + f.Add(uint(200), uint(50), uint(13), uint(2), uint(2), uint(0), uint(1)) + f.Fuzz(func(t *testing.T, total, limit, authors, timestampAuthorFactor, seedFactor, kinds, kindFactor uint) { + total++ + authors++ + seedFactor++ + kindFactor++ + if kinds == 1 { + kinds++ + } + if limit == 0 { + return + } + + // ~ setup db + if err := os.RemoveAll("/tmp/bolttest"); err != nil { + t.Fatal(err) + return + } + db := &BoltBackend{} + db.Path = "/tmp/bolttest" + if err := db.Init(); err != nil { + t.Fatal(err) + return + } + defer db.Close() + + // ~ start actual test + + filter := nostr.Filter{ + Authors: make([]nostr.PubKey, authors), + Limit: int(limit), + } + var maxKind nostr.Kind = 1 + if kinds > 0 { + filter.Kinds = make([]nostr.Kind, kinds) + for i := range filter.Kinds { + filter.Kinds[i] = nostr.Kind(int(kindFactor) * i) + } + maxKind = filter.Kinds[len(filter.Kinds)-1] + } + + for i := 0; i < int(authors); i++ { + var sk nostr.SecretKey + binary.BigEndian.PutUint32(sk[:], uint32(i%int(authors*seedFactor))+1) + pk := nostr.GetPublicKey(sk) + filter.Authors[i] = pk + } + + expected := make([]nostr.Event, 0, total) + for i := 0; i < int(total); i++ { + skseed := uint32(i%int(authors*seedFactor)) + 1 + sk := nostr.SecretKey{} + binary.BigEndian.PutUint32(sk[:], skseed) + + evt := nostr.Event{ + CreatedAt: nostr.Timestamp(skseed)*nostr.Timestamp(timestampAuthorFactor) + nostr.Timestamp(i), + Content: fmt.Sprintf("unbalanced %d", i), + Tags: nostr.Tags{}, + Kind: nostr.Kind(i) % maxKind, + } + err := evt.Sign(sk) + require.NoError(t, err) + + err = db.SaveEvent(evt) + require.NoError(t, err) + + if filter.Matches(evt) { + expected = append(expected, evt) + } + } + + slices.SortFunc(expected, nostr.CompareEventReverse) + if len(expected) > int(limit) { + expected = expected[0:limit] + } + + start := time.Now() + + res := slices.Collect(db.QueryEvents(filter, 500)) + end := time.Now() + + require.Equal(t, len(expected), len(res), "number of results is different than expected") + require.Less(t, end.Sub(start).Milliseconds(), int64(1500), "query took too long") + nresults := len(expected) + + getTimestamps := func(events []nostr.Event) []nostr.Timestamp { + res := make([]nostr.Timestamp, len(events)) + for i, evt := range events { + res[i] = evt.CreatedAt + } + return res + } + + fmt.Println(" expected result") + for i := range expected { + fmt.Println(" ", expected[i].CreatedAt, expected[i].ID.Hex()[0:8], " ", res[i].CreatedAt, res[i].ID.Hex()[0:8], " ", i) + } + + require.Equal(t, expected[0].CreatedAt, res[0].CreatedAt, "first result is wrong") + require.Equal(t, expected[nresults-1].CreatedAt, res[nresults-1].CreatedAt, "last result (%d) is wrong", nresults-1) + require.Equal(t, getTimestamps(expected), getTimestamps(res)) + + for _, evt := range res { + require.True(t, filter.Matches(evt), "event %s doesn't match filter %s", evt, filter) + } + + require.True(t, slices.IsSortedFunc(res, func(a, b nostr.Event) int { return cmp.Compare(b.CreatedAt, a.CreatedAt) }), "results are not sorted") + }) +} diff --git a/eventstore/boltdb/helpers.go b/eventstore/boltdb/helpers.go new file mode 100644 index 0000000..a22003b --- /dev/null +++ b/eventstore/boltdb/helpers.go @@ -0,0 +1,283 @@ +package bolt + +import ( + "bytes" + "crypto/md5" + "encoding/binary" + "encoding/hex" + "fmt" + "iter" + "slices" + "strconv" + "strings" + + "fiatjaf.com/nostr" + "go.etcd.io/bbolt" +) + +type iterator struct { + query query + + // iteration stuff + cursor *bbolt.Cursor + key []byte + currIdPtr []byte + err error + + // this keeps track of last timestamp value pulled from this + last uint32 + + // if we shouldn't fetch more from this + exhausted bool + + // results not yet emitted + idPtrs [][]byte + timestamps []uint32 +} + +func (it *iterator) pull(n int, since uint32) { + query := it.query + + for range n { + // in the beginning we already have a k and a v and an err from the cursor setup, so check and use these + if it.err != nil { + it.exhausted = true + return + } + + if !bytes.HasPrefix(it.key, query.prefix) { + // we reached the end of this prefix + it.exhausted = true + return + } + + createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:]) + if createdAt < since { + it.exhausted = true + return + } + + // got a key + it.idPtrs = append(it.idPtrs, it.currIdPtr) + it.timestamps = append(it.timestamps, createdAt) + it.last = createdAt + + // advance the cursor for the next call + it.next() + } + + return +} + +func (it *iterator) seek(keyPrefix []byte) { + fullkey, _ := it.cursor.Seek(keyPrefix) + copy(it.key, fullkey[len(fullkey)-8-4:]) + copy(it.currIdPtr, fullkey[len(fullkey)-8:]) +} + +// goes backwards +func (it *iterator) next() { + // move one back (we'll look into k and v and err in the next iteration) + fullkey, _ := it.cursor.Prev() + copy(it.key, fullkey[len(fullkey)-8-4:]) + copy(it.currIdPtr, fullkey[len(fullkey)-8:]) +} + +type iterators []*iterator + +// quickselect reorders the slice just enough to make the top k elements be arranged at the end +// i.e. [1, 700, 25, 312, 44, 28] with k=3 becomes something like [700, 312, 44, 1, 25, 28] +// in this case it's hardcoded to use the 'last' field of the iterator +// copied from https://github.com/chrislee87/go-quickselect +// this is modified to also return the highest 'last' (because it's not guaranteed it will be the first item) +func (its iterators) quickselect(k int) uint32 { + if len(its) == 0 || k >= len(its) { + return 0 + } + + left, right := 0, len(its)-1 + + for { + // insertion sort for small ranges + if right-left <= 20 { + for i := left + 1; i <= right; i++ { + for j := i; j > 0 && its[j].last > its[j-1].last; j-- { + its[j], its[j-1] = its[j-1], its[j] + } + } + return its[0].last + } + + // median-of-three to choose pivot + pivotIndex := left + (right-left)/2 + if its[right].last > its[left].last { + its[right], its[left] = its[left], its[right] + } + if its[pivotIndex].last > its[left].last { + its[pivotIndex], its[left] = its[left], its[pivotIndex] + } + if its[right].last > its[pivotIndex].last { + its[right], its[pivotIndex] = its[pivotIndex], its[right] + } + + // partition + its[left], its[pivotIndex] = its[pivotIndex], its[left] + ll := left + 1 + rr := right + for ll <= rr { + for ll <= right && its[ll].last > its[left].last { + ll++ + } + for rr >= left && its[left].last > its[rr].last { + rr-- + } + if ll <= rr { + its[ll], its[rr] = its[rr], its[ll] + ll++ + rr-- + } + } + its[left], its[rr] = its[rr], its[left] // swap into right place + pivotIndex = rr + + if k == pivotIndex { + // now that stuff is selected we get the highest "last" + highest := its[0].last + for i := 1; i < k; i++ { + if its[i].last > highest { + highest = its[i].last + } + } + return highest + } + + if k < pivotIndex { + right = pivotIndex - 1 + } else { + left = pivotIndex + 1 + } + } +} + +type key struct { + bucket []byte + key []byte +} + +func (b *BoltBackend) keyName(key key) string { + return fmt.Sprintf("", string(key.bucket), key.key) +} + +func (b *BoltBackend) getIndexKeysForEvent(evt nostr.Event) iter.Seq[key] { + return func(yield func(key) bool) { + { + // ~ by pubkey+date + k := make([]byte, 8+4+8) + copy(k[0:8], evt.PubKey[0:8]) + binary.BigEndian.PutUint32(k[8:8+4], uint32(evt.CreatedAt)) + copy(k[8+4:8+4+8], evt.ID[16:24]) + if !yield(key{bucket: indexPubkey, key: k[0 : 8+4]}) { + return + } + } + + { + // ~ by kind+date + k := make([]byte, 2+4+8) + binary.BigEndian.PutUint16(k[0:2], uint16(evt.Kind)) + binary.BigEndian.PutUint32(k[2:2+4], uint32(evt.CreatedAt)) + copy(k[2+4:2+4+8], evt.ID[16:24]) + if !yield(key{bucket: indexKind, key: k[0 : 2+4]}) { + return + } + } + + { + // ~ by pubkey+kind+date + k := make([]byte, 8+2+4+8) + copy(k[0:8], evt.PubKey[0:8]) + binary.BigEndian.PutUint16(k[8:8+2], uint16(evt.Kind)) + binary.BigEndian.PutUint32(k[8+2:8+2+4], uint32(evt.CreatedAt)) + copy(k[8+2:8+2+8], evt.ID[16:24]) + if !yield(key{bucket: indexPubkeyKind, key: k[0 : 8+2+4]}) { + return + } + } + + // ~ by tagvalue+date + // ~ by p-tag+kind+date + for i, tag := range evt.Tags { + if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 { + // not indexable + continue + } + firstIndex := slices.IndexFunc(evt.Tags, func(t nostr.Tag) bool { + return len(t) >= 2 && t[0] == tag[0] && t[1] == tag[1] + }) + if firstIndex != i { + // duplicate + continue + } + + // get key prefix (with full length) and offset where to write the created_at + bucket, k := b.getTagIndexPrefix(tag[0], tag[1]) + // keys always end with 4 bytes of created_at + 8 bytes of the id ptr + binary.BigEndian.PutUint32(k[len(k)-8-4:], uint32(evt.CreatedAt)) + copy(k[len(k)-8:], evt.ID[16:24]) + if !yield(key{bucket: bucket, key: k}) { + return + } + } + + { + // ~ by date only + k := make([]byte, 4+8) + binary.BigEndian.PutUint32(k[0:4], uint32(evt.CreatedAt)) + copy(k[4:4+8], evt.ID[16:24]) + if !yield(key{bucket: indexCreatedAt, key: k[0:4]}) { + return + } + } + } +} + +func (b *BoltBackend) getTagIndexPrefix(tagName string, tagValue string) (bucket_ []byte, k_ []byte) { + var k []byte // the key with full length for created_at and idptr at the end, but not filled with these + + letterPrefix := byte(int(tagName[0]) % 256) + + // if it's 32 bytes as hex, save it as bytes + if len(tagValue) == 64 { + // but we actually only use the first 8 bytes, with letter (tag name) prefix + k = make([]byte, 1+8+4+8) + if _, err := hex.Decode(k[1:1+8], []byte(tagValue[0:8*2])); err == nil { + k[0] = letterPrefix + return indexTag32, k + } + } + + // if it looks like an "a" tag, index it in this special format, with letter (tag name) prefix + spl := strings.Split(tagValue, ":") + if len(spl) == 3 && len(spl[1]) == 64 { + k = make([]byte, 1+2+8+30+4+8) + if _, err := hex.Decode(k[1+2:1+2+8], []byte(spl[1][0:8*2])); err == nil { + if kind, err := strconv.ParseUint(spl[0], 10, 16); err == nil { + k[0] = letterPrefix + k[1] = byte(kind >> 8) + k[2] = byte(kind) + // limit "d" identifier to 30 bytes (so we don't have to grow our byte slice) + copy(k[1+2+8:1+2+8+30], spl[2]) + return indexTagAddr, k + } + } + } + + // index whatever else as a md5 hash of the contents, with letter (tag name) prefix + h := md5.New() + h.Write([]byte(tagValue)) + k = make([]byte, 1, 1+16+4+8) + k[0] = letterPrefix + k = h.Sum(k) + + return indexTag, k +} diff --git a/eventstore/boltdb/lib.go b/eventstore/boltdb/lib.go new file mode 100644 index 0000000..6d9e78c --- /dev/null +++ b/eventstore/boltdb/lib.go @@ -0,0 +1,88 @@ +package bolt + +import ( + "os" + + "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore" + "go.etcd.io/bbolt" +) + +var ( + settingsStore = []byte("settingsStore") + rawEventStore = []byte("rawEventStore") + indexCreatedAt = []byte("indexCreatedAt") + indexKind = []byte("indexKind") + indexPubkey = []byte("indexPubkey") + indexPubkeyKind = []byte("indexPubkeyKind") + indexTag = []byte("indexTag") + indexTag32 = []byte("indexTag32") + indexTagAddr = []byte("indexTagAddr") + hllCache = []byte("hllCache") +) + +var _ eventstore.Store = (*BoltBackend)(nil) + +type BoltBackend struct { + Path string + MapSize int64 + DB *bbolt.DB + + EnableHLLCacheFor func(kind nostr.Kind) (useCache bool, skipSavingActualEvent bool) +} + +func (b *BoltBackend) Init() error { + // create directory if it doesn't exist and open it + if err := os.MkdirAll(b.Path, 0755); err != nil { + return err + } + + return b.initialize() +} + +func (b *BoltBackend) Close() { + b.DB.Close() +} + +func (b *BoltBackend) initialize() error { + db, err := bbolt.Open(b.Path, 0600, nil) + if err != nil { + return err + } + + db.Update(func(txn *bbolt.Tx) error { + if _, err := txn.CreateBucketIfNotExists(settingsStore); err != nil { + return err + } + if _, err := txn.CreateBucketIfNotExists(rawEventStore); err != nil { + return err + } + if _, err := txn.CreateBucketIfNotExists(indexCreatedAt); err != nil { + return err + } + if _, err := txn.CreateBucketIfNotExists(indexKind); err != nil { + return err + } + if _, err := txn.CreateBucketIfNotExists(indexPubkey); err != nil { + return err + } + if _, err := txn.CreateBucketIfNotExists(indexPubkeyKind); err != nil { + return err + } + if _, err := txn.CreateBucketIfNotExists(indexTag); err != nil { + return err + } + if _, err := txn.CreateBucketIfNotExists(indexTag32); err != nil { + return err + } + if _, err := txn.CreateBucketIfNotExists(indexTagAddr); err != nil { + return err + } + if _, err := txn.CreateBucketIfNotExists(hllCache); err != nil { + return err + } + return nil + }) + + return b.migrate() +} diff --git a/eventstore/boltdb/migration.go b/eventstore/boltdb/migration.go new file mode 100644 index 0000000..2eabb5d --- /dev/null +++ b/eventstore/boltdb/migration.go @@ -0,0 +1,47 @@ +package bolt + +import ( + "encoding/binary" + "log" + + "go.etcd.io/bbolt" +) + +const ( + DB_VERSION byte = 'v' +) + +const target = 1 + +func (b *BoltBackend) migrate() error { + return b.DB.Update(func(txn *bbolt.Tx) error { + bucket := txn.Bucket(settingsStore) + + val := bucket.Get([]byte("version")) + + var version uint16 = target + if val == nil { + version = binary.BigEndian.Uint16(val) + } + + // do the migrations in increasing steps (there is no rollback) + if version < target { + log.Printf("[bolt] migration %d: reindex everything\n", target) + + // bump version + if err := b.setVersion(txn, target); err != nil { + return err + } + } + + return nil + }) +} + +func (b *BoltBackend) setVersion(txn *bbolt.Tx, v uint16) error { + bucket := txn.Bucket(settingsStore) + + var newVersion [2]byte + binary.BigEndian.PutUint16(newVersion[:], v) + return bucket.Put([]byte("version"), newVersion[:]) +} diff --git a/eventstore/boltdb/query.go b/eventstore/boltdb/query.go new file mode 100644 index 0000000..d77f774 --- /dev/null +++ b/eventstore/boltdb/query.go @@ -0,0 +1,187 @@ +package bolt + +import ( + "iter" + "log" + "math" + "slices" + + "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore/codec/betterbinary" + "fiatjaf.com/nostr/eventstore/internal" + "go.etcd.io/bbolt" +) + +func (b *BoltBackend) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[nostr.Event] { + return func(yield func(nostr.Event) bool) { + if filter.IDs != nil { + // when there are ids we ignore everything else and just fetch the ids + if err := b.DB.View(func(txn *bbolt.Tx) error { + return b.queryByIds(txn, filter.IDs, yield) + }); err != nil { + log.Printf("bolt: unexpected id query error: %s\n", err) + } + return + } + + // ignore search queries + if filter.Search != "" { + return + } + + // max number of events we'll return + if tlimit := filter.GetTheoreticalLimit(); tlimit == 0 || filter.LimitZero { + return + } else if tlimit < maxLimit { + maxLimit = tlimit + } + if filter.Limit > 0 && filter.Limit < maxLimit { + maxLimit = filter.Limit + } + + // do a normal query based on various filters + if err := b.DB.View(func(txn *bbolt.Tx) error { + return b.query(txn, filter, maxLimit, yield) + }); err != nil { + log.Printf("bolt: unexpected query error: %s\n", err) + } + } +} + +func (b *BoltBackend) queryByIds(txn *bbolt.Tx, ids []nostr.ID, yield func(nostr.Event) bool) error { + rawBucket := txn.Bucket(rawEventStore) + + for _, id := range ids { + bin := rawBucket.Get(id[16:24]) + if bin == nil { + continue + } + + event := nostr.Event{} + if err := betterbinary.Unmarshal(bin, &event); err != nil { + continue + } + + if !yield(event) { + return nil + } + } + + return nil +} + +func (b *BoltBackend) query(txn *bbolt.Tx, filter nostr.Filter, limit int, yield func(nostr.Event) bool) error { + queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := b.prepareQueries(filter) + if err != nil { + return err + } + + iterators := make(iterators, len(queries)) + batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, len(queries)) + + for q, query := range queries { + bucket := txn.Bucket(queries[q].bucket) + + iterators[q] = &iterator{ + query: query, + cursor: bucket.Cursor(), + } + + iterators[q].seek(queries[q].startingPoint) + } + + // initial pull from all queries + for i := range iterators { + iterators[i].pull(batchSizePerQuery, since) + } + + numberOfIteratorsToPullOnEachRound := max(1, int(math.Ceil(float64(len(iterators))/float64(12)))) + totalEventsEmitted := 0 + tempResults := make([]nostr.Event, 0, batchSizePerQuery*2) + + rawBucket := txn.Bucket(rawEventStore) + + for len(iterators) > 0 { + // reset stuff + tempResults = tempResults[:0] + + // after pulling from all iterators once we now find out what iterators are + // the ones we should keep pulling from next (i.e. which one's last emitted timestamp is the highest) + threshold := iterators.quickselect(min(numberOfIteratorsToPullOnEachRound, len(iterators))) + + // so we can emit all the events higher than the threshold + for i := range iterators { + for t := 0; t < len(iterators[i].timestamps); t++ { + if iterators[i].timestamps[t] >= threshold { + idPtr := iterators[i].idPtrs[t] + + // discard this regardless of what happens + iterators[i].timestamps = internal.SwapDelete(iterators[i].timestamps, t) + iterators[i].idPtrs = internal.SwapDelete(iterators[i].idPtrs, t) + t-- + + // fetch actual event + bin := rawBucket.Get(idPtr) + if bin == nil { + log.Printf("bolt: failed to get %x from raw event store (query prefix=%x, index=%s, bucket=%s)\n", idPtr, err, iterators[i].query.prefix, string(iterators[i].query.bucket)) + continue + } + + // check it against pubkeys without decoding the entire thing + if extraAuthors != nil && !slices.Contains(extraAuthors, betterbinary.GetPubKey(bin)) { + continue + } + + // check it against kinds without decoding the entire thing + if extraKinds != nil && !slices.Contains(extraKinds, betterbinary.GetKind(bin)) { + continue + } + + // decode the entire thing + event := nostr.Event{} + if err := betterbinary.Unmarshal(bin, &event); err != nil { + log.Printf("bolt: value read error (id %x) on query prefix %x sp %x dbi %s: %s\n", + betterbinary.GetID(bin), iterators[i].query.prefix, iterators[i].query.startingPoint, string(iterators[i].query.bucket), err) + continue + } + + // if there is still a tag to be checked, do it now + if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) { + continue + } + + tempResults = append(tempResults, event) + } + } + } + + // emit this stuff in order + slices.SortFunc(tempResults, nostr.CompareEventReverse) + for _, evt := range tempResults { + if !yield(evt) { + return nil + } + + totalEventsEmitted++ + if totalEventsEmitted == limit { + return nil + } + } + + // now pull more events + for i := 0; i < min(len(iterators), numberOfIteratorsToPullOnEachRound); i++ { + if iterators[i].exhausted { + if len(iterators[i].idPtrs) == 0 { + // eliminating this from the list of iterators + iterators = internal.SwapDelete(iterators, i) + i-- + } + continue + } + + iterators[i].pull(batchSizePerQuery, since) + } + } + + return nil +} diff --git a/eventstore/boltdb/query_planner.go b/eventstore/boltdb/query_planner.go new file mode 100644 index 0000000..458c0e6 --- /dev/null +++ b/eventstore/boltdb/query_planner.go @@ -0,0 +1,144 @@ +package bolt + +import ( + "encoding/binary" + + "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore/internal" +) + +type query struct { + i int + bucket []byte + prefix []byte + startingPoint []byte +} + +func (b *BoltBackend) prepareQueries(filter nostr.Filter) ( + queries []query, + extraAuthors []nostr.PubKey, + extraKinds []nostr.Kind, + extraTagKey string, + extraTagValues []string, + since uint32, + err error, +) { + // we will apply this to every query we return + defer func() { + if queries == nil { + return + } + + var until uint32 = 4294967295 + if filter.Until != 0 { + if fu := uint32(filter.Until); fu < until { + until = fu + 1 + } + } + for i, q := range queries { + sp := make([]byte, len(q.prefix)) + sp = sp[0:len(q.prefix)] + copy(sp, q.prefix) + queries[i].startingPoint = binary.BigEndian.AppendUint32(sp, uint32(until)) + } + }() + + // this is where we'll end the iteration + if filter.Since != 0 { + if fs := uint32(filter.Since); fs > since { + since = fs + } + } + + if len(filter.Tags) > 0 { + // we will select ONE tag to query for and ONE extra tag to do further narrowing, if available + tagKey, tagValues, goodness := internal.ChooseNarrowestTag(filter) + + // we won't use a tag index for this as long as we have something else to match with + if goodness < 2 && (len(filter.Authors) > 0 || len(filter.Kinds) > 0) { + goto pubkeyMatching + } + + // otherwise we will use a plain tag index + queries = make([]query, len(tagValues)) + for i, value := range tagValues { + // get key prefix (with full length) and offset where to write the created_at + dbi, k := b.getTagIndexPrefix(tagKey, value) + // remove the last parts part to get just the prefix we want here + prefix := k[0 : len(k)-8-4] + queries[i] = query{i: i, bucket: dbi, prefix: prefix} + } + + // add an extra kind filter if available (only do this on plain tag index, not on ptag-kind index) + if filter.Kinds != nil { + extraKinds = make([]nostr.Kind, len(filter.Kinds)) + for i, kind := range filter.Kinds { + extraKinds[i] = kind + } + } + + // add an extra author search if possible + if filter.Authors != nil { + extraAuthors = make([]nostr.PubKey, len(filter.Authors)) + for i, pk := range filter.Authors { + extraAuthors[i] = pk + } + } + + // add an extra useless tag if available + filter.Tags = internal.CopyMapWithoutKey(filter.Tags, tagKey) + if len(filter.Tags) > 0 { + extraTagKey, extraTagValues, _ = internal.ChooseNarrowestTag(filter) + } + + return queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, nil + } + +pubkeyMatching: + if len(filter.Authors) > 0 { + if len(filter.Kinds) == 0 { + // will use pubkey index + queries = make([]query, len(filter.Authors)) + for i, pk := range filter.Authors { + queries[i] = query{i: i, bucket: indexPubkey, prefix: pk[0:8]} + } + } else { + // will use pubkeyKind index + queries = make([]query, len(filter.Authors)*len(filter.Kinds)) + i := 0 + for _, pk := range filter.Authors { + for _, kind := range filter.Kinds { + prefix := make([]byte, 8+2) + copy(prefix[0:8], pk[0:8]) + binary.BigEndian.PutUint16(prefix[8:8+2], uint16(kind)) + queries[i] = query{i: i, bucket: indexPubkeyKind, prefix: prefix[0 : 8+2]} + i++ + } + } + } + + // potentially with an extra useless tag filtering + extraTagKey, extraTagValues, _ = internal.ChooseNarrowestTag(filter) + return queries, nil, nil, extraTagKey, extraTagValues, since, nil + } + + if len(filter.Kinds) > 0 { + // will use a kind index + queries = make([]query, len(filter.Kinds)) + for i, kind := range filter.Kinds { + prefix := make([]byte, 2) + binary.BigEndian.PutUint16(prefix[0:2], uint16(kind)) + queries[i] = query{i: i, bucket: indexKind, prefix: prefix[0:2]} + } + + // potentially with an extra useless tag filtering + tagKey, tagValues, _ := internal.ChooseNarrowestTag(filter) + return queries, nil, nil, tagKey, tagValues, since, nil + } + + // if we got here our query will have nothing to filter with + queries = make([]query, 1) + prefix := make([]byte, 0) + queries[0] = query{i: 0, bucket: indexCreatedAt, prefix: prefix} + return queries, nil, nil, "", nil, since, nil +} diff --git a/eventstore/boltdb/replace.go b/eventstore/boltdb/replace.go new file mode 100644 index 0000000..55297fd --- /dev/null +++ b/eventstore/boltdb/replace.go @@ -0,0 +1,47 @@ +package bolt + +import ( + "fmt" + "iter" + + "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore/internal" + "go.etcd.io/bbolt" +) + +func (b *BoltBackend) ReplaceEvent(evt nostr.Event) error { + return b.DB.Update(func(txn *bbolt.Tx) error { + filter := nostr.Filter{Limit: 1, Kinds: []nostr.Kind{evt.Kind}, Authors: []nostr.PubKey{evt.PubKey}} + if evt.Kind.IsAddressable() { + // when addressable, add the "d" tag to the filter + filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}} + } + + // now we fetch the past events, whatever they are, delete them and then save the new + var yield_ func(nostr.Event) bool + var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) { + yield_ = yield + } + err := b.query(txn, filter, 10 /* in theory limit could be just 1 and this should work */, yield_) + if err != nil { + return fmt.Errorf("failed to query past events with %s: %w", filter, err) + } + + shouldStore := true + for previous := range results { + if internal.IsOlder(previous, evt) { + if err := b.delete(txn, previous.ID); err != nil { + return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err) + } + } else { + // there is a newer event already stored, so we won't store this + shouldStore = false + } + } + if shouldStore { + return b.save(txn, evt) + } + + return nil + }) +} diff --git a/eventstore/boltdb/save.go b/eventstore/boltdb/save.go new file mode 100644 index 0000000..b14e24a --- /dev/null +++ b/eventstore/boltdb/save.go @@ -0,0 +1,65 @@ +package bolt + +import ( + "fmt" + + "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore" + "fiatjaf.com/nostr/eventstore/codec/betterbinary" + "go.etcd.io/bbolt" +) + +func (b *BoltBackend) SaveEvent(evt nostr.Event) error { + return b.DB.Update(func(txn *bbolt.Tx) error { + if b.EnableHLLCacheFor != nil { + // modify hyperloglog caches relative to this + useCache, skipSaving := b.EnableHLLCacheFor(evt.Kind) + + if useCache { + err := b.updateHyperLogLogCachedValues(txn, evt) + if err != nil { + return fmt.Errorf("failed to update hll cache: %w", err) + } + if skipSaving { + return nil + } + } + } + + rawBucket := txn.Bucket(rawEventStore) + + // check if we already have this id + bin := rawBucket.Get(evt.ID[16:24]) + if bin != nil { + // we should get nil, otherwise end here + return eventstore.ErrDupEvent + } + + return b.save(txn, evt) + }) +} + +func (b *BoltBackend) save(txn *bbolt.Tx, evt nostr.Event) error { + rawBucket := txn.Bucket(rawEventStore) + + // encode to binary form so we'll save it + bin := make([]byte, betterbinary.Measure(evt)) + if err := betterbinary.Marshal(evt, bin); err != nil { + return err + } + + // raw event store + if err := rawBucket.Put(evt.ID[16:24], bin); err != nil { + return err + } + + // put indexes + for k := range b.getIndexKeysForEvent(evt) { + err := txn.Bucket(k.bucket).Put(k.key, nil) + if err != nil { + return err + } + } + + return nil +} diff --git a/eventstore/boltdb/utils_test.go b/eventstore/boltdb/utils_test.go new file mode 100644 index 0000000..3da9733 --- /dev/null +++ b/eventstore/boltdb/utils_test.go @@ -0,0 +1,67 @@ +package bolt + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestQuickselect(t *testing.T) { + { + its := iterators{ + {last: 781}, + {last: 900}, + {last: 1}, + {last: 81}, + {last: 325}, + {last: 781}, + {last: 562}, + {last: 81}, + {last: 444}, + } + + its.quickselect(3) + require.ElementsMatch(t, + []uint32{its[0].last, its[1].last, its[2].last}, + []uint32{900, 781, 781}, + ) + } + + { + its := iterators{ + {last: 781}, + {last: 781}, + {last: 900}, + {last: 1}, + {last: 87}, + {last: 315}, + {last: 789}, + {last: 500}, + {last: 812}, + {last: 306}, + {last: 612}, + {last: 444}, + {last: 59}, + {last: 441}, + {last: 901}, + {last: 901}, + {last: 2}, + {last: 81}, + {last: 325}, + {last: 781}, + {last: 562}, + {last: 81}, + {last: 326}, + {last: 662}, + {last: 444}, + {last: 81}, + {last: 444}, + } + + its.quickselect(6) + require.ElementsMatch(t, + []uint32{its[0].last, its[1].last, its[2].last, its[3].last, its[4].last, its[5].last}, + []uint32{901, 900, 901, 781, 812, 789}, + ) + } +} diff --git a/go.mod b/go.mod index 120c2ae..be23983 100644 --- a/go.mod +++ b/go.mod @@ -86,8 +86,9 @@ require ( github.com/tidwall/pretty v1.2.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/x448/float16 v0.8.4 // indirect + go.etcd.io/bbolt v1.4.2 // indirect go.opencensus.io v0.24.0 // indirect - golang.org/x/sys v0.31.0 // indirect + golang.org/x/sys v0.34.0 // indirect google.golang.org/protobuf v1.36.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 3b8cd28..cfb0223 100644 --- a/go.sum +++ b/go.sum @@ -298,6 +298,8 @@ github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcY github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +go.etcd.io/bbolt v1.4.2 h1:IrUHp260R8c+zYx/Tm8QZr04CX+qWS5PGfPdevhdm1I= +go.etcd.io/bbolt v1.4.2/go.mod h1:Is8rSHO/b4f3XigBC0lL0+4FwAQv3HXEEIgFMuKHceM= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= golang.org/x/crypto v0.0.0-20170613210332-850760c427c5/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -355,6 +357,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= +golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=