mmm: small .EnsureLayer() change, fuzz tests for fixing borked databases
This commit is contained in:
@@ -22,9 +22,6 @@ func doMmmInit(path string) (eventstore.Store, error) {
|
|||||||
if err := mmmm.Init(); err != nil {
|
if err := mmmm.Init(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
il := &mmm.IndexingLayer{}
|
|
||||||
if err := mmmm.EnsureLayer(filepath.Base(path), il); err != nil {
|
return mmmm.EnsureLayer(filepath.Base(path))
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return il, nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ func (b *MultiMmapManager) Rescan() error {
|
|||||||
borked = true
|
borked = true
|
||||||
}
|
}
|
||||||
|
|
||||||
var evt *nostr.Event
|
evt := &nostr.Event{}
|
||||||
var layersToRemove []uint16
|
var layersToRemove []uint16
|
||||||
|
|
||||||
// then for every layer referenced in there we check
|
// then for every layer referenced in there we check
|
||||||
@@ -52,7 +52,7 @@ func (b *MultiMmapManager) Rescan() error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
layer.lmdbEnv.View(func(txn *lmdb.Txn) error {
|
layer.lmdbEnv.Update(func(txn *lmdb.Txn) error {
|
||||||
txn.RawRead = true
|
txn.RawRead = true
|
||||||
|
|
||||||
if borked {
|
if borked {
|
||||||
@@ -80,8 +80,10 @@ func (b *MultiMmapManager) Rescan() error {
|
|||||||
|
|
||||||
// act as if it's borked
|
// act as if it's borked
|
||||||
if err := layer.bruteDeleteIndexes(txn, pos); err != nil {
|
if err := layer.bruteDeleteIndexes(txn, pos); err != nil {
|
||||||
panic(err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
} else {
|
} else {
|
||||||
goto haveEvent
|
goto haveEvent
|
||||||
}
|
}
|
||||||
@@ -109,7 +111,7 @@ func (b *MultiMmapManager) Rescan() error {
|
|||||||
if slices.Contains(layersToRemove, binary.BigEndian.Uint16(val[s:s+2])) {
|
if slices.Contains(layersToRemove, binary.BigEndian.Uint16(val[s:s+2])) {
|
||||||
// swap-delete
|
// swap-delete
|
||||||
copy(val[s:s+2], val[len(val)-2:])
|
copy(val[s:s+2], val[len(val)-2:])
|
||||||
val = val[len(val)-2:]
|
val = val[0 : len(val)-2]
|
||||||
} else {
|
} else {
|
||||||
s += 2
|
s += 2
|
||||||
}
|
}
|
||||||
@@ -191,9 +193,8 @@ func (il *IndexingLayer) bruteDeleteIndexes(iltxn *lmdb.Txn, pos position) error
|
|||||||
} {
|
} {
|
||||||
cursor, err := iltxn.OpenCursor(index)
|
cursor, err := iltxn.OpenCursor(index)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
return err
|
||||||
}
|
}
|
||||||
defer cursor.Close()
|
|
||||||
|
|
||||||
for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) {
|
for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) {
|
||||||
if positionFromBytes(val[0:12]) == pos {
|
if positionFromBytes(val[0:12]) == pos {
|
||||||
@@ -201,6 +202,8 @@ func (il *IndexingLayer) bruteDeleteIndexes(iltxn *lmdb.Txn, pos position) error
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cursor.Close()
|
||||||
|
|
||||||
for _, entry := range toDelete {
|
for _, entry := range toDelete {
|
||||||
if err := iltxn.Del(index, entry.key, entry.val); err != nil {
|
if err := iltxn.Del(index, entry.key, entry.val); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -3,36 +3,38 @@ package mmm
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"iter"
|
||||||
"math/rand/v2"
|
"math/rand/v2"
|
||||||
"os"
|
"os"
|
||||||
|
"slices"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"fiatjaf.com/nostr"
|
"fiatjaf.com/nostr"
|
||||||
"github.com/PowerDNS/lmdb-go/lmdb"
|
"github.com/PowerDNS/lmdb-go/lmdb"
|
||||||
"github.com/rs/zerolog"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func FuzzRescan(f *testing.F) {
|
func FuzzBorkedRescan(f *testing.F) {
|
||||||
f.Add(0, uint(3), uint(10), uint(2))
|
f.Add(0, uint(3), uint(150), uint(40), uint(30), uint(30))
|
||||||
f.Fuzz(func(t *testing.T, seed int, nlayers, nevents, nbork uint) {
|
f.Fuzz(func(t *testing.T, seed int, nlayers, nevents, layerProbability, inconsistencyProbability, borkProbability uint) {
|
||||||
nlayers = nlayers%5 + 1
|
nlayers = nlayers%20 + 1
|
||||||
nevents = nevents%100 + 1
|
nevents = nevents%100 + 1
|
||||||
nbork = nbork % nevents
|
layerProbability = layerProbability % 100
|
||||||
|
borkProbability = borkProbability % 100
|
||||||
|
inconsistencyProbability = inconsistencyProbability % 100
|
||||||
|
|
||||||
// create a temporary directory for the test
|
// create a temporary directory for the test
|
||||||
tmpDir, err := os.MkdirTemp("", "mmm-rescan-test-*")
|
tmpDir, err := os.MkdirTemp("", "mmm-rescan-test-*")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
logger := zerolog.Nop()
|
|
||||||
rnd := rand.New(rand.NewPCG(uint64(seed), 0))
|
rnd := rand.New(rand.NewPCG(uint64(seed), 0))
|
||||||
|
chance := func(n uint) bool {
|
||||||
|
return rnd.UintN(100) < n
|
||||||
|
}
|
||||||
|
|
||||||
// initialize MMM
|
// initialize MMM
|
||||||
mmmm := &MultiMmapManager{
|
mmmm := &MultiMmapManager{Dir: tmpDir}
|
||||||
Dir: tmpDir,
|
|
||||||
Logger: &logger,
|
|
||||||
}
|
|
||||||
|
|
||||||
err = mmmm.Init()
|
err = mmmm.Init()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@@ -41,109 +43,137 @@ func FuzzRescan(f *testing.F) {
|
|||||||
// create layers
|
// create layers
|
||||||
for i := range nlayers {
|
for i := range nlayers {
|
||||||
name := string([]byte{97 + byte(i)})
|
name := string([]byte{97 + byte(i)})
|
||||||
il := &IndexingLayer{}
|
il, err := mmmm.EnsureLayer(name)
|
||||||
err = mmmm.EnsureLayer(name, il)
|
|
||||||
defer il.Close()
|
defer il.Close()
|
||||||
require.NoError(t, err, "layer %s/%d", name, i)
|
require.NoError(t, err, "layer %s/%d", name, i)
|
||||||
}
|
}
|
||||||
|
|
||||||
// create and store events
|
// create and store events
|
||||||
sk := nostr.MustSecretKeyFromHex("945e01e37662430162121b804d3645a86d97df9d256917d86735d0eb219393eb")
|
sk := nostr.MustSecretKeyFromHex("945e01e37662430162121b804d3645a86d97df9d256917d86735d0eb219393eb")
|
||||||
storedEvents := make([]nostr.Event, nevents)
|
storedEvents := make([]nostr.Event, 0, nevents)
|
||||||
|
|
||||||
for i := 0; i < int(nevents); i++ {
|
for i := 0; i < int(nevents); i++ {
|
||||||
tags := nostr.Tags{}
|
|
||||||
// randomly assign to layers
|
|
||||||
for j := range nlayers {
|
|
||||||
if rnd.UintN(2) == 1 {
|
|
||||||
tag := string([]byte{97 + byte(j)})
|
|
||||||
tags = append(tags, nostr.Tag{"t", tag})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(tags) == 0 {
|
|
||||||
// ensure at least one tag
|
|
||||||
tags = append(tags, nostr.Tag{"t", string([]byte{97})})
|
|
||||||
}
|
|
||||||
|
|
||||||
evt := nostr.Event{
|
evt := nostr.Event{
|
||||||
CreatedAt: nostr.Timestamp(i),
|
CreatedAt: nostr.Timestamp(i * 1000),
|
||||||
Kind: nostr.KindTextNote,
|
Kind: nostr.KindTextNote,
|
||||||
Tags: tags,
|
Tags: nostr.Tags{},
|
||||||
Content: fmt.Sprintf("test content %d", i),
|
Content: fmt.Sprintf("test content %d", i),
|
||||||
}
|
}
|
||||||
evt.Sign(sk)
|
evt.Sign(sk)
|
||||||
storedEvents[i] = evt
|
|
||||||
|
|
||||||
// save to appropriate layers
|
// randomly assign to some layers (or none)
|
||||||
for _, layer := range mmmm.layers {
|
for _, layer := range mmmm.layers {
|
||||||
if evt.Tags.FindWithValue("t", layer.name) != nil {
|
if chance(layerProbability) {
|
||||||
err := layer.SaveEvent(evt)
|
err := layer.SaveEvent(evt)
|
||||||
|
storedEvents = append(storedEvents, evt)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// get positions of some events to corrupt
|
// check that all events are still accessible
|
||||||
var positionsToBork []position
|
for _, evt := range storedEvents {
|
||||||
var eventsToBork []nostr.Event
|
// this event should still be accessible
|
||||||
|
gotEvt, layers := mmmm.GetByID(evt.ID)
|
||||||
|
require.NotNil(t, gotEvt, "stored event should still exist")
|
||||||
|
require.NotEmpty(t, layers, "stored event should have layer references")
|
||||||
|
}
|
||||||
|
|
||||||
err = mmmm.lmdbEnv.View(func(txn *lmdb.Txn) error {
|
// bork some events
|
||||||
|
type entry struct {
|
||||||
|
evt nostr.Event
|
||||||
|
layer *IndexingLayer
|
||||||
|
}
|
||||||
|
var inconsistentEvents []entry
|
||||||
|
var borkedEvents []nostr.Event
|
||||||
|
|
||||||
|
err = mmmm.lmdbEnv.Update(func(txn *lmdb.Txn) error {
|
||||||
cursor, err := txn.OpenCursor(mmmm.indexId)
|
cursor, err := txn.OpenCursor(mmmm.indexId)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer cursor.Close()
|
defer cursor.Close()
|
||||||
|
|
||||||
i := 0
|
for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) {
|
||||||
for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil && i < int(nbork); key, val, err = cursor.Get(key, val, lmdb.Next) {
|
pos := positionFromBytes(val[0:12])
|
||||||
if len(val) >= 12 {
|
|
||||||
pos := positionFromBytes(val[0:12])
|
|
||||||
positionsToBork = append(positionsToBork, pos)
|
|
||||||
|
|
||||||
// find the corresponding event
|
if chance(borkProbability) {
|
||||||
for _, evt := range storedEvents {
|
var evt nostr.Event
|
||||||
if bytes.Equal(evt.ID[0:8], key) {
|
mmmm.loadEvent(pos, &evt)
|
||||||
eventsToBork = append(eventsToBork, evt)
|
borkedEvents = append(borkedEvents, evt)
|
||||||
break
|
|
||||||
}
|
// manually corrupt the mmapped file at these positions
|
||||||
|
copy(mmmm.mmapf[pos.start:], []byte("CORRUPTED_DATA_XXXX"))
|
||||||
|
} else if chance(inconsistencyProbability) {
|
||||||
|
// inconsistently delete from some layers
|
||||||
|
var evt nostr.Event
|
||||||
|
mmmm.loadEvent(pos, &evt)
|
||||||
|
|
||||||
|
// manually delete indexes from some layer
|
||||||
|
_, layers := mmmm.GetByID(evt.ID)
|
||||||
|
|
||||||
|
// this won't be erased, just removed from this specific layer
|
||||||
|
layer := layers[rnd.IntN(len(layers))]
|
||||||
|
posb := make([]byte, 12)
|
||||||
|
bytesFromPosition(posb, pos)
|
||||||
|
|
||||||
|
if err := layer.lmdbEnv.Update(func(iltxn *lmdb.Txn) error {
|
||||||
|
return layer.deleteIndexes(iltxn, evt, posb)
|
||||||
|
}); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(layers) == 1 {
|
||||||
|
// this should be completely erased since there is only one layer, so
|
||||||
|
// for checking purposes in this test just treat it as borked
|
||||||
|
borkedEvents = append(borkedEvents, evt)
|
||||||
|
} else {
|
||||||
|
inconsistentEvents = append(inconsistentEvents, entry{evt: evt, layer: layer})
|
||||||
}
|
}
|
||||||
i++
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// manually corrupt the mmapped file at these positions
|
// call Rescan() to remove borked and inconsistent events
|
||||||
for _, pos := range positionsToBork {
|
|
||||||
// write garbage to the position
|
|
||||||
copy(mmmm.mmapf[pos.start:], []byte("CORRUPTED_DATA_XXXX"))
|
|
||||||
}
|
|
||||||
|
|
||||||
// call Rescan and check that borked events are removed
|
|
||||||
err = mmmm.Rescan()
|
err = mmmm.Rescan()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// verify borked events are no longer accessible
|
// verify borked events are no longer accessible
|
||||||
for _, evt := range eventsToBork {
|
for _, evt := range borkedEvents {
|
||||||
gotEvt, layers := mmmm.GetByID(evt.ID)
|
gotEvt, layers := mmmm.GetByID(evt.ID)
|
||||||
require.Nil(t, gotEvt, "borked event should be removed")
|
require.Nilf(t, gotEvt, "borked event %s should have been removed", evt.ID)
|
||||||
require.Empty(t, layers, "borked event should have no layer references")
|
require.Empty(t, layers, "borked event should have no layer references")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that non-borked events are still accessible
|
// check that non-borked events are still accessible
|
||||||
for _, evt := range storedEvents {
|
for _, evt := range storedEvents {
|
||||||
found := false
|
isBorked := slices.ContainsFunc(borkedEvents, func(b nostr.Event) bool {
|
||||||
for _, borkedEvt := range eventsToBork {
|
return bytes.Equal(evt.ID[:], b.ID[:])
|
||||||
if bytes.Equal(evt.ID[:], borkedEvt.ID[:]) {
|
})
|
||||||
found = true
|
if !isBorked {
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
// this event should still be accessible
|
// this event should still be accessible
|
||||||
gotEvt, layers := mmmm.GetByID(evt.ID)
|
gotEvt, layers := mmmm.GetByID(evt.ID)
|
||||||
require.NotNil(t, gotEvt, "non-borked event should still exist")
|
require.NotNilf(t, gotEvt, "non-borked event %s should still exist", evt.ID)
|
||||||
require.NotEmpty(t, layers, "non-borked event should have layer references")
|
require.NotEmpty(t, layers, "non-borked event should have layer references")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check that inconsistent events have been removed from one of their original layers
|
||||||
|
for _, e := range inconsistentEvents {
|
||||||
|
evt := e.evt
|
||||||
|
layer := e.layer
|
||||||
|
|
||||||
|
_, layers := mmmm.GetByID(evt.ID)
|
||||||
|
require.NotContainsf(t, layers, layer, "layers for inconsistent event should not contain %s", layer.name)
|
||||||
|
|
||||||
|
next, done := iter.Pull(layer.QueryEvents(nostr.Filter{
|
||||||
|
Since: evt.CreatedAt - 1,
|
||||||
|
Until: evt.CreatedAt + 1,
|
||||||
|
}, 1))
|
||||||
|
evt, ok := next()
|
||||||
|
done()
|
||||||
|
|
||||||
|
require.False(t, ok, "layer for inconsistent event should not index %s", evt.ID)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,8 +40,7 @@ func FuzzTest(f *testing.F) {
|
|||||||
|
|
||||||
for i := range nlayers {
|
for i := range nlayers {
|
||||||
name := string([]byte{97 + byte(i)})
|
name := string([]byte{97 + byte(i)})
|
||||||
il := &IndexingLayer{}
|
il, err := mmmm.EnsureLayer(name)
|
||||||
err = mmmm.EnsureLayer(name, il)
|
|
||||||
defer il.Close()
|
defer il.Close()
|
||||||
require.NoError(t, err, "layer %s/%d", name, i)
|
require.NoError(t, err, "layer %s/%d", name, i)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -51,6 +51,11 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (b *MultiMmapManager) Init() error {
|
func (b *MultiMmapManager) Init() error {
|
||||||
|
if b.Logger == nil {
|
||||||
|
nopLogger := zerolog.Nop()
|
||||||
|
b.Logger = &nopLogger
|
||||||
|
}
|
||||||
|
|
||||||
// create directory if it doesn't exist
|
// create directory if it doesn't exist
|
||||||
dbpath := filepath.Join(b.Dir, "mmmm")
|
dbpath := filepath.Join(b.Dir, "mmmm")
|
||||||
if err := os.MkdirAll(dbpath, 0755); err != nil {
|
if err := os.MkdirAll(dbpath, 0755); err != nil {
|
||||||
@@ -128,12 +133,14 @@ func (b *MultiMmapManager) Init() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *MultiMmapManager) EnsureLayer(name string, il *IndexingLayer) error {
|
func (b *MultiMmapManager) EnsureLayer(name string) (*IndexingLayer, error) {
|
||||||
b.writeMutex.Lock()
|
b.writeMutex.Lock()
|
||||||
defer b.writeMutex.Unlock()
|
defer b.writeMutex.Unlock()
|
||||||
|
|
||||||
il.mmmm = b
|
il := &IndexingLayer{
|
||||||
il.name = name
|
mmmm: b,
|
||||||
|
name: name,
|
||||||
|
}
|
||||||
|
|
||||||
err := b.lmdbEnv.Update(func(txn *lmdb.Txn) error {
|
err := b.lmdbEnv.Update(func(txn *lmdb.Txn) error {
|
||||||
txn.RawRead = true
|
txn.RawRead = true
|
||||||
@@ -164,11 +171,11 @@ func (b *MultiMmapManager) EnsureLayer(name string, il *IndexingLayer) error {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
b.layers = append(b.layers, il)
|
b.layers = append(b.layers, il)
|
||||||
return nil
|
return il, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *MultiMmapManager) DropLayer(name string) error {
|
func (b *MultiMmapManager) DropLayer(name string) error {
|
||||||
|
|||||||
@@ -68,11 +68,10 @@ func TestMMM(t *testing.T) {
|
|||||||
err := mmmm.Init()
|
err := mmmm.Init()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
il := mmm.IndexingLayer{}
|
il, err := mmmm.EnsureLayer("test")
|
||||||
err = mmmm.EnsureLayer("test", &il)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
test.run(t, &il)
|
test.run(t, il)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user