mmm: freeranges tests and fixes.
This commit is contained in:
@@ -133,7 +133,12 @@ func (b *MultiMmapManager) Rescan() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.GatherFreeRanges(mmmtxn)
|
b.freeRanges, err = b.gatherFreeRanges(mmmtxn)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,14 +8,14 @@ import (
|
|||||||
"github.com/PowerDNS/lmdb-go/lmdb"
|
"github.com/PowerDNS/lmdb-go/lmdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (b *MultiMmapManager) GatherFreeRanges(txn *lmdb.Txn) error {
|
func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) (positions, error) {
|
||||||
cursor, err := txn.OpenCursor(b.indexId)
|
cursor, err := txn.OpenCursor(b.indexId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open cursor on indexId: %w", err)
|
return nil, fmt.Errorf("failed to open cursor on indexId: %w", err)
|
||||||
}
|
}
|
||||||
defer cursor.Close()
|
defer cursor.Close()
|
||||||
|
|
||||||
usedPositions := make([]position, 0, 256)
|
usedPositions := make(positions, 0, 256)
|
||||||
for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) {
|
for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) {
|
||||||
pos := positionFromBytes(val[0:12])
|
pos := positionFromBytes(val[0:12])
|
||||||
usedPositions = append(usedPositions, pos)
|
usedPositions = append(usedPositions, pos)
|
||||||
@@ -25,79 +25,85 @@ func (b *MultiMmapManager) GatherFreeRanges(txn *lmdb.Txn) error {
|
|||||||
slices.SortFunc(usedPositions, func(a, b position) int { return cmp.Compare(a.start, b.start) })
|
slices.SortFunc(usedPositions, func(a, b position) int { return cmp.Compare(a.start, b.start) })
|
||||||
|
|
||||||
// calculate free ranges as gaps between used positions
|
// calculate free ranges as gaps between used positions
|
||||||
b.freeRanges = make([]position, 0, len(usedPositions)/2)
|
freeRanges := make(positions, 0, len(usedPositions)/2)
|
||||||
var currentStart uint64 = 0
|
var currentStart uint64 = 0
|
||||||
for _, pos := range usedPositions {
|
for _, used := range usedPositions {
|
||||||
if pos.start > currentStart {
|
if used.start > currentStart {
|
||||||
// gap from currentStart to pos.start
|
// gap from currentStart to pos.start
|
||||||
freeSize := pos.start - currentStart
|
freeSize := used.start - currentStart
|
||||||
if freeSize > 0 {
|
if freeSize > 0 {
|
||||||
b.freeRanges = append(b.freeRanges, position{
|
freeRanges = append(freeRanges, position{
|
||||||
start: currentStart,
|
start: currentStart,
|
||||||
size: uint32(freeSize),
|
size: uint32(freeSize),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
currentStart = pos.start + uint64(pos.size)
|
currentStart = used.start + uint64(used.size)
|
||||||
}
|
}
|
||||||
|
|
||||||
// sort free ranges by size (smallest first, as before)
|
return freeRanges, nil
|
||||||
slices.SortFunc(b.freeRanges, func(a, b position) int { return cmp.Compare(a.size, b.size) })
|
|
||||||
|
|
||||||
logOp := b.Logger.Debug()
|
|
||||||
for _, pos := range b.freeRanges {
|
|
||||||
if pos.size > 20 {
|
|
||||||
logOp = logOp.Uint32(fmt.Sprintf("%d", pos.start), pos.size)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logOp.Msg("calculated free ranges from index scan")
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *MultiMmapManager) mergeNewFreeRange(pos position) (isAtEnd bool) {
|
// this injects the new free range into the list, merging it with existing free ranges if necessary.
|
||||||
// before adding check if we can merge this with some other range
|
// it also takes a pointer so it can modify it for the caller to use it in setting up the new mmapf.
|
||||||
// (to merge means to delete the previous and add a new one)
|
func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange *position) (isAtEnd bool) {
|
||||||
toDelete := make([]int, 0, 2)
|
// use binary search to find the insertion point for the new pos
|
||||||
for f, fr := range b.freeRanges {
|
idx, exists := slices.BinarySearchFunc(b.freeRanges, newFreeRange.start, func(item position, target uint64) int {
|
||||||
if pos.start+uint64(pos.size) == fr.start {
|
return cmp.Compare(item.start, target)
|
||||||
// [new_pos_to_be_freed][existing_fr] -> merge!
|
})
|
||||||
toDelete = append(toDelete, f)
|
|
||||||
pos.size = pos.size + fr.size
|
if exists {
|
||||||
} else if fr.start+uint64(fr.size) == pos.start {
|
panic(fmt.Errorf("can't add free range that already exists: %s", newFreeRange))
|
||||||
// [existing_fr][new_pos_to_be_freed] -> merge!
|
}
|
||||||
toDelete = append(toDelete, f)
|
|
||||||
pos.start = fr.start
|
deleteStart := -1
|
||||||
pos.size = fr.size + pos.size
|
deleting := 0
|
||||||
|
|
||||||
|
// check the range immediately before
|
||||||
|
if idx > 0 {
|
||||||
|
before := b.freeRanges[idx-1]
|
||||||
|
if before.start+uint64(before.size) == newFreeRange.start {
|
||||||
|
deleteStart = idx - 1
|
||||||
|
deleting++
|
||||||
|
newFreeRange.start = before.start
|
||||||
|
newFreeRange.size = before.size + newFreeRange.size
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
slices.SortFunc(toDelete, func(a, b int) int { return b - a })
|
|
||||||
for _, idx := range toDelete {
|
// check the range immediately after
|
||||||
b.freeRanges = slices.Delete(b.freeRanges, idx, idx+1)
|
if idx < len(b.freeRanges) {
|
||||||
|
after := b.freeRanges[idx]
|
||||||
|
if newFreeRange.start+uint64(newFreeRange.size) == after.start {
|
||||||
|
if deleteStart == -1 {
|
||||||
|
deleteStart = idx
|
||||||
|
}
|
||||||
|
deleting++
|
||||||
|
|
||||||
|
newFreeRange.size = newFreeRange.size + after.size
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// when we're at the end of a file we just delete everything and don't add new free ranges
|
// when we're at the end of a file we just delete everything and don't add new free ranges
|
||||||
// the caller will truncate the mmap file and adjust the position accordingly
|
// the caller will truncate the mmap file and adjust the position accordingly
|
||||||
if pos.start+uint64(pos.size) == b.mmapfEnd {
|
if newFreeRange.start+uint64(newFreeRange.size) == b.mmapfEnd {
|
||||||
|
if deleting > 0 {
|
||||||
|
b.freeRanges = slices.Delete(b.freeRanges, deleteStart, deleteStart+deleting)
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
b.addNewFreeRange(pos)
|
switch deleting {
|
||||||
|
case 0:
|
||||||
|
// if we are not deleting anything we must insert the new free range
|
||||||
|
b.freeRanges = slices.Insert(b.freeRanges, idx, *newFreeRange)
|
||||||
|
case 1:
|
||||||
|
// if we're deleting a single range, don't delete it, modify it in-place instead.
|
||||||
|
b.freeRanges[deleteStart] = *newFreeRange
|
||||||
|
case 2:
|
||||||
|
// now if we're deleting two ranges, delete just one instead and modify the other in place
|
||||||
|
b.freeRanges[deleteStart] = *newFreeRange
|
||||||
|
b.freeRanges = slices.Delete(b.freeRanges, deleteStart+1, deleteStart+1+1)
|
||||||
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *MultiMmapManager) addNewFreeRange(pos position) {
|
|
||||||
// update freeranges slice in memory
|
|
||||||
idx, _ := slices.BinarySearchFunc(b.freeRanges, pos, func(item, target position) int {
|
|
||||||
if item.size > target.size {
|
|
||||||
return 1
|
|
||||||
} else if target.size > item.size {
|
|
||||||
return -1
|
|
||||||
} else if item.start > target.start {
|
|
||||||
return 1
|
|
||||||
} else {
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
})
|
|
||||||
b.freeRanges = slices.Insert(b.freeRanges, idx, pos)
|
|
||||||
}
|
|
||||||
|
|||||||
85
eventstore/mmm/freeranges_test.go
Normal file
85
eventstore/mmm/freeranges_test.go
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
package mmm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand/v2"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"fiatjaf.com/nostr"
|
||||||
|
"github.com/PowerDNS/lmdb-go/lmdb"
|
||||||
|
"github.com/rs/zerolog"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func FuzzFreeRanges(f *testing.F) {
|
||||||
|
f.Add(0)
|
||||||
|
f.Fuzz(func(t *testing.T, seed int) {
|
||||||
|
// create a temporary directory for the test
|
||||||
|
tmpDir, err := os.MkdirTemp("", "mmm-freeranges-test-*")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
logger := zerolog.Nop()
|
||||||
|
rnd := rand.New(rand.NewPCG(uint64(seed), 0))
|
||||||
|
chance := func(n uint) bool {
|
||||||
|
return rnd.UintN(100) < n
|
||||||
|
}
|
||||||
|
|
||||||
|
// initialize MMM
|
||||||
|
mmmm := &MultiMmapManager{
|
||||||
|
Dir: tmpDir,
|
||||||
|
Logger: &logger,
|
||||||
|
}
|
||||||
|
|
||||||
|
err = mmmm.Init()
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer mmmm.Close()
|
||||||
|
|
||||||
|
// create a single layer
|
||||||
|
il, err := mmmm.EnsureLayer("a")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer il.Close()
|
||||||
|
|
||||||
|
sk := nostr.MustSecretKeyFromHex("945e01e37662430162121b804d3645a86d97df9d256917d86735d0eb219393eb")
|
||||||
|
|
||||||
|
total := 0
|
||||||
|
for {
|
||||||
|
for range rnd.IntN(40) {
|
||||||
|
evt := nostr.Event{
|
||||||
|
CreatedAt: nostr.Timestamp(rnd.Uint32()),
|
||||||
|
Kind: 1,
|
||||||
|
Content: strings.Repeat("z", rnd.IntN(1000)),
|
||||||
|
}
|
||||||
|
evt.Sign(sk)
|
||||||
|
err := il.SaveEvent(evt)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
total++
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete some events
|
||||||
|
if total > 0 {
|
||||||
|
for range rnd.IntN(total) {
|
||||||
|
for evt := range il.QueryEvents(nostr.Filter{}, 1) {
|
||||||
|
err := il.DeleteEvent(evt.ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
total--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mmmm.lmdbEnv.View(func(txn *lmdb.Txn) error {
|
||||||
|
expectedFreeRanges, _ := mmmm.gatherFreeRanges(txn)
|
||||||
|
require.Equalf(t, expectedFreeRanges, mmmm.freeRanges, "expected %s, got %s", expectedFreeRanges, mmmm.freeRanges)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
t.Logf("loop -- current %d", total)
|
||||||
|
|
||||||
|
if chance(30) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -37,7 +37,7 @@ type MultiMmapManager struct {
|
|||||||
knownLayers lmdb.DBI
|
knownLayers lmdb.DBI
|
||||||
indexId lmdb.DBI
|
indexId lmdb.DBI
|
||||||
|
|
||||||
freeRanges []position
|
freeRanges positions
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *MultiMmapManager) String() string {
|
func (b *MultiMmapManager) String() string {
|
||||||
@@ -123,7 +123,18 @@ func (b *MultiMmapManager) Init() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// scan index table to calculate free ranges from used positions
|
// scan index table to calculate free ranges from used positions
|
||||||
b.GatherFreeRanges(txn)
|
b.freeRanges, err = b.gatherFreeRanges(txn)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
logOp := b.Logger.Debug()
|
||||||
|
for _, pos := range b.freeRanges {
|
||||||
|
if pos.size > 20 {
|
||||||
|
logOp = logOp.Uint32(fmt.Sprintf("%d", pos.start), pos.size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logOp.Msg("calculated free ranges from index scan")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
|||||||
@@ -3,8 +3,23 @@ package mmm
|
|||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type positions []position
|
||||||
|
|
||||||
|
func (poss positions) String() string {
|
||||||
|
str := strings.Builder{}
|
||||||
|
str.Grow(10 + 20*len(poss))
|
||||||
|
str.WriteString("positions:[")
|
||||||
|
for _, pos := range poss {
|
||||||
|
str.WriteByte(' ')
|
||||||
|
str.WriteString(pos.String())
|
||||||
|
}
|
||||||
|
str.WriteString(" ]")
|
||||||
|
return str.String()
|
||||||
|
}
|
||||||
|
|
||||||
type position struct {
|
type position struct {
|
||||||
start uint64
|
start uint64
|
||||||
size uint32
|
size uint32
|
||||||
|
|||||||
@@ -17,8 +17,7 @@ func (b *MultiMmapManager) purge(txn *lmdb.Txn, idPrefix8 []byte, pos position)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// will add the current range to free ranges, which means it is "deleted" (or merge with existing)
|
// will add the current range to free ranges, which means it is "deleted" (or merge with existing)
|
||||||
isAtEnd := b.mergeNewFreeRange(pos)
|
isAtEnd := b.mergeNewFreeRange(&pos)
|
||||||
|
|
||||||
if isAtEnd {
|
if isAtEnd {
|
||||||
// when at the end, truncate the mmap
|
// when at the end, truncate the mmap
|
||||||
// [new_pos_to_be_freed][end_of_file] -> shrink file!
|
// [new_pos_to_be_freed][end_of_file] -> shrink file!
|
||||||
|
|||||||
@@ -104,14 +104,15 @@ func (b *MultiMmapManager) storeOn(
|
|||||||
pos.start = fr.start
|
pos.start = fr.start
|
||||||
|
|
||||||
// modify the free ranges we're keeping track of
|
// modify the free ranges we're keeping track of
|
||||||
// (i.e. delete the current and add a new freerange with the remaining space)
|
if pos.size == fr.size {
|
||||||
b.freeRanges = slices.Delete(b.freeRanges, f, f+1)
|
// if we've used it entirely just delete it
|
||||||
|
b.freeRanges = slices.Delete(b.freeRanges, f, f+1)
|
||||||
if pos.size != fr.size {
|
} else {
|
||||||
b.addNewFreeRange(position{
|
// otherwise modify it in place
|
||||||
|
b.freeRanges[f] = position{
|
||||||
start: fr.start + uint64(pos.size),
|
start: fr.start + uint64(pos.size),
|
||||||
size: fr.size - pos.size,
|
size: fr.size - pos.size,
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
break
|
break
|
||||||
|
|||||||
2
eventstore/mmm/testdata/fuzz/FuzzFreeRanges/479307f78037b7d5
vendored
Normal file
2
eventstore/mmm/testdata/fuzz/FuzzFreeRanges/479307f78037b7d5
vendored
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
go test fuzz v1
|
||||||
|
int(-185)
|
||||||
2
eventstore/mmm/testdata/fuzz/FuzzFreeRanges/e574b3a75c531fc8
vendored
Normal file
2
eventstore/mmm/testdata/fuzz/FuzzFreeRanges/e574b3a75c531fc8
vendored
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
go test fuzz v1
|
||||||
|
int(-92)
|
||||||
Reference in New Issue
Block a user