diff --git a/eventstore/mmm/fix.go b/eventstore/mmm/fix.go index 13b4efc..1e2e820 100644 --- a/eventstore/mmm/fix.go +++ b/eventstore/mmm/fix.go @@ -133,7 +133,12 @@ func (b *MultiMmapManager) Rescan() error { } } - return b.GatherFreeRanges(mmmtxn) + b.freeRanges, err = b.gatherFreeRanges(mmmtxn) + if err != nil { + return err + } + + return nil }) } diff --git a/eventstore/mmm/freeranges.go b/eventstore/mmm/freeranges.go index 63a9f30..045cad0 100644 --- a/eventstore/mmm/freeranges.go +++ b/eventstore/mmm/freeranges.go @@ -8,14 +8,14 @@ import ( "github.com/PowerDNS/lmdb-go/lmdb" ) -func (b *MultiMmapManager) GatherFreeRanges(txn *lmdb.Txn) error { +func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) (positions, error) { cursor, err := txn.OpenCursor(b.indexId) if err != nil { - return fmt.Errorf("failed to open cursor on indexId: %w", err) + return nil, fmt.Errorf("failed to open cursor on indexId: %w", err) } defer cursor.Close() - usedPositions := make([]position, 0, 256) + usedPositions := make(positions, 0, 256) for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) { pos := positionFromBytes(val[0:12]) usedPositions = append(usedPositions, pos) @@ -25,79 +25,85 @@ func (b *MultiMmapManager) GatherFreeRanges(txn *lmdb.Txn) error { slices.SortFunc(usedPositions, func(a, b position) int { return cmp.Compare(a.start, b.start) }) // calculate free ranges as gaps between used positions - b.freeRanges = make([]position, 0, len(usedPositions)/2) + freeRanges := make(positions, 0, len(usedPositions)/2) var currentStart uint64 = 0 - for _, pos := range usedPositions { - if pos.start > currentStart { + for _, used := range usedPositions { + if used.start > currentStart { // gap from currentStart to pos.start - freeSize := pos.start - currentStart + freeSize := used.start - currentStart if freeSize > 0 { - b.freeRanges = append(b.freeRanges, position{ + freeRanges = append(freeRanges, position{ start: currentStart, size: uint32(freeSize), }) } } - currentStart = pos.start + uint64(pos.size) + currentStart = used.start + uint64(used.size) } - // sort free ranges by size (smallest first, as before) - slices.SortFunc(b.freeRanges, func(a, b position) int { return cmp.Compare(a.size, b.size) }) - - logOp := b.Logger.Debug() - for _, pos := range b.freeRanges { - if pos.size > 20 { - logOp = logOp.Uint32(fmt.Sprintf("%d", pos.start), pos.size) - } - } - logOp.Msg("calculated free ranges from index scan") - - return nil + return freeRanges, nil } -func (b *MultiMmapManager) mergeNewFreeRange(pos position) (isAtEnd bool) { - // before adding check if we can merge this with some other range - // (to merge means to delete the previous and add a new one) - toDelete := make([]int, 0, 2) - for f, fr := range b.freeRanges { - if pos.start+uint64(pos.size) == fr.start { - // [new_pos_to_be_freed][existing_fr] -> merge! - toDelete = append(toDelete, f) - pos.size = pos.size + fr.size - } else if fr.start+uint64(fr.size) == pos.start { - // [existing_fr][new_pos_to_be_freed] -> merge! - toDelete = append(toDelete, f) - pos.start = fr.start - pos.size = fr.size + pos.size +// this injects the new free range into the list, merging it with existing free ranges if necessary. +// it also takes a pointer so it can modify it for the caller to use it in setting up the new mmapf. +func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange *position) (isAtEnd bool) { + // use binary search to find the insertion point for the new pos + idx, exists := slices.BinarySearchFunc(b.freeRanges, newFreeRange.start, func(item position, target uint64) int { + return cmp.Compare(item.start, target) + }) + + if exists { + panic(fmt.Errorf("can't add free range that already exists: %s", newFreeRange)) + } + + deleteStart := -1 + deleting := 0 + + // check the range immediately before + if idx > 0 { + before := b.freeRanges[idx-1] + if before.start+uint64(before.size) == newFreeRange.start { + deleteStart = idx - 1 + deleting++ + newFreeRange.start = before.start + newFreeRange.size = before.size + newFreeRange.size } } - slices.SortFunc(toDelete, func(a, b int) int { return b - a }) - for _, idx := range toDelete { - b.freeRanges = slices.Delete(b.freeRanges, idx, idx+1) + + // check the range immediately after + if idx < len(b.freeRanges) { + after := b.freeRanges[idx] + if newFreeRange.start+uint64(newFreeRange.size) == after.start { + if deleteStart == -1 { + deleteStart = idx + } + deleting++ + + newFreeRange.size = newFreeRange.size + after.size + } } // when we're at the end of a file we just delete everything and don't add new free ranges // the caller will truncate the mmap file and adjust the position accordingly - if pos.start+uint64(pos.size) == b.mmapfEnd { + if newFreeRange.start+uint64(newFreeRange.size) == b.mmapfEnd { + if deleting > 0 { + b.freeRanges = slices.Delete(b.freeRanges, deleteStart, deleteStart+deleting) + } return true } - b.addNewFreeRange(pos) + switch deleting { + case 0: + // if we are not deleting anything we must insert the new free range + b.freeRanges = slices.Insert(b.freeRanges, idx, *newFreeRange) + case 1: + // if we're deleting a single range, don't delete it, modify it in-place instead. + b.freeRanges[deleteStart] = *newFreeRange + case 2: + // now if we're deleting two ranges, delete just one instead and modify the other in place + b.freeRanges[deleteStart] = *newFreeRange + b.freeRanges = slices.Delete(b.freeRanges, deleteStart+1, deleteStart+1+1) + } + return false } - -func (b *MultiMmapManager) addNewFreeRange(pos position) { - // update freeranges slice in memory - idx, _ := slices.BinarySearchFunc(b.freeRanges, pos, func(item, target position) int { - if item.size > target.size { - return 1 - } else if target.size > item.size { - return -1 - } else if item.start > target.start { - return 1 - } else { - return -1 - } - }) - b.freeRanges = slices.Insert(b.freeRanges, idx, pos) -} diff --git a/eventstore/mmm/freeranges_test.go b/eventstore/mmm/freeranges_test.go new file mode 100644 index 0000000..9886dc0 --- /dev/null +++ b/eventstore/mmm/freeranges_test.go @@ -0,0 +1,85 @@ +package mmm + +import ( + "math/rand/v2" + "os" + "strings" + "testing" + + "fiatjaf.com/nostr" + "github.com/PowerDNS/lmdb-go/lmdb" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func FuzzFreeRanges(f *testing.F) { + f.Add(0) + f.Fuzz(func(t *testing.T, seed int) { + // create a temporary directory for the test + tmpDir, err := os.MkdirTemp("", "mmm-freeranges-test-*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + logger := zerolog.Nop() + rnd := rand.New(rand.NewPCG(uint64(seed), 0)) + chance := func(n uint) bool { + return rnd.UintN(100) < n + } + + // initialize MMM + mmmm := &MultiMmapManager{ + Dir: tmpDir, + Logger: &logger, + } + + err = mmmm.Init() + require.NoError(t, err) + defer mmmm.Close() + + // create a single layer + il, err := mmmm.EnsureLayer("a") + require.NoError(t, err) + defer il.Close() + + sk := nostr.MustSecretKeyFromHex("945e01e37662430162121b804d3645a86d97df9d256917d86735d0eb219393eb") + + total := 0 + for { + for range rnd.IntN(40) { + evt := nostr.Event{ + CreatedAt: nostr.Timestamp(rnd.Uint32()), + Kind: 1, + Content: strings.Repeat("z", rnd.IntN(1000)), + } + evt.Sign(sk) + err := il.SaveEvent(evt) + require.NoError(t, err) + + total++ + } + + // delete some events + if total > 0 { + for range rnd.IntN(total) { + for evt := range il.QueryEvents(nostr.Filter{}, 1) { + err := il.DeleteEvent(evt.ID) + require.NoError(t, err) + + total-- + } + } + } + + mmmm.lmdbEnv.View(func(txn *lmdb.Txn) error { + expectedFreeRanges, _ := mmmm.gatherFreeRanges(txn) + require.Equalf(t, expectedFreeRanges, mmmm.freeRanges, "expected %s, got %s", expectedFreeRanges, mmmm.freeRanges) + return nil + }) + t.Logf("loop -- current %d", total) + + if chance(30) { + break + } + } + }) +} diff --git a/eventstore/mmm/mmmm.go b/eventstore/mmm/mmmm.go index f39549a..a30d728 100644 --- a/eventstore/mmm/mmmm.go +++ b/eventstore/mmm/mmmm.go @@ -37,7 +37,7 @@ type MultiMmapManager struct { knownLayers lmdb.DBI indexId lmdb.DBI - freeRanges []position + freeRanges positions } func (b *MultiMmapManager) String() string { @@ -123,7 +123,18 @@ func (b *MultiMmapManager) Init() error { } // scan index table to calculate free ranges from used positions - b.GatherFreeRanges(txn) + b.freeRanges, err = b.gatherFreeRanges(txn) + if err != nil { + return err + } + + logOp := b.Logger.Debug() + for _, pos := range b.freeRanges { + if pos.size > 20 { + logOp = logOp.Uint32(fmt.Sprintf("%d", pos.start), pos.size) + } + } + logOp.Msg("calculated free ranges from index scan") return nil }); err != nil { diff --git a/eventstore/mmm/position.go b/eventstore/mmm/position.go index bcb34b1..d692300 100644 --- a/eventstore/mmm/position.go +++ b/eventstore/mmm/position.go @@ -3,8 +3,23 @@ package mmm import ( "encoding/binary" "fmt" + "strings" ) +type positions []position + +func (poss positions) String() string { + str := strings.Builder{} + str.Grow(10 + 20*len(poss)) + str.WriteString("positions:[") + for _, pos := range poss { + str.WriteByte(' ') + str.WriteString(pos.String()) + } + str.WriteString(" ]") + return str.String() +} + type position struct { start uint64 size uint32 diff --git a/eventstore/mmm/purge.go b/eventstore/mmm/purge.go index 82daa7d..bb6b293 100644 --- a/eventstore/mmm/purge.go +++ b/eventstore/mmm/purge.go @@ -17,8 +17,7 @@ func (b *MultiMmapManager) purge(txn *lmdb.Txn, idPrefix8 []byte, pos position) } // will add the current range to free ranges, which means it is "deleted" (or merge with existing) - isAtEnd := b.mergeNewFreeRange(pos) - + isAtEnd := b.mergeNewFreeRange(&pos) if isAtEnd { // when at the end, truncate the mmap // [new_pos_to_be_freed][end_of_file] -> shrink file! diff --git a/eventstore/mmm/save.go b/eventstore/mmm/save.go index fa17557..9f70512 100644 --- a/eventstore/mmm/save.go +++ b/eventstore/mmm/save.go @@ -104,14 +104,15 @@ func (b *MultiMmapManager) storeOn( pos.start = fr.start // modify the free ranges we're keeping track of - // (i.e. delete the current and add a new freerange with the remaining space) - b.freeRanges = slices.Delete(b.freeRanges, f, f+1) - - if pos.size != fr.size { - b.addNewFreeRange(position{ + if pos.size == fr.size { + // if we've used it entirely just delete it + b.freeRanges = slices.Delete(b.freeRanges, f, f+1) + } else { + // otherwise modify it in place + b.freeRanges[f] = position{ start: fr.start + uint64(pos.size), size: fr.size - pos.size, - }) + } } break diff --git a/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/479307f78037b7d5 b/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/479307f78037b7d5 new file mode 100644 index 0000000..8ed7006 --- /dev/null +++ b/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/479307f78037b7d5 @@ -0,0 +1,2 @@ +go test fuzz v1 +int(-185) diff --git a/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/e574b3a75c531fc8 b/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/e574b3a75c531fc8 new file mode 100644 index 0000000..c56e669 --- /dev/null +++ b/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/e574b3a75c531fc8 @@ -0,0 +1,2 @@ +go test fuzz v1 +int(-92)