From 53e838c61d4892b91252f9f2da5e28eef441b34a Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Tue, 14 Oct 2025 00:38:51 +0000 Subject: [PATCH] mmm: freeranges computed on startup and kept only in memory. --- eventstore/mmm/fix.go | 2 +- eventstore/mmm/freeranges.go | 61 ++++++++++++++++++++++++++++-------- eventstore/mmm/mmmm.go | 22 ++----------- eventstore/mmm/purge.go | 2 +- eventstore/mmm/save.go | 4 --- 5 files changed, 52 insertions(+), 39 deletions(-) diff --git a/eventstore/mmm/fix.go b/eventstore/mmm/fix.go index 14572ae..08f9252 100644 --- a/eventstore/mmm/fix.go +++ b/eventstore/mmm/fix.go @@ -131,7 +131,7 @@ func (b *MultiMmapManager) Rescan() error { } } - return nil + return b.GatherFreeRanges(mmmtxn) }) } diff --git a/eventstore/mmm/freeranges.go b/eventstore/mmm/freeranges.go index fc3a734..63a9f30 100644 --- a/eventstore/mmm/freeranges.go +++ b/eventstore/mmm/freeranges.go @@ -1,12 +1,60 @@ package mmm import ( + "cmp" "fmt" "slices" "github.com/PowerDNS/lmdb-go/lmdb" ) +func (b *MultiMmapManager) GatherFreeRanges(txn *lmdb.Txn) error { + cursor, err := txn.OpenCursor(b.indexId) + if err != nil { + return fmt.Errorf("failed to open cursor on indexId: %w", err) + } + defer cursor.Close() + + usedPositions := make([]position, 0, 256) + for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) { + pos := positionFromBytes(val[0:12]) + usedPositions = append(usedPositions, pos) + } + + // sort used positions by start + slices.SortFunc(usedPositions, func(a, b position) int { return cmp.Compare(a.start, b.start) }) + + // calculate free ranges as gaps between used positions + b.freeRanges = make([]position, 0, len(usedPositions)/2) + var currentStart uint64 = 0 + for _, pos := range usedPositions { + if pos.start > currentStart { + // gap from currentStart to pos.start + freeSize := pos.start - currentStart + if freeSize > 0 { + b.freeRanges = append(b.freeRanges, position{ + start: currentStart, + size: uint32(freeSize), + }) + } + } + currentStart = pos.start + uint64(pos.size) + } + + // sort free ranges by size (smallest first, as before) + slices.SortFunc(b.freeRanges, func(a, b position) int { return cmp.Compare(a.size, b.size) }) + + logOp := b.Logger.Debug() + for _, pos := range b.freeRanges { + if pos.size > 20 { + logOp = logOp.Uint32(fmt.Sprintf("%d", pos.start), pos.size) + } + } + logOp.Msg("calculated free ranges from index scan") + + return nil +} + func (b *MultiMmapManager) mergeNewFreeRange(pos position) (isAtEnd bool) { // before adding check if we can merge this with some other range // (to merge means to delete the previous and add a new one) @@ -53,16 +101,3 @@ func (b *MultiMmapManager) addNewFreeRange(pos position) { }) b.freeRanges = slices.Insert(b.freeRanges, idx, pos) } - -func (b *MultiMmapManager) saveFreeRanges(txn *lmdb.Txn) error { - // save to database - valReserve, err := txn.PutReserve(b.stuff, FREERANGES_KEY, len(b.freeRanges)*12, 0) - if err != nil { - return fmt.Errorf("on put freeranges: %w", err) - } - for f, fr := range b.freeRanges { - bytesFromPosition(valReserve[f*12:], fr) - } - - return nil -} diff --git a/eventstore/mmm/mmmm.go b/eventstore/mmm/mmmm.go index 0ce3780..fd3c34d 100644 --- a/eventstore/mmm/mmmm.go +++ b/eventstore/mmm/mmmm.go @@ -50,8 +50,6 @@ const ( maxuint32 = 4294967295 ) -var FREERANGES_KEY = []byte{'F'} - func (b *MultiMmapManager) Init() error { // create directory if it doesn't exist dbpath := filepath.Join(b.Dir, "mmmm") @@ -119,24 +117,8 @@ func (b *MultiMmapManager) Init() error { b.indexId = dbi } - // load all free ranges into memory - { - data, err := txn.Get(b.stuff, FREERANGES_KEY) - if err != nil && !lmdb.IsNotFound(err) { - return fmt.Errorf("on freeranges: %w", err) - } - b.freeRanges = make([]position, len(data)/12) - logOp := b.Logger.Debug() - for f := range b.freeRanges { - pos := positionFromBytes(data[f*12 : (f+1)*12]) - b.freeRanges[f] = pos - if pos.size > 20 { - logOp = logOp.Uint32(fmt.Sprintf("%d", pos.start), pos.size) - } - } - slices.SortFunc(b.freeRanges, func(a, b position) int { return int(a.size - b.size) }) - logOp.Msg("loaded free ranges") - } + // scan index table to calculate free ranges from used positions + b.GatherFreeRanges(txn) return nil }); err != nil { diff --git a/eventstore/mmm/purge.go b/eventstore/mmm/purge.go index 8ce73ff..82daa7d 100644 --- a/eventstore/mmm/purge.go +++ b/eventstore/mmm/purge.go @@ -32,5 +32,5 @@ func (b *MultiMmapManager) purge(txn *lmdb.Txn, idPrefix8 []byte, pos position) copy(b.mmapf[pos.start:], bytes.Repeat([]byte{'!'}, int(pos.size))) } - return b.saveFreeRanges(txn) + return nil } diff --git a/eventstore/mmm/save.go b/eventstore/mmm/save.go index 23974ac..fa17557 100644 --- a/eventstore/mmm/save.go +++ b/eventstore/mmm/save.go @@ -114,10 +114,6 @@ func (b *MultiMmapManager) storeOn( }) } - if err := b.saveFreeRanges(mmmtxn); err != nil { - return false, fmt.Errorf("failed to save modified free ranges: %w", err) - } - break } }