mmm: fixes to rescan logic.
This commit is contained in:
@@ -1,7 +1,9 @@
|
|||||||
package mmm
|
package mmm
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"encoding/hex"
|
||||||
"slices"
|
"slices"
|
||||||
|
|
||||||
"fiatjaf.com/nostr"
|
"fiatjaf.com/nostr"
|
||||||
@@ -15,7 +17,7 @@ func (b *MultiMmapManager) Rescan() error {
|
|||||||
return b.lmdbEnv.Update(func(mmmtxn *lmdb.Txn) error {
|
return b.lmdbEnv.Update(func(mmmtxn *lmdb.Txn) error {
|
||||||
cursor, err := mmmtxn.OpenCursor(b.indexId)
|
cursor, err := mmmtxn.OpenCursor(b.indexId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
return err
|
||||||
}
|
}
|
||||||
defer cursor.Close()
|
defer cursor.Close()
|
||||||
|
|
||||||
@@ -30,18 +32,17 @@ func (b *MultiMmapManager) Rescan() error {
|
|||||||
|
|
||||||
// for every event in this index
|
// for every event in this index
|
||||||
var borked bool
|
var borked bool
|
||||||
var ts nostr.Timestamp
|
|
||||||
// we try to load it
|
// we try to load it
|
||||||
if ts_, err := b.loadJustTimestamp(pos); err == nil {
|
var evt nostr.Event
|
||||||
// if we succeed we assume the event is ok for now
|
if err := b.loadEvent(pos, &evt); err == nil && bytes.Equal(evt.ID[0:8], key) {
|
||||||
|
// all good
|
||||||
borked = false
|
borked = false
|
||||||
ts = ts_
|
|
||||||
} else {
|
} else {
|
||||||
// otherwise we know it's borked
|
// it's borked
|
||||||
borked = true
|
borked = true
|
||||||
}
|
}
|
||||||
|
|
||||||
evt := &nostr.Event{}
|
|
||||||
var layersToRemove []uint16
|
var layersToRemove []uint16
|
||||||
|
|
||||||
// then for every layer referenced in there we check
|
// then for every layer referenced in there we check
|
||||||
@@ -52,7 +53,7 @@ func (b *MultiMmapManager) Rescan() error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
layer.lmdbEnv.Update(func(txn *lmdb.Txn) error {
|
if err := layer.lmdbEnv.Update(func(txn *lmdb.Txn) error {
|
||||||
txn.RawRead = true
|
txn.RawRead = true
|
||||||
|
|
||||||
if borked {
|
if borked {
|
||||||
@@ -60,7 +61,7 @@ func (b *MultiMmapManager) Rescan() error {
|
|||||||
if layer.hasAtPosition(txn, pos) {
|
if layer.hasAtPosition(txn, pos) {
|
||||||
// expected -- delete anyway since it's borked
|
// expected -- delete anyway since it's borked
|
||||||
if err := layer.bruteDeleteIndexes(txn, pos); err != nil {
|
if err := layer.bruteDeleteIndexes(txn, pos); err != nil {
|
||||||
panic(err)
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// this stuff is doubly borked -- let's do nothing
|
// this stuff is doubly borked -- let's do nothing
|
||||||
@@ -68,32 +69,15 @@ func (b *MultiMmapManager) Rescan() error {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// otherwise we do a more reasonable check
|
// otherwise we do a more reasonable check
|
||||||
if layer.hasAtTimestampAndPosition(txn, ts, pos) {
|
if layer.hasAtTimestampAndPosition(txn, evt.CreatedAt, pos) {
|
||||||
// expected, all good
|
// expected, all good
|
||||||
} else {
|
} else {
|
||||||
// can't find it in this layer, so update source reference to remove this
|
// can't find it in this layer, so update source reference to remove this
|
||||||
// and clear it from this layer (if any traces remain)
|
// and clear it from this layer (if any traces remain)
|
||||||
if evt == nil {
|
if err := layer.deleteIndexes(txn, evt, val[0:12]); err != nil {
|
||||||
if err := b.loadEvent(pos, evt); err != nil {
|
|
||||||
// can't load event, means it's borked
|
|
||||||
borked = true
|
|
||||||
|
|
||||||
// act as if it's borked
|
|
||||||
if err := layer.bruteDeleteIndexes(txn, pos); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
} else {
|
|
||||||
goto haveEvent
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
haveEvent:
|
|
||||||
if err := layer.deleteIndexes(txn, *evt, val[0:12]); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// we'll remove references to this later
|
// we'll remove references to this later
|
||||||
// (no need to do anything in the borked case as everything will be deleted)
|
// (no need to do anything in the borked case as everything will be deleted)
|
||||||
layersToRemove = append(layersToRemove, layerId)
|
layersToRemove = append(layersToRemove, layerId)
|
||||||
@@ -101,7 +85,9 @@ func (b *MultiMmapManager) Rescan() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if borked {
|
if borked {
|
||||||
@@ -119,7 +105,7 @@ func (b *MultiMmapManager) Rescan() error {
|
|||||||
|
|
||||||
if len(val) > 12 {
|
if len(val) > 12 {
|
||||||
if err := mmmtxn.Put(b.indexId, key, val, 0); err != nil {
|
if err := mmmtxn.Put(b.indexId, key, val, 0); err != nil {
|
||||||
panic(err)
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
toPurge = append(toPurge, entry{idPrefix: key, pos: pos})
|
toPurge = append(toPurge, entry{idPrefix: key, pos: pos})
|
||||||
@@ -128,8 +114,11 @@ func (b *MultiMmapManager) Rescan() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, entry := range toPurge {
|
for _, entry := range toPurge {
|
||||||
if err := b.purge(mmmtxn, entry.idPrefix, entry.pos); err != nil {
|
// just delete from the ids index,
|
||||||
panic(err)
|
// no need to deal with the freeranges list as it will be recalculated afterwards.
|
||||||
|
// this also ensures any brokenly overlapping overwritten events don't have to be sacrificed.
|
||||||
|
if err := mmmtxn.Del(b.indexId, entry.idPrefix, nil); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -101,7 +101,7 @@ func FuzzBorkedRescan(f *testing.F) {
|
|||||||
borkedEvents = append(borkedEvents, evt)
|
borkedEvents = append(borkedEvents, evt)
|
||||||
|
|
||||||
// manually corrupt the mmapped file at these positions
|
// manually corrupt the mmapped file at these positions
|
||||||
copy(mmmm.mmapf[pos.start:], []byte("CORRUPTED_DATA_XXXX"))
|
copy(mmmm.mmapf[pos.start:], []byte("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"))
|
||||||
} else if chance(inconsistencyProbability) {
|
} else if chance(inconsistencyProbability) {
|
||||||
// inconsistently delete from some layers
|
// inconsistently delete from some layers
|
||||||
var evt nostr.Event
|
var evt nostr.Event
|
||||||
|
|||||||
@@ -24,6 +24,9 @@ func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) (positions, error) {
|
|||||||
// sort used positions by start
|
// sort used positions by start
|
||||||
slices.SortFunc(usedPositions, func(a, b position) int { return cmp.Compare(a.start, b.start) })
|
slices.SortFunc(usedPositions, func(a, b position) int { return cmp.Compare(a.start, b.start) })
|
||||||
|
|
||||||
|
// if there is free space at the end (which doesn't happen in normal conditions) do this to simulate it
|
||||||
|
usedPositions = append(usedPositions, position{start: b.mmapfEnd, size: 0})
|
||||||
|
|
||||||
// calculate free ranges as gaps between used positions
|
// calculate free ranges as gaps between used positions
|
||||||
freeRanges := make(positions, 0, len(usedPositions)/2)
|
freeRanges := make(positions, 0, len(usedPositions)/2)
|
||||||
var currentStart uint64 = 0
|
var currentStart uint64 = 0
|
||||||
|
|||||||
@@ -293,13 +293,6 @@ func (b *MultiMmapManager) loadEvent(pos position, eventReceiver *nostr.Event) e
|
|||||||
return betterbinary.Unmarshal(b.mmapf[pos.start:pos.start+uint64(pos.size)], eventReceiver)
|
return betterbinary.Unmarshal(b.mmapf[pos.start:pos.start+uint64(pos.size)], eventReceiver)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *MultiMmapManager) loadJustTimestamp(pos position) (nostr.Timestamp, error) {
|
|
||||||
if len(b.mmapf) < int(pos.start+uint64(pos.size)) {
|
|
||||||
return 0, fmt.Errorf("out of bounds")
|
|
||||||
}
|
|
||||||
return betterbinary.GetCreatedAt(b.mmapf[pos.start : pos.start+uint64(pos.size)]), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getNextAvailableLayerId iterates through all existing layers to find a vacant id
|
// getNextAvailableLayerId iterates through all existing layers to find a vacant id
|
||||||
func (b *MultiMmapManager) getNextAvailableLayerId(txn *lmdb.Txn) (uint16, error) {
|
func (b *MultiMmapManager) getNextAvailableLayerId(txn *lmdb.Txn) (uint16, error) {
|
||||||
cursor, err := txn.OpenCursor(b.knownLayers)
|
cursor, err := txn.OpenCursor(b.knownLayers)
|
||||||
|
|||||||
Reference in New Issue
Block a user