From e9a08d669e3c8e541cb1b642ae3f71289a549031 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Tue, 10 Jun 2025 13:46:29 -0300 Subject: [PATCH] eventstore: index tag letter together with the value. --- eventstore/badger/helpers.go | 41 ++++++----- eventstore/badger/migrations.go | 106 ++++++++++++++++++++++++----- eventstore/badger/query_planner.go | 3 +- eventstore/lmdb/helpers.go | 46 +++++++------ eventstore/lmdb/migration.go | 98 ++++++++++++++++++++------ eventstore/lmdb/query_planner.go | 3 +- eventstore/mmm/helpers.go | 48 +++++++------ eventstore/mmm/query_planner.go | 3 +- 8 files changed, 250 insertions(+), 98 deletions(-) diff --git a/eventstore/badger/helpers.go b/eventstore/badger/helpers.go index d9cb0b4..fa66105 100644 --- a/eventstore/badger/helpers.go +++ b/eventstore/badger/helpers.go @@ -11,30 +11,35 @@ import ( "fiatjaf.com/nostr" ) -func getTagIndexPrefix(tagValue string) ([]byte, int) { +func getTagIndexPrefix(tagName string, tagValue string) ([]byte, int) { var k []byte // the key with full length for created_at and idx at the end, but not filled with these var offset int // the offset -- i.e. where the prefix ends and the created_at and idx would start + letterPrefix := byte(int(tagName[0]) % 256) + if kind, pkb, d := getAddrTagElements(tagValue); len(pkb) == 32 { - // store value in the new special "a" tag index - k = make([]byte, 1+2+8+len(d)+4+4) + // store value in the new special "a"-style tag index + k = make([]byte, 1+1+2+8+len(d)+4+4) k[0] = indexTagAddrPrefix - binary.BigEndian.PutUint16(k[1:], uint16(kind)) - copy(k[1+2:], pkb[0:8]) - copy(k[1+2+8:], d) - offset = 1 + 2 + 8 + len(d) + k[1] = letterPrefix + binary.BigEndian.PutUint16(k[1+1:], uint16(kind)) + copy(k[1+1+2:], pkb[0:8]) + copy(k[1+1+2+8:], d) + offset = 1 + 1 + 2 + 8 + len(d) } else if vb, _ := hex.DecodeString(tagValue); len(vb) == 32 { - // store value as bytes - k = make([]byte, 1+8+4+4) + // store value as bytes with tag name prefix + k = make([]byte, 1+1+8+4+4) k[0] = indexTag32Prefix - copy(k[1:], vb[0:8]) - offset = 1 + 8 + k[1] = letterPrefix + copy(k[2:], vb[0:8]) + offset = 1 + 1 + 8 } else { - // store whatever as utf-8 - k = make([]byte, 1+len(tagValue)+4+4) + // store whatever as utf-8 with tag name prefix + k = make([]byte, 1+1+len(tagValue)+4+4) k[0] = indexTagPrefix - copy(k[1:], tagValue) - offset = 1 + len(tagValue) + k[1] = letterPrefix + copy(k[2:], tagValue) + offset = 1 + 1 + len(tagValue) } return k, offset @@ -102,7 +107,9 @@ func (b *BadgerBackend) getIndexKeysForEvent(evt nostr.Event, idx []byte) iter.S } } - firstIndex := slices.IndexFunc(evt.Tags, func(t nostr.Tag) bool { return len(t) >= 2 && t[1] == tag[1] }) + firstIndex := slices.IndexFunc(evt.Tags, func(t nostr.Tag) bool { + return len(t) >= 2 && t[0] == tag[0] && t[1] == tag[1] + }) if firstIndex != i { // duplicate continue @@ -114,7 +121,7 @@ func (b *BadgerBackend) getIndexKeysForEvent(evt nostr.Event, idx []byte) iter.S } // get key prefix (with full length) and offset where to write the last parts - k, offset := getTagIndexPrefix(tag[1]) + k, offset := getTagIndexPrefix(tag[0], tag[1]) // write the last parts (created_at and idx) binary.BigEndian.PutUint32(k[offset:], uint32(evt.CreatedAt)) diff --git a/eventstore/badger/migrations.go b/eventstore/badger/migrations.go index 90cf1e8..6a87d24 100644 --- a/eventstore/badger/migrations.go +++ b/eventstore/badger/migrations.go @@ -2,41 +2,115 @@ package badger import ( "encoding/binary" + "fmt" + "log" + "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore/codec/betterbinary" "github.com/dgraph-io/badger/v4" ) func (b *BadgerBackend) runMigrations() error { return b.Update(func(txn *badger.Txn) error { - var version uint16 + item, err := txn.Get([]byte("version")) + if err != nil && err != badger.ErrKeyNotFound { + return fmt.Errorf("failed to get db version: %w", err) + } - item, err := txn.Get([]byte{dbVersionKey}) - if err == badger.ErrKeyNotFound { - version = 0 - } else if err != nil { - return err - } else { - item.Value(func(val []byte) error { + var version uint16 = 0 + if err == nil { + err = item.Value(func(val []byte) error { version = binary.BigEndian.Uint16(val) return nil }) + if err != nil { + return fmt.Errorf("failed to read db version: %w", err) + } } // do the migrations in increasing steps (there is no rollback) // - if version < 1 { - // ... - } + // the 5 first migrations go to trash because on version 5 we need to export and import all the data anyway + if version < 5 { + log.Println("[badger] migration 5: delete all indexes and recreate them") - // b.bumpVersion(txn, 1) + // delete all index entries + prefixes := []byte{ + indexIdPrefix, + indexCreatedAtPrefix, + indexKindPrefix, + indexPubkeyPrefix, + indexPubkeyKindPrefix, + indexTagPrefix, + indexTag32Prefix, + indexTagAddrPrefix, + } + + for _, prefix := range prefixes { + it := txn.NewIterator(badger.IteratorOptions{ + PrefetchValues: false, + Prefix: []byte{prefix}, + }) + defer it.Close() + + var keysToDelete [][]byte + for it.Seek([]byte{prefix}); it.ValidForPrefix([]byte{prefix}); it.Next() { + key := it.Item().Key() + keyCopy := make([]byte, len(key)) + copy(keyCopy, key) + keysToDelete = append(keysToDelete, keyCopy) + } + + for _, key := range keysToDelete { + if err := txn.Delete(key); err != nil { + return fmt.Errorf("failed to delete index key %x: %w", key, err) + } + } + } + + // iterate through all events and recreate indexes + it := txn.NewIterator(badger.IteratorOptions{ + PrefetchValues: true, + Prefix: []byte{rawEventStorePrefix}, + }) + defer it.Close() + + for it.Seek([]byte{rawEventStorePrefix}); it.ValidForPrefix([]byte{rawEventStorePrefix}); it.Next() { + item := it.Item() + idx := item.Key() + + err := item.Value(func(val []byte) error { + evt := nostr.Event{} + if err := betterbinary.Unmarshal(val, &evt); err != nil { + return fmt.Errorf("error decoding event %x on migration 5: %w", idx, err) + } + + for key := range b.getIndexKeysForEvent(evt, idx[1:]) { + if err := txn.Set(key, nil); err != nil { + return fmt.Errorf("failed to save index for event %s on migration 5: %w", evt.ID, err) + } + } + + return nil + }) + if err != nil { + return err + } + } + + // bump version + if err := b.bumpVersion(txn, 5); err != nil { + return err + } + } return nil }) } -func (b *BadgerBackend) bumpVersion(txn *badger.Txn, version uint16) error { - buf := make([]byte, 2) - binary.BigEndian.PutUint16(buf, version) - return txn.Set([]byte{dbVersionKey}, buf) +func (b *BadgerBackend) bumpVersion(txn *badger.Txn, v uint16) error { + var newVersion [2]byte + binary.BigEndian.PutUint16(newVersion[:], v) + return txn.Set([]byte("version"), newVersion[:]) } diff --git a/eventstore/badger/query_planner.go b/eventstore/badger/query_planner.go index 946f4c0..70d55ae 100644 --- a/eventstore/badger/query_planner.go +++ b/eventstore/badger/query_planner.go @@ -71,11 +71,10 @@ func prepareQueries(filter nostr.Filter) ( queries = make([]query, len(tagValues)) for i, value := range tagValues { // get key prefix (with full length) and offset where to write the created_at - k, offset := getTagIndexPrefix(value) + k, offset := getTagIndexPrefix(tagKey, value) // remove the last parts part to get just the prefix we want here prefix := k[0:offset] queries[i] = query{i: i, prefix: prefix} - i++ } extraFilter = &nostr.Filter{ diff --git a/eventstore/lmdb/helpers.go b/eventstore/lmdb/helpers.go index b9988f2..dc87f95 100644 --- a/eventstore/lmdb/helpers.go +++ b/eventstore/lmdb/helpers.go @@ -99,14 +99,16 @@ func (b *LMDBBackend) getIndexKeysForEvent(evt nostr.Event) iter.Seq[key] { // not indexable continue } - firstIndex := slices.IndexFunc(evt.Tags, func(t nostr.Tag) bool { return len(t) >= 2 && t[1] == tag[1] }) + firstIndex := slices.IndexFunc(evt.Tags, func(t nostr.Tag) bool { + return len(t) >= 2 && t[0] == tag[0] && t[1] == tag[1] + }) if firstIndex != i { // duplicate continue } // get key prefix (with full length) and offset where to write the created_at - dbi, k, offset := b.getTagIndexPrefix(tag[1]) + dbi, k, offset := b.getTagIndexPrefix(tag[0], tag[1]) binary.BigEndian.PutUint32(k[offset:], uint32(evt.CreatedAt)) if !yield(key{dbi: dbi, key: k}) { return @@ -136,47 +138,53 @@ func (b *LMDBBackend) getIndexKeysForEvent(evt nostr.Event) iter.Seq[key] { } } -func (b *LMDBBackend) getTagIndexPrefix(tagValue string) (lmdb.DBI, []byte, int) { +func (b *LMDBBackend) getTagIndexPrefix(tagName string, tagValue string) (lmdb.DBI, []byte, int) { var k []byte // the key with full length for created_at and idx at the end, but not filled with these var offset int // the offset -- i.e. where the prefix ends and the created_at and idx would start var dbi lmdb.DBI + letterPrefix := byte(int(tagName[0]) % 256) + // if it's 32 bytes as hex, save it as bytes if len(tagValue) == 64 { - // but we actually only use the first 8 bytes - k = make([]byte, 8+4) - if _, err := hex.Decode(k[0:8], []byte(tagValue[0:8*2])); err == nil { - offset = 8 + // but we actually only use the first 8 bytes, with tag name prefix + k = make([]byte, 1+8+4) + if _, err := hex.Decode(k[1:1+8], []byte(tagValue[0:8*2])); err == nil { + k[0] = letterPrefix + offset = 1 + 8 dbi = b.indexTag32 - return dbi, k[0 : 8+4], offset + return dbi, k[0 : 1+8+4], offset } } - // if it looks like an "a" tag, index it in this special format + // if it looks like an "a" tag, index it in this special format (no tag name prefix for special indexes) spl := strings.Split(tagValue, ":") if len(spl) == 3 && len(spl[1]) == 64 { - k = make([]byte, 2+8+30) - if _, err := hex.Decode(k[2:2+8], []byte(tagValue[0:8*2])); err == nil { + k = make([]byte, 1+2+8+30) + if _, err := hex.Decode(k[1+2:1+2+8], []byte(spl[1][0:8*2])); err == nil { if kind, err := strconv.ParseUint(spl[0], 10, 16); err == nil { - k[0] = byte(kind >> 8) - k[1] = byte(kind) + k[0] = letterPrefix + k[1] = byte(kind >> 8) + k[2] = byte(kind) // limit "d" identifier to 30 bytes (so we don't have to grow our byte slice) - n := copy(k[2+8:2+8+30], spl[2]) - offset = 2 + 8 + n + n := copy(k[1+2+8:1+2+8+30], spl[2]) + offset = 1 + 2 + 8 + n + dbi = b.indexTagAddr return dbi, k[0 : offset+4], offset } } } - // index whatever else as a md5 hash of the contents + // index whatever else as a md5 hash of the contents with tag name prefix h := md5.New() h.Write([]byte(tagValue)) - k = make([]byte, 0, 16+4) + k = make([]byte, 1, 1+16+4) + k[0] = letterPrefix k = h.Sum(k) - offset = 16 + offset = 1 + 16 dbi = b.indexTag - return dbi, k[0 : 16+4], offset + return dbi, k[0 : 1+16+4], offset } func (b *LMDBBackend) dbiName(dbi lmdb.DBI) string { diff --git a/eventstore/lmdb/migration.go b/eventstore/lmdb/migration.go index 0d477b0..e9304f3 100644 --- a/eventstore/lmdb/migration.go +++ b/eventstore/lmdb/migration.go @@ -3,7 +3,10 @@ package lmdb import ( "encoding/binary" "fmt" + "log" + "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore/codec/betterbinary" "github.com/PowerDNS/lmdb-go/lmdb" ) @@ -13,36 +16,91 @@ const ( func (b *LMDBBackend) runMigrations() error { return b.lmdbEnv.Update(func(txn *lmdb.Txn) error { - var version uint16 - v, err := txn.Get(b.settingsStore, []byte{DB_VERSION}) - if err != nil { - if lmdb.IsNotFound(err) { - version = 0 - } else if v == nil { - return fmt.Errorf("failed to read database version: %w", err) - } - } else { - version = binary.BigEndian.Uint16(v) + val, err := txn.Get(b.settingsStore, []byte("version")) + if err != nil && !lmdb.IsNotFound(err) { + return fmt.Errorf("failed to get db version: %w", err) + } + + var version uint16 = 0 + if err == nil { + version = binary.BigEndian.Uint16(val) } // do the migrations in increasing steps (there is no rollback) // // this is when we reindex everything - if version < 1 { - } + if version < 9 { + log.Println("[lmdb] migration 9: reindex everything") - // bump version - // if err := b.setVersion(txn, 1); err != nil { - // return err - // } + if err := txn.Drop(b.indexId, false); err != nil { + return err + } + if err := txn.Drop(b.indexKind, false); err != nil { + return err + } + if err := txn.Drop(b.indexPubkey, false); err != nil { + return err + } + if err := txn.Drop(b.indexPubkeyKind, false); err != nil { + return err + } + if err := txn.Drop(b.indexTag, false); err != nil { + return err + } + if err := txn.Drop(b.indexTag32, false); err != nil { + return err + } + if err := txn.Drop(b.indexTagAddr, false); err != nil { + return err + } + if err := txn.Drop(b.indexPTagKind, false); err != nil { + return err + } + + cursor, err := txn.OpenCursor(b.rawEventStore) + if err != nil { + return fmt.Errorf("failed to open cursor in migration 9: %w", err) + } + defer cursor.Close() + + var idx, val []byte + var evt nostr.Event + + for { + idx, val, err = cursor.Get(nil, nil, lmdb.Next) + if lmdb.IsNotFound(err) { + break + } + if err != nil { + return fmt.Errorf("failed to get next in migration 9: %w", err) + } + + if err := betterbinary.Unmarshal(val, &evt); err != nil { + log.Printf("failed to unmarshal event %x, skipping: %s", idx, err) + continue + } + + for key := range b.getIndexKeysForEvent(evt) { + if err := txn.Put(key.dbi, key.key, idx, 0); err != nil { + return fmt.Errorf("failed to save index %s for event %s (%v) on migration 9: %w", + b.keyName(key), evt.ID, idx, err) + } + } + } + + // bump version + if err := b.setVersion(txn, 9); err != nil { + return err + } + } return nil }) } -func (b *LMDBBackend) setVersion(txn *lmdb.Txn, version uint16) error { - buf, err := txn.PutReserve(b.settingsStore, []byte{DB_VERSION}, 4, 0) - binary.BigEndian.PutUint16(buf, version) - return err +func (b *LMDBBackend) setVersion(txn *lmdb.Txn, v uint16) error { + var newVersion [2]byte + binary.BigEndian.PutUint16(newVersion[:], v) + return txn.Put(b.settingsStore, []byte("version"), newVersion[:], 0) } diff --git a/eventstore/lmdb/query_planner.go b/eventstore/lmdb/query_planner.go index 383c5d8..763ff4c 100644 --- a/eventstore/lmdb/query_planner.go +++ b/eventstore/lmdb/query_planner.go @@ -118,11 +118,10 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( queries = make([]query, len(tagValues)) for i, value := range tagValues { // get key prefix (with full length) and offset where to write the created_at - dbi, k, offset := b.getTagIndexPrefix(value) + dbi, k, offset := b.getTagIndexPrefix(tagKey, value) // remove the last parts part to get just the prefix we want here prefix := k[0:offset] queries[i] = query{i: i, dbi: dbi, prefix: prefix, keySize: len(prefix) + 4, timestampSize: 4} - i++ } // add an extra kind filter if available (only do this on plain tag index, not on ptag-kind index) diff --git a/eventstore/mmm/helpers.go b/eventstore/mmm/helpers.go index 5c10894..4add03a 100644 --- a/eventstore/mmm/helpers.go +++ b/eventstore/mmm/helpers.go @@ -86,14 +86,16 @@ func (il *IndexingLayer) getIndexKeysForEvent(evt nostr.Event) iter.Seq[key] { // not indexable continue } - firstIndex := slices.IndexFunc(evt.Tags, func(t nostr.Tag) bool { return len(t) >= 2 && t[1] == tag[1] }) + firstIndex := slices.IndexFunc(evt.Tags, func(t nostr.Tag) bool { + return len(t) >= 2 && t[0] == tag[0] && t[1] == tag[1] + }) if firstIndex != i { // duplicate continue } // get key prefix (with full length) and offset where to write the created_at - dbi, k, offset := il.getTagIndexPrefix(tag[1]) + dbi, k, offset := il.getTagIndexPrefix(tag[0], tag[1]) binary.BigEndian.PutUint32(k[offset:], uint32(evt.CreatedAt)) if !yield(key{dbi: dbi, key: k}) { return @@ -123,43 +125,49 @@ func (il *IndexingLayer) getIndexKeysForEvent(evt nostr.Event) iter.Seq[key] { } } -func (il *IndexingLayer) getTagIndexPrefix(tagValue string) (lmdb.DBI, []byte, int) { +func (il *IndexingLayer) getTagIndexPrefix(tagName string, tagValue string) (lmdb.DBI, []byte, int) { var k []byte // the key with full length for created_at and idx at the end, but not filled with these var offset int // the offset -- i.e. where the prefix ends and the created_at and idx would start var dbi lmdb.DBI + letterPrefix := byte(int(tagName[0]) % 256) + // if it's 32 bytes as hex, save it as bytes if len(tagValue) == 64 { - // but we actually only use the first 8 bytes - k = make([]byte, 8+4) - if _, err := hex.Decode(k[0:8], []byte(tagValue[0:8*2])); err == nil { - offset = 8 + // but we actually only use the first 8 bytes, with tag name prefix + k = make([]byte, 1+8+4) + if _, err := hex.Decode(k[1:1+8], []byte(tagValue[0:8*2])); err == nil { + k[0] = letterPrefix + offset = 1 + 8 dbi = il.indexTag32 - return dbi, k[0 : 8+4], offset + return dbi, k[0 : 1+8+4], offset } } - // if it looks like an "a" tag, index it in this special format + // if it looks like an "a" tag, index it in this special format (no tag name prefix for special indexes) spl := strings.Split(tagValue, ":") if len(spl) == 3 && len(spl[1]) == 64 { - k = make([]byte, 2+8+30) - if _, err := hex.Decode(k[2:2+8], []byte(tagValue[0:8*2])); err == nil { + k = make([]byte, 1+2+8+30) + if _, err := hex.Decode(k[1+2:1+2+8], []byte(spl[1][0:8*2])); err == nil { if kind, err := strconv.ParseUint(spl[0], 10, 16); err == nil { - k[0] = byte(kind >> 8) - k[1] = byte(kind) + k[0] = letterPrefix + k[1] = byte(kind >> 8) + k[2] = byte(kind) // limit "d" identifier to 30 bytes (so we don't have to grow our byte slice) - n := copy(k[2+8:2+8+30], spl[2]) - offset = 2 + 8 + n + n := copy(k[1+2+8:1+2+8+30], spl[2]) + offset = 1 + 2 + 8 + n + dbi = il.indexTagAddr return dbi, k[0 : offset+4], offset } } } - // index whatever else as utf-8, but limit it to 40 bytes - k = make([]byte, 40+4) - n := copy(k[0:40], tagValue) - offset = n + // index whatever else as utf-8, but limit it to 40 bytes, with tag name prefix + k = make([]byte, 1+40+4) + k[0] = letterPrefix + n := copy(k[1:1+40], tagValue) + offset = 1 + n dbi = il.indexTag - return dbi, k[0 : n+4], offset + return dbi, k[0 : 1+n+4], offset } diff --git a/eventstore/mmm/query_planner.go b/eventstore/mmm/query_planner.go index 18379f6..e4d457f 100644 --- a/eventstore/mmm/query_planner.go +++ b/eventstore/mmm/query_planner.go @@ -107,11 +107,10 @@ func (il *IndexingLayer) prepareQueries(filter nostr.Filter) ( queries = make([]query, len(tagValues)) for i, value := range tagValues { // get key prefix (with full length) and offset where to write the created_at - dbi, k, offset := il.getTagIndexPrefix(value) + dbi, k, offset := il.getTagIndexPrefix(tagKey, value) // remove the last parts part to get just the prefix we want here prefix := k[0:offset] queries[i] = query{i: i, dbi: dbi, prefix: prefix, keySize: len(prefix) + 4, timestampSize: 4} - i++ } // add an extra kind filter if available (only do this on plain tag index, not on ptag-kind index)