diff --git a/eventstore/mmm/delete.go b/eventstore/mmm/delete.go index 656323e..7188b21 100644 --- a/eventstore/mmm/delete.go +++ b/eventstore/mmm/delete.go @@ -66,11 +66,8 @@ func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, id nostr.ID) return fmt.Errorf("failed to load event %x when deleting: %w", id[:], err) } - // calculate all index keys we have for this event and delete them - for k := range il.getIndexKeysForEvent(evt) { - if err := iltxn.Del(k.dbi, k.key, val[0:12]); err != nil && !lmdb.IsNotFound(err) { - return fmt.Errorf("index entry %v/%x deletion failed: %w", k.dbi, k.key, err) - } + if err := il.deleteIndexes(iltxn, evt, val[0:12]); err != nil { + return fmt.Errorf("failed to delete indexes for %s=>%v: %w", evt.ID, val[0:12], err) } // if there are no more refs we delete the event from the id index and mmap @@ -82,3 +79,14 @@ func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, id nostr.ID) return nil } + +func (il *IndexingLayer) deleteIndexes(iltxn *lmdb.Txn, event nostr.Event, posbytes []byte) error { + // calculate all index keys we have for this event and delete them + for k := range il.getIndexKeysForEvent(event) { + if err := iltxn.Del(k.dbi, k.key, posbytes); err != nil && !lmdb.IsNotFound(err) { + return fmt.Errorf("index entry %v/%x deletion failed: %w", k.dbi, k.key, err) + } + } + + return nil +} diff --git a/eventstore/mmm/fix.go b/eventstore/mmm/fix.go new file mode 100644 index 0000000..14572ae --- /dev/null +++ b/eventstore/mmm/fix.go @@ -0,0 +1,214 @@ +package mmm + +import ( + "encoding/binary" + "slices" + + "fiatjaf.com/nostr" + "github.com/PowerDNS/lmdb-go/lmdb" +) + +func (b *MultiMmapManager) Rescan() error { + b.writeMutex.Lock() + defer b.writeMutex.Unlock() + + return b.lmdbEnv.Update(func(mmmtxn *lmdb.Txn) error { + cursor, err := mmmtxn.OpenCursor(b.indexId) + if err != nil { + panic(err) + } + defer cursor.Close() + + type entry struct { + idPrefix []byte + pos position + } + var toPurge []entry + + for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) { + pos := positionFromBytes(val[0:12]) + + // for every event in this index + var borked bool + var ts nostr.Timestamp + // we try to load it + if ts_, err := b.loadJustTimestamp(pos); err == nil { + // if we succeed we assume the event is ok for now + borked = false + ts = ts_ + } else { + // otherwise we know it's borked + borked = true + } + + var evt *nostr.Event + var layersToRemove []uint16 + + // then for every layer referenced in there we check + for s := 12; s < len(val); s += 2 { + layerId := binary.BigEndian.Uint16(val[s : s+2]) + layer := b.layers.ByID(layerId) + if layer == nil { + continue + } + + layer.lmdbEnv.View(func(txn *lmdb.Txn) error { + txn.RawRead = true + + if borked { + // for borked events we have to do a bruteforce check + if layer.hasAtPosition(txn, pos) { + // expected -- delete anyway since it's borked + if err := layer.bruteDeleteIndexes(txn, pos); err != nil { + panic(err) + } + } else { + // this stuff is doubly borked -- let's do nothing + return nil + } + } else { + // otherwise we do a more reasonable check + if layer.hasAtTimestampAndPosition(txn, ts, pos) { + // expected, all good + } else { + // can't find it in this layer, so update source reference to remove this + // and clear it from this layer (if any traces remain) + if evt == nil { + if err := b.loadEvent(pos, evt); err != nil { + // can't load event, means it's borked + borked = true + + // act as if it's borked + if err := layer.bruteDeleteIndexes(txn, pos); err != nil { + panic(err) + } + } else { + goto haveEvent + } + } + + haveEvent: + if err := layer.deleteIndexes(txn, *evt, val[0:12]); err != nil { + panic(err) + } + + // we'll remove references to this later + // (no need to do anything in the borked case as everything will be deleted) + layersToRemove = append(layersToRemove, layerId) + } + } + + return nil + }) + } + + if borked { + toPurge = append(toPurge, entry{idPrefix: key, pos: pos}) + } else if len(layersToRemove) > 0 { + for s := 12; s < len(val); { + if slices.Contains(layersToRemove, binary.BigEndian.Uint16(val[s:s+2])) { + // swap-delete + copy(val[s:s+2], val[len(val)-2:]) + val = val[len(val)-2:] + } else { + s += 2 + } + } + + if len(val) > 12 { + if err := mmmtxn.Put(b.indexId, key, val, 0); err != nil { + panic(err) + } + } else { + toPurge = append(toPurge, entry{idPrefix: key, pos: pos}) + } + } + } + + for _, entry := range toPurge { + if err := b.purge(mmmtxn, entry.idPrefix, entry.pos); err != nil { + panic(err) + } + } + + return nil + }) +} + +func (il *IndexingLayer) hasAtTimestampAndPosition(iltxn *lmdb.Txn, ts nostr.Timestamp, pos position) (exists bool) { + cursor, err := iltxn.OpenCursor(il.indexCreatedAt) + if err != nil { + panic(err) + } + defer cursor.Close() + + key := make([]byte, 4) + binary.BigEndian.PutUint32(key[0:4], uint32(ts)) + + if _, val, err := cursor.Get(key, nil, lmdb.SetKey); err == nil { + if positionFromBytes(val[0:12]) == pos { + exists = true + } + } + + return exists +} + +func (il *IndexingLayer) hasAtPosition(iltxn *lmdb.Txn, pos position) (exists bool) { + cursor, err := iltxn.OpenCursor(il.indexCreatedAt) + if err != nil { + panic(err) + } + defer cursor.Close() + + for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) { + if positionFromBytes(val[0:12]) == pos { + exists = true + break + } + } + + return exists +} + +func (il *IndexingLayer) bruteDeleteIndexes(iltxn *lmdb.Txn, pos position) error { + type entry struct { + key []byte + val []byte + } + + toDelete := make([]entry, 0, 8) + + for _, index := range []lmdb.DBI{ + il.indexCreatedAt, + il.indexKind, + il.indexPubkey, + il.indexPubkeyKind, + il.indexPTagKind, + il.indexTag, + il.indexTag32, + il.indexTagAddr, + } { + cursor, err := iltxn.OpenCursor(index) + if err != nil { + panic(err) + } + defer cursor.Close() + + for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) { + if positionFromBytes(val[0:12]) == pos { + toDelete = append(toDelete, entry{key, val}) + } + } + + for _, entry := range toDelete { + if err := iltxn.Del(index, entry.key, entry.val); err != nil { + return err + } + } + + toDelete = toDelete[:0] + } + + return nil +} diff --git a/eventstore/mmm/mmmm.go b/eventstore/mmm/mmmm.go index 3c87863..0ce3780 100644 --- a/eventstore/mmm/mmmm.go +++ b/eventstore/mmm/mmmm.go @@ -293,6 +293,13 @@ func (b *MultiMmapManager) loadEvent(pos position, eventReceiver *nostr.Event) e return betterbinary.Unmarshal(b.mmapf[pos.start:pos.start+uint64(pos.size)], eventReceiver) } +func (b *MultiMmapManager) loadJustTimestamp(pos position) (nostr.Timestamp, error) { + if len(b.mmapf) < int(pos.start+uint64(pos.size)) { + return 0, fmt.Errorf("out of bounds") + } + return betterbinary.GetCreatedAt(b.mmapf[pos.start : pos.start+uint64(pos.size)]), nil +} + // getNextAvailableLayerId iterates through all existing layers to find a vacant id func (b *MultiMmapManager) getNextAvailableLayerId(txn *lmdb.Txn) (uint16, error) { cursor, err := txn.OpenCursor(b.knownLayers)