mmm: stop truncating mmap, reorder commits and write operations such that eventual inconsistencies are minimized and less harmful.

This commit is contained in:
fiatjaf
2025-10-16 02:26:07 +00:00
parent 98cbe66e16
commit fc16a36481
15 changed files with 278 additions and 236 deletions

View File

@@ -3,6 +3,7 @@ package mmm
import ( import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"runtime"
"slices" "slices"
"fiatjaf.com/nostr" "fiatjaf.com/nostr"
@@ -13,32 +14,84 @@ func (il *IndexingLayer) DeleteEvent(id nostr.ID) error {
il.mmmm.writeMutex.Lock() il.mmmm.writeMutex.Lock()
defer il.mmmm.writeMutex.Unlock() defer il.mmmm.writeMutex.Unlock()
return il.mmmm.lmdbEnv.Update(func(mmmtxn *lmdb.Txn) error { runtime.LockOSThread()
return il.lmdbEnv.Update(func(iltxn *lmdb.Txn) error { defer runtime.UnlockOSThread()
return il.delete(mmmtxn, iltxn, id)
}) // prepare transactions
}) mmmtxn, err := il.mmmm.lmdbEnv.BeginTxn(nil, 0)
if err != nil {
return err
}
defer func() {
// defer abort but only if we haven't committed (we'll set it to nil after committing)
if mmmtxn != nil {
mmmtxn.Abort()
}
}()
mmmtxn.RawRead = true
iltxn, err := il.lmdbEnv.BeginTxn(nil, 0)
if err != nil {
return err
}
defer func() {
// defer abort but only if we haven't committed (we'll set it to nil after committing)
if iltxn != nil {
iltxn.Abort()
}
}()
iltxn.RawRead = true
var acquiredFreeRangeFromDelete *position
if pos, shouldPurge, err := il.delete(mmmtxn, iltxn, id); err != nil {
return fmt.Errorf("failed to delete event %s: %w", id, err)
} else if shouldPurge {
// purge
if err := mmmtxn.Del(il.mmmm.indexId, id[0:8], nil); err != nil {
return err
}
acquiredFreeRangeFromDelete = &pos
}
// commit in this order to minimize problematic inconsistencies
if err := mmmtxn.Commit(); err != nil {
return fmt.Errorf("can't commit mmmtxn: %w", err)
}
mmmtxn = nil
if err := iltxn.Commit(); err != nil {
return fmt.Errorf("can't commit iltxn: %w", err)
}
iltxn = nil
// finally merge in the new free range (in this order it makes more sense, the worst that can
// happen is that we lose this free range but we'll have it again on the next startup)
if acquiredFreeRangeFromDelete != nil {
il.mmmm.mergeNewFreeRange(*acquiredFreeRangeFromDelete)
}
return nil
} }
func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, id nostr.ID) error { func (il *IndexingLayer) delete(
zeroRefs := false mmmtxn *lmdb.Txn,
b := il.mmmm iltxn *lmdb.Txn,
id nostr.ID,
b.Logger.Debug().Str("layer", il.name).Uint16("il", il.id).Msg("deleting") ) (pos position, shouldPurge bool, err error) {
il.mmmm.Logger.Debug().Str("layer", il.name).Uint16("il", il.id).Msg("deleting")
// first in the mmmm txn we check if we have the event still // first in the mmmm txn we check if we have the event still
val, err := mmmtxn.Get(b.indexId, id[0:8]) val, err := mmmtxn.Get(il.mmmm.indexId, id[0:8])
if err != nil { if err != nil {
if lmdb.IsNotFound(err) { if lmdb.IsNotFound(err) {
// we already do not have this anywhere // we already do not have this anywhere
return nil return position{}, false, nil
} }
return fmt.Errorf("failed to check if we have the event %x: %w", id, err) return position{}, false, fmt.Errorf("failed to check if we have the event %x: %w", id, err)
} }
// we have this, but do we have it in the current layer? // we have this, but do we have it in the current layer?
// val is [posb][il_idx][il_idx...] // val is [posb][il_idx][il_idx...]
pos := positionFromBytes(val[0:12]) pos = positionFromBytes(val[0:12])
// check references // check references
currentLayer := binary.BigEndian.AppendUint16(nil, il.id) currentLayer := binary.BigEndian.AppendUint16(nil, il.id)
@@ -49,12 +102,12 @@ func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, id nostr.ID)
copy(nextval, val[0:i]) copy(nextval, val[0:i])
copy(nextval[i:], val[i+2:]) copy(nextval[i:], val[i+2:])
if err := mmmtxn.Put(b.indexId, id[0:8], nextval, 0); err != nil { if err := mmmtxn.Put(il.mmmm.indexId, id[0:8], nextval, 0); err != nil {
return fmt.Errorf("failed to update references for %x: %w", id[:], err) return pos, false, fmt.Errorf("failed to update references for %x: %w", id[:], err)
} }
// if there are no more layers we will delete everything later // if there are no more layers we will delete everything later
zeroRefs = len(nextval) == 12 shouldPurge = len(nextval) == 12
break break
} }
@@ -63,21 +116,14 @@ func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, id nostr.ID)
// load the event so we can compute the indexes // load the event so we can compute the indexes
var evt nostr.Event var evt nostr.Event
if err := il.mmmm.loadEvent(pos, &evt); err != nil { if err := il.mmmm.loadEvent(pos, &evt); err != nil {
return fmt.Errorf("failed to load event %x when deleting: %w", id[:], err) return pos, false, fmt.Errorf("failed to load event %x when deleting: %w", id[:], err)
} }
if err := il.deleteIndexes(iltxn, evt, val[0:12]); err != nil { if err := il.deleteIndexes(iltxn, evt, val[0:12]); err != nil {
return fmt.Errorf("failed to delete indexes for %s=>%v: %w", evt.ID, val[0:12], err) return pos, false, fmt.Errorf("failed to delete indexes for %s=>%v: %w", evt.ID, val[0:12], err)
} }
// if there are no more refs we delete the event from the id index and mmap return pos, shouldPurge, nil
if zeroRefs {
if err := b.purge(mmmtxn, id[0:8], pos); err != nil {
panic(err)
}
}
return nil
} }
func (il *IndexingLayer) deleteIndexes(iltxn *lmdb.Txn, event nostr.Event, posbytes []byte) error { func (il *IndexingLayer) deleteIndexes(iltxn *lmdb.Txn, event nostr.Event, posbytes []byte) error {

View File

@@ -3,7 +3,6 @@ package mmm
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"encoding/hex"
"slices" "slices"
"fiatjaf.com/nostr" "fiatjaf.com/nostr"
@@ -21,12 +20,7 @@ func (b *MultiMmapManager) Rescan() error {
} }
defer cursor.Close() defer cursor.Close()
type entry struct { var toPurge [][]byte // a list of idPrefix entries
idPrefix []byte
pos position
}
var toPurge []entry
for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) { for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) {
pos := positionFromBytes(val[0:12]) pos := positionFromBytes(val[0:12])
@@ -91,7 +85,7 @@ func (b *MultiMmapManager) Rescan() error {
} }
if borked { if borked {
toPurge = append(toPurge, entry{idPrefix: key, pos: pos}) toPurge = append(toPurge, key)
} else if len(layersToRemove) > 0 { } else if len(layersToRemove) > 0 {
for s := 12; s < len(val); { for s := 12; s < len(val); {
if slices.Contains(layersToRemove, binary.BigEndian.Uint16(val[s:s+2])) { if slices.Contains(layersToRemove, binary.BigEndian.Uint16(val[s:s+2])) {
@@ -108,16 +102,16 @@ func (b *MultiMmapManager) Rescan() error {
return err return err
} }
} else { } else {
toPurge = append(toPurge, entry{idPrefix: key, pos: pos}) toPurge = append(toPurge, key)
} }
} }
} }
for _, entry := range toPurge { for _, idPrefix := range toPurge {
// just delete from the ids index, // just delete from the ids index,
// no need to deal with the freeranges list as it will be recalculated afterwards. // no need to deal with the freeranges list as it will be recalculated afterwards.
// this also ensures any brokenly overlapping overwritten events don't have to be sacrificed. // this also ensures any brokenly overlapping overwritten events don't have to be sacrificed.
if err := mmmtxn.Del(b.indexId, entry.idPrefix, nil); err != nil { if err := mmmtxn.Del(b.indexId, idPrefix, nil); err != nil {
return err return err
} }
} }

View File

@@ -113,7 +113,7 @@ func FuzzBorkedRescan(f *testing.F) {
// this won't be erased, just removed from this specific layer // this won't be erased, just removed from this specific layer
layer := layers[rnd.IntN(len(layers))] layer := layers[rnd.IntN(len(layers))]
posb := make([]byte, 12) posb := make([]byte, 12)
bytesFromPosition(posb, pos) writeBytesFromPosition(posb, pos)
if err := layer.lmdbEnv.Update(func(iltxn *lmdb.Txn) error { if err := layer.lmdbEnv.Update(func(iltxn *lmdb.Txn) error {
return layer.deleteIndexes(iltxn, evt, posb) return layer.deleteIndexes(iltxn, evt, posb)

View File

@@ -24,7 +24,7 @@ func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) (positions, error) {
// sort used positions by start // sort used positions by start
slices.SortFunc(usedPositions, func(a, b position) int { return cmp.Compare(a.start, b.start) }) slices.SortFunc(usedPositions, func(a, b position) int { return cmp.Compare(a.start, b.start) })
// if there is free space at the end (which doesn't happen in normal conditions) do this to simulate it // if there is free space at the end this will simulate it
usedPositions = append(usedPositions, position{start: b.mmapfEnd, size: 0}) usedPositions = append(usedPositions, position{start: b.mmapfEnd, size: 0})
// calculate free ranges as gaps between used positions // calculate free ranges as gaps between used positions
@@ -47,9 +47,7 @@ func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) (positions, error) {
return freeRanges, nil return freeRanges, nil
} }
// this injects the new free range into the list, merging it with existing free ranges if necessary. func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) {
// it also takes a pointer so it can modify it for the caller to use it in setting up the new mmapf.
func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange *position) (isAtEnd bool) {
// use binary search to find the insertion point for the new pos // use binary search to find the insertion point for the new pos
idx, exists := slices.BinarySearchFunc(b.freeRanges, newFreeRange.start, func(item position, target uint64) int { idx, exists := slices.BinarySearchFunc(b.freeRanges, newFreeRange.start, func(item position, target uint64) int {
return cmp.Compare(item.start, target) return cmp.Compare(item.start, target)
@@ -86,27 +84,16 @@ func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange *position) (isAtEnd bo
} }
} }
// when we're at the end of a file we just delete everything and don't add new free ranges
// the caller will truncate the mmap file and adjust the position accordingly
if newFreeRange.start+uint64(newFreeRange.size) == b.mmapfEnd {
if deleting > 0 {
b.freeRanges = slices.Delete(b.freeRanges, deleteStart, deleteStart+deleting)
}
return true
}
switch deleting { switch deleting {
case 0: case 0:
// if we are not deleting anything we must insert the new free range // if we are not deleting anything we must insert the new free range
b.freeRanges = slices.Insert(b.freeRanges, idx, *newFreeRange) b.freeRanges = slices.Insert(b.freeRanges, idx, newFreeRange)
case 1: case 1:
// if we're deleting a single range, don't delete it, modify it in-place instead. // if we're deleting a single range, don't delete it, modify it in-place instead.
b.freeRanges[deleteStart] = *newFreeRange b.freeRanges[deleteStart] = newFreeRange
case 2: case 2:
// now if we're deleting two ranges, delete just one instead and modify the other in place // now if we're deleting two ranges, delete just one instead and modify the other in place
b.freeRanges[deleteStart] = *newFreeRange b.freeRanges[deleteStart] = newFreeRange
b.freeRanges = slices.Delete(b.freeRanges, deleteStart+1, deleteStart+1+1) b.freeRanges = slices.Delete(b.freeRanges, deleteStart+1, deleteStart+1+1)
} }
return false
} }

View File

@@ -45,12 +45,15 @@ func FuzzFreeRanges(f *testing.F) {
total := 0 total := 0
for { for {
freeBefore := countUsableFreeRanges(mmmm) freeBefore, spaceBefore := countUsableFreeRanges(mmmm)
hasAdded := false
for i := range rnd.IntN(40) { for i := range rnd.IntN(40) {
hasAdded = true
content := "1" // ensure at least one event is as small as it can be content := "1" // ensure at least one event is as small as it can be
if i > 0 { if i > 0 {
strings.Repeat("z", rnd.IntN(1000)) content = strings.Repeat("z", rnd.IntN(1000))
} }
evt := nostr.Event{ evt := nostr.Event{
@@ -66,9 +69,9 @@ func FuzzFreeRanges(f *testing.F) {
total++ total++
} }
freeAfter := countUsableFreeRanges(mmmm) freeAfter, spaceAfter := countUsableFreeRanges(mmmm)
if freeBefore > 0 { if hasAdded && freeBefore > 0 {
require.Lessf(t, freeAfter, freeBefore, "must use some of the existing free ranges when inserting new events") require.Lessf(t, spaceAfter, spaceBefore, "must use some of the existing free ranges when inserting new events (before: %d, after: %d)", freeBefore, freeAfter)
} }
// delete some events // delete some events
@@ -96,12 +99,12 @@ func FuzzFreeRanges(f *testing.F) {
}) })
} }
func countUsableFreeRanges(mmmm *MultiMmapManager) int { func countUsableFreeRanges(mmmm *MultiMmapManager) (count int, space int) {
count := 0
for _, fr := range mmmm.freeRanges { for _, fr := range mmmm.freeRanges {
if fr.size > 150 { if fr.size >= 142 {
count++ count++
space += int(fr.size)
} }
} }
return count return count, space
} }

View File

@@ -34,8 +34,6 @@ type iterator struct {
} }
func (it *iterator) pull(n int, since uint32) { func (it *iterator) pull(n int, since uint32) {
query := it.query
for range n { for range n {
// in the beginning we already have a k and a v and an err from the cursor setup, so check and use these // in the beginning we already have a k and a v and an err from the cursor setup, so check and use these
if it.err != nil { if it.err != nil {
@@ -43,7 +41,7 @@ func (it *iterator) pull(n int, since uint32) {
return return
} }
if len(it.key) != query.keySize || !bytes.HasPrefix(it.key, query.prefix) { if len(it.key) != it.query.keySize || !bytes.HasPrefix(it.key, it.query.prefix) {
// we reached the end of this prefix // we reached the end of this prefix
it.exhausted = true it.exhausted = true
return return

View File

@@ -276,9 +276,11 @@ func (b *MultiMmapManager) removeAllReferencesFromLayer(txn *lmdb.Txn, layerId u
posb := val[0:12] posb := val[0:12]
pos := positionFromBytes(posb) pos := positionFromBytes(posb)
if err := b.purge(txn, idPrefix8, pos); err != nil { if err := txn.Del(b.indexId, idPrefix8, nil); err != nil {
return fmt.Errorf("failed to purge unreferenced event %x: %w", idPrefix8, err) return fmt.Errorf("failed to purge unreferenced event %x: %w", idPrefix8, err)
} }
b.mergeNewFreeRange(pos)
} else if update { } else if update {
if err := txn.Put(b.indexId, idPrefix8, val, 0); err != nil { if err := txn.Put(b.indexId, idPrefix8, val, 0); err != nil {
return fmt.Errorf("failed to put updated index+refs: %w", err) return fmt.Errorf("failed to put updated index+refs: %w", err)

View File

@@ -36,7 +36,7 @@ func positionFromBytes(posb []byte) position {
} }
} }
func bytesFromPosition(out []byte, pos position) { func writeBytesFromPosition(out []byte, pos position) {
binary.BigEndian.PutUint32(out[0:4], pos.size) binary.BigEndian.PutUint32(out[0:4], pos.size)
binary.BigEndian.PutUint64(out[4:12], pos.start) binary.BigEndian.PutUint64(out[4:12], pos.start)
} }

View File

@@ -1,35 +0,0 @@
package mmm
import (
"bytes"
"fmt"
"os"
"github.com/PowerDNS/lmdb-go/lmdb"
)
func (b *MultiMmapManager) purge(txn *lmdb.Txn, idPrefix8 []byte, pos position) error {
b.Logger.Debug().Hex("event", idPrefix8).Stringer("pos", pos).Msg("purging")
// delete from index
if err := txn.Del(b.indexId, idPrefix8, nil); err != nil {
return err
}
// will add the current range to free ranges, which means it is "deleted" (or merge with existing)
isAtEnd := b.mergeNewFreeRange(&pos)
if isAtEnd {
// when at the end, truncate the mmap
// [new_pos_to_be_freed][end_of_file] -> shrink file!
pos.size = 0 // so we don't try to add this some lines below
if err := os.Truncate(b.mmapfPath, int64(pos.start)); err != nil {
panic(fmt.Errorf("error decreasing %s: %w", b.mmapfPath, err))
}
b.mmapfEnd = pos.start
} else {
// this is for debugging -------------
copy(b.mmapf[pos.start:], bytes.Repeat([]byte{'!'}, int(pos.size)))
}
return nil
}

View File

@@ -14,7 +14,6 @@ type query struct {
i int i int
dbi lmdb.DBI dbi lmdb.DBI
prefix []byte prefix []byte
results chan *nostr.Event
keySize int keySize int
timestampSize int timestampSize int
startingPoint []byte startingPoint []byte
@@ -46,7 +45,6 @@ func (il *IndexingLayer) prepareQueries(filter nostr.Filter) (
sp = sp[0:len(q.prefix)] sp = sp[0:len(q.prefix)]
copy(sp, q.prefix) copy(sp, q.prefix)
queries[i].startingPoint = binary.BigEndian.AppendUint32(sp, uint32(until)) queries[i].startingPoint = binary.BigEndian.AppendUint32(sp, uint32(until))
queries[i].results = make(chan *nostr.Event, 12)
} }
}() }()

View File

@@ -7,7 +7,6 @@ import (
"fiatjaf.com/nostr" "fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/internal" "fiatjaf.com/nostr/eventstore/internal"
"github.com/PowerDNS/lmdb-go/lmdb"
) )
func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error { func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error {
@@ -23,36 +22,80 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error {
filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}} filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}}
} }
return il.mmmm.lmdbEnv.Update(func(mmmtxn *lmdb.Txn) error { // prepare transactions
mmmtxn.RawRead = true mmmtxn, err := il.mmmm.lmdbEnv.BeginTxn(nil, 0)
if err != nil {
return err
}
defer func() {
// defer abort but only if we haven't committed (we'll set it to nil after committing)
if mmmtxn != nil {
mmmtxn.Abort()
}
}()
mmmtxn.RawRead = true
return il.lmdbEnv.Update(func(iltxn *lmdb.Txn) error { iltxn, err := il.lmdbEnv.BeginTxn(nil, 0)
// now we fetch the past events, whatever they are, delete them and then save the new if err != nil {
var err error return err
var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) { }
err = il.query(iltxn, filter, 10 /* in theory limit could be just 1 and this should work */, yield) defer func() {
} // defer abort but only if we haven't committed (we'll set it to nil after committing)
if err != nil { if iltxn != nil {
return fmt.Errorf("failed to query past events with %s: %w", filter, err) iltxn.Abort()
} }
}()
iltxn.RawRead = true
shouldStore := true // now we fetch the past events, whatever they are, delete them and then save the new
for previous := range results { var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) {
if internal.IsOlder(previous, evt) { err = il.query(iltxn, filter, 10 /* in theory limit could be just 1 and this should work */, yield)
if err := il.delete(mmmtxn, iltxn, previous.ID); err != nil { }
return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err) if err != nil {
} return fmt.Errorf("failed to query past events with %s: %w", filter, err)
} else { }
// there is a newer event already stored, so we won't store this
shouldStore = false var acquiredFreeRangeFromDelete *position
shouldStore := true
for previous := range results {
if internal.IsOlder(previous, evt) {
if pos, shouldPurge, err := il.delete(mmmtxn, iltxn, previous.ID); err != nil {
return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err)
} else if shouldPurge {
// purge
if err := mmmtxn.Del(il.mmmm.indexId, previous.ID[0:8], nil); err != nil {
return err
} }
acquiredFreeRangeFromDelete = &pos
} }
if shouldStore { } else {
_, err := il.mmmm.storeOn(mmmtxn, []*IndexingLayer{il}, []*lmdb.Txn{iltxn}, evt) // there is a newer event already stored, so we won't store this
return err shouldStore = false
} }
}
return nil if shouldStore {
}) _, err := il.mmmm.storeOn(mmmtxn, iltxn, il, evt)
}) if err != nil {
return err
}
}
// commit in this order to minimize problematic inconsistencies
if err := mmmtxn.Commit(); err != nil {
return fmt.Errorf("can't commit mmmtxn: %w", err)
}
mmmtxn = nil
if err := iltxn.Commit(); err != nil {
return fmt.Errorf("can't commit iltxn: %w", err)
}
iltxn = nil
// finally merge in the new free range (in this order it makes more sense, the worst that can
// happen is that we lose this free range but we'll have it again on the next startup)
if acquiredFreeRangeFromDelete != nil {
il.mmmm.mergeNewFreeRange(*acquiredFreeRangeFromDelete)
}
return nil
} }

View File

@@ -21,150 +21,150 @@ func (il *IndexingLayer) SaveEvent(evt nostr.Event) error {
runtime.LockOSThread() runtime.LockOSThread()
defer runtime.UnlockOSThread() defer runtime.UnlockOSThread()
// do this just so it's cleaner, we're already locking the thread and the mutex anyway // prepare transactions
mmmtxn, err := il.mmmm.lmdbEnv.BeginTxn(nil, 0) mmmtxn, err := il.mmmm.lmdbEnv.BeginTxn(nil, 0)
if err != nil { if err != nil {
return fmt.Errorf("failed to begin global transaction: %w", err) return err
} }
defer func() {
// defer abort but only if we haven't committed (we'll set it to nil after committing)
if mmmtxn != nil {
mmmtxn.Abort()
}
}()
mmmtxn.RawRead = true mmmtxn.RawRead = true
iltxn, err := il.lmdbEnv.BeginTxn(nil, 0) iltxn, err := il.lmdbEnv.BeginTxn(nil, 0)
if err != nil { if err != nil {
mmmtxn.Abort() return err
return fmt.Errorf("failed to start txn on %s: %w", il.name, err)
} }
defer func() {
if _, err := il.mmmm.storeOn(mmmtxn, []*IndexingLayer{il}, []*lmdb.Txn{iltxn}, evt); err != nil { // defer abort but only if we haven't committed (we'll set it to nil after committing)
mmmtxn.Abort()
if iltxn != nil { if iltxn != nil {
iltxn.Abort() iltxn.Abort()
} }
}()
iltxn.RawRead = true
// the actual save operation
if _, err := il.mmmm.storeOn(mmmtxn, iltxn, il, evt); err != nil {
return err return err
} }
// commit in this order to minimize problematic inconsistencies
if err := mmmtxn.Commit(); err != nil { if err := mmmtxn.Commit(); err != nil {
return err return fmt.Errorf("can't commit mmmtxn: %w", err)
} }
mmmtxn = nil
if err := iltxn.Commit(); err != nil { if err := iltxn.Commit(); err != nil {
return err return fmt.Errorf("can't commit iltxn: %w", err)
} }
iltxn = nil
return nil return nil
} }
func (b *MultiMmapManager) storeOn( func (b *MultiMmapManager) storeOn(
mmmtxn *lmdb.Txn, mmmtxn *lmdb.Txn,
ils []*IndexingLayer, iltxn *lmdb.Txn,
iltxns []*lmdb.Txn, il *IndexingLayer,
evt nostr.Event, evt nostr.Event,
) (stored bool, err error) { ) (stored bool, err error) {
// check if we already have this id // check if we already have this id
var pos position
val, err := mmmtxn.Get(b.indexId, evt.ID[0:8]) val, err := mmmtxn.Get(b.indexId, evt.ID[0:8])
if err == nil { if err == nil {
// we found the event, now check if it is already indexed by the layers that want to store it pos = positionFromBytes(val[0:12])
for i := len(ils) - 1; i >= 0; i-- { // we found the event, now check if it is already indexed by the layer that wants to store it
for s := 12; s < len(val); s += 2 { for s := 12; s < len(val); s += 2 {
ilid := binary.BigEndian.Uint16(val[s : s+2]) ilid := binary.BigEndian.Uint16(val[s : s+2])
if ils[i].id == ilid { if il.id == ilid {
// swap delete this il, but keep the deleted ones at the end // already on the specified layer, we can end here
// (so the caller can successfully finalize the transactions) return false, nil
ils[i], ils[len(ils)-1] = ils[len(ils)-1], ils[i]
ils = ils[0 : len(ils)-1]
iltxns[i], iltxns[len(iltxns)-1] = iltxns[len(iltxns)-1], iltxns[i]
iltxns = iltxns[0 : len(iltxns)-1]
break
}
} }
} }
} else if !lmdb.IsNotFound(err) { } else if !lmdb.IsNotFound(err) {
// now if we got an error from lmdb we will only proceed if we get a NotFound -- for anything else we will error // if we got an error from lmdb we will only proceed if it's NotFound -- for anything else we will error
return false, fmt.Errorf("error checking existence: %w", err) return false, fmt.Errorf("error checking existence: %w", err)
} }
// if all ils already have this event indexed (or no il was given) we can end here // ok, now we have to write the event to the mmapped file
if len(ils) == 0 { // unless we already have the event stored, in that case we don't have to write it again, we'll just reuse it
return false, nil
}
// get event binary size
pos := position{
size: uint32(betterbinary.Measure(evt)),
}
if pos.size >= 1<<16 {
return false, fmt.Errorf("event too large to store, max %d, got %d", 1<<16, pos.size)
}
// find a suitable place for this to be stored in
appendToMmap := true
for f, fr := range b.freeRanges {
if fr.size >= pos.size {
// found the smallest possible place that can fit this event
appendToMmap = false
pos.start = fr.start
// modify the free ranges we're keeping track of
if pos.size == fr.size {
// if we've used it entirely just delete it
b.freeRanges = slices.Delete(b.freeRanges, f, f+1)
} else {
// otherwise modify it in place
b.freeRanges[f] = position{
start: fr.start + uint64(pos.size),
size: fr.size - pos.size,
}
}
break
}
}
if appendToMmap {
// no free ranges found, so write to the end of the mmap file
pos.start = b.mmapfEnd
mmapfNewSize := int64(b.mmapfEnd) + int64(pos.size)
if err := os.Truncate(b.mmapfPath, mmapfNewSize); err != nil {
return false, fmt.Errorf("error increasing %s: %w", b.mmapfPath, err)
}
b.mmapfEnd = uint64(mmapfNewSize)
}
// write to the mmap
if err := betterbinary.Marshal(evt, b.mmapf[pos.start:]); err != nil {
return false, fmt.Errorf("error marshaling to %d: %w", pos.start, err)
}
// prepare value to be saved in the id index (if we didn't have it already)
// val: [posb][layerIdRefs...]
if val == nil { if val == nil {
val = make([]byte, 12, 12+2*len(b.layers)) // get event binary size
binary.BigEndian.PutUint32(val[0:4], pos.size) pos = position{
binary.BigEndian.PutUint64(val[4:12], pos.start) size: uint32(betterbinary.Measure(evt)),
} }
if pos.size >= 1<<16 {
return false, fmt.Errorf("event too large to store, max %d, got %d", 1<<16, pos.size)
}
// each index that was reserved above for the different layers // find a suitable place for this to be stored in
for i, il := range ils { appendToMmap := true
iltxn := iltxns[i] for f, fr := range b.freeRanges {
if fr.size >= pos.size {
// found the smallest possible place that can fit this event
appendToMmap = false
pos.start = fr.start
for k := range il.getIndexKeysForEvent(evt) { // modify the free ranges we're keeping track of
if err := iltxn.Put(k.dbi, k.key, val[0:12] /* pos */, 0); err != nil { // (in case of conflict we lose this free range but it's ok, it will be recovered on the next startup)
b.Logger.Warn().Str("name", il.name).Msg("failed to index event on layer") if pos.size == fr.size {
// if we've used it entirely just delete it
b.freeRanges = slices.Delete(b.freeRanges, f, f+1)
} else {
// otherwise modify it in place
b.freeRanges[f] = position{
start: fr.start + uint64(pos.size),
size: fr.size - pos.size,
}
}
break
} }
} }
val = binary.BigEndian.AppendUint16(val, il.id) if appendToMmap {
// no free ranges found, so write to the end of the mmap file
pos.start = b.mmapfEnd
mmapfNewSize := int64(b.mmapfEnd) + int64(pos.size)
if err := os.Truncate(b.mmapfPath, mmapfNewSize); err != nil {
return false, fmt.Errorf("error increasing %s: %w", b.mmapfPath, err)
}
b.mmapfEnd = uint64(mmapfNewSize)
}
// write to the mmap
if err := betterbinary.Marshal(evt, b.mmapf[pos.start:]); err != nil {
return false, fmt.Errorf("error marshaling to %d: %w", pos.start, err)
}
// msync
_, _, errno := syscall.Syscall(syscall.SYS_MSYNC,
uintptr(unsafe.Pointer(&b.mmapf[0])), uintptr(len(b.mmapf)), syscall.MS_SYNC)
if errno != 0 {
panic(fmt.Errorf("msync failed: %w", syscall.Errno(errno)))
}
// prepare value to be saved in the id index (if we didn't have it already)
// val: [posb][layerIdRefs...]
val = make([]byte, 12, 12+2) // only reserve room for one layer after the position
writeBytesFromPosition(val, pos)
} }
// store the id index with the layer references // generate and save indexes
for k := range il.getIndexKeysForEvent(evt) {
if err := iltxn.Put(k.dbi, k.key, val[0:12] /* pos */, 0); err != nil {
b.Logger.Warn().Str("name", il.name).Msg("failed to index event on layer")
}
}
// add layer to the id index val
val = binary.BigEndian.AppendUint16(val, il.id)
// store the id index with the new layer reference
if err := mmmtxn.Put(b.indexId, evt.ID[0:8], val, 0); err != nil { if err := mmmtxn.Put(b.indexId, evt.ID[0:8], val, 0); err != nil {
panic(fmt.Errorf("failed to store %x by id: %w", evt.ID[:], err)) return false, fmt.Errorf("failed to store %x by id: %w", evt.ID[:], err)
}
// msync
_, _, errno := syscall.Syscall(syscall.SYS_MSYNC,
uintptr(unsafe.Pointer(&b.mmapf[0])), uintptr(len(b.mmapf)), syscall.MS_SYNC)
if errno != 0 {
panic(fmt.Errorf("msync failed: %w", syscall.Errno(errno)))
} }
return true, nil return true, nil

View File

@@ -0,0 +1,2 @@
go test fuzz v1
int(-367)

View File

@@ -0,0 +1,2 @@
go test fuzz v1
int(91)

View File

@@ -0,0 +1,2 @@
go test fuzz v1
int(-188)