Files
nostrlib/eventstore/mmm/save.go
2026-01-18 18:02:07 -03:00

177 lines
4.8 KiB
Go

package mmm
import (
"encoding/binary"
"fmt"
"os"
"runtime"
"slices"
"syscall"
"unsafe"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore"
"fiatjaf.com/nostr/eventstore/codec/betterbinary"
"github.com/PowerDNS/lmdb-go/lmdb"
)
func (il *IndexingLayer) SaveEvent(evt nostr.Event) error {
if il.mmmm.ReadOnly {
return ReadOnly
}
il.mmmm.writeMutex.Lock()
defer il.mmmm.writeMutex.Unlock()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
// prepare transactions
mmmtxn, err := il.mmmm.lmdbEnv.BeginTxn(nil, 0)
if err != nil {
return err
}
defer func() {
// defer abort but only if we haven't committed (we'll set it to nil after committing)
if mmmtxn != nil {
mmmtxn.Abort()
}
}()
mmmtxn.RawRead = true
iltxn, err := il.lmdbEnv.BeginTxn(nil, 0)
if err != nil {
return err
}
defer func() {
// defer abort but only if we haven't committed (we'll set it to nil after committing)
if iltxn != nil {
iltxn.Abort()
}
}()
iltxn.RawRead = true
// the actual save operation
if _, err := il.mmmm.storeOn(mmmtxn, iltxn, il, evt); err != nil {
return err
}
// commit in this order to minimize problematic inconsistencies
if err := mmmtxn.Commit(); err != nil {
return fmt.Errorf("can't commit mmmtxn: %w", err)
}
mmmtxn = nil
if err := iltxn.Commit(); err != nil {
return fmt.Errorf("can't commit iltxn: %w", err)
}
iltxn = nil
return nil
}
func (b *MultiMmapManager) storeOn(
mmmtxn *lmdb.Txn,
iltxn *lmdb.Txn,
il *IndexingLayer,
evt nostr.Event,
) (stored bool, err error) {
// check if we already have this id
var pos position
val, err := mmmtxn.Get(b.indexId, evt.ID[0:8])
if err == nil {
pos = positionFromBytes(val[0:12])
// we found the event, now check if it is already indexed by the layer that wants to store it
for s := 12; s < len(val); s += 2 {
ilid := binary.BigEndian.Uint16(val[s : s+2])
if il.id == ilid {
// already on the specified layer, we can end here
return false, eventstore.ErrDupEvent
}
}
} else if !lmdb.IsNotFound(err) {
// if we got an error from lmdb we will only proceed if it's NotFound -- for anything else we will error
return false, fmt.Errorf("error checking existence: %w", err)
}
// ok, now we have to write the event to the mmapped file
// unless we already have the event stored, in that case we don't have to write it again, we'll just reuse it
if val == nil {
// get event binary size
pos = position{
size: uint32(betterbinary.Measure(evt)),
}
if pos.size >= 1<<16 {
return false, fmt.Errorf("event too large to store, max %d, got %d", 1<<16, pos.size)
}
// find a suitable place for this to be stored in
appendToMmap := true
for f, fr := range b.freeRanges {
if fr.size >= pos.size {
// found the smallest possible place that can fit this event
appendToMmap = false
pos.start = fr.start
// modify the free ranges we're keeping track of
// (in case of conflict we lose this free range but it's ok, it will be recovered on the next startup)
if pos.size == fr.size {
// if we've used it entirely just delete it
b.freeRanges = slices.Delete(b.freeRanges, f, f+1)
} else {
// otherwise modify it in place
b.freeRanges[f] = position{
start: fr.start + uint64(pos.size),
size: fr.size - pos.size,
}
}
break
}
}
if appendToMmap {
// no free ranges found, so write to the end of the mmap file
pos.start = b.mmapfEnd
mmapfNewSize := int64(b.mmapfEnd) + int64(pos.size)
if err := os.Truncate(b.mmapfPath, mmapfNewSize); err != nil {
return false, fmt.Errorf("error increasing %s: %w", b.mmapfPath, err)
}
b.mmapfEnd = uint64(mmapfNewSize)
}
// write to the mmap
if err := betterbinary.Marshal(evt, b.mmapf[pos.start:]); err != nil {
return false, fmt.Errorf("error marshaling to %d: %w", pos.start, err)
}
// msync
_, _, errno := syscall.Syscall(syscall.SYS_MSYNC,
uintptr(unsafe.Pointer(&b.mmapf[0])), uintptr(len(b.mmapf)), syscall.MS_SYNC)
if errno != 0 {
panic(fmt.Errorf("msync failed: %w", syscall.Errno(errno)))
}
// prepare value to be saved in the id index (if we didn't have it already)
// val: [posb][layerIdRefs...]
val = make([]byte, 12, 12+2) // only reserve room for one layer after the position
writeBytesFromPosition(val, pos)
}
// generate and save indexes
for k := range il.getIndexKeysForEvent(evt) {
if err := iltxn.Put(k.dbi, k.key, val[0:12] /* pos */, 0); err != nil {
b.Logger.Warn().Str("name", il.name).Msg("failed to index event on layer")
}
}
// add layer to the id index val
val = binary.BigEndian.AppendUint16(val, il.id)
// store the id index with the new layer reference
if err := mmmtxn.Put(b.indexId, evt.ID[0:8], val, 0); err != nil {
return false, fmt.Errorf("failed to store %x by id: %w", evt.ID[:], err)
}
return true, nil
}