eventstore: QueryEvents() to take a maxLimit param now so everything is clearer.

This commit is contained in:
fiatjaf
2025-05-11 09:36:59 -03:00
parent 9118217048
commit f60fc08f8d
40 changed files with 108 additions and 151 deletions

View File

@@ -27,8 +27,6 @@ var _ eventstore.Store = (*BadgerBackend)(nil)
type BadgerBackend struct {
Path string
MaxLimit int
MaxLimitNegentropy int
BadgerOptionsModifier func(badger.Options) badger.Options
// Experimental
@@ -57,15 +55,6 @@ func (b *BadgerBackend) Init() error {
return fmt.Errorf("error running migrations: %w", err)
}
if b.MaxLimit != 0 {
b.MaxLimitNegentropy = b.MaxLimit
} else {
b.MaxLimit = 1000
if b.MaxLimitNegentropy == 0 {
b.MaxLimitNegentropy = 16777216
}
}
if err := b.DB.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.IteratorOptions{
Prefix: []byte{0},

View File

@@ -16,26 +16,25 @@ import (
var batchFilled = errors.New("batch-filled")
func (b *BadgerBackend) QueryEvents(filter nostr.Filter) iter.Seq[nostr.Event] {
func (b *BadgerBackend) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[nostr.Event] {
return func(yield func(nostr.Event) bool) {
if filter.Search != "" {
return
}
// max number of events we'll return
limit := b.MaxLimit / 4
if filter.Limit > 0 && filter.Limit <= b.MaxLimit {
limit = filter.Limit
}
if tlimit := nostr.GetTheoreticalLimit(filter); tlimit == 0 {
if tlimit := filter.GetTheoreticalLimit(); tlimit == 0 || filter.LimitZero {
return
} else if tlimit > 0 {
limit = tlimit
} else if tlimit < maxLimit {
maxLimit = tlimit
}
if filter.Limit < maxLimit {
maxLimit = filter.Limit
}
// fmt.Println("limit", limit)
b.View(func(txn *badger.Txn) error {
results, err := b.query(txn, filter, limit)
results, err := b.query(txn, filter, maxLimit)
if err != nil {
return err
}

View File

@@ -10,8 +10,15 @@ import (
"github.com/blugelabs/bluge/search"
)
func (b *BlugeBackend) QueryEvents(filter nostr.Filter) iter.Seq[nostr.Event] {
func (b *BlugeBackend) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[nostr.Event] {
return func(yield func(nostr.Event) bool) {
limit := maxLimit
if filter.LimitZero {
return
} else if filter.Limit < limit {
limit = filter.Limit
}
if len(filter.Search) < 2 {
return
}
@@ -69,14 +76,6 @@ func (b *BlugeBackend) QueryEvents(filter nostr.Filter) iter.Seq[nostr.Event] {
q = complicatedQuery
}
limit := 40
if filter.Limit != 0 {
limit = filter.Limit
if filter.Limit > 150 {
limit = 150
}
}
req := bluge.NewTopNSearch(limit, q)
dmi, err := reader.Search(context.Background(), req)
@@ -92,7 +91,7 @@ func (b *BlugeBackend) QueryEvents(filter nostr.Filter) iter.Seq[nostr.Event] {
next.VisitStoredFields(func(field string, value []byte) bool {
id, err := nostr.IDFromHex(string(value))
if err == nil {
for evt := range b.RawEventStore.QueryEvents(nostr.Filter{IDs: []nostr.ID{id}}) {
for evt := range b.RawEventStore.QueryEvents(nostr.Filter{IDs: []nostr.ID{id}}, 1) {
yield(evt)
}
}

View File

@@ -12,13 +12,13 @@ func (b *BlugeBackend) ReplaceEvent(evt nostr.Event) error {
b.Lock()
defer b.Unlock()
filter := nostr.Filter{Limit: 1, Kinds: []nostr.Kind{evt.Kind}, Authors: []nostr.PubKey{evt.PubKey}}
if evt.Kind.IsReplaceable() {
filter := nostr.Filter{Kinds: []nostr.Kind{evt.Kind}, Authors: []nostr.PubKey{evt.PubKey}}
if evt.Kind.IsAddressable() {
filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}}
}
shouldStore := true
for previous := range b.QueryEvents(filter) {
for previous := range b.QueryEvents(filter, 1) {
if internal.IsOlder(previous, evt) {
if err := b.DeleteEvent(previous.ID); err != nil {
return fmt.Errorf("failed to delete event for replacing: %w", err)

View File

@@ -70,9 +70,9 @@ var app = &cli.Command{
switch typ {
case "lmdb":
db = &lmdb.LMDBBackend{Path: path, MaxLimit: 1_000_000}
db = &lmdb.LMDBBackend{Path: path}
case "badger":
db = &badger.BadgerBackend{Path: path, MaxLimit: 1_000_000}
db = &badger.BadgerBackend{Path: path}
case "mmm":
var err error
if db, err = doMmmInit(path); err != nil {

View File

@@ -44,7 +44,7 @@ var neg = &cli.Command{
// create negentropy object and initialize it with events
vec := vector.New()
neg := negentropy.New(vec, frameSizeLimit)
for evt := range db.QueryEvents(filter) {
for evt := range db.QueryEvents(filter, math.MaxInt) {
vec.Insert(evt.CreatedAt, evt.ID)
}

View File

@@ -47,7 +47,7 @@ func doSave(ctx context.Context, line string, evt nostr.Event) error {
}
func doQuery(ctx context.Context, f *nostr.Filter) error {
for evt := range db.QueryEvents(*f) {
for evt := range db.QueryEvents(*f, 1_000_000) {
fmt.Println(evt)
}
return nil

View File

@@ -25,7 +25,7 @@ var query = &cli.Command{
continue
}
for evt := range db.QueryEvents(filter) {
for evt := range db.QueryEvents(filter, 1_000_000) {
fmt.Println(evt)
}
}

View File

@@ -14,10 +14,8 @@ import (
var _ eventstore.Store = (*LMDBBackend)(nil)
type LMDBBackend struct {
Path string
MaxLimit int
MaxLimitNegentropy int
MapSize int64
Path string
MapSize int64
lmdbEnv *lmdb.Env
extraFlags uint // (for debugging and testing)
@@ -41,15 +39,6 @@ type LMDBBackend struct {
}
func (b *LMDBBackend) Init() error {
if b.MaxLimit != 0 {
b.MaxLimitNegentropy = b.MaxLimit
} else {
b.MaxLimit = 1500
if b.MaxLimitNegentropy == 0 {
b.MaxLimitNegentropy = 16777216
}
}
// create directory if it doesn't exist and open it
if err := os.MkdirAll(b.Path, 0755); err != nil {
return err

View File

@@ -14,27 +14,25 @@ import (
"github.com/PowerDNS/lmdb-go/lmdb"
)
func (b *LMDBBackend) QueryEvents(filter nostr.Filter) iter.Seq[nostr.Event] {
func (b *LMDBBackend) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[nostr.Event] {
return func(yield func(nostr.Event) bool) {
if filter.Search != "" {
return
}
// max number of events we'll return
var limit int
limit = b.MaxLimit / 4
if filter.Limit > 0 && filter.Limit <= b.MaxLimit {
limit = filter.Limit
}
if tlimit := nostr.GetTheoreticalLimit(filter); tlimit == 0 {
if tlimit := filter.GetTheoreticalLimit(); tlimit == 0 || filter.LimitZero {
return
} else if tlimit > 0 {
limit = tlimit
} else if tlimit < maxLimit {
maxLimit = tlimit
}
if filter.Limit < maxLimit {
maxLimit = filter.Limit
}
b.lmdbEnv.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
results, err := b.query(txn, filter, limit)
results, err := b.query(txn, filter, maxLimit)
for _, ie := range results {
if !yield(ie.Event) {

View File

@@ -14,8 +14,7 @@ type IndexingLayer struct {
isInitialized bool
name string
MaxLimit int
mmmm *MultiMmapManager
mmmm *MultiMmapManager
// this is stored in the knownLayers db as a value, and used to keep track of which layer owns each event
id uint16
@@ -53,10 +52,6 @@ func (il *IndexingLayer) Init() error {
path := filepath.Join(il.mmmm.Dir, il.name)
if il.MaxLimit == 0 {
il.MaxLimit = 500
}
// open lmdb
env, err := lmdb.NewEnv()
if err != nil {

View File

@@ -73,32 +73,30 @@ func (b *MultiMmapManager) queryByIDs(yield func(nostr.Event) bool, ids []nostr.
})
}
func (il *IndexingLayer) QueryEvents(filter nostr.Filter) iter.Seq[nostr.Event] {
func (il *IndexingLayer) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[nostr.Event] {
return func(yield func(nostr.Event) bool) {
if len(filter.IDs) > 0 {
il.mmmm.queryByIDs(yield, filter.IDs, nil)
return
}
if filter.Search != "" {
return
}
// max number of events we'll return
limit := il.MaxLimit / 4
if filter.Limit > 0 && filter.Limit < il.MaxLimit {
limit = filter.Limit
}
if tlimit := nostr.GetTheoreticalLimit(filter); tlimit == 0 {
if tlimit := filter.GetTheoreticalLimit(); tlimit == 0 || filter.LimitZero {
return
} else if tlimit > 0 {
limit = tlimit
} else if tlimit < maxLimit {
maxLimit = tlimit
}
if filter.Limit < maxLimit {
maxLimit = filter.Limit
}
il.lmdbEnv.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
results, err := il.query(txn, filter, limit)
results, err := il.query(txn, filter, filter.Limit)
for _, ie := range results {
if !yield(ie.Event) {

View File

@@ -21,7 +21,7 @@ func (b NullStore) DeleteEvent(id nostr.ID) error {
return nil
}
func (b NullStore) QueryEvents(filter nostr.Filter) iter.Seq[nostr.Event] {
func (b NullStore) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[nostr.Event] {
return func(yield func(nostr.Event) bool) {}
}

View File

@@ -18,24 +18,19 @@ var _ eventstore.Store = (*SliceStore)(nil)
type SliceStore struct {
sync.Mutex
internal []nostr.Event
MaxLimit int
}
func (b *SliceStore) Init() error {
b.internal = make([]nostr.Event, 0, 5000)
if b.MaxLimit == 0 {
b.MaxLimit = 500
}
return nil
}
func (b *SliceStore) Close() {}
func (b *SliceStore) QueryEvents(filter nostr.Filter) iter.Seq[nostr.Event] {
func (b *SliceStore) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[nostr.Event] {
return func(yield func(nostr.Event) bool) {
if filter.Limit > b.MaxLimit || (filter.Limit == 0 && !filter.LimitZero) {
filter.Limit = b.MaxLimit
if filter.Limit > maxLimit || (filter.Limit == 0 && !filter.LimitZero) {
filter.Limit = maxLimit
}
// efficiently determine where to start and end
@@ -136,7 +131,7 @@ func (b *SliceStore) ReplaceEvent(evt nostr.Event) error {
}
shouldStore := true
for previous := range b.QueryEvents(filter) {
for previous := range b.QueryEvents(filter, 1) {
if internal.IsOlder(previous, evt) {
if err := b.delete(previous.ID); err != nil {
return fmt.Errorf("failed to delete event for replacing: %w", err)

View File

@@ -16,7 +16,7 @@ type Store interface {
Close()
// QueryEvents returns events that match the filter
QueryEvents(nostr.Filter) iter.Seq[nostr.Event]
QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[nostr.Event]
// DeleteEvent deletes an event atomically by ID
DeleteEvent(nostr.ID) error

View File

@@ -3,6 +3,7 @@ package wrappers
import (
"context"
"fmt"
"iter"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore"
@@ -12,6 +13,11 @@ var _ nostr.Publisher = StorePublisher{}
type StorePublisher struct {
eventstore.Store
MaxLimit int
}
func (w StorePublisher) QueryEvents(filter nostr.Filter) iter.Seq[nostr.Event] {
return w.Store.QueryEvents(filter, w.MaxLimit)
}
func (w StorePublisher) Publish(ctx context.Context, evt nostr.Event) error {