diff --git a/eventstore/cmd/eventstore/main_mmm.go b/eventstore/cmd/eventstore/main_mmm.go index 285ec7b..6dbfb1e 100644 --- a/eventstore/cmd/eventstore/main_mmm.go +++ b/eventstore/cmd/eventstore/main_mmm.go @@ -22,9 +22,6 @@ func doMmmInit(path string) (eventstore.Store, error) { if err := mmmm.Init(); err != nil { return nil, err } - il := &mmm.IndexingLayer{} - if err := mmmm.EnsureLayer(filepath.Base(path), il); err != nil { - return nil, err - } - return il, nil + + return mmmm.EnsureLayer(filepath.Base(path)) } diff --git a/eventstore/mmm/fix.go b/eventstore/mmm/fix.go index 08f9252..13b4efc 100644 --- a/eventstore/mmm/fix.go +++ b/eventstore/mmm/fix.go @@ -41,7 +41,7 @@ func (b *MultiMmapManager) Rescan() error { borked = true } - var evt *nostr.Event + evt := &nostr.Event{} var layersToRemove []uint16 // then for every layer referenced in there we check @@ -52,7 +52,7 @@ func (b *MultiMmapManager) Rescan() error { continue } - layer.lmdbEnv.View(func(txn *lmdb.Txn) error { + layer.lmdbEnv.Update(func(txn *lmdb.Txn) error { txn.RawRead = true if borked { @@ -80,8 +80,10 @@ func (b *MultiMmapManager) Rescan() error { // act as if it's borked if err := layer.bruteDeleteIndexes(txn, pos); err != nil { - panic(err) + return err } + + return nil } else { goto haveEvent } @@ -109,7 +111,7 @@ func (b *MultiMmapManager) Rescan() error { if slices.Contains(layersToRemove, binary.BigEndian.Uint16(val[s:s+2])) { // swap-delete copy(val[s:s+2], val[len(val)-2:]) - val = val[len(val)-2:] + val = val[0 : len(val)-2] } else { s += 2 } @@ -191,9 +193,8 @@ func (il *IndexingLayer) bruteDeleteIndexes(iltxn *lmdb.Txn, pos position) error } { cursor, err := iltxn.OpenCursor(index) if err != nil { - panic(err) + return err } - defer cursor.Close() for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) { if positionFromBytes(val[0:12]) == pos { @@ -201,6 +202,8 @@ func (il *IndexingLayer) bruteDeleteIndexes(iltxn *lmdb.Txn, pos position) error } } + cursor.Close() + for _, entry := range toDelete { if err := iltxn.Del(index, entry.key, entry.val); err != nil { return err diff --git a/eventstore/mmm/fix_test.go b/eventstore/mmm/fix_test.go index ca46337..66a3ffe 100644 --- a/eventstore/mmm/fix_test.go +++ b/eventstore/mmm/fix_test.go @@ -3,36 +3,38 @@ package mmm import ( "bytes" "fmt" + "iter" "math/rand/v2" "os" + "slices" "testing" "fiatjaf.com/nostr" "github.com/PowerDNS/lmdb-go/lmdb" - "github.com/rs/zerolog" "github.com/stretchr/testify/require" ) -func FuzzRescan(f *testing.F) { - f.Add(0, uint(3), uint(10), uint(2)) - f.Fuzz(func(t *testing.T, seed int, nlayers, nevents, nbork uint) { - nlayers = nlayers%5 + 1 +func FuzzBorkedRescan(f *testing.F) { + f.Add(0, uint(3), uint(150), uint(40), uint(30), uint(30)) + f.Fuzz(func(t *testing.T, seed int, nlayers, nevents, layerProbability, inconsistencyProbability, borkProbability uint) { + nlayers = nlayers%20 + 1 nevents = nevents%100 + 1 - nbork = nbork % nevents + layerProbability = layerProbability % 100 + borkProbability = borkProbability % 100 + inconsistencyProbability = inconsistencyProbability % 100 // create a temporary directory for the test tmpDir, err := os.MkdirTemp("", "mmm-rescan-test-*") require.NoError(t, err) defer os.RemoveAll(tmpDir) - logger := zerolog.Nop() rnd := rand.New(rand.NewPCG(uint64(seed), 0)) + chance := func(n uint) bool { + return rnd.UintN(100) < n + } // initialize MMM - mmmm := &MultiMmapManager{ - Dir: tmpDir, - Logger: &logger, - } + mmmm := &MultiMmapManager{Dir: tmpDir} err = mmmm.Init() require.NoError(t, err) @@ -41,109 +43,137 @@ func FuzzRescan(f *testing.F) { // create layers for i := range nlayers { name := string([]byte{97 + byte(i)}) - il := &IndexingLayer{} - err = mmmm.EnsureLayer(name, il) + il, err := mmmm.EnsureLayer(name) defer il.Close() require.NoError(t, err, "layer %s/%d", name, i) } // create and store events sk := nostr.MustSecretKeyFromHex("945e01e37662430162121b804d3645a86d97df9d256917d86735d0eb219393eb") - storedEvents := make([]nostr.Event, nevents) + storedEvents := make([]nostr.Event, 0, nevents) for i := 0; i < int(nevents); i++ { - tags := nostr.Tags{} - // randomly assign to layers - for j := range nlayers { - if rnd.UintN(2) == 1 { - tag := string([]byte{97 + byte(j)}) - tags = append(tags, nostr.Tag{"t", tag}) - } - } - if len(tags) == 0 { - // ensure at least one tag - tags = append(tags, nostr.Tag{"t", string([]byte{97})}) - } - evt := nostr.Event{ - CreatedAt: nostr.Timestamp(i), + CreatedAt: nostr.Timestamp(i * 1000), Kind: nostr.KindTextNote, - Tags: tags, + Tags: nostr.Tags{}, Content: fmt.Sprintf("test content %d", i), } evt.Sign(sk) - storedEvents[i] = evt - // save to appropriate layers + // randomly assign to some layers (or none) for _, layer := range mmmm.layers { - if evt.Tags.FindWithValue("t", layer.name) != nil { + if chance(layerProbability) { err := layer.SaveEvent(evt) + storedEvents = append(storedEvents, evt) require.NoError(t, err) } } } - // get positions of some events to corrupt - var positionsToBork []position - var eventsToBork []nostr.Event + // check that all events are still accessible + for _, evt := range storedEvents { + // this event should still be accessible + gotEvt, layers := mmmm.GetByID(evt.ID) + require.NotNil(t, gotEvt, "stored event should still exist") + require.NotEmpty(t, layers, "stored event should have layer references") + } - err = mmmm.lmdbEnv.View(func(txn *lmdb.Txn) error { + // bork some events + type entry struct { + evt nostr.Event + layer *IndexingLayer + } + var inconsistentEvents []entry + var borkedEvents []nostr.Event + + err = mmmm.lmdbEnv.Update(func(txn *lmdb.Txn) error { cursor, err := txn.OpenCursor(mmmm.indexId) require.NoError(t, err) defer cursor.Close() - i := 0 - for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil && i < int(nbork); key, val, err = cursor.Get(key, val, lmdb.Next) { - if len(val) >= 12 { - pos := positionFromBytes(val[0:12]) - positionsToBork = append(positionsToBork, pos) + for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) { + pos := positionFromBytes(val[0:12]) - // find the corresponding event - for _, evt := range storedEvents { - if bytes.Equal(evt.ID[0:8], key) { - eventsToBork = append(eventsToBork, evt) - break - } + if chance(borkProbability) { + var evt nostr.Event + mmmm.loadEvent(pos, &evt) + borkedEvents = append(borkedEvents, evt) + + // manually corrupt the mmapped file at these positions + copy(mmmm.mmapf[pos.start:], []byte("CORRUPTED_DATA_XXXX")) + } else if chance(inconsistencyProbability) { + // inconsistently delete from some layers + var evt nostr.Event + mmmm.loadEvent(pos, &evt) + + // manually delete indexes from some layer + _, layers := mmmm.GetByID(evt.ID) + + // this won't be erased, just removed from this specific layer + layer := layers[rnd.IntN(len(layers))] + posb := make([]byte, 12) + bytesFromPosition(posb, pos) + + if err := layer.lmdbEnv.Update(func(iltxn *lmdb.Txn) error { + return layer.deleteIndexes(iltxn, evt, posb) + }); err != nil { + panic(err) + } + + if len(layers) == 1 { + // this should be completely erased since there is only one layer, so + // for checking purposes in this test just treat it as borked + borkedEvents = append(borkedEvents, evt) + } else { + inconsistentEvents = append(inconsistentEvents, entry{evt: evt, layer: layer}) } - i++ } } return nil }) require.NoError(t, err) - // manually corrupt the mmapped file at these positions - for _, pos := range positionsToBork { - // write garbage to the position - copy(mmmm.mmapf[pos.start:], []byte("CORRUPTED_DATA_XXXX")) - } - - // call Rescan and check that borked events are removed + // call Rescan() to remove borked and inconsistent events err = mmmm.Rescan() require.NoError(t, err) // verify borked events are no longer accessible - for _, evt := range eventsToBork { + for _, evt := range borkedEvents { gotEvt, layers := mmmm.GetByID(evt.ID) - require.Nil(t, gotEvt, "borked event should be removed") + require.Nilf(t, gotEvt, "borked event %s should have been removed", evt.ID) require.Empty(t, layers, "borked event should have no layer references") } - // Test that non-borked events are still accessible + // check that non-borked events are still accessible for _, evt := range storedEvents { - found := false - for _, borkedEvt := range eventsToBork { - if bytes.Equal(evt.ID[:], borkedEvt.ID[:]) { - found = true - break - } - } - if !found { + isBorked := slices.ContainsFunc(borkedEvents, func(b nostr.Event) bool { + return bytes.Equal(evt.ID[:], b.ID[:]) + }) + if !isBorked { // this event should still be accessible gotEvt, layers := mmmm.GetByID(evt.ID) - require.NotNil(t, gotEvt, "non-borked event should still exist") + require.NotNilf(t, gotEvt, "non-borked event %s should still exist", evt.ID) require.NotEmpty(t, layers, "non-borked event should have layer references") } } + + // check that inconsistent events have been removed from one of their original layers + for _, e := range inconsistentEvents { + evt := e.evt + layer := e.layer + + _, layers := mmmm.GetByID(evt.ID) + require.NotContainsf(t, layers, layer, "layers for inconsistent event should not contain %s", layer.name) + + next, done := iter.Pull(layer.QueryEvents(nostr.Filter{ + Since: evt.CreatedAt - 1, + Until: evt.CreatedAt + 1, + }, 1)) + evt, ok := next() + done() + + require.False(t, ok, "layer for inconsistent event should not index %s", evt.ID) + } }) } diff --git a/eventstore/mmm/fuzz_test.go b/eventstore/mmm/fuzz_test.go index 1dcbd48..5d23c83 100644 --- a/eventstore/mmm/fuzz_test.go +++ b/eventstore/mmm/fuzz_test.go @@ -40,8 +40,7 @@ func FuzzTest(f *testing.F) { for i := range nlayers { name := string([]byte{97 + byte(i)}) - il := &IndexingLayer{} - err = mmmm.EnsureLayer(name, il) + il, err := mmmm.EnsureLayer(name) defer il.Close() require.NoError(t, err, "layer %s/%d", name, i) } diff --git a/eventstore/mmm/mmmm.go b/eventstore/mmm/mmmm.go index fd3c34d..f39549a 100644 --- a/eventstore/mmm/mmmm.go +++ b/eventstore/mmm/mmmm.go @@ -51,6 +51,11 @@ const ( ) func (b *MultiMmapManager) Init() error { + if b.Logger == nil { + nopLogger := zerolog.Nop() + b.Logger = &nopLogger + } + // create directory if it doesn't exist dbpath := filepath.Join(b.Dir, "mmmm") if err := os.MkdirAll(dbpath, 0755); err != nil { @@ -128,12 +133,14 @@ func (b *MultiMmapManager) Init() error { return nil } -func (b *MultiMmapManager) EnsureLayer(name string, il *IndexingLayer) error { +func (b *MultiMmapManager) EnsureLayer(name string) (*IndexingLayer, error) { b.writeMutex.Lock() defer b.writeMutex.Unlock() - il.mmmm = b - il.name = name + il := &IndexingLayer{ + mmmm: b, + name: name, + } err := b.lmdbEnv.Update(func(txn *lmdb.Txn) error { txn.RawRead = true @@ -164,11 +171,11 @@ func (b *MultiMmapManager) EnsureLayer(name string, il *IndexingLayer) error { } }) if err != nil { - return err + return nil, err } b.layers = append(b.layers, il) - return nil + return il, nil } func (b *MultiMmapManager) DropLayer(name string) error { diff --git a/eventstore/test/db_test.go b/eventstore/test/db_test.go index 6b7bf24..8fbbb59 100644 --- a/eventstore/test/db_test.go +++ b/eventstore/test/db_test.go @@ -68,11 +68,10 @@ func TestMMM(t *testing.T) { err := mmmm.Init() require.NoError(t, err) - il := mmm.IndexingLayer{} - err = mmmm.EnsureLayer("test", &il) + il, err := mmmm.EnsureLayer("test") require.NoError(t, err) - test.run(t, &il) + test.run(t, il) }) } }