eventstore: replace bluge with bleve.
bluge seems to be abandoned and bleve should work better, who knows.
This commit is contained in:
@@ -33,7 +33,7 @@ type Store interface {
|
||||
|
||||
## Available Implementations
|
||||
|
||||
- **bluge**: Full-text search and indexing using the Bluge search library
|
||||
- **bleve**: Full-text search and indexing using the Bleve search library
|
||||
- **boltdb**: Embedded key-value database using BoltDB
|
||||
- **lmdb**: High-performance embedded database using LMDB
|
||||
- **mmm**: Custom memory-mapped storage with advanced indexing
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package bluge
|
||||
package bleve
|
||||
|
||||
import (
|
||||
"os"
|
||||
@@ -9,16 +9,16 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestBlugeFlow(t *testing.T) {
|
||||
os.RemoveAll("/tmp/blugetest-lmdb")
|
||||
os.RemoveAll("/tmp/blugetest-bluge")
|
||||
func TestBleveFlow(t *testing.T) {
|
||||
os.RemoveAll("/tmp/blevetest-lmdb")
|
||||
os.RemoveAll("/tmp/blevetest-bleve")
|
||||
|
||||
bb := &lmdb.LMDBBackend{Path: "/tmp/blugetest-lmdb"}
|
||||
bb := &lmdb.LMDBBackend{Path: "/tmp/blevetest-lmdb"}
|
||||
bb.Init()
|
||||
defer bb.Close()
|
||||
|
||||
bl := BlugeBackend{
|
||||
Path: "/tmp/blugetest-bluge",
|
||||
bl := BleveBackend{
|
||||
Path: "/tmp/blevetest-bleve",
|
||||
RawEventStore: bb,
|
||||
}
|
||||
bl.Init()
|
||||
@@ -46,9 +46,11 @@ func TestBlugeFlow(t *testing.T) {
|
||||
|
||||
{
|
||||
n := 0
|
||||
t.Logf("searching for 'good' (should find 3)")
|
||||
for range bl.QueryEvents(nostr.Filter{Search: "good"}, 400) {
|
||||
n++
|
||||
}
|
||||
t.Logf("found %d results", n)
|
||||
assert.Equal(t, 3, n)
|
||||
}
|
||||
|
||||
@@ -58,12 +60,12 @@ func TestBlugeFlow(t *testing.T) {
|
||||
|
||||
{
|
||||
n := 0
|
||||
for res := range bl.QueryEvents(nostr.Filter{Search: "good"}, 400) {
|
||||
for evt := range bl.QueryEvents(nostr.Filter{Search: "good"}, 400) {
|
||||
n++
|
||||
assert.Equal(t, res.Content, "good night")
|
||||
assert.Equal(t, evt.Content, "good night")
|
||||
assert.Equal(t,
|
||||
nostr.MustPubKeyFromHex("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798"),
|
||||
res.PubKey,
|
||||
evt.PubKey,
|
||||
)
|
||||
}
|
||||
assert.Equal(t, 1, n)
|
||||
9
eventstore/bleve/delete.go
Normal file
9
eventstore/bleve/delete.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package bleve
|
||||
|
||||
import (
|
||||
"fiatjaf.com/nostr"
|
||||
)
|
||||
|
||||
func (b *BleveBackend) DeleteEvent(id nostr.ID) error {
|
||||
return b.index.Delete(id.Hex())
|
||||
}
|
||||
9
eventstore/bleve/helpers.go
Normal file
9
eventstore/bleve/helpers.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package bleve
|
||||
|
||||
const (
|
||||
idField = "i"
|
||||
contentField = "c"
|
||||
kindField = "k"
|
||||
createdAtField = "a"
|
||||
pubkeyField = "p"
|
||||
)
|
||||
61
eventstore/bleve/lib.go
Normal file
61
eventstore/bleve/lib.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package bleve
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore"
|
||||
bleve "github.com/blevesearch/bleve/v2"
|
||||
bleveMapping "github.com/blevesearch/bleve/v2/mapping"
|
||||
)
|
||||
|
||||
var _ eventstore.Store = (*BleveBackend)(nil)
|
||||
|
||||
type BleveBackend struct {
|
||||
sync.Mutex
|
||||
// Path is where the index will be saved
|
||||
Path string
|
||||
|
||||
// RawEventStore is where we'll fetch the raw events from
|
||||
// bleve will only store ids, so the actual events must be somewhere else
|
||||
RawEventStore eventstore.Store
|
||||
|
||||
index bleve.Index
|
||||
}
|
||||
|
||||
func (b *BleveBackend) Close() {
|
||||
if b.index != nil {
|
||||
b.index.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BleveBackend) Init() error {
|
||||
if b.Path == "" {
|
||||
return fmt.Errorf("missing Path")
|
||||
}
|
||||
if b.RawEventStore == nil {
|
||||
return fmt.Errorf("missing RawEventStore")
|
||||
}
|
||||
|
||||
// try to open existing index
|
||||
index, err := bleve.Open(b.Path)
|
||||
if err == bleve.ErrorIndexPathDoesNotExist {
|
||||
// create new index with default mapping
|
||||
mapping := bleveMapping.NewIndexMapping()
|
||||
index, err = bleve.New(b.Path, mapping)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating index: %w", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("error opening index: %w", err)
|
||||
}
|
||||
|
||||
b.index = index
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BleveBackend) CountEvents(nostr.Filter) (uint32, error) {
|
||||
return 0, errors.New("not supported")
|
||||
}
|
||||
95
eventstore/bleve/query.go
Normal file
95
eventstore/bleve/query.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package bleve
|
||||
|
||||
import (
|
||||
"iter"
|
||||
"strconv"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
bleve "github.com/blevesearch/bleve/v2"
|
||||
"github.com/blevesearch/bleve/v2/search/query"
|
||||
)
|
||||
|
||||
func (b *BleveBackend) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[nostr.Event] {
|
||||
return func(yield func(nostr.Event) bool) {
|
||||
limit := maxLimit
|
||||
if filter.LimitZero {
|
||||
return
|
||||
} else if filter.Limit > 0 && filter.Limit < limit {
|
||||
limit = filter.Limit
|
||||
}
|
||||
|
||||
if len(filter.Search) < 2 {
|
||||
return
|
||||
}
|
||||
|
||||
searchQ := bleve.NewMatchQuery(filter.Search)
|
||||
searchQ.SetField(contentField)
|
||||
var q query.Query = searchQ
|
||||
|
||||
conjQueries := []query.Query{searchQ}
|
||||
|
||||
if len(filter.Kinds) > 0 {
|
||||
eitherKind := bleve.NewDisjunctionQuery()
|
||||
for _, kind := range filter.Kinds {
|
||||
kindQ := bleve.NewTermQuery(strconv.Itoa(int(kind)))
|
||||
kindQ.SetField(kindField)
|
||||
eitherKind.AddQuery(kindQ)
|
||||
}
|
||||
conjQueries = append(conjQueries, eitherKind)
|
||||
}
|
||||
|
||||
if len(filter.Authors) > 0 {
|
||||
eitherPubkey := bleve.NewDisjunctionQuery()
|
||||
for _, pubkey := range filter.Authors {
|
||||
if len(pubkey) != 64 {
|
||||
continue
|
||||
}
|
||||
pubkeyQ := bleve.NewTermQuery(pubkey.Hex()[56:])
|
||||
pubkeyQ.SetField(pubkeyField)
|
||||
eitherPubkey.AddQuery(pubkeyQ)
|
||||
}
|
||||
conjQueries = append(conjQueries, eitherPubkey)
|
||||
}
|
||||
|
||||
if filter.Since != 0 || filter.Until != 0 {
|
||||
var min *float64
|
||||
if filter.Since != 0 {
|
||||
minVal := float64(filter.Since)
|
||||
min = &minVal
|
||||
}
|
||||
var max *float64
|
||||
if filter.Until != 0 {
|
||||
maxVal := float64(filter.Until)
|
||||
max = &maxVal
|
||||
}
|
||||
dateRangeQ := bleve.NewNumericRangeInclusiveQuery(min, max, nil, nil)
|
||||
dateRangeQ.SetField(createdAtField)
|
||||
conjQueries = append(conjQueries, dateRangeQ)
|
||||
}
|
||||
|
||||
if len(conjQueries) > 1 {
|
||||
q = bleve.NewConjunctionQuery(conjQueries...)
|
||||
}
|
||||
|
||||
req := bleve.NewSearchRequest(q)
|
||||
req.Size = limit
|
||||
req.From = 0
|
||||
|
||||
result, err := b.index.Search(req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, hit := range result.Hits {
|
||||
id, err := nostr.IDFromHex(hit.ID)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for evt := range b.RawEventStore.QueryEvents(nostr.Filter{IDs: []nostr.ID{id}}, 1) {
|
||||
if !yield(evt) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package bluge
|
||||
package bleve
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"fiatjaf.com/nostr/eventstore/internal"
|
||||
)
|
||||
|
||||
func (b *BlugeBackend) ReplaceEvent(evt nostr.Event) error {
|
||||
func (b *BleveBackend) ReplaceEvent(evt nostr.Event) error {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
23
eventstore/bleve/save.go
Normal file
23
eventstore/bleve/save.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package bleve
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
)
|
||||
|
||||
func (b *BleveBackend) SaveEvent(evt nostr.Event) error {
|
||||
doc := map[string]interface{}{
|
||||
contentField: evt.Content,
|
||||
kindField: strconv.Itoa(int(evt.Kind)),
|
||||
pubkeyField: evt.PubKey.Hex()[56:],
|
||||
createdAtField: float64(evt.CreatedAt),
|
||||
}
|
||||
|
||||
if err := b.index.Index(evt.ID.Hex(), doc); err != nil {
|
||||
return fmt.Errorf("failed to index '%s' document: %w", evt.ID, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,9 +0,0 @@
|
||||
package bluge
|
||||
|
||||
import (
|
||||
"fiatjaf.com/nostr"
|
||||
)
|
||||
|
||||
func (b *BlugeBackend) DeleteEvent(id nostr.ID) error {
|
||||
return b.writer.Delete(eventIdentifier(id))
|
||||
}
|
||||
@@ -1,27 +0,0 @@
|
||||
package bluge
|
||||
|
||||
import (
|
||||
"fiatjaf.com/nostr"
|
||||
"github.com/templexxx/xhex"
|
||||
)
|
||||
|
||||
const (
|
||||
contentField = "c"
|
||||
kindField = "k"
|
||||
createdAtField = "a"
|
||||
pubkeyField = "p"
|
||||
)
|
||||
|
||||
type eventIdentifier nostr.ID
|
||||
|
||||
const idField = "i"
|
||||
|
||||
func (id eventIdentifier) Field() string {
|
||||
return idField
|
||||
}
|
||||
|
||||
func (id eventIdentifier) Term() []byte {
|
||||
idhex := make([]byte, 64)
|
||||
xhex.Encode(idhex, id[:])
|
||||
return idhex
|
||||
}
|
||||
@@ -1,58 +0,0 @@
|
||||
package bluge
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"fiatjaf.com/nostr/eventstore"
|
||||
"github.com/blugelabs/bluge"
|
||||
"github.com/blugelabs/bluge/analysis/token"
|
||||
"golang.org/x/text/unicode/norm"
|
||||
)
|
||||
|
||||
var _ eventstore.Store = (*BlugeBackend)(nil)
|
||||
|
||||
type BlugeBackend struct {
|
||||
sync.Mutex
|
||||
// Path is where the index will be saved
|
||||
Path string
|
||||
|
||||
// RawEventStore is where we'll fetch the raw events from
|
||||
// bluge will only store ids, so the actual events must be somewhere else
|
||||
RawEventStore eventstore.Store
|
||||
|
||||
searchConfig bluge.Config
|
||||
writer *bluge.Writer
|
||||
}
|
||||
|
||||
func (b *BlugeBackend) Close() {
|
||||
defer b.writer.Close()
|
||||
}
|
||||
|
||||
func (b *BlugeBackend) Init() error {
|
||||
if b.Path == "" {
|
||||
return fmt.Errorf("missing Path")
|
||||
}
|
||||
if b.RawEventStore == nil {
|
||||
return fmt.Errorf("missing RawEventStore")
|
||||
}
|
||||
|
||||
b.searchConfig = bluge.DefaultConfig(b.Path)
|
||||
b.searchConfig.DefaultSearchAnalyzer.TokenFilters = append(b.searchConfig.DefaultSearchAnalyzer.TokenFilters,
|
||||
token.NewUnicodeNormalizeFilter(norm.NFKC),
|
||||
)
|
||||
|
||||
var err error
|
||||
b.writer, err = bluge.OpenWriter(b.searchConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening writer: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BlugeBackend) CountEvents(nostr.Filter) (uint32, error) {
|
||||
return 0, errors.New("not supported")
|
||||
}
|
||||
@@ -1,105 +0,0 @@
|
||||
package bluge
|
||||
|
||||
import (
|
||||
"context"
|
||||
"iter"
|
||||
"strconv"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"github.com/blugelabs/bluge"
|
||||
"github.com/blugelabs/bluge/search"
|
||||
)
|
||||
|
||||
func (b *BlugeBackend) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[nostr.Event] {
|
||||
return func(yield func(nostr.Event) bool) {
|
||||
limit := maxLimit
|
||||
if filter.LimitZero {
|
||||
return
|
||||
} else if filter.Limit < limit {
|
||||
limit = filter.Limit
|
||||
}
|
||||
|
||||
if len(filter.Search) < 2 {
|
||||
return
|
||||
}
|
||||
|
||||
reader, err := b.writer.Reader()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
searchQ := bluge.NewMatchQuery(filter.Search)
|
||||
searchQ.SetField(contentField)
|
||||
var q bluge.Query = searchQ
|
||||
|
||||
complicatedQuery := bluge.NewBooleanQuery().AddMust(searchQ)
|
||||
|
||||
if len(filter.Kinds) > 0 {
|
||||
eitherKind := bluge.NewBooleanQuery()
|
||||
eitherKind.SetMinShould(1)
|
||||
for _, kind := range filter.Kinds {
|
||||
kindQ := bluge.NewTermQuery(strconv.Itoa(int(kind)))
|
||||
kindQ.SetField(kindField)
|
||||
eitherKind.AddShould(kindQ)
|
||||
}
|
||||
complicatedQuery.AddMust(eitherKind)
|
||||
q = complicatedQuery
|
||||
}
|
||||
|
||||
if len(filter.Authors) > 0 {
|
||||
eitherPubkey := bluge.NewBooleanQuery()
|
||||
eitherPubkey.SetMinShould(1)
|
||||
for _, pubkey := range filter.Authors {
|
||||
if len(pubkey) != 64 {
|
||||
continue
|
||||
}
|
||||
pubkeyQ := bluge.NewTermQuery(pubkey.Hex()[56:])
|
||||
pubkeyQ.SetField(pubkeyField)
|
||||
eitherPubkey.AddShould(pubkeyQ)
|
||||
}
|
||||
complicatedQuery.AddMust(eitherPubkey)
|
||||
q = complicatedQuery
|
||||
}
|
||||
|
||||
if filter.Since != 0 || filter.Until != 0 {
|
||||
min := 0.0
|
||||
if filter.Since != 0 {
|
||||
min = float64(filter.Since)
|
||||
}
|
||||
max := float64(nostr.Now())
|
||||
if filter.Until != 0 {
|
||||
max = float64(filter.Until)
|
||||
}
|
||||
dateRangeQ := bluge.NewNumericRangeInclusiveQuery(min, max, true, true)
|
||||
dateRangeQ.SetField(createdAtField)
|
||||
complicatedQuery.AddMust(dateRangeQ)
|
||||
q = complicatedQuery
|
||||
}
|
||||
|
||||
req := bluge.NewTopNSearch(limit, q)
|
||||
|
||||
dmi, err := reader.Search(context.Background(), req)
|
||||
if err != nil {
|
||||
reader.Close()
|
||||
return
|
||||
}
|
||||
|
||||
defer reader.Close()
|
||||
|
||||
var next *search.DocumentMatch
|
||||
for next, err = dmi.Next(); next != nil; next, err = dmi.Next() {
|
||||
next.VisitStoredFields(func(field string, value []byte) bool {
|
||||
id, err := nostr.IDFromHex(string(value))
|
||||
if err == nil {
|
||||
for evt := range b.RawEventStore.QueryEvents(nostr.Filter{IDs: []nostr.ID{id}}, 1) {
|
||||
yield(evt)
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,27 +0,0 @@
|
||||
package bluge
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"fiatjaf.com/nostr"
|
||||
"github.com/blugelabs/bluge"
|
||||
)
|
||||
|
||||
func (b *BlugeBackend) SaveEvent(evt nostr.Event) error {
|
||||
id := eventIdentifier(evt.ID)
|
||||
doc := &bluge.Document{
|
||||
bluge.NewKeywordFieldBytes(id.Field(), id.Term()).Sortable().StoreValue(),
|
||||
}
|
||||
|
||||
doc.AddField(bluge.NewTextField(contentField, evt.Content))
|
||||
doc.AddField(bluge.NewTextField(kindField, strconv.Itoa(int(evt.Kind))))
|
||||
doc.AddField(bluge.NewTextField(pubkeyField, evt.PubKey.Hex()[56:]))
|
||||
doc.AddField(bluge.NewNumericField(createdAtField, float64(evt.CreatedAt)))
|
||||
|
||||
if err := b.writer.Update(doc.ID(), doc); err != nil {
|
||||
return fmt.Errorf("failed to write '%s' document: %w", evt.ID, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -2,7 +2,7 @@ package checks
|
||||
|
||||
import (
|
||||
"fiatjaf.com/nostr/eventstore"
|
||||
"fiatjaf.com/nostr/eventstore/bluge"
|
||||
"fiatjaf.com/nostr/eventstore/bleve"
|
||||
"fiatjaf.com/nostr/eventstore/boltdb"
|
||||
"fiatjaf.com/nostr/eventstore/lmdb"
|
||||
"fiatjaf.com/nostr/eventstore/mmm"
|
||||
@@ -13,5 +13,5 @@ var (
|
||||
_ eventstore.Store = (*lmdb.LMDBBackend)(nil)
|
||||
_ eventstore.Store = (*mmm.IndexingLayer)(nil)
|
||||
_ eventstore.Store = (*boltdb.BoltBackend)(nil)
|
||||
_ eventstore.Store = (*bluge.BlugeBackend)(nil)
|
||||
_ eventstore.Store = (*bleve.BleveBackend)(nil)
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user