eventstore: delete badger, it's too buggy.
This commit is contained in:
@@ -1,61 +0,0 @@
|
||||
package badger
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBasicStoreAndQuery(t *testing.T) {
|
||||
// create a temporary directory for the test database
|
||||
dir, err := os.MkdirTemp("", "badger-test-*")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
// initialize the store
|
||||
db := &BadgerBackend{Path: dir}
|
||||
err = db.Init()
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
// create a test event
|
||||
evt := nostr.Event{
|
||||
Content: "hello world",
|
||||
CreatedAt: 1000,
|
||||
Kind: 1,
|
||||
Tags: nostr.Tags{},
|
||||
}
|
||||
err = evt.Sign(nostr.Generate())
|
||||
require.NoError(t, err)
|
||||
|
||||
// save the event
|
||||
err = db.SaveEvent(evt)
|
||||
require.NoError(t, err)
|
||||
|
||||
// try to save it again, should fail with ErrDupEvent
|
||||
err = db.SaveEvent(evt)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, eventstore.ErrDupEvent, err)
|
||||
|
||||
// query the event by its ID
|
||||
filter := nostr.Filter{
|
||||
IDs: []nostr.ID{evt.ID},
|
||||
}
|
||||
|
||||
// collect results
|
||||
results := make([]nostr.Event, 0)
|
||||
for event := range db.QueryEvents(filter, 500) {
|
||||
results = append(results, event)
|
||||
}
|
||||
|
||||
// verify we got exactly one event and it matches
|
||||
require.Len(t, results, 1)
|
||||
require.Equal(t, evt.ID, results[0].ID)
|
||||
require.Equal(t, evt.Content, results[0].Content)
|
||||
require.Equal(t, evt.CreatedAt, results[0].CreatedAt)
|
||||
require.Equal(t, evt.Kind, results[0].Kind)
|
||||
require.Equal(t, evt.PubKey, results[0].PubKey)
|
||||
}
|
||||
@@ -1,167 +0,0 @@
|
||||
package badger
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"log"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore/codec/betterbinary"
|
||||
"fiatjaf.com/nostr/nip45/hyperloglog"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
)
|
||||
|
||||
func (b *BadgerBackend) CountEvents(filter nostr.Filter) (uint32, error) {
|
||||
var count uint32 = 0
|
||||
|
||||
queries, extraFilter, since, err := prepareQueries(filter)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
err = b.View(func(txn *badger.Txn) error {
|
||||
// iterate only through keys and in reverse order
|
||||
opts := badger.IteratorOptions{
|
||||
Reverse: true,
|
||||
}
|
||||
|
||||
// actually iterate
|
||||
for _, q := range queries {
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
for it.Seek(q.startingPoint); it.ValidForPrefix(q.prefix); it.Next() {
|
||||
item := it.Item()
|
||||
key := item.Key()
|
||||
|
||||
idxOffset := len(key) - 4 // this is where the idx actually starts
|
||||
|
||||
// "id" indexes don't contain a timestamp
|
||||
if !q.skipTimestamp {
|
||||
createdAt := binary.BigEndian.Uint32(key[idxOffset-4 : idxOffset])
|
||||
if createdAt < since {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
idx := make([]byte, 5)
|
||||
idx[0] = rawEventStorePrefix
|
||||
copy(idx[1:], key[idxOffset:])
|
||||
|
||||
if extraFilter == nil {
|
||||
count++
|
||||
} else {
|
||||
// fetch actual event
|
||||
item, err := txn.Get(idx)
|
||||
if err != nil {
|
||||
if err == badger.ErrDiscardedTxn {
|
||||
return err
|
||||
}
|
||||
log.Printf("badger: count (%v) failed to get %d from raw event store: %s\n", q, idx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = item.Value(func(bin []byte) error {
|
||||
evt := nostr.Event{}
|
||||
if err := betterbinary.Unmarshal(bin, &evt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check if this matches the other filters that were not part of the index
|
||||
if extraFilter.Matches(evt) {
|
||||
count++
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("badger: count value read error: %s\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return count, err
|
||||
}
|
||||
|
||||
func (b *BadgerBackend) CountEventsHLL(filter nostr.Filter, offset int) (uint32, *hyperloglog.HyperLogLog, error) {
|
||||
var count uint32 = 0
|
||||
|
||||
queries, extraFilter, since, err := prepareQueries(filter)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
hll := hyperloglog.New(offset)
|
||||
|
||||
err = b.View(func(txn *badger.Txn) error {
|
||||
// iterate only through keys and in reverse order
|
||||
opts := badger.IteratorOptions{
|
||||
Reverse: true,
|
||||
}
|
||||
|
||||
// actually iterate
|
||||
for _, q := range queries {
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
for it.Seek(q.startingPoint); it.ValidForPrefix(q.prefix); it.Next() {
|
||||
item := it.Item()
|
||||
key := item.Key()
|
||||
|
||||
idxOffset := len(key) - 4 // this is where the idx actually starts
|
||||
|
||||
// "id" indexes don't contain a timestamp
|
||||
if !q.skipTimestamp {
|
||||
createdAt := binary.BigEndian.Uint32(key[idxOffset-4 : idxOffset])
|
||||
if createdAt < since {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
idx := make([]byte, 5)
|
||||
idx[0] = rawEventStorePrefix
|
||||
copy(idx[1:], key[idxOffset:])
|
||||
|
||||
// fetch actual event
|
||||
item, err := txn.Get(idx)
|
||||
if err != nil {
|
||||
if err == badger.ErrDiscardedTxn {
|
||||
return err
|
||||
}
|
||||
log.Printf("badger: count (%v) failed to get %d from raw event store: %s\n", q, idx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = item.Value(func(bin []byte) error {
|
||||
if extraFilter == nil {
|
||||
hll.AddBytes(betterbinary.GetPubKey(bin))
|
||||
count++
|
||||
return nil
|
||||
}
|
||||
|
||||
evt := nostr.Event{}
|
||||
if err := betterbinary.Unmarshal(bin, &evt); err != nil {
|
||||
return err
|
||||
}
|
||||
if extraFilter.Matches(evt) {
|
||||
hll.Add(evt.PubKey)
|
||||
count++
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("badger: count value read error: %s\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return count, hll, err
|
||||
}
|
||||
@@ -1,83 +0,0 @@
|
||||
package badger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore/codec/betterbinary"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
)
|
||||
|
||||
var serialDelete uint32 = 0
|
||||
|
||||
func (b *BadgerBackend) DeleteEvent(id nostr.ID) error {
|
||||
deletionHappened := false
|
||||
|
||||
err := b.Update(func(txn *badger.Txn) error {
|
||||
var err error
|
||||
deletionHappened, err = b.delete(txn, id)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// after deleting, run garbage collector (sometimes)
|
||||
if deletionHappened {
|
||||
serialDelete = (serialDelete + 1) % 256
|
||||
if serialDelete == 0 {
|
||||
if err := b.RunValueLogGC(0.8); err != nil && err != badger.ErrNoRewrite {
|
||||
log.Println("badger gc errored:" + err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BadgerBackend) delete(txn *badger.Txn, id nostr.ID) (bool, error) {
|
||||
idx := make([]byte, 1, 5)
|
||||
idx[0] = rawEventStorePrefix
|
||||
|
||||
// query event by id to get its idx
|
||||
prefix := make([]byte, 1+8)
|
||||
prefix[0] = indexIdPrefix
|
||||
copy(prefix[1:], id[0:8])
|
||||
opts := badger.IteratorOptions{
|
||||
PrefetchValues: false,
|
||||
}
|
||||
|
||||
// also grab the actual event so we can calculate its indexes
|
||||
var evt nostr.Event
|
||||
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
it.Seek(prefix)
|
||||
if it.ValidForPrefix(prefix) {
|
||||
idx = append(idx, it.Item().Key()[1+8:1+8+4]...)
|
||||
item, err := txn.Get(idx)
|
||||
if err == badger.ErrKeyNotFound {
|
||||
// this event doesn't exist or is already deleted
|
||||
return false, nil
|
||||
} else if err != nil {
|
||||
return false, fmt.Errorf("failed to fetch event %x to delete: %w", id[:], err)
|
||||
} else {
|
||||
if err := item.Value(func(bin []byte) error {
|
||||
return betterbinary.Unmarshal(bin, &evt)
|
||||
}); err != nil {
|
||||
return false, fmt.Errorf("failed to unmarshal event %x to delete: %w", id[:], err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// calculate all index keys we have for this event and delete them
|
||||
for k := range b.getIndexKeysForEvent(evt, idx[1:]) {
|
||||
if err := txn.Delete(k); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
// delete the raw event
|
||||
return true, txn.Delete(idx)
|
||||
}
|
||||
@@ -1,150 +0,0 @@
|
||||
package badger
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func FuzzQuery(f *testing.F) {
|
||||
f.Add(uint(200), uint(50), uint(13), uint(2), uint(2), uint(0), uint(1))
|
||||
f.Fuzz(func(t *testing.T, total, limit, authors, timestampAuthorFactor, seedFactor, kinds, kindFactor uint) {
|
||||
total++
|
||||
authors++
|
||||
seedFactor++
|
||||
kindFactor++
|
||||
if kinds == 1 {
|
||||
kinds++
|
||||
}
|
||||
if limit == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// ~ setup db
|
||||
|
||||
bdb, err := badger.Open(badger.DefaultOptions("").WithInMemory(true))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create database: %s", err)
|
||||
return
|
||||
}
|
||||
db := &BadgerBackend{}
|
||||
db.DB = bdb
|
||||
|
||||
if err := db.migrate(); err != nil {
|
||||
t.Fatalf("error: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := db.DB.View(func(txn *badger.Txn) error {
|
||||
it := txn.NewIterator(badger.IteratorOptions{
|
||||
Prefix: []byte{0},
|
||||
Reverse: true,
|
||||
})
|
||||
it.Seek([]byte{1})
|
||||
if it.Valid() {
|
||||
key := it.Item().Key()
|
||||
idx := key[1:]
|
||||
serial := binary.BigEndian.Uint32(idx)
|
||||
db.serial.Store(serial)
|
||||
}
|
||||
it.Close()
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("failed to initialize serial: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
defer db.Close()
|
||||
|
||||
// ~ start actual test
|
||||
|
||||
filter := nostr.Filter{
|
||||
Authors: make([]nostr.PubKey, authors),
|
||||
Limit: int(limit),
|
||||
}
|
||||
var maxKind nostr.Kind = 1
|
||||
if kinds > 0 {
|
||||
filter.Kinds = make([]nostr.Kind, kinds)
|
||||
for i := range filter.Kinds {
|
||||
filter.Kinds[i] = nostr.Kind(kindFactor) * nostr.Kind(i)
|
||||
}
|
||||
maxKind = filter.Kinds[len(filter.Kinds)-1]
|
||||
}
|
||||
|
||||
for i := 0; i < int(authors); i++ {
|
||||
sk := nostr.SecretKey{}
|
||||
binary.BigEndian.PutUint32(sk[:], uint32(i%int(authors*seedFactor))+1)
|
||||
pk := nostr.GetPublicKey(sk)
|
||||
filter.Authors[i] = pk
|
||||
}
|
||||
|
||||
expected := make([]nostr.Event, 0, total)
|
||||
for i := 0; i < int(total); i++ {
|
||||
skseed := uint32(i%int(authors*seedFactor)) + 1
|
||||
sk := nostr.SecretKey{}
|
||||
binary.BigEndian.PutUint32(sk[:], skseed)
|
||||
|
||||
evt := nostr.Event{
|
||||
CreatedAt: nostr.Timestamp(skseed)*nostr.Timestamp(timestampAuthorFactor) + nostr.Timestamp(i),
|
||||
Content: fmt.Sprintf("unbalanced %d", i),
|
||||
Tags: nostr.Tags{},
|
||||
Kind: nostr.Kind(i) % maxKind,
|
||||
}
|
||||
err := evt.Sign(sk)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.SaveEvent(evt)
|
||||
require.NoError(t, err)
|
||||
|
||||
if filter.Matches(evt) {
|
||||
expected = append(expected, evt)
|
||||
}
|
||||
}
|
||||
|
||||
slices.SortFunc(expected, nostr.CompareEventReverse)
|
||||
if len(expected) > int(limit) {
|
||||
expected = expected[0:limit]
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
// fmt.Println(filter)
|
||||
res := slices.Collect(db.QueryEvents(filter, 500))
|
||||
end := time.Now()
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(expected), len(res), "number of results is different than expected")
|
||||
|
||||
require.Less(t, end.Sub(start).Milliseconds(), int64(1500), "query took too long")
|
||||
require.True(t, slices.IsSortedFunc(res, func(a, b nostr.Event) int { return cmp.Compare(b.CreatedAt, a.CreatedAt) }), "results are not sorted")
|
||||
|
||||
nresults := len(expected)
|
||||
|
||||
getTimestamps := func(events []nostr.Event) []nostr.Timestamp {
|
||||
res := make([]nostr.Timestamp, len(events))
|
||||
for i, evt := range events {
|
||||
res[i] = evt.CreatedAt
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// fmt.Println(" expected result")
|
||||
// for i := range expected {
|
||||
// fmt.Println(" ", expected[i].CreatedAt, expected[i].ID[0:8], " ", res[i].CreatedAt, res[i].ID[0:8], " ", i)
|
||||
// }
|
||||
|
||||
require.Equal(t, expected[0].CreatedAt, res[0].CreatedAt, "first result is wrong")
|
||||
require.Equal(t, expected[nresults-1].CreatedAt, res[nresults-1].CreatedAt, "last result is wrong")
|
||||
require.Equal(t, getTimestamps(expected), getTimestamps(res))
|
||||
|
||||
for _, evt := range res {
|
||||
require.True(t, filter.Matches(evt), "event %s doesn't match filter %s", evt, filter)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -1,166 +0,0 @@
|
||||
package badger
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"iter"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
)
|
||||
|
||||
func getTagIndexPrefix(tagName string, tagValue string) ([]byte, int) {
|
||||
var k []byte // the key with full length for created_at and idx at the end, but not filled with these
|
||||
var offset int // the offset -- i.e. where the prefix ends and the created_at and idx would start
|
||||
|
||||
letterPrefix := byte(int(tagName[0]) % 256)
|
||||
|
||||
if kind, pkb, d := getAddrTagElements(tagValue); len(pkb) == 32 {
|
||||
// store value in the new special "a"-style tag index
|
||||
k = make([]byte, 1+1+2+8+len(d)+4+4)
|
||||
k[0] = indexTagAddrPrefix
|
||||
k[1] = letterPrefix
|
||||
binary.BigEndian.PutUint16(k[1+1:], uint16(kind))
|
||||
copy(k[1+1+2:], pkb[0:8])
|
||||
copy(k[1+1+2+8:], d)
|
||||
offset = 1 + 1 + 2 + 8 + len(d)
|
||||
} else if vb, _ := hex.DecodeString(tagValue); len(vb) == 32 {
|
||||
// store value as bytes with tag name prefix
|
||||
k = make([]byte, 1+1+8+4+4)
|
||||
k[0] = indexTag32Prefix
|
||||
k[1] = letterPrefix
|
||||
copy(k[2:], vb[0:8])
|
||||
offset = 1 + 1 + 8
|
||||
} else {
|
||||
// store whatever as utf-8 with tag name prefix
|
||||
k = make([]byte, 1+1+len(tagValue)+4+4)
|
||||
k[0] = indexTagPrefix
|
||||
k[1] = letterPrefix
|
||||
copy(k[2:], tagValue)
|
||||
offset = 1 + 1 + len(tagValue)
|
||||
}
|
||||
|
||||
return k, offset
|
||||
}
|
||||
|
||||
func (b *BadgerBackend) getIndexKeysForEvent(evt nostr.Event, idx []byte) iter.Seq[[]byte] {
|
||||
return func(yield func([]byte) bool) {
|
||||
{
|
||||
// ~ by id
|
||||
k := make([]byte, 1+8+4)
|
||||
k[0] = indexIdPrefix
|
||||
copy(k[1:], evt.ID[0:8])
|
||||
copy(k[1+8:], idx)
|
||||
if !yield(k) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// ~ by pubkey+date
|
||||
k := make([]byte, 1+8+4+4)
|
||||
k[0] = indexPubkeyPrefix
|
||||
copy(k[1:], evt.PubKey[0:8])
|
||||
binary.BigEndian.PutUint32(k[1+8:], uint32(evt.CreatedAt))
|
||||
copy(k[1+8+4:], idx)
|
||||
if !yield(k) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// ~ by kind+date
|
||||
k := make([]byte, 1+2+4+4)
|
||||
k[0] = indexKindPrefix
|
||||
binary.BigEndian.PutUint16(k[1:], uint16(evt.Kind))
|
||||
binary.BigEndian.PutUint32(k[1+2:], uint32(evt.CreatedAt))
|
||||
copy(k[1+2+4:], idx)
|
||||
if !yield(k) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// ~ by pubkey+kind+date
|
||||
k := make([]byte, 1+8+2+4+4)
|
||||
k[0] = indexPubkeyKindPrefix
|
||||
copy(k[1:], evt.PubKey[0:8])
|
||||
binary.BigEndian.PutUint16(k[1+8:], uint16(evt.Kind))
|
||||
binary.BigEndian.PutUint32(k[1+8+2:], uint32(evt.CreatedAt))
|
||||
copy(k[1+8+2+4:], idx)
|
||||
if !yield(k) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// ~ by tagvalue+date
|
||||
customIndex := b.IndexLongerTag != nil
|
||||
customSkip := b.SkipIndexingTag != nil
|
||||
|
||||
for i, tag := range evt.Tags {
|
||||
if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 {
|
||||
if !customIndex || !b.IndexLongerTag(evt, tag[0], tag[1]) {
|
||||
// not indexable
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
firstIndex := slices.IndexFunc(evt.Tags, func(t nostr.Tag) bool {
|
||||
return len(t) >= 2 && t[0] == tag[0] && t[1] == tag[1]
|
||||
})
|
||||
if firstIndex != i {
|
||||
// duplicate
|
||||
continue
|
||||
}
|
||||
|
||||
if customSkip && b.SkipIndexingTag(evt, tag[0], tag[1]) {
|
||||
// purposefully skipped
|
||||
continue
|
||||
}
|
||||
|
||||
// get key prefix (with full length) and offset where to write the last parts
|
||||
k, offset := getTagIndexPrefix(tag[0], tag[1])
|
||||
|
||||
// write the last parts (created_at and idx)
|
||||
binary.BigEndian.PutUint32(k[offset:], uint32(evt.CreatedAt))
|
||||
copy(k[offset+4:], idx)
|
||||
if !yield(k) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// ~ by date only
|
||||
k := make([]byte, 1+4+4)
|
||||
k[0] = indexCreatedAtPrefix
|
||||
binary.BigEndian.PutUint32(k[1:], uint32(evt.CreatedAt))
|
||||
copy(k[1+4:], idx)
|
||||
if !yield(k) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getAddrTagElements(tagValue string) (kind nostr.Kind, pkb []byte, d string) {
|
||||
spl := strings.Split(tagValue, ":")
|
||||
if len(spl) == 3 {
|
||||
if pkb, _ := hex.DecodeString(spl[1]); len(pkb) == 32 {
|
||||
if kind, err := strconv.ParseUint(spl[0], 10, 16); err == nil {
|
||||
return nostr.Kind(kind), pkb, spl[2]
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0, nil, ""
|
||||
}
|
||||
|
||||
func filterMatchesTags(ef nostr.Filter, event nostr.Event) bool {
|
||||
for f, v := range ef.Tags {
|
||||
if v != nil && !event.Tags.ContainsAny(f, v) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -1,89 +0,0 @@
|
||||
package badger
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
)
|
||||
|
||||
const (
|
||||
dbVersionKey byte = 255
|
||||
rawEventStorePrefix byte = 0
|
||||
indexCreatedAtPrefix byte = 1
|
||||
indexIdPrefix byte = 2
|
||||
indexKindPrefix byte = 3
|
||||
indexPubkeyPrefix byte = 4
|
||||
indexPubkeyKindPrefix byte = 5
|
||||
indexTagPrefix byte = 6
|
||||
indexTag32Prefix byte = 7
|
||||
indexTagAddrPrefix byte = 8
|
||||
)
|
||||
|
||||
var _ eventstore.Store = (*BadgerBackend)(nil)
|
||||
|
||||
type BadgerBackend struct {
|
||||
Path string
|
||||
BadgerOptionsModifier func(badger.Options) badger.Options
|
||||
|
||||
// Experimental
|
||||
SkipIndexingTag func(event nostr.Event, tagName string, tagValue string) bool
|
||||
// Experimental
|
||||
IndexLongerTag func(event nostr.Event, tagName string, tagValue string) bool
|
||||
|
||||
*badger.DB
|
||||
|
||||
serial atomic.Uint32
|
||||
}
|
||||
|
||||
func (b *BadgerBackend) Init() error {
|
||||
opts := badger.DefaultOptions(b.Path)
|
||||
if b.BadgerOptionsModifier != nil {
|
||||
opts = b.BadgerOptionsModifier(opts)
|
||||
}
|
||||
|
||||
db, err := badger.Open(opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.DB = db
|
||||
|
||||
if err := b.migrate(); err != nil {
|
||||
return fmt.Errorf("error running migrations: %w", err)
|
||||
}
|
||||
|
||||
if err := b.DB.View(func(txn *badger.Txn) error {
|
||||
it := txn.NewIterator(badger.IteratorOptions{
|
||||
Prefix: []byte{0},
|
||||
Reverse: true,
|
||||
})
|
||||
it.Seek([]byte{1})
|
||||
if it.Valid() {
|
||||
key := it.Item().Key()
|
||||
idx := key[1:]
|
||||
serial := binary.BigEndian.Uint32(idx)
|
||||
b.serial.Store(serial)
|
||||
}
|
||||
it.Close()
|
||||
return nil
|
||||
}); err != nil {
|
||||
return fmt.Errorf("error initializing serial: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BadgerBackend) Close() {
|
||||
b.DB.Close()
|
||||
}
|
||||
|
||||
func (b *BadgerBackend) Serial() []byte {
|
||||
next := b.serial.Add(1)
|
||||
vb := make([]byte, 5)
|
||||
vb[0] = rawEventStorePrefix
|
||||
binary.BigEndian.PutUint32(vb[1:], next)
|
||||
return vb
|
||||
}
|
||||
@@ -1,115 +0,0 @@
|
||||
package badger
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore/codec/betterbinary"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
)
|
||||
|
||||
func (b *BadgerBackend) migrate() error {
|
||||
return b.Update(func(txn *badger.Txn) error {
|
||||
item, err := txn.Get([]byte("version"))
|
||||
if err != nil && err != badger.ErrKeyNotFound {
|
||||
return fmt.Errorf("failed to get db version: %w", err)
|
||||
}
|
||||
|
||||
var version uint16 = 1
|
||||
if err == nil {
|
||||
err = item.Value(func(val []byte) error {
|
||||
version = binary.BigEndian.Uint16(val)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read db version: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
const target = 2
|
||||
|
||||
// do the migrations in increasing steps (there is no rollback)
|
||||
if version < target {
|
||||
log.Printf("[badger] migration %d: delete all indexes and recreate them\n", target)
|
||||
|
||||
// delete all index entries
|
||||
prefixes := []byte{
|
||||
indexIdPrefix,
|
||||
indexCreatedAtPrefix,
|
||||
indexKindPrefix,
|
||||
indexPubkeyPrefix,
|
||||
indexPubkeyKindPrefix,
|
||||
indexTagPrefix,
|
||||
indexTag32Prefix,
|
||||
indexTagAddrPrefix,
|
||||
}
|
||||
|
||||
for _, prefix := range prefixes {
|
||||
it := txn.NewIterator(badger.IteratorOptions{
|
||||
PrefetchValues: false,
|
||||
Prefix: []byte{prefix},
|
||||
})
|
||||
defer it.Close()
|
||||
|
||||
var keysToDelete [][]byte
|
||||
for it.Seek([]byte{prefix}); it.ValidForPrefix([]byte{prefix}); it.Next() {
|
||||
key := it.Item().Key()
|
||||
keyCopy := make([]byte, len(key))
|
||||
copy(keyCopy, key)
|
||||
keysToDelete = append(keysToDelete, keyCopy)
|
||||
}
|
||||
|
||||
for _, key := range keysToDelete {
|
||||
if err := txn.Delete(key); err != nil {
|
||||
return fmt.Errorf("failed to delete index key %x: %w", key, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// iterate through all events and recreate indexes
|
||||
it := txn.NewIterator(badger.IteratorOptions{
|
||||
PrefetchValues: true,
|
||||
Prefix: []byte{rawEventStorePrefix},
|
||||
})
|
||||
defer it.Close()
|
||||
|
||||
for it.Seek([]byte{rawEventStorePrefix}); it.ValidForPrefix([]byte{rawEventStorePrefix}); it.Next() {
|
||||
item := it.Item()
|
||||
idx := item.Key()
|
||||
|
||||
err := item.Value(func(val []byte) error {
|
||||
evt := nostr.Event{}
|
||||
if err := betterbinary.Unmarshal(val, &evt); err != nil {
|
||||
return fmt.Errorf("error decoding event %x on migration %d: %w", idx, target, err)
|
||||
}
|
||||
|
||||
for key := range b.getIndexKeysForEvent(evt, idx[1:]) {
|
||||
if err := txn.Set(key, nil); err != nil {
|
||||
return fmt.Errorf("failed to save index for event %s on migration %d: %w", evt.ID, target, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// bump version
|
||||
if err := b.bumpVersion(txn, target); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (b *BadgerBackend) bumpVersion(txn *badger.Txn, v uint16) error {
|
||||
var newVersion [2]byte
|
||||
binary.BigEndian.PutUint16(newVersion[:], v)
|
||||
return txn.Set([]byte("version"), newVersion[:])
|
||||
}
|
||||
@@ -1,416 +0,0 @@
|
||||
package badger
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"iter"
|
||||
"log"
|
||||
"slices"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore/codec/betterbinary"
|
||||
"fiatjaf.com/nostr/eventstore/internal"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
)
|
||||
|
||||
var batchFilled = errors.New("batch-filled")
|
||||
|
||||
func (b *BadgerBackend) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[nostr.Event] {
|
||||
return func(yield func(nostr.Event) bool) {
|
||||
if filter.Search != "" {
|
||||
return
|
||||
}
|
||||
|
||||
// max number of events we'll return
|
||||
if tlimit := filter.GetTheoreticalLimit(); tlimit == 0 || filter.LimitZero {
|
||||
return
|
||||
} else if tlimit < maxLimit {
|
||||
maxLimit = tlimit
|
||||
}
|
||||
if filter.Limit > 0 && filter.Limit < maxLimit {
|
||||
maxLimit = filter.Limit
|
||||
}
|
||||
|
||||
b.View(func(txn *badger.Txn) error {
|
||||
results, err := b.query(txn, filter, maxLimit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, evt := range results {
|
||||
if !yield(evt.Event) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BadgerBackend) query(txn *badger.Txn, filter nostr.Filter, limit int) ([]internal.IterEvent, error) {
|
||||
queries, extraFilter, since, err := prepareQueries(filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
iterators := make([]*badger.Iterator, len(queries))
|
||||
exhausted := make([]bool, len(queries)) // indicates that a query won't be used anymore
|
||||
results := make([][]internal.IterEvent, len(queries))
|
||||
pulledPerQuery := make([]int, len(queries))
|
||||
|
||||
// these are kept updated so we never pull from the iterator that is at further distance
|
||||
// (i.e. the one that has the oldest event among all)
|
||||
// we will continue to pull from it as soon as some other iterator takes the position
|
||||
oldest := internal.IterEvent{Q: -1}
|
||||
|
||||
sndPhase := false // after we have gathered enough events we will change the way we iterate
|
||||
secondBatch := make([][]internal.IterEvent, 0, len(queries)+1)
|
||||
sndPhaseParticipants := make([]int, 0, len(queries)+1)
|
||||
|
||||
// while merging results in the second phase we will alternate between these two lists
|
||||
// to avoid having to create new lists all the time
|
||||
var sndPhaseResultsA []internal.IterEvent
|
||||
var sndPhaseResultsB []internal.IterEvent
|
||||
var sndPhaseResultsToggle bool // this is just a dummy thing we use to keep track of the alternating
|
||||
var sndPhaseHasResultsPending bool
|
||||
|
||||
remainingUnexhausted := len(queries) // when all queries are exhausted we can finally end this thing
|
||||
batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted)
|
||||
firstPhaseTotalPulled := 0
|
||||
|
||||
exhaust := func(q int) {
|
||||
exhausted[q] = true
|
||||
remainingUnexhausted--
|
||||
if q == oldest.Q {
|
||||
oldest = internal.IterEvent{Q: -1}
|
||||
}
|
||||
}
|
||||
|
||||
var firstPhaseResults []internal.IterEvent
|
||||
|
||||
for q := range queries {
|
||||
iterators[q] = txn.NewIterator(badger.IteratorOptions{
|
||||
Reverse: true,
|
||||
PrefetchValues: false, // we don't even have values, only keys
|
||||
Prefix: queries[q].prefix,
|
||||
})
|
||||
defer iterators[q].Close()
|
||||
iterators[q].Seek(queries[q].startingPoint)
|
||||
results[q] = make([]internal.IterEvent, 0, batchSizePerQuery*2)
|
||||
}
|
||||
|
||||
// we will reuse this throughout the iteration
|
||||
valIdx := make([]byte, 5)
|
||||
|
||||
// fmt.Println("queries", len(queries))
|
||||
|
||||
for c := 0; ; c++ {
|
||||
batchSizePerQuery = internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted)
|
||||
|
||||
// fmt.Println(" iteration", c, "remaining", remainingUnexhausted, "batchsize", batchSizePerQuery)
|
||||
// we will go through all the iterators in batches until we have pulled all the required results
|
||||
for q, query := range queries {
|
||||
if exhausted[q] {
|
||||
continue
|
||||
}
|
||||
if oldest.Q == q && remainingUnexhausted > 1 {
|
||||
continue
|
||||
}
|
||||
// fmt.Println(" query", q, unsafe.Pointer(&results[q]), hex.EncodeToString(query.prefix), len(results[q]))
|
||||
|
||||
it := iterators[q]
|
||||
pulledThisIteration := 0
|
||||
|
||||
for {
|
||||
if !it.Valid() {
|
||||
// fmt.Println(" reached end")
|
||||
exhaust(q)
|
||||
break
|
||||
}
|
||||
|
||||
item := it.Item()
|
||||
key := item.Key()
|
||||
|
||||
idxOffset := len(key) - 4 // this is where the idx actually starts
|
||||
|
||||
// "id" indexes don't contain a timestamp
|
||||
if !query.skipTimestamp {
|
||||
createdAt := binary.BigEndian.Uint32(key[idxOffset-4 : idxOffset])
|
||||
if createdAt < since {
|
||||
// fmt.Println(" reached since", createdAt, "<", since)
|
||||
exhaust(q)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
valIdx[0] = rawEventStorePrefix
|
||||
copy(valIdx[1:], key[idxOffset:])
|
||||
|
||||
// fetch actual event
|
||||
item, err := txn.Get(valIdx)
|
||||
if err != nil {
|
||||
if err == badger.ErrDiscardedTxn {
|
||||
return nil, err
|
||||
}
|
||||
log.Printf("badger: failed to get %x based on prefix %x, index key %x from raw event store: %s\n",
|
||||
valIdx, query.prefix, key, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := item.Value(func(bin []byte) error {
|
||||
// fmt.Println(" event", betterbinary.GetID(bin), "kind", betterbinary.GetKind(bin).Num(), "author", betterbinary.GetPubKey(bin), "ts", betterbinary.GetCreatedAt(bin))
|
||||
|
||||
// check it against pubkeys without decoding the entire thing
|
||||
if extraFilter != nil && extraFilter.Authors != nil &&
|
||||
!nostr.ContainsPubKey(extraFilter.Authors, betterbinary.GetPubKey(bin)) {
|
||||
// fmt.Println(" skipped (authors)")
|
||||
return nil
|
||||
}
|
||||
|
||||
// check it against kinds without decoding the entire thing
|
||||
if extraFilter != nil && extraFilter.Kinds != nil &&
|
||||
!slices.Contains(extraFilter.Kinds, betterbinary.GetKind(bin)) {
|
||||
// fmt.Println(" skipped (kinds)")
|
||||
return nil
|
||||
}
|
||||
|
||||
event := nostr.Event{}
|
||||
if err := betterbinary.Unmarshal(bin, &event); err != nil {
|
||||
log.Printf("badger: value read error (id %x): %s\n", betterbinary.GetID(bin), err)
|
||||
return err
|
||||
}
|
||||
|
||||
// check if this matches the other filters that were not part of the index
|
||||
if extraFilter != nil && !filterMatchesTags(*extraFilter, event) {
|
||||
// fmt.Println(" skipped (filter)", extraFilter, event)
|
||||
return nil
|
||||
}
|
||||
|
||||
// this event is good to be used
|
||||
evt := internal.IterEvent{Event: event, Q: q}
|
||||
//
|
||||
//
|
||||
if sndPhase {
|
||||
// do the process described below at HIWAWVRTP.
|
||||
// if we've reached here this means we've already passed the `since` check.
|
||||
// now we have to eliminate the event currently at the `since` threshold.
|
||||
nextThreshold := firstPhaseResults[len(firstPhaseResults)-2]
|
||||
if oldest.Event.ID == nostr.ZeroID {
|
||||
// fmt.Println(" b1")
|
||||
// BRANCH WHEN WE DON'T HAVE THE OLDEST EVENT (BWWDHTOE)
|
||||
// when we don't have the oldest set, we will keep the results
|
||||
// and not change the cutting point -- it's bad, but hopefully not that bad.
|
||||
results[q] = append(results[q], evt)
|
||||
sndPhaseHasResultsPending = true
|
||||
} else if nextThreshold.CreatedAt > oldest.CreatedAt {
|
||||
// fmt.Println(" b2", nextThreshold.CreatedAt, ">", oldest.CreatedAt)
|
||||
// one of the events we have stored is the actual next threshold
|
||||
// eliminate last, update since with oldest
|
||||
firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1]
|
||||
since = uint32(oldest.CreatedAt)
|
||||
// fmt.Println(" new since", since)
|
||||
// we null the oldest Event as we can't rely on it anymore
|
||||
// (we'll fall under BWWDHTOE above) until we have a new oldest set.
|
||||
oldest = internal.IterEvent{Q: -1}
|
||||
// anything we got that would be above this won't trigger an update to
|
||||
// the oldest anyway, because it will be discarded as being after the limit.
|
||||
//
|
||||
// finally
|
||||
// add this to the results to be merged later
|
||||
results[q] = append(results[q], evt)
|
||||
sndPhaseHasResultsPending = true
|
||||
} else if nextThreshold.CreatedAt < evt.CreatedAt {
|
||||
// the next last event in the firstPhaseResults is the next threshold
|
||||
// fmt.Println(" b3", nextThreshold.CreatedAt, "<", oldest.CreatedAt)
|
||||
// eliminate last, update since with the antelast
|
||||
firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1]
|
||||
since = uint32(nextThreshold.CreatedAt)
|
||||
// fmt.Println(" new since", since)
|
||||
// add this to the results to be merged later
|
||||
results[q] = append(results[q], evt)
|
||||
sndPhaseHasResultsPending = true
|
||||
// update the oldest event
|
||||
if evt.CreatedAt < oldest.CreatedAt {
|
||||
oldest = evt
|
||||
}
|
||||
} else {
|
||||
// fmt.Println(" b4")
|
||||
// oops, _we_ are the next `since` threshold
|
||||
firstPhaseResults[len(firstPhaseResults)-1] = evt
|
||||
since = uint32(evt.CreatedAt)
|
||||
// fmt.Println(" new since", since)
|
||||
// do not add us to the results to be merged later
|
||||
// as we're already inhabiting the firstPhaseResults slice
|
||||
}
|
||||
} else {
|
||||
results[q] = append(results[q], evt)
|
||||
firstPhaseTotalPulled++
|
||||
|
||||
// update the oldest event
|
||||
if oldest.Event.ID == nostr.ZeroID || evt.CreatedAt < oldest.CreatedAt {
|
||||
oldest = evt
|
||||
}
|
||||
}
|
||||
|
||||
pulledPerQuery[q]++
|
||||
pulledThisIteration++
|
||||
if pulledThisIteration > batchSizePerQuery {
|
||||
return batchFilled
|
||||
}
|
||||
if pulledPerQuery[q] >= limit {
|
||||
exhaust(q)
|
||||
return batchFilled
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err == batchFilled {
|
||||
// fmt.Println(" #")
|
||||
it.Next()
|
||||
break
|
||||
} else if err != nil {
|
||||
return nil, fmt.Errorf("iteration error: %w", err)
|
||||
}
|
||||
|
||||
it.Next()
|
||||
}
|
||||
}
|
||||
|
||||
// we will do this check if we don't accumulated the requested number of events yet
|
||||
// fmt.Println("oldest", oldest.Event, "from iter", oldest.Q)
|
||||
if sndPhase && sndPhaseHasResultsPending && (oldest.Event.ID == nostr.ZeroID || remainingUnexhausted == 0) {
|
||||
// fmt.Println("second phase aggregation!")
|
||||
// when we are in the second phase we will aggressively aggregate results on every iteration
|
||||
//
|
||||
secondBatch = secondBatch[:0]
|
||||
for s := 0; s < len(sndPhaseParticipants); s++ {
|
||||
q := sndPhaseParticipants[s]
|
||||
|
||||
if len(results[q]) > 0 {
|
||||
secondBatch = append(secondBatch, results[q])
|
||||
}
|
||||
|
||||
if exhausted[q] {
|
||||
sndPhaseParticipants = internal.SwapDelete(sndPhaseParticipants, s)
|
||||
s--
|
||||
}
|
||||
}
|
||||
|
||||
// every time we get here we will alternate between these A and B lists
|
||||
// combining everything we have into a new partial results list.
|
||||
// after we've done that we can again set the oldest.
|
||||
// fmt.Println(" xxx", sndPhaseResultsToggle)
|
||||
if sndPhaseResultsToggle {
|
||||
secondBatch = append(secondBatch, sndPhaseResultsB)
|
||||
sndPhaseResultsA = internal.MergeSortMultiple(secondBatch, limit, sndPhaseResultsA)
|
||||
oldest = sndPhaseResultsA[len(sndPhaseResultsA)-1]
|
||||
// fmt.Println(" new aggregated a", len(sndPhaseResultsB))
|
||||
} else {
|
||||
secondBatch = append(secondBatch, sndPhaseResultsA)
|
||||
sndPhaseResultsB = internal.MergeSortMultiple(secondBatch, limit, sndPhaseResultsB)
|
||||
oldest = sndPhaseResultsB[len(sndPhaseResultsB)-1]
|
||||
// fmt.Println(" new aggregated b", len(sndPhaseResultsB))
|
||||
}
|
||||
sndPhaseResultsToggle = !sndPhaseResultsToggle
|
||||
|
||||
since = uint32(oldest.CreatedAt)
|
||||
// fmt.Println(" new since", since)
|
||||
|
||||
// reset the `results` list so we can keep using it
|
||||
results = results[:len(queries)]
|
||||
for _, q := range sndPhaseParticipants {
|
||||
results[q] = results[q][:0]
|
||||
}
|
||||
} else if !sndPhase && firstPhaseTotalPulled >= limit && remainingUnexhausted > 0 {
|
||||
// fmt.Println("have enough!", firstPhaseTotalPulled, "/", limit, "remaining", remainingUnexhausted)
|
||||
|
||||
// we will exclude this oldest number as it is not relevant anymore
|
||||
// (we now want to keep track only of the oldest among the remaining iterators)
|
||||
oldest = internal.IterEvent{Q: -1}
|
||||
|
||||
// HOW IT WORKS AFTER WE'VE REACHED THIS POINT (HIWAWVRTP)
|
||||
// now we can combine the results we have and check what is our current oldest event.
|
||||
// we also discard anything that is after the current cutting point (`limit`).
|
||||
// so if we have [1,2,3], [10, 15, 20] and [7, 21, 49] but we only want 6 total
|
||||
// we can just keep [1,2,3,7,10,15] and discard [20, 21, 49],
|
||||
// and also adjust our `since` parameter to `15`, discarding anything we get after it
|
||||
// and immediately declaring that iterator exhausted.
|
||||
// also every time we get result that is more recent than this updated `since` we can
|
||||
// keep it but also discard the previous since, moving the needle one back -- for example,
|
||||
// if we get an `8` we can keep it and move the `since` parameter to `10`, discarding `15`
|
||||
// in the process.
|
||||
all := make([][]internal.IterEvent, len(results))
|
||||
copy(all, results) // we have to use this otherwise mergeSortMultiple will scramble our results slice
|
||||
firstPhaseResults = internal.MergeSortMultiple(all, limit, nil)
|
||||
oldest = firstPhaseResults[limit-1]
|
||||
since = uint32(oldest.CreatedAt)
|
||||
// fmt.Println("new since", since)
|
||||
|
||||
for q := range queries {
|
||||
if exhausted[q] {
|
||||
continue
|
||||
}
|
||||
|
||||
// we also automatically exhaust any of the iterators that have already passed the
|
||||
// cutting point (`since`)
|
||||
if results[q][len(results[q])-1].CreatedAt < oldest.CreatedAt {
|
||||
exhausted[q] = true
|
||||
remainingUnexhausted--
|
||||
continue
|
||||
}
|
||||
|
||||
// for all the remaining iterators,
|
||||
// since we have merged all the events in this `firstPhaseResults` slice, we can empty the
|
||||
// current `results` slices and reuse them.
|
||||
results[q] = results[q][:0]
|
||||
|
||||
// build this index of indexes with everybody who remains
|
||||
sndPhaseParticipants = append(sndPhaseParticipants, q)
|
||||
}
|
||||
|
||||
// we create these two lists and alternate between them so we don't have to create a
|
||||
// a new one every time
|
||||
sndPhaseResultsA = make([]internal.IterEvent, 0, limit*2)
|
||||
sndPhaseResultsB = make([]internal.IterEvent, 0, limit*2)
|
||||
|
||||
// from now on we won't run this block anymore
|
||||
sndPhase = true
|
||||
}
|
||||
|
||||
// fmt.Println("remaining", remainingUnexhausted)
|
||||
if remainingUnexhausted == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// fmt.Println("is sndPhase?", sndPhase)
|
||||
|
||||
var combinedResults []internal.IterEvent
|
||||
|
||||
if sndPhase {
|
||||
// fmt.Println("ending second phase")
|
||||
// when we reach this point either sndPhaseResultsA or sndPhaseResultsB will be full of stuff,
|
||||
// the other will be empty
|
||||
var sndPhaseResults []internal.IterEvent
|
||||
// fmt.Println("xxx", sndPhaseResultsToggle, len(sndPhaseResultsA), len(sndPhaseResultsB))
|
||||
if sndPhaseResultsToggle {
|
||||
sndPhaseResults = sndPhaseResultsB
|
||||
combinedResults = sndPhaseResultsA[0:limit] // reuse this
|
||||
// fmt.Println(" using b", len(sndPhaseResultsA))
|
||||
} else {
|
||||
sndPhaseResults = sndPhaseResultsA
|
||||
combinedResults = sndPhaseResultsB[0:limit] // reuse this
|
||||
// fmt.Println(" using a", len(sndPhaseResultsA))
|
||||
}
|
||||
|
||||
all := [][]internal.IterEvent{firstPhaseResults, sndPhaseResults}
|
||||
combinedResults = internal.MergeSortMultiple(all, limit, combinedResults)
|
||||
// fmt.Println("final combinedResults", len(combinedResults), cap(combinedResults), limit)
|
||||
} else {
|
||||
combinedResults = make([]internal.IterEvent, limit)
|
||||
combinedResults = internal.MergeSortMultiple(results, limit, combinedResults)
|
||||
}
|
||||
|
||||
return combinedResults, nil
|
||||
}
|
||||
@@ -1,134 +0,0 @@
|
||||
package badger
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore/internal"
|
||||
)
|
||||
|
||||
type query struct {
|
||||
i int
|
||||
prefix []byte
|
||||
startingPoint []byte
|
||||
skipTimestamp bool
|
||||
}
|
||||
|
||||
func prepareQueries(filter nostr.Filter) (
|
||||
queries []query,
|
||||
extraFilter *nostr.Filter,
|
||||
since uint32,
|
||||
err error,
|
||||
) {
|
||||
// these things have to run for every result we return
|
||||
defer func() {
|
||||
if queries == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var until uint32 = 4294967295
|
||||
if filter.Until != 0 {
|
||||
if fu := uint32(filter.Until); fu < until {
|
||||
until = fu + 1
|
||||
}
|
||||
}
|
||||
|
||||
for i, q := range queries {
|
||||
queries[i].startingPoint = binary.BigEndian.AppendUint32(q.prefix, uint32(until))
|
||||
}
|
||||
|
||||
// this is where we'll end the iteration
|
||||
if filter.Since != 0 {
|
||||
if fs := uint32(filter.Since); fs > since {
|
||||
since = fs
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
var index byte
|
||||
|
||||
if len(filter.IDs) > 0 {
|
||||
queries = make([]query, len(filter.IDs))
|
||||
for i, id := range filter.IDs {
|
||||
prefix := make([]byte, 1+8)
|
||||
prefix[0] = indexIdPrefix
|
||||
copy(prefix[1:1+8], id[0:8])
|
||||
queries[i] = query{i: i, prefix: prefix, skipTimestamp: true}
|
||||
}
|
||||
|
||||
return queries, extraFilter, since, nil
|
||||
}
|
||||
|
||||
if len(filter.Tags) > 0 {
|
||||
// we will select ONE tag to query with
|
||||
tagKey, tagValues, goodness := internal.ChooseNarrowestTag(filter)
|
||||
|
||||
// we won't use a tag index for this as long as we have something else to match with
|
||||
if goodness < 3 && (len(filter.Authors) > 0 || len(filter.Kinds) > 0) {
|
||||
goto pubkeyMatching
|
||||
}
|
||||
|
||||
queries = make([]query, len(tagValues))
|
||||
for i, value := range tagValues {
|
||||
// get key prefix (with full length) and offset where to write the created_at
|
||||
k, offset := getTagIndexPrefix(tagKey, value)
|
||||
// remove the last parts part to get just the prefix we want here
|
||||
prefix := k[0:offset]
|
||||
queries[i] = query{i: i, prefix: prefix}
|
||||
}
|
||||
|
||||
extraFilter = &nostr.Filter{
|
||||
Kinds: filter.Kinds,
|
||||
Authors: filter.Authors,
|
||||
Tags: internal.CopyMapWithoutKey(filter.Tags, tagKey),
|
||||
}
|
||||
|
||||
return queries, extraFilter, since, nil
|
||||
}
|
||||
|
||||
pubkeyMatching:
|
||||
if len(filter.Authors) > 0 {
|
||||
if len(filter.Kinds) == 0 {
|
||||
queries = make([]query, len(filter.Authors))
|
||||
for i, pk := range filter.Authors {
|
||||
prefix := make([]byte, 1+8)
|
||||
prefix[0] = indexPubkeyPrefix
|
||||
copy(prefix[1:1+8], pk[0:8])
|
||||
queries[i] = query{i: i, prefix: prefix}
|
||||
}
|
||||
} else {
|
||||
queries = make([]query, len(filter.Authors)*len(filter.Kinds))
|
||||
i := 0
|
||||
for _, pk := range filter.Authors {
|
||||
for _, kind := range filter.Kinds {
|
||||
prefix := make([]byte, 1+8+2)
|
||||
prefix[0] = indexPubkeyKindPrefix
|
||||
copy(prefix[1:1+8], pk[0:8])
|
||||
binary.BigEndian.PutUint16(prefix[1+8:], uint16(kind))
|
||||
queries[i] = query{i: i, prefix: prefix}
|
||||
i++
|
||||
}
|
||||
}
|
||||
}
|
||||
extraFilter = &nostr.Filter{Tags: filter.Tags}
|
||||
} else if len(filter.Kinds) > 0 {
|
||||
index = indexKindPrefix
|
||||
queries = make([]query, len(filter.Kinds))
|
||||
for i, kind := range filter.Kinds {
|
||||
prefix := make([]byte, 1+2)
|
||||
prefix[0] = index
|
||||
binary.BigEndian.PutUint16(prefix[1:], uint16(kind))
|
||||
queries[i] = query{i: i, prefix: prefix}
|
||||
}
|
||||
extraFilter = &nostr.Filter{Tags: filter.Tags}
|
||||
} else {
|
||||
index = indexCreatedAtPrefix
|
||||
queries = make([]query, 1)
|
||||
prefix := make([]byte, 1)
|
||||
prefix[0] = index
|
||||
queries[0] = query{i: 0, prefix: prefix}
|
||||
extraFilter = nil
|
||||
}
|
||||
|
||||
return queries, extraFilter, since, nil
|
||||
}
|
||||
@@ -1,48 +0,0 @@
|
||||
package badger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore/internal"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
)
|
||||
|
||||
func (b *BadgerBackend) ReplaceEvent(evt nostr.Event) error {
|
||||
// sanity checking
|
||||
if evt.CreatedAt > math.MaxUint32 || evt.Kind > math.MaxUint16 {
|
||||
return fmt.Errorf("event with values out of expected boundaries")
|
||||
}
|
||||
|
||||
return b.Update(func(txn *badger.Txn) error {
|
||||
filter := nostr.Filter{Limit: 1, Kinds: []nostr.Kind{evt.Kind}, Authors: []nostr.PubKey{evt.PubKey}}
|
||||
if evt.Kind.IsAddressable() {
|
||||
// when addressable, add the "d" tag to the filter
|
||||
filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}}
|
||||
}
|
||||
|
||||
// now we fetch the past events, whatever they are, delete them and then save the new
|
||||
results, err := b.query(txn, filter, 10) // in theory limit could be just 1 and this should work
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query past events with %s: %w", filter, err)
|
||||
}
|
||||
|
||||
shouldStore := true
|
||||
for _, previous := range results {
|
||||
if internal.IsOlder(previous.Event, evt) {
|
||||
if _, err := b.delete(txn, previous.Event.ID); err != nil {
|
||||
return fmt.Errorf("failed to delete event %s for replacing: %w", previous.Event.ID, err)
|
||||
}
|
||||
} else {
|
||||
// there is a newer event already stored, so we won't store this
|
||||
shouldStore = false
|
||||
}
|
||||
}
|
||||
if shouldStore {
|
||||
return b.save(txn, evt)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
@@ -1,56 +0,0 @@
|
||||
package badger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore"
|
||||
"fiatjaf.com/nostr/eventstore/codec/betterbinary"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
)
|
||||
|
||||
func (b *BadgerBackend) SaveEvent(evt nostr.Event) error {
|
||||
// sanity checking
|
||||
if evt.CreatedAt > math.MaxUint32 || evt.Kind > math.MaxUint16 {
|
||||
return fmt.Errorf("event with values out of expected boundaries")
|
||||
}
|
||||
|
||||
return b.Update(func(txn *badger.Txn) error {
|
||||
// query event by id to ensure we don't save duplicates
|
||||
prefix := make([]byte, 1+8)
|
||||
prefix[0] = indexIdPrefix
|
||||
copy(prefix[1:], evt.ID[0:8])
|
||||
it := txn.NewIterator(badger.IteratorOptions{})
|
||||
defer it.Close()
|
||||
it.Seek(prefix)
|
||||
if it.ValidForPrefix(prefix) {
|
||||
// event exists
|
||||
return eventstore.ErrDupEvent
|
||||
}
|
||||
|
||||
return b.save(txn, evt)
|
||||
})
|
||||
}
|
||||
|
||||
func (b *BadgerBackend) save(txn *badger.Txn, evt nostr.Event) error {
|
||||
// encode to binary
|
||||
buf := make([]byte, betterbinary.Measure(evt))
|
||||
if err := betterbinary.Marshal(evt, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
idx := b.Serial()
|
||||
// raw event store
|
||||
if err := txn.Set(idx, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for k := range b.getIndexKeysForEvent(evt, idx[1:]) {
|
||||
if err := txn.Set(k, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
1
eventstore/badger/testdata/fuzz/FuzzQuery
vendored
1
eventstore/badger/testdata/fuzz/FuzzQuery
vendored
@@ -1 +0,0 @@
|
||||
../../../internal/testdata/fuzz/FuzzQuery
|
||||
@@ -5,15 +5,15 @@ import (
|
||||
"testing"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore/badger"
|
||||
"fiatjaf.com/nostr/eventstore/lmdb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestBlugeFlow(t *testing.T) {
|
||||
os.RemoveAll("/tmp/blugetest-badger")
|
||||
os.RemoveAll("/tmp/blugetest-lmdb")
|
||||
os.RemoveAll("/tmp/blugetest-bluge")
|
||||
|
||||
bb := &badger.BadgerBackend{Path: "/tmp/blugetest-badger"}
|
||||
bb := &lmdb.LMDBBackend{Path: "/tmp/blugetest-lmdb"}
|
||||
bb.Init()
|
||||
defer bb.Close()
|
||||
|
||||
|
||||
@@ -49,14 +49,14 @@ func detect(dir string) (string, error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, entry := range entries {
|
||||
if strings.HasSuffix(entry.Name(), ".mdb") {
|
||||
return "lmdb", nil
|
||||
}
|
||||
if strings.HasSuffix(entry.Name(), ".vlog") {
|
||||
return "badger", nil
|
||||
}
|
||||
}
|
||||
// for _, entry := range entries {
|
||||
// if strings.HasSuffix(entry.Name(), ".mdb") {
|
||||
// return "lmdb", nil
|
||||
// }
|
||||
// if strings.HasSuffix(entry.Name(), ".vlog") {
|
||||
// return "badger", nil
|
||||
// }
|
||||
// }
|
||||
|
||||
return "", fmt.Errorf("undetected")
|
||||
}
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore"
|
||||
"fiatjaf.com/nostr/eventstore/badger"
|
||||
"fiatjaf.com/nostr/eventstore/lmdb"
|
||||
"fiatjaf.com/nostr/eventstore/slicestore"
|
||||
"github.com/urfave/cli/v3"
|
||||
@@ -33,7 +32,7 @@ var app = &cli.Command{
|
||||
&cli.StringFlag{
|
||||
Name: "type",
|
||||
Aliases: []string{"t"},
|
||||
Usage: "store type ('lmdb', 'badger', 'mmm')",
|
||||
Usage: "store type ('lmdb', 'mmm')",
|
||||
},
|
||||
},
|
||||
Before: func(ctx context.Context, c *cli.Command) (context.Context, error) {
|
||||
@@ -71,8 +70,6 @@ var app = &cli.Command{
|
||||
switch typ {
|
||||
case "lmdb":
|
||||
db = &lmdb.LMDBBackend{Path: path}
|
||||
case "badger":
|
||||
db = &badger.BadgerBackend{Path: path}
|
||||
case "mmm":
|
||||
var err error
|
||||
if db, err = doMmmInit(path); err != nil {
|
||||
|
||||
@@ -2,7 +2,6 @@ package checks
|
||||
|
||||
import (
|
||||
"fiatjaf.com/nostr/eventstore"
|
||||
"fiatjaf.com/nostr/eventstore/badger"
|
||||
"fiatjaf.com/nostr/eventstore/bluge"
|
||||
"fiatjaf.com/nostr/eventstore/lmdb"
|
||||
"fiatjaf.com/nostr/eventstore/mmm"
|
||||
@@ -10,7 +9,6 @@ import (
|
||||
|
||||
// compile-time checks to ensure all backends implement Store
|
||||
var (
|
||||
_ eventstore.Store = (*badger.BadgerBackend)(nil)
|
||||
_ eventstore.Store = (*lmdb.LMDBBackend)(nil)
|
||||
_ eventstore.Store = (*mmm.IndexingLayer)(nil)
|
||||
_ eventstore.Store = (*bluge.BlugeBackend)(nil)
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore"
|
||||
"fiatjaf.com/nostr/eventstore/badger"
|
||||
"fiatjaf.com/nostr/eventstore/lmdb"
|
||||
"fiatjaf.com/nostr/eventstore/slicestore"
|
||||
)
|
||||
@@ -28,12 +27,6 @@ func BenchmarkLMDB(b *testing.B) {
|
||||
runBenchmarkOn(b, l)
|
||||
}
|
||||
|
||||
func BenchmarkBadger(b *testing.B) {
|
||||
d := &badger.BadgerBackend{Path: dbpath + "badger"}
|
||||
d.Init()
|
||||
runBenchmarkOn(b, d)
|
||||
}
|
||||
|
||||
func runBenchmarkOn(b *testing.B, db eventstore.Store) {
|
||||
for i := 0; i < 10000; i++ {
|
||||
eTag := make([]byte, 32)
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore"
|
||||
"fiatjaf.com/nostr/eventstore/badger"
|
||||
"fiatjaf.com/nostr/eventstore/lmdb"
|
||||
"fiatjaf.com/nostr/eventstore/slicestore"
|
||||
)
|
||||
@@ -43,10 +42,3 @@ func TestLMDB(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) { test.run(t, &lmdb.LMDBBackend{Path: dbpath + "lmdb"}) })
|
||||
}
|
||||
}
|
||||
|
||||
func TestBadger(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
os.RemoveAll(dbpath + "badger")
|
||||
t.Run(test.name, func(t *testing.T) { test.run(t, &badger.BadgerBackend{Path: dbpath + "badger"}) })
|
||||
}
|
||||
}
|
||||
|
||||
2
go.mod
2
go.mod
@@ -91,5 +91,3 @@ require (
|
||||
google.golang.org/protobuf v1.36.2 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
replace github.com/coder/websocket => /tmp/websocket
|
||||
|
||||
2
go.sum
2
go.sum
@@ -97,6 +97,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
|
||||
github.com/cmars/basen v0.0.0-20150613233007-fe3947df716e h1:0XBUw73chJ1VYSsfvcPvVT7auykAJce9FpRr10L6Qhw=
|
||||
github.com/cmars/basen v0.0.0-20150613233007-fe3947df716e/go.mod h1:P13beTBKr5Q18lJe1rIoLUqjM+CB1zYrRg44ZqGuQSA=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE=
|
||||
github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
|
||||
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
|
||||
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
|
||||
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"fiatjaf.com/nostr/eventstore/badger"
|
||||
"fiatjaf.com/nostr/khatru"
|
||||
)
|
||||
|
||||
func main() {
|
||||
relay := khatru.NewRelay()
|
||||
|
||||
db := &badger.BadgerBackend{Path: "/tmp/khatru-badgern-tmp"}
|
||||
if err := db.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
relay.UseEventstore(db, 400)
|
||||
|
||||
relay.Negentropy = true
|
||||
|
||||
fmt.Println("running on :3334")
|
||||
http.ListenAndServe(":3334", relay)
|
||||
}
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"fiatjaf.com/nostr/eventstore/badger"
|
||||
"fiatjaf.com/nostr/eventstore/lmdb"
|
||||
"fiatjaf.com/nostr/khatru"
|
||||
"fiatjaf.com/nostr/khatru/blossom"
|
||||
)
|
||||
@@ -16,14 +16,14 @@ import (
|
||||
func main() {
|
||||
relay := khatru.NewRelay()
|
||||
|
||||
db := &badger.BadgerBackend{Path: "/tmp/khatru-badger-tmp"}
|
||||
db := &lmdb.LMDBBackend{Path: "/tmp/khatru-lmdb-tmp"}
|
||||
if err := db.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
relay.UseEventstore(db, 400)
|
||||
|
||||
bdb := &badger.BadgerBackend{Path: "/tmp/khatru-badger-blossom-tmp"}
|
||||
bdb := &lmdb.LMDBBackend{Path: "/tmp/khatru-lmdb-blossom-tmp"}
|
||||
if err := bdb.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"slices"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore/badger"
|
||||
"fiatjaf.com/nostr/eventstore/lmdb"
|
||||
"fiatjaf.com/nostr/eventstore/slicestore"
|
||||
"fiatjaf.com/nostr/khatru"
|
||||
)
|
||||
@@ -17,7 +17,7 @@ func main() {
|
||||
r1 := khatru.NewRelay()
|
||||
r1.UseEventstore(db1, 400)
|
||||
|
||||
db2 := &badger.BadgerBackend{Path: "/tmp/t"}
|
||||
db2 := &lmdb.LMDBBackend{Path: "/tmp/t"}
|
||||
db2.Init()
|
||||
r2 := khatru.NewRelay()
|
||||
r2.UseEventstore(db2, 400)
|
||||
|
||||
Reference in New Issue
Block a user