a bunch of conversions and api tweaks on khatru and eventstore.

This commit is contained in:
fiatjaf
2025-04-17 00:08:36 -03:00
parent f7884cea4f
commit a7be696243
48 changed files with 450 additions and 576 deletions

View File

@@ -24,7 +24,7 @@ type Store interface {
}
```
[![Go Reference](https://pkg.go.dev/badge/github.com/fiatjaf/eventstore.svg)](https://pkg.go.dev/github.com/fiatjaf/eventstore) [![Run Tests](https://github.com/fiatjaf/eventstore/actions/workflows/test.yml/badge.svg)](https://github.com/fiatjaf/eventstore/actions/workflows/test.yml)
[![Go Reference](https://pkg.go.dev/badge/fiatjaf.com/nostr/eventstore.svg)](https://pkg.go.dev/fiatjaf.com/nostr/eventstore) [![Run Tests](https://fiatjaf.com/nostr/eventstore/actions/workflows/test.yml/badge.svg)](https://fiatjaf.com/nostr/eventstore/actions/workflows/test.yml)
## command-line tool

View File

@@ -1,7 +1,7 @@
# eventstore command-line tool
```
go install github.com/fiatjaf/eventstore/cmd/eventstore@latest
go install fiatjaf.com/nostr/eventstore/cmd/eventstore@latest
```
## Usage

View File

@@ -3,13 +3,11 @@
package main
import (
"context"
"os"
"path/filepath"
"fiatjaf.com/nostr/eventstore"
"fiatjaf.com/nostr/eventstore/mmm"
"fiatjaf.com/nostr"
"github.com/rs/zerolog"
)
@@ -24,9 +22,7 @@ func doMmmInit(path string) (eventstore.Store, error) {
if err := mmmm.Init(); err != nil {
return nil, err
}
il := &mmm.IndexingLayer{
ShouldIndex: func(ctx context.Context, e *nostr.Event) bool { return false },
}
il := &mmm.IndexingLayer{}
if err := mmmm.EnsureLayer(filepath.Base(path), il); err != nil {
return nil, err
}

View File

@@ -8,11 +8,11 @@ import (
"os"
"sync"
"github.com/urfave/cli/v3"
"github.com/mailru/easyjson"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/nip77/negentropy"
"fiatjaf.com/nostr/nip77/negentropy/storage/vector"
"github.com/mailru/easyjson"
"github.com/urfave/cli/v3"
)
var neg = &cli.Command{
@@ -44,11 +44,7 @@ var neg = &cli.Command{
// create negentropy object and initialize it with events
vec := vector.New()
neg := negentropy.New(vec, frameSizeLimit)
ch, err := db.QueryEvents(ctx, filter)
if err != nil {
return fmt.Errorf("error querying: %s\n", err)
}
for evt := range ch {
for evt := range db.QueryEvents(filter) {
vec.Insert(evt.CreatedAt, evt.ID)
}

View File

@@ -5,8 +5,8 @@ import (
"fmt"
"os"
"github.com/mailru/easyjson"
"fiatjaf.com/nostr"
"github.com/mailru/easyjson"
"github.com/urfave/cli/v3"
)
@@ -25,14 +25,7 @@ var query = &cli.Command{
continue
}
ch, err := db.QueryEvents(ctx, filter)
if err != nil {
fmt.Fprintf(os.Stderr, "error querying: %s\n", err)
hasError = true
continue
}
for evt := range ch {
for evt := range db.QueryEvents(filter) {
fmt.Println(evt)
}
}

View File

@@ -5,9 +5,9 @@ import (
"fmt"
"os"
"github.com/urfave/cli/v3"
"github.com/mailru/easyjson"
"fiatjaf.com/nostr"
"github.com/mailru/easyjson"
"github.com/urfave/cli/v3"
)
var save = &cli.Command{
@@ -25,7 +25,7 @@ var save = &cli.Command{
continue
}
if err := db.SaveEvent(ctx, &event); err != nil {
if err := db.SaveEvent(event); err != nil {
fmt.Fprintf(os.Stderr, "failed to save event '%s': %s\n", line, err)
hasError = true
continue

2
go.mod
View File

@@ -19,7 +19,6 @@ require (
github.com/dgraph-io/ristretto v1.0.0
github.com/elnosh/gonuts v0.3.1-0.20250123162555-7c0381a585e3
github.com/fasthttp/websocket v1.5.12
github.com/fiatjaf/eventstore v0.16.2
github.com/fiatjaf/khatru v0.17.4
github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62
github.com/json-iterator/go v1.1.12
@@ -70,6 +69,7 @@ require (
github.com/dgraph-io/ristretto/v2 v2.1.0 // indirect
github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/fiatjaf/eventstore v0.16.2 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
github.com/golang/snappy v0.0.4 // indirect

View File

@@ -115,7 +115,7 @@ func main() {
### But I don't want to write my own database!
Fear no more. Using the https://github.com/fiatjaf/eventstore module you get a bunch of compatible databases out of the box and you can just plug them into your relay. For example, [sqlite](https://pkg.go.dev/github.com/fiatjaf/eventstore/sqlite3):
Fear no more. Using the https://fiatjaf.com/nostr/eventstore module you get a bunch of compatible databases out of the box and you can just plug them into your relay. For example, [sqlite](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/sqlite3):
```go
db := sqlite3.SQLite3Backend{DatabaseURL: "/tmp/khatru-sqlite-tmp"}

View File

@@ -5,16 +5,12 @@ import (
"errors"
"fmt"
"fiatjaf.com/nostr/eventstore"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore"
)
// AddEvent sends an event through then normal add pipeline, as if it was received from a websocket.
func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (skipBroadcast bool, writeError error) {
if evt == nil {
return false, errors.New("error: event is nil")
}
func (rl *Relay) AddEvent(ctx context.Context, evt nostr.Event) (skipBroadcast bool, writeError error) {
if nostr.IsEphemeralKind(evt.Kind) {
return false, rl.handleEphemeral(ctx, evt)
} else {
@@ -22,9 +18,9 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (skipBroadcast
}
}
func (rl *Relay) handleNormal(ctx context.Context, evt *nostr.Event) (skipBroadcast bool, writeError error) {
for _, reject := range rl.RejectEvent {
if reject, msg := reject(ctx, evt); reject {
func (rl *Relay) handleNormal(ctx context.Context, evt nostr.Event) (skipBroadcast bool, writeError error) {
if nil != rl.OnEvent {
if reject, msg := rl.OnEvent(ctx, evt); reject {
if msg == "" {
return true, errors.New("blocked: no reason")
} else {
@@ -36,8 +32,8 @@ func (rl *Relay) handleNormal(ctx context.Context, evt *nostr.Event) (skipBroadc
// will store
// regular kinds are just saved directly
if nostr.IsRegularKind(evt.Kind) {
for _, store := range rl.StoreEvent {
if err := store(ctx, evt); err != nil {
if nil != rl.StoreEvent {
if err := rl.StoreEvent(ctx, evt); err != nil {
switch err {
case eventstore.ErrDupEvent:
return true, nil
@@ -47,63 +43,21 @@ func (rl *Relay) handleNormal(ctx context.Context, evt *nostr.Event) (skipBroadc
}
}
} else {
// otherwise it's a replaceable -- so we'll use the replacer functions if we have any
if len(rl.ReplaceEvent) > 0 {
for _, repl := range rl.ReplaceEvent {
if err := repl(ctx, evt); err != nil {
switch err {
case eventstore.ErrDupEvent:
return true, nil
default:
return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(err.Error(), "error"))
}
}
}
} else {
// otherwise do it the manual way
filter := nostr.Filter{Limit: 1, Kinds: []int{evt.Kind}, Authors: []string{evt.PubKey}}
if nostr.IsAddressableKind(evt.Kind) {
// when addressable, add the "d" tag to the filter
filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}}
}
// now we fetch old events and delete them
shouldStore := true
for _, query := range rl.QueryEvents {
ch, err := query(ctx, filter)
if err != nil {
continue
}
for previous := range ch {
if isOlder(previous, evt) {
for _, del := range rl.DeleteEvent {
del(ctx, previous)
}
} else {
// we found a more recent event, so we won't delete it and also will not store this new one
shouldStore = false
}
}
}
// store
if shouldStore {
for _, store := range rl.StoreEvent {
if saveErr := store(ctx, evt); saveErr != nil {
switch saveErr {
case eventstore.ErrDupEvent:
return true, nil
default:
return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(saveErr.Error(), "error"))
}
}
// otherwise it's a replaceable
if nil != rl.ReplaceEvent {
if err := rl.ReplaceEvent(ctx, evt); err != nil {
switch err {
case eventstore.ErrDupEvent:
return true, nil
default:
return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(err.Error(), "error"))
}
}
}
}
for _, ons := range rl.OnEventSaved {
ons(ctx, evt)
if nil != rl.OnEventSaved {
rl.OnEventSaved(ctx, evt)
}
// track event expiration if applicable

View File

@@ -7,8 +7,8 @@ import (
"strconv"
"strings"
"github.com/mailru/easyjson"
"fiatjaf.com/nostr"
"github.com/mailru/easyjson"
)
func readAuthorization(r *http.Request) (*nostr.Event, error) {
@@ -28,7 +28,7 @@ func readAuthorization(r *http.Request) (*nostr.Event, error) {
if evt.Kind != 24242 || !evt.CheckID() {
return nil, fmt.Errorf("invalid event")
}
if ok, _ := evt.CheckSignature(); !ok {
if !evt.VerifySignature() {
return nil, fmt.Errorf("invalid signature")
}

View File

@@ -2,6 +2,7 @@ package blossom
import (
"context"
"iter"
"fiatjaf.com/nostr"
)
@@ -13,14 +14,14 @@ type BlobDescriptor struct {
Type string `json:"type"`
Uploaded nostr.Timestamp `json:"uploaded"`
Owner string `json:"-"`
Owner nostr.PubKey `json:"-"`
}
type BlobIndex interface {
Keep(ctx context.Context, blob BlobDescriptor, pubkey string) error
List(ctx context.Context, pubkey string) (chan BlobDescriptor, error)
Keep(ctx context.Context, blob BlobDescriptor, pubkey nostr.PubKey) error
List(ctx context.Context, pubkey nostr.PubKey) iter.Seq[BlobDescriptor]
Get(ctx context.Context, sha256 string) (*BlobDescriptor, error)
Delete(ctx context.Context, sha256 string, pubkey string) error
Delete(ctx context.Context, sha256 string, pubkey nostr.PubKey) error
}
var _ BlobIndex = (*EventStoreBlobIndexWrapper)(nil)

View File

@@ -2,10 +2,11 @@ package blossom
import (
"context"
"iter"
"strconv"
"fiatjaf.com/nostr/eventstore"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore"
)
// EventStoreBlobIndexWrapper uses fake events to keep track of what blobs we have stored and who owns them
@@ -15,15 +16,15 @@ type EventStoreBlobIndexWrapper struct {
ServiceURL string
}
func (es EventStoreBlobIndexWrapper) Keep(ctx context.Context, blob BlobDescriptor, pubkey string) error {
ch, err := es.Store.QueryEvents(ctx, nostr.Filter{Authors: []string{pubkey}, Kinds: []int{24242}, Tags: nostr.TagMap{"x": []string{blob.SHA256}}})
if err != nil {
return err
}
func (es EventStoreBlobIndexWrapper) Keep(ctx context.Context, blob BlobDescriptor, pubkey nostr.PubKey) error {
next, stop := iter.Pull(
es.Store.QueryEvents(nostr.Filter{Authors: []nostr.PubKey{pubkey}, Kinds: []uint16{24242}, Tags: nostr.TagMap{"x": []string{blob.SHA256}}}),
)
defer stop()
if <-ch == nil {
if _, exists := next(); !exists {
// doesn't exist, save
evt := &nostr.Event{
evt := nostr.Event{
PubKey: pubkey,
Kind: 24242,
Tags: nostr.Tags{
@@ -34,38 +35,31 @@ func (es EventStoreBlobIndexWrapper) Keep(ctx context.Context, blob BlobDescript
CreatedAt: blob.Uploaded,
}
evt.ID = evt.GetID()
es.Store.SaveEvent(ctx, evt)
es.Store.SaveEvent(evt)
}
return nil
}
func (es EventStoreBlobIndexWrapper) List(ctx context.Context, pubkey string) (chan BlobDescriptor, error) {
ech, err := es.Store.QueryEvents(ctx, nostr.Filter{Authors: []string{pubkey}, Kinds: []int{24242}})
if err != nil {
return nil, err
}
ch := make(chan BlobDescriptor)
go func() {
for evt := range ech {
ch <- es.parseEvent(evt)
func (es EventStoreBlobIndexWrapper) List(ctx context.Context, pubkey nostr.PubKey) iter.Seq[BlobDescriptor] {
return func(yield func(BlobDescriptor) bool) {
for evt := range es.Store.QueryEvents(nostr.Filter{
Authors: []nostr.PubKey{pubkey},
Kinds: []uint16{24242},
}) {
yield(es.parseEvent(evt))
}
close(ch)
}()
return ch, nil
}
}
func (es EventStoreBlobIndexWrapper) Get(ctx context.Context, sha256 string) (*BlobDescriptor, error) {
ech, err := es.Store.QueryEvents(ctx, nostr.Filter{Tags: nostr.TagMap{"x": []string{sha256}}, Kinds: []int{24242}, Limit: 1})
if err != nil {
return nil, err
}
next, stop := iter.Pull(
es.Store.QueryEvents(nostr.Filter{Tags: nostr.TagMap{"x": []string{sha256}}, Kinds: []uint16{24242}, Limit: 1}),
)
evt := <-ech
if evt != nil {
defer stop()
if evt, found := next(); found {
bd := es.parseEvent(evt)
return &bd, nil
}
@@ -73,21 +67,27 @@ func (es EventStoreBlobIndexWrapper) Get(ctx context.Context, sha256 string) (*B
return nil, nil
}
func (es EventStoreBlobIndexWrapper) Delete(ctx context.Context, sha256 string, pubkey string) error {
ech, err := es.Store.QueryEvents(ctx, nostr.Filter{Authors: []string{pubkey}, Tags: nostr.TagMap{"x": []string{sha256}}, Kinds: []int{24242}, Limit: 1})
if err != nil {
return err
}
func (es EventStoreBlobIndexWrapper) Delete(ctx context.Context, sha256 string, pubkey nostr.PubKey) error {
next, stop := iter.Pull(
es.Store.QueryEvents(nostr.Filter{
Authors: []nostr.PubKey{pubkey},
Tags: nostr.TagMap{"x": []string{sha256}},
Kinds: []uint16{24242},
Limit: 1,
},
),
)
evt := <-ech
if evt != nil {
return es.Store.DeleteEvent(ctx, evt)
defer stop()
if evt, found := next(); found {
return es.Store.DeleteEvent(evt.ID)
}
return nil
}
func (es EventStoreBlobIndexWrapper) parseEvent(evt *nostr.Event) BlobDescriptor {
func (es EventStoreBlobIndexWrapper) parseEvent(evt nostr.Event) BlobDescriptor {
hhash := evt.Tags[0][1]
mimetype := evt.Tags[1][1]
ext := getExtension(mimetype)

View File

@@ -87,8 +87,8 @@ func (bs BlossomServer) handleUpload(w http.ResponseWriter, r *http.Request) {
}
// run the reject hooks
for _, ru := range bs.RejectUpload {
reject, reason, code := ru(r.Context(), auth, size, ext)
if nil != bs.RejectUpload {
reject, reason, code := bs.RejectUpload(r.Context(), auth, size, ext)
if reject {
blossomError(w, reason, code)
return
@@ -134,8 +134,8 @@ func (bs BlossomServer) handleUpload(w http.ResponseWriter, r *http.Request) {
}
// save actual blob
for _, sb := range bs.StoreBlob {
if err := sb(r.Context(), hhash, b); err != nil {
if nil != bs.StoreBlob {
if err := bs.StoreBlob(r.Context(), hhash, b); err != nil {
blossomError(w, "failed to save: "+err.Error(), 500)
return
}
@@ -175,8 +175,8 @@ func (bs BlossomServer) handleGetBlob(w http.ResponseWriter, r *http.Request) {
}
}
for _, rg := range bs.RejectGet {
reject, reason, code := rg(r.Context(), auth, hhash)
if nil != bs.RejectGet {
reject, reason, code := bs.RejectGet(r.Context(), auth, hhash)
if reject {
blossomError(w, reason, code)
return
@@ -188,8 +188,8 @@ func (bs BlossomServer) handleGetBlob(w http.ResponseWriter, r *http.Request) {
ext = "." + spl[1]
}
for _, lb := range bs.LoadBlob {
reader, _ := lb(r.Context(), hhash)
if nil != bs.LoadBlob {
reader, _ := bs.LoadBlob(r.Context(), hhash)
if reader != nil {
// use unix epoch as the time if we can't find the descriptor
// as described in the http.ServeContent documentation
@@ -245,26 +245,20 @@ func (bs BlossomServer) handleList(w http.ResponseWriter, r *http.Request) {
}
}
pubkey := r.URL.Path[6:]
pubkey, err := nostr.PubKeyFromHex(r.URL.Path[6:])
for _, rl := range bs.RejectList {
reject, reason, code := rl(r.Context(), auth, pubkey)
if nil != bs.RejectList {
reject, reason, code := bs.RejectList(r.Context(), auth, pubkey)
if reject {
blossomError(w, reason, code)
return
}
}
ch, err := bs.Store.List(r.Context(), pubkey)
if err != nil {
blossomError(w, "failed to query: "+err.Error(), 500)
return
}
w.Write([]byte{'['})
enc := json.NewEncoder(w)
first := true
for bd := range ch {
for bd := range bs.Store.List(r.Context(), pubkey) {
if !first {
w.Write([]byte{','})
} else {
@@ -303,8 +297,8 @@ func (bs BlossomServer) handleDelete(w http.ResponseWriter, r *http.Request) {
}
// should we accept this delete?
for _, rd := range bs.RejectDelete {
reject, reason, code := rd(r.Context(), auth, hhash)
if nil != bs.RejectDelete {
reject, reason, code := bs.RejectDelete(r.Context(), auth, hhash)
if reject {
blossomError(w, reason, code)
return
@@ -319,8 +313,8 @@ func (bs BlossomServer) handleDelete(w http.ResponseWriter, r *http.Request) {
// we will actually only delete the file if no one else owns it
if bd, err := bs.Store.Get(r.Context(), hhash); err == nil && bd == nil {
for _, del := range bs.DeleteBlob {
if err := del(r.Context(), hhash); err != nil {
if nil != bs.DeleteBlob {
if err := bs.DeleteBlob(r.Context(), hhash); err != nil {
blossomError(w, "failed to delete blob: "+err.Error(), 500)
return
}

View File

@@ -21,7 +21,7 @@ type BlossomServer struct {
RejectUpload func(ctx context.Context, auth *nostr.Event, size int, ext string) (bool, string, int)
RejectGet func(ctx context.Context, auth *nostr.Event, sha256 string) (bool, string, int)
RejectList func(ctx context.Context, auth *nostr.Event, pubkey string) (bool, string, int)
RejectList func(ctx context.Context, auth *nostr.Event, pubkey nostr.PubKey) (bool, string, int)
RejectDelete func(ctx context.Context, auth *nostr.Event, sha256 string) (bool, string, int)
}

View File

@@ -6,6 +6,6 @@ import (
// BroadcastEvent emits an event to all listeners whose filters' match, skipping all filters and actions
// it also doesn't attempt to store the event or trigger any reactions or callbacks
func (rl *Relay) BroadcastEvent(evt *nostr.Event) int {
func (rl *Relay) BroadcastEvent(evt nostr.Event) int {
return rl.notifyListeners(evt)
}

View File

@@ -9,7 +9,7 @@ import (
"fiatjaf.com/nostr"
)
func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) error {
func (rl *Relay) handleDeleteRequest(ctx context.Context, evt nostr.Event) error {
// event deletion -- nip09
for _, tag := range evt.Tags {
if len(tag) >= 2 {
@@ -17,7 +17,11 @@ func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) erro
switch tag[0] {
case "e":
f = nostr.Filter{IDs: []string{tag[1]}}
id, err := nostr.IDFromHex(tag[1])
if err != nil {
return fmt.Errorf("invalid 'e' tag '%s': %w", tag[1], err)
}
f = nostr.Filter{IDs: []nostr.ID{id}}
case "a":
spl := strings.Split(tag[1], ":")
if len(spl) != 3 {
@@ -27,11 +31,15 @@ func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) erro
if err != nil {
continue
}
author := spl[1]
author, err := nostr.PubKeyFromHex(spl[1])
if err != nil {
continue
}
identifier := spl[2]
f = nostr.Filter{
Kinds: []int{kind},
Authors: []string{author},
Kinds: []uint16{uint16(kind)},
Authors: []nostr.PubKey{author},
Tags: nostr.TagMap{"d": []string{identifier}},
Until: &evt.CreatedAt,
}
@@ -40,39 +48,30 @@ func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) erro
}
ctx := context.WithValue(ctx, internalCallKey, struct{}{})
for _, query := range rl.QueryEvents {
ch, err := query(ctx, f)
if err != nil {
continue
}
target := <-ch
if target == nil {
continue
}
// got the event, now check if the user can delete it
acceptDeletion := target.PubKey == evt.PubKey
var msg string
if !acceptDeletion {
msg = "you are not the author of this event"
}
// but if we have a function to overwrite this outcome, use that instead
for _, odo := range rl.OverwriteDeletionOutcome {
acceptDeletion, msg = odo(ctx, target, evt)
}
if acceptDeletion {
// delete it
for _, del := range rl.DeleteEvent {
if err := del(ctx, target); err != nil {
return err
}
if nil != rl.QueryStored {
for target := range rl.QueryStored(ctx, f) {
// got the event, now check if the user can delete it
acceptDeletion := target.PubKey == evt.PubKey
var msg string
if !acceptDeletion {
msg = "you are not the author of this event"
}
// if it was tracked to be expired that is not needed anymore
rl.expirationManager.removeEvent(target.ID)
} else {
// fail and stop here
return fmt.Errorf("blocked: %s", msg)
if acceptDeletion {
// delete it
if nil != rl.DeleteEvent {
if err := rl.DeleteEvent(ctx, target.ID); err != nil {
return err
}
}
// if it was tracked to be expired that is not needed anymore
rl.expirationManager.removeEvent(target.ID)
} else {
// fail and stop here
return fmt.Errorf("blocked: %s", msg)
}
}
// don't try to query this same event again

View File

@@ -6,7 +6,7 @@ outline: deep
The [`nostr.Filter` type](https://pkg.go.dev/github.com/nbd-wtf/go-nostr#Filter) has a `Search` field, so you basically just has to handle that if it's present.
It can be tricky to implement fulltext search properly though, so some [eventstores](../core/eventstore) implement it natively, such as [Bluge](https://pkg.go.dev/github.com/fiatjaf/eventstore/bluge), [OpenSearch](https://pkg.go.dev/github.com/fiatjaf/eventstore/opensearch) and [ElasticSearch](https://pkg.go.dev/github.com/fiatjaf/eventstore/elasticsearch) (although for the last two you'll need an instance of these database servers running, while with Bluge it's embedded).
It can be tricky to implement fulltext search properly though, so some [eventstores](../core/eventstore) implement it natively, such as [Bluge](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/bluge), [OpenSearch](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/opensearch) and [ElasticSearch](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/elasticsearch) (although for the last two you'll need an instance of these database servers running, while with Bluge it's embedded).
If you have any of these you can just use them just like any other eventstore:
@@ -33,9 +33,9 @@ func main () {
}
```
Note that in this case we're using the [LMDB](https://pkg.go.dev/github.com/fiatjaf/eventstore/lmdb) adapter for normal queries and it explicitly rejects any filter that contains a `Search` field, while [Bluge](https://pkg.go.dev/github.com/fiatjaf/eventstore/bluge) rejects any filter _without_ a `Search` value, which make them pair well together.
Note that in this case we're using the [LMDB](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/lmdb) adapter for normal queries and it explicitly rejects any filter that contains a `Search` field, while [Bluge](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/bluge) rejects any filter _without_ a `Search` value, which make them pair well together.
Other adapters, like [SQLite](https://pkg.go.dev/github.com/fiatjaf/eventstore/sqlite3), implement search functionality on their own, so if you don't want to use that you would have to have a middleware between, like:
Other adapters, like [SQLite](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/sqlite3), implement search functionality on their own, so if you don't want to use that you would have to have a middleware between, like:
```go
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent, search.SaveEvent)

View File

@@ -6,7 +6,7 @@ outline: deep
Khatru doesn't make any assumptions about how you'll want to store events. Any function can be plugged in to the `StoreEvent`, `DeleteEvent`, `ReplaceEvent` and `QueryEvents` hooks.
However the [`eventstore`](https://github.com/fiatjaf/eventstore) library has adapters that you can easily plug into `khatru`'s hooks.
However the [`eventstore`](https://fiatjaf.com/nostr/eventstore) library has adapters that you can easily plug into `khatru`'s hooks.
# Using the `eventstore` library
@@ -14,7 +14,7 @@ The library includes many different adapters -- often called "backends" --, writ
For all of them you start by instantiating a struct containing some basic options and a pointer (a file path for local databases, a connection string for remote databases) to the data. Then you call `.Init()` and if all is well you're ready to start storing, querying and deleting events, so you can pass the respective functions to their `khatru` counterparts. These eventstores also expose a `.Close()` function that must be called if you're going to stop using that store and keep your application open.
Here's an example with the [Badger](https://pkg.go.dev/github.com/fiatjaf/eventstore/badger) adapter, made for the [Badger](https://github.com/dgraph-io/badger) embedded key-value database:
Here's an example with the [Badger](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/badger) adapter, made for the [Badger](https://github.com/dgraph-io/badger) embedded key-value database:
```go
package main
@@ -23,7 +23,7 @@ import (
"fmt"
"net/http"
"github.com/fiatjaf/eventstore/badger"
"fiatjaf.com/nostr/eventstore/badger"
"github.com/fiatjaf/khatru"
)
@@ -46,11 +46,11 @@ func main() {
}
```
[LMDB](https://pkg.go.dev/github.com/fiatjaf/eventstore/lmdb) works the same way.
[LMDB](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/lmdb) works the same way.
[SQLite](https://pkg.go.dev/github.com/fiatjaf/eventstore/sqlite3) also stores things locally so it only needs a `Path`.
[SQLite](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/sqlite3) also stores things locally so it only needs a `Path`.
[PostgreSQL](https://pkg.go.dev/github.com/fiatjaf/eventstore/postgresql) and [MySQL](https://pkg.go.dev/github.com/fiatjaf/eventstore/mysql) use remote connections to database servers, so they take a `DatabaseURL` parameter, but after that it's the same.
[PostgreSQL](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/postgresql) and [MySQL](https://pkg.go.dev/fiatjaf.com/nostr/eventstore/mysql) use remote connections to database servers, so they take a `DatabaseURL` parameter, but after that it's the same.
## Using two at a time

View File

@@ -31,7 +31,7 @@ relay.Info.Description = "this is my custom relay"
relay.Info.Icon = "https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Fliquipedia.net%2Fcommons%2Fimages%2F3%2F35%2FSCProbe.jpg&f=1&nofb=1&ipt=0cbbfef25bce41da63d910e86c3c343e6c3b9d63194ca9755351bb7c2efa3359&ipo=images"
```
Now we must set up the basic functions for accepting events and answering queries. We could make our own querying engine from scratch, but we can also use [eventstore](https://github.com/fiatjaf/eventstore). In this example we'll use the SQLite adapter:
Now we must set up the basic functions for accepting events and answering queries. We could make our own querying engine from scratch, but we can also use [eventstore](https://fiatjaf.com/nostr/eventstore). In this example we'll use the SQLite adapter:
```go
db := sqlite3.SQLite3Backend{DatabaseURL: "/tmp/khatru-sqlite-tmp"}

View File

@@ -7,9 +7,9 @@ import (
"fiatjaf.com/nostr"
)
func (rl *Relay) handleEphemeral(ctx context.Context, evt *nostr.Event) error {
for _, reject := range rl.RejectEvent {
if reject, msg := reject(ctx, evt); reject {
func (rl *Relay) handleEphemeral(ctx context.Context, evt nostr.Event) error {
if nil != rl.OnEvent {
if reject, msg := rl.OnEvent(ctx, evt); reject {
if msg == "" {
return errors.New("blocked: no reason")
} else {
@@ -18,8 +18,8 @@ func (rl *Relay) handleEphemeral(ctx context.Context, evt *nostr.Event) error {
}
}
for _, oee := range rl.OnEphemeralEvent {
oee(ctx, evt)
if nil != rl.OnEphemeralEvent {
rl.OnEphemeralEvent(ctx, evt)
}
return nil

View File

@@ -11,16 +11,13 @@ import (
func main() {
relay := khatru.NewRelay()
db := badger.BadgerBackend{Path: "/tmp/khatru-badgern-tmp"}
db := &badger.BadgerBackend{Path: "/tmp/khatru-badgern-tmp"}
if err := db.Init(); err != nil {
panic(err)
}
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
relay.ReplaceEvent = append(relay.ReplaceEvent, db.ReplaceEvent)
relay.UseEventstore(db)
relay.Negentropy = true
fmt.Println("running on :3334")

View File

@@ -12,17 +12,13 @@ import (
func main() {
relay := khatru.NewRelay()
db := lmdb.LMDBBackend{Path: "/tmp/khatru-lmdb-tmp"}
db := &lmdb.LMDBBackend{Path: "/tmp/khatru-lmdb-tmp"}
os.MkdirAll(db.Path, 0o755)
if err := db.Init(); err != nil {
panic(err)
}
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
relay.ReplaceEvent = append(relay.ReplaceEvent, db.ReplaceEvent)
relay.UseEventstore(db)
fmt.Println("running on :3334")
http.ListenAndServe(":3334", relay)

View File

@@ -19,11 +19,8 @@ func main() {
if err := db.Init(); err != nil {
panic(err)
}
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
relay.ReplaceEvent = append(relay.ReplaceEvent, db.ReplaceEvent)
relay.UseEventstore(db)
bdb := &badger.BadgerBackend{Path: "/tmp/khatru-badger-blossom-tmp"}
if err := bdb.Init(); err != nil {
@@ -31,15 +28,15 @@ func main() {
}
bl := blossom.New(relay, "http://localhost:3334")
bl.Store = blossom.EventStoreBlobIndexWrapper{Store: bdb, ServiceURL: bl.ServiceURL}
bl.StoreBlob = append(bl.StoreBlob, func(ctx context.Context, sha256 string, body []byte) error {
bl.StoreBlob = func(ctx context.Context, sha256 string, body []byte) error {
fmt.Println("storing", sha256, len(body))
return nil
})
bl.LoadBlob = append(bl.LoadBlob, func(ctx context.Context, sha256 string) (io.ReadSeeker, error) {
}
bl.LoadBlob = func(ctx context.Context, sha256 string) (io.ReadSeeker, error) {
fmt.Println("loading", sha256)
blob := strings.NewReader("aaaaa")
return blob, nil
})
}
fmt.Println("running on :3334")
http.ListenAndServe(":3334", relay)

View File

@@ -6,31 +6,28 @@ import (
"net/http"
"os"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/lmdb"
"fiatjaf.com/nostr/khatru"
"fiatjaf.com/nostr/khatru/policies"
"fiatjaf.com/nostr"
)
func main() {
relay := khatru.NewRelay()
db := lmdb.LMDBBackend{Path: "/tmp/exclusive"}
db := &lmdb.LMDBBackend{Path: "/tmp/exclusive"}
os.MkdirAll(db.Path, 0o755)
if err := db.Init(); err != nil {
panic(err)
}
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
relay.UseEventstore(db)
relay.RejectEvent = append(relay.RejectEvent, policies.PreventTooManyIndexableTags(10, nil, nil))
relay.RejectFilter = append(relay.RejectFilter, policies.NoComplexFilters)
relay.OnEvent = policies.PreventTooManyIndexableTags(10, nil, nil)
relay.OnRequest = policies.NoComplexFilters
relay.OnEventSaved = append(relay.OnEventSaved, func(ctx context.Context, event *nostr.Event) {
})
relay.OnEventSaved = func(ctx context.Context, event nostr.Event) {
}
fmt.Println("running on :3334")
http.ListenAndServe(":3334", relay)

View File

@@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"iter"
"log"
"net/http"
@@ -22,45 +23,36 @@ func main() {
relay.Info.Icon = "https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Fliquipedia.net%2Fcommons%2Fimages%2F3%2F35%2FSCProbe.jpg&f=1&nofb=1&ipt=0cbbfef25bce41da63d910e86c3c343e6c3b9d63194ca9755351bb7c2efa3359&ipo=images"
// you must bring your own storage scheme -- if you want to have any
store := make(map[string]*nostr.Event, 120)
store := make(map[nostr.ID]nostr.Event, 120)
// set up the basic relay functions
relay.StoreEvent = append(relay.StoreEvent,
func(ctx context.Context, event *nostr.Event) error {
store[event.ID] = event
return nil
},
)
relay.QueryEvents = append(relay.QueryEvents,
func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
ch := make(chan *nostr.Event)
go func() {
for _, evt := range store {
if filter.Matches(evt) {
ch <- evt
}
relay.StoreEvent = func(ctx context.Context, event nostr.Event) error {
store[event.ID] = event
return nil
}
relay.QueryStored = func(ctx context.Context, filter nostr.Filter) iter.Seq[nostr.Event] {
return func(yield func(nostr.Event) bool) {
for _, evt := range store {
if filter.Matches(evt) {
yield(evt)
}
close(ch)
}()
return ch, nil
},
)
relay.DeleteEvent = append(relay.DeleteEvent,
func(ctx context.Context, event *nostr.Event) error {
delete(store, event.ID)
return nil
},
)
}
}
}
relay.DeleteEvent = func(ctx context.Context, id nostr.ID) error {
delete(store, id)
return nil
}
// there are many other configurable things you can set
relay.RejectEvent = append(relay.RejectEvent,
relay.OnEvent = policies.SeqEvent(
// built-in policies
policies.ValidateKind,
policies.PreventLargeTags(100),
// define your own policies
policies.PreventLargeTags(100),
func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
if event.PubKey == "fa984bd7dbb282f07e16e7ae87b26a2a7b9b90b7246a44771f0cf5ae58018f52" {
func(ctx context.Context, event nostr.Event) (reject bool, msg string) {
if event.PubKey == nostr.MustPubKeyFromHex("fa984bd7dbb282f07e16e7ae87b26a2a7b9b90b7246a44771f0cf5ae58018f52") {
return true, "we don't allow this person to write here"
}
return false, "" // anyone else can
@@ -68,13 +60,13 @@ func main() {
)
// you can request auth by rejecting an event or a request with the prefix "auth-required: "
relay.RejectFilter = append(relay.RejectFilter,
relay.OnRequest = policies.SeqRequest(
// built-in policies
policies.NoComplexFilters,
// define your own policies
func(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
if pubkey := khatru.GetAuthed(ctx); pubkey != "" {
if pubkey, isAuthed := khatru.GetAuthed(ctx); !isAuthed {
log.Printf("request from %s\n", pubkey)
return false, ""
}

View File

@@ -12,29 +12,20 @@ import (
)
func main() {
db1 := slicestore.SliceStore{}
db1 := &slicestore.SliceStore{}
db1.Init()
r1 := khatru.NewRelay()
r1.StoreEvent = append(r1.StoreEvent, db1.SaveEvent)
r1.QueryEvents = append(r1.QueryEvents, db1.QueryEvents)
r1.CountEvents = append(r1.CountEvents, db1.CountEvents)
r1.DeleteEvent = append(r1.DeleteEvent, db1.DeleteEvent)
r1.UseEventstore(db1)
db2 := badger.BadgerBackend{DatabaseURL: "/tmp/t"}
db2 := &badger.BadgerBackend{Path: "/tmp/t"}
db2.Init()
r2 := khatru.NewRelay()
r2.StoreEvent = append(r2.StoreEvent, db2.SaveEvent)
r2.QueryEvents = append(r2.QueryEvents, db2.QueryEvents)
r2.CountEvents = append(r2.CountEvents, db2.CountEvents)
r2.DeleteEvent = append(r2.DeleteEvent, db2.DeleteEvent)
r2.UseEventstore(db2)
db3 := slicestore.SliceStore{}
db3 := &slicestore.SliceStore{}
db3.Init()
r3 := khatru.NewRelay()
r3.StoreEvent = append(r3.StoreEvent, db3.SaveEvent)
r3.QueryEvents = append(r3.QueryEvents, db3.QueryEvents)
r3.CountEvents = append(r3.CountEvents, db3.CountEvents)
r3.DeleteEvent = append(r3.DeleteEvent, db3.DeleteEvent)
r3.UseEventstore(db3)
router := khatru.NewRouter()

View File

@@ -11,7 +11,7 @@ import (
)
type expiringEvent struct {
id string
id nostr.ID
expiresAt nostr.Timestamp
}
@@ -74,13 +74,8 @@ func (em *expirationManager) initialScan(ctx context.Context) {
// query all events
ctx = context.WithValue(ctx, internalCallKey, struct{}{})
for _, query := range em.relay.QueryEvents {
ch, err := query(ctx, nostr.Filter{})
if err != nil {
continue
}
for evt := range ch {
if nil != em.relay.QueryStored {
for evt := range em.relay.QueryStored(ctx, nostr.Filter{}) {
if expiresAt := nip40.GetExpiration(evt.Tags); expiresAt != -1 {
heap.Push(&em.events, expiringEvent{
id: evt.ID,
@@ -109,23 +104,13 @@ func (em *expirationManager) checkExpiredEvents(ctx context.Context) {
heap.Pop(&em.events)
ctx := context.WithValue(ctx, internalCallKey, struct{}{})
for _, query := range em.relay.QueryEvents {
ch, err := query(ctx, nostr.Filter{IDs: []string{next.id}})
if err != nil {
continue
}
if evt := <-ch; evt != nil {
for _, del := range em.relay.DeleteEvent {
del(ctx, evt)
}
}
break
if nil != em.relay.DeleteEvent {
em.relay.DeleteEvent(ctx, next.id)
}
}
}
func (em *expirationManager) trackEvent(evt *nostr.Event) {
func (em *expirationManager) trackEvent(evt nostr.Event) {
if expiresAt := nip40.GetExpiration(evt.Tags); expiresAt != -1 {
em.mu.Lock()
heap.Push(&em.events, expiringEvent{
@@ -136,7 +121,7 @@ func (em *expirationManager) trackEvent(evt *nostr.Event) {
}
}
func (em *expirationManager) removeEvent(id string) {
func (em *expirationManager) removeEvent(id nostr.ID) {
em.mu.Lock()
defer em.mu.Unlock()

View File

@@ -176,8 +176,8 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
// check NIP-70 protected
if nip70.IsProtected(env.Event) {
authed := GetAuthed(ctx)
if authed == "" {
authed, isAuthed := GetAuthed(ctx)
if isAuthed {
RequestAuth(ctx)
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
@@ -213,20 +213,20 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
if env.Event.Kind == 5 {
// this always returns "blocked: " whenever it returns an error
writeErr = srl.handleDeleteRequest(ctx, &env.Event)
writeErr = srl.handleDeleteRequest(ctx, env.Event)
} else if nostr.IsEphemeralKind(env.Event.Kind) {
// this will also always return a prefixed reason
writeErr = srl.handleEphemeral(ctx, &env.Event)
writeErr = srl.handleEphemeral(ctx, env.Event)
} else {
// this will also always return a prefixed reason
skipBroadcast, writeErr = srl.handleNormal(ctx, &env.Event)
skipBroadcast, writeErr = srl.handleNormal(ctx, env.Event)
}
var reason string
if writeErr == nil {
ok = true
if !skipBroadcast {
n := srl.notifyListeners(&env.Event)
n := srl.notifyListeners(env.Event)
// the number of notified listeners matters in ephemeral events
if nostr.IsEphemeralKind(env.Event.Kind) {
@@ -247,12 +247,12 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
}
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: ok, Reason: reason})
case *nostr.CountEnvelope:
if rl.CountEvents == nil && rl.CountEventsHLL == nil {
if rl.Count == nil && rl.CountHLL == nil {
ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: "unsupported: this relay does not support NIP-45"})
return
}
var total int64
var total uint32
var hll *hyperloglog.HyperLogLog
srl := rl
@@ -278,7 +278,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
case *nostr.ReqEnvelope:
eose := sync.WaitGroup{}
eose.Add(len(env.Filters))
eose.Add(1)
// a context just for the "stored events" request handler
reqCtx, cancelReqCtx := context.WithCancelCause(ctx)
@@ -287,24 +287,22 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
reqCtx = context.WithValue(reqCtx, subscriptionIdKey, env.SubscriptionID)
// handle each filter separately -- dispatching events as they're loaded from databases
for _, filter := range env.Filters {
srl := rl
if rl.getSubRelayFromFilter != nil {
srl = rl.getSubRelayFromFilter(filter)
}
err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter)
if err != nil {
// fail everything if any filter is rejected
reason := err.Error()
if strings.HasPrefix(reason, "auth-required:") {
RequestAuth(ctx)
}
ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason})
cancelReqCtx(errors.New("filter rejected"))
return
} else {
rl.addListener(ws, env.SubscriptionID, srl, filter, cancelReqCtx)
srl := rl
if rl.getSubRelayFromFilter != nil {
srl = rl.getSubRelayFromFilter(env.Filter)
}
err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, env.Filter)
if err != nil {
// fail everything if any filter is rejected
reason := err.Error()
if strings.HasPrefix(reason, "auth-required:") {
RequestAuth(ctx)
}
ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason})
cancelReqCtx(errors.New("filter rejected"))
return
} else {
rl.addListener(ws, env.SubscriptionID, srl, env.Filter, cancelReqCtx)
}
go func() {
@@ -317,7 +315,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
rl.removeListenerId(ws, id)
case *nostr.AuthEnvelope:
wsBaseUrl := strings.Replace(rl.getBaseURL(r), "http", "ws", 1)
if pubkey, ok := nip42.ValidateAuthEvent(&env.Event, ws.Challenge, wsBaseUrl); ok {
if pubkey, ok := nip42.ValidateAuthEvent(env.Event, ws.Challenge, wsBaseUrl); ok {
ws.AuthedPublicKey = pubkey
ws.authLock.Lock()
if ws.Authed != nil {

View File

@@ -1,6 +1,7 @@
package khatru
import (
"bytes"
"net"
"net/http"
"strings"
@@ -10,7 +11,7 @@ import (
func isOlder(previous, next *nostr.Event) bool {
return previous.CreatedAt < next.CreatedAt ||
(previous.CreatedAt == next.CreatedAt && previous.ID > next.ID)
(previous.CreatedAt == next.CreatedAt && bytes.Compare(previous.ID[:], next.ID[:]) == 1)
}
var privateMasks = func() []net.IPNet {

View File

@@ -133,17 +133,17 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
}
// returns how many listeners were notified
func (rl *Relay) notifyListeners(event *nostr.Event) int {
func (rl *Relay) notifyListeners(event nostr.Event) int {
count := 0
listenersloop:
for _, listener := range rl.listeners {
if listener.filter.Matches(event) {
for _, pb := range rl.PreventBroadcast {
if pb(listener.ws, event) {
if nil != rl.PreventBroadcast {
if rl.PreventBroadcast(listener.ws, event) {
continue listenersloop
}
}
listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.id, Event: *event})
listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.id, Event: event})
count++
}
}

View File

@@ -17,7 +17,7 @@ func FuzzRandomListenerClientRemoving(f *testing.F) {
rl := NewRelay()
f := nostr.Filter{Kinds: []int{1}}
f := nostr.Filter{Kinds: []uint16{1}}
cancel := func(cause error) {}
websockets := make([]*WebSocket, 0, totalWebsockets*baseSubs)
@@ -71,7 +71,7 @@ func FuzzRandomListenerIdRemoving(f *testing.F) {
rl := NewRelay()
f := nostr.Filter{Kinds: []int{1}}
f := nostr.Filter{Kinds: []uint16{1}}
cancel := func(cause error) {}
websockets := make([]*WebSocket, 0, totalWebsockets)
@@ -150,7 +150,7 @@ func FuzzRouterListenersPabloCrash(f *testing.F) {
rl.clients[ws] = make([]listenerSpec, 0, subIterations)
}
f := nostr.Filter{Kinds: []int{1}}
f := nostr.Filter{Kinds: []uint16{1}}
cancel := func(cause error) {}
type wsid struct {

View File

@@ -29,9 +29,9 @@ func TestListenerSetupAndRemoveOnce(t *testing.T) {
ws1 := &WebSocket{}
ws2 := &WebSocket{}
f1 := nostr.Filter{Kinds: []int{1}}
f2 := nostr.Filter{Kinds: []int{2}}
f3 := nostr.Filter{Kinds: []int{3}}
f1 := nostr.Filter{Kinds: []uint16{1}}
f2 := nostr.Filter{Kinds: []uint16{2}}
f3 := nostr.Filter{Kinds: []uint16{3}}
rl.clients[ws1] = nil
rl.clients[ws2] = nil
@@ -86,9 +86,9 @@ func TestListenerMoreConvolutedCase(t *testing.T) {
ws3 := &WebSocket{}
ws4 := &WebSocket{}
f1 := nostr.Filter{Kinds: []int{1}}
f2 := nostr.Filter{Kinds: []int{2}}
f3 := nostr.Filter{Kinds: []int{3}}
f1 := nostr.Filter{Kinds: []uint16{1}}
f2 := nostr.Filter{Kinds: []uint16{2}}
f3 := nostr.Filter{Kinds: []uint16{3}}
rl.clients[ws1] = nil
rl.clients[ws2] = nil
@@ -205,9 +205,9 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) {
ws3 := &WebSocket{}
ws4 := &WebSocket{}
f1 := nostr.Filter{Kinds: []int{1}}
f2 := nostr.Filter{Kinds: []int{2}}
f3 := nostr.Filter{Kinds: []int{3}}
f1 := nostr.Filter{Kinds: []uint16{1}}
f2 := nostr.Filter{Kinds: []uint16{2}}
f3 := nostr.Filter{Kinds: []uint16{3}}
rlx := NewRelay()
rly := NewRelay()
@@ -424,7 +424,7 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) {
func TestRandomListenerClientRemoving(t *testing.T) {
rl := NewRelay()
f := nostr.Filter{Kinds: []int{1}}
f := nostr.Filter{Kinds: []uint16{1}}
cancel := func(cause error) {}
websockets := make([]*WebSocket, 0, 20)
@@ -463,7 +463,7 @@ func TestRandomListenerClientRemoving(t *testing.T) {
func TestRandomListenerIdRemoving(t *testing.T) {
rl := NewRelay()
f := nostr.Filter{Kinds: []int{1}}
f := nostr.Filter{Kinds: []uint16{1}}
cancel := func(cause error) {}
websockets := make([]*WebSocket, 0, 20)
@@ -531,7 +531,7 @@ func TestRouterListenersPabloCrash(t *testing.T) {
rl.clients[ws2] = nil
rl.clients[ws3] = nil
f := nostr.Filter{Kinds: []int{1}}
f := nostr.Filter{Kinds: []uint16{1}}
cancel := func(cause error) {}
rl.addListener(ws1, ":1", rla, f, cancel)

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"fiatjaf.com/nostr/eventstore"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/nip77/negentropy"
"fiatjaf.com/nostr/nip77/negentropy/storage/vector"
@@ -17,33 +16,22 @@ type NegentropySession struct {
}
func (rl *Relay) startNegentropySession(ctx context.Context, filter nostr.Filter) (*vector.Vector, error) {
ctx = eventstore.SetNegentropy(ctx)
// do the same overwrite/reject flow we do in normal REQs
for _, ovw := range rl.OverwriteFilter {
ovw(ctx, &filter)
}
if filter.LimitZero {
return nil, fmt.Errorf("invalid limit 0")
}
for _, reject := range rl.RejectFilter {
if reject, msg := reject(ctx, filter); reject {
ctx = SetNegentropy(ctx)
if nil != rl.OnRequest {
if reject, msg := rl.OnRequest(ctx, filter); reject {
return nil, errors.New(nostr.NormalizeOKMessage(msg, "blocked"))
}
}
// fetch events and add them to a negentropy Vector store
vec := vector.New()
for _, query := range rl.QueryEvents {
ch, err := query(ctx, filter)
if err != nil {
continue
} else if ch == nil {
continue
}
for event := range ch {
// since the goal here is to sync databases we won't do fancy stuff like overwrite events
if nil != rl.QueryStored {
for event := range rl.QueryStored(ctx, filter) {
vec.Insert(event.CreatedAt, event.ID)
}
}
@@ -51,3 +39,13 @@ func (rl *Relay) startNegentropySession(ctx context.Context, filter nostr.Filter
return vec, nil
}
var negentropySessionKey = struct{}{}
func IsNegentropySession(ctx context.Context) bool {
return ctx.Value(negentropySessionKey) != nil
}
func SetNegentropy(ctx context.Context) context.Context {
return context.WithValue(ctx, negentropySessionKey, struct{}{})
}

View File

@@ -11,10 +11,10 @@ func (rl *Relay) HandleNIP11(w http.ResponseWriter, r *http.Request) {
info := *rl.Info
if len(rl.DeleteEvent) > 0 {
if nil != rl.DeleteEvent {
info.AddSupportedNIP(9)
}
if len(rl.CountEvents) > 0 {
if nil != rl.Count {
info.AddSupportedNIP(45)
}
if rl.Negentropy {
@@ -30,8 +30,8 @@ func (rl *Relay) HandleNIP11(w http.ResponseWriter, r *http.Request) {
info.Banner = strings.TrimSuffix(baseURL, "/") + "/" + strings.TrimPrefix(info.Banner, "/")
}
for _, ovw := range rl.OverwriteRelayInformation {
info = ovw(r.Context(), r, info)
if nil != rl.OverwriteRelayInformation {
info = rl.OverwriteRelayInformation(r.Context(), r, info)
}
json.NewEncoder(w).Encode(info)

View File

@@ -16,25 +16,25 @@ import (
//
// If ignoreKinds is given this restriction will not apply to these kinds (useful for allowing a bigger).
// If onlyKinds is given then all other kinds will be ignored.
func PreventTooManyIndexableTags(max int, ignoreKinds []int, onlyKinds []int) func(context.Context, *nostr.Event) (bool, string) {
func PreventTooManyIndexableTags(max int, ignoreKinds []uint16, onlyKinds []uint16) func(context.Context, nostr.Event) (bool, string) {
slices.Sort(ignoreKinds)
slices.Sort(onlyKinds)
ignore := func(kind int) bool { return false }
ignore := func(kind uint16) bool { return false }
if len(ignoreKinds) > 0 {
ignore = func(kind int) bool {
ignore = func(kind uint16) bool {
_, isIgnored := slices.BinarySearch(ignoreKinds, kind)
return isIgnored
}
}
if len(onlyKinds) > 0 {
ignore = func(kind int) bool {
ignore = func(kind uint16) bool {
_, isApplicable := slices.BinarySearch(onlyKinds, kind)
return !isApplicable
}
}
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
return func(ctx context.Context, event nostr.Event) (reject bool, msg string) {
if ignore(event.Kind) {
return false, ""
}
@@ -53,8 +53,8 @@ func PreventTooManyIndexableTags(max int, ignoreKinds []int, onlyKinds []int) fu
}
// PreventLargeTags rejects events that have indexable tag values greater than maxTagValueLen.
func PreventLargeTags(maxTagValueLen int) func(context.Context, *nostr.Event) (bool, string) {
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
func PreventLargeTags(maxTagValueLen int) func(context.Context, nostr.Event) (bool, string) {
return func(ctx context.Context, event nostr.Event) (reject bool, msg string) {
for _, tag := range event.Tags {
if len(tag) > 1 && len(tag[0]) == 1 {
if len(tag[1]) > maxTagValueLen {
@@ -68,11 +68,11 @@ func PreventLargeTags(maxTagValueLen int) func(context.Context, *nostr.Event) (b
// RestrictToSpecifiedKinds returns a function that can be used as a RejectFilter that will reject
// any events with kinds different than the specified ones.
func RestrictToSpecifiedKinds(allowEphemeral bool, kinds ...uint16) func(context.Context, *nostr.Event) (bool, string) {
func RestrictToSpecifiedKinds(allowEphemeral bool, kinds ...uint16) func(context.Context, nostr.Event) (bool, string) {
// sort the kinds in increasing order
slices.Sort(kinds)
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
return func(ctx context.Context, event nostr.Event) (reject bool, msg string) {
if allowEphemeral && nostr.IsEphemeralKind(event.Kind) {
return false, ""
}
@@ -85,9 +85,9 @@ func RestrictToSpecifiedKinds(allowEphemeral bool, kinds ...uint16) func(context
}
}
func PreventTimestampsInThePast(threshold time.Duration) func(context.Context, *nostr.Event) (bool, string) {
func PreventTimestampsInThePast(threshold time.Duration) func(context.Context, nostr.Event) (bool, string) {
thresholdSeconds := nostr.Timestamp(threshold.Seconds())
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
return func(ctx context.Context, event nostr.Event) (reject bool, msg string) {
if nostr.Now()-event.CreatedAt > thresholdSeconds {
return true, "event too old"
}
@@ -95,9 +95,9 @@ func PreventTimestampsInThePast(threshold time.Duration) func(context.Context, *
}
}
func PreventTimestampsInTheFuture(threshold time.Duration) func(context.Context, *nostr.Event) (bool, string) {
func PreventTimestampsInTheFuture(threshold time.Duration) func(context.Context, nostr.Event) (bool, string) {
thresholdSeconds := nostr.Timestamp(threshold.Seconds())
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
return func(ctx context.Context, event nostr.Event) (reject bool, msg string) {
if event.CreatedAt-nostr.Now() > thresholdSeconds {
return true, "event too much in the future"
}
@@ -105,12 +105,12 @@ func PreventTimestampsInTheFuture(threshold time.Duration) func(context.Context,
}
}
func RejectEventsWithBase64Media(ctx context.Context, evt *nostr.Event) (bool, string) {
func RejectEventsWithBase64Media(ctx context.Context, evt nostr.Event) (bool, string) {
return strings.Contains(evt.Content, "data:image/") || strings.Contains(evt.Content, "data:video/"), "event with base64 media"
}
func OnlyAllowNIP70ProtectedEvents(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
if nip70.IsProtected(*event) {
func OnlyAllowNIP70ProtectedEvents(ctx context.Context, event nostr.Event) (reject bool, msg string) {
if nip70.IsProtected(event) {
return false, ""
}
return true, "blocked: we only accept events protected with the nip70 \"-\" tag"

View File

@@ -4,8 +4,8 @@ import (
"context"
"slices"
"fiatjaf.com/nostr/khatru"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/khatru"
)
// NoComplexFilters disallows filters with more than 2 tags.
@@ -21,7 +21,7 @@ func NoComplexFilters(ctx context.Context, filter nostr.Filter) (reject bool, ms
// MustAuth requires all subscribers to be authenticated
func MustAuth(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
if khatru.GetAuthed(ctx) == "" {
if _, isAuthed := khatru.GetAuthed(ctx); !isAuthed {
return true, "auth-required: all requests must be authenticated"
}
return false, ""
@@ -63,7 +63,7 @@ func RemoveSearchQueries(ctx context.Context, filter *nostr.Filter) {
func RemoveAllButKinds(kinds ...uint16) func(context.Context, *nostr.Filter) {
return func(ctx context.Context, filter *nostr.Filter) {
if n := len(filter.Kinds); n > 0 {
newKinds := make([]int, 0, n)
newKinds := make([]uint16, 0, n)
for i := 0; i < n; i++ {
if k := filter.Kinds[i]; slices.Contains(kinds, uint16(k)) {
newKinds = append(newKinds, k)

View File

@@ -7,7 +7,7 @@ import (
"fiatjaf.com/nostr"
)
func ValidateKind(ctx context.Context, evt *nostr.Event) (bool, string) {
func ValidateKind(ctx context.Context, evt nostr.Event) (bool, string) {
switch evt.Kind {
case 0:
var m struct {

47
khatru/policies/multi.go Normal file
View File

@@ -0,0 +1,47 @@
package policies
import (
"context"
"fiatjaf.com/nostr"
)
func SeqEvent(
funcs ...func(ctx context.Context, evt nostr.Event) (bool, string),
) func(context.Context, nostr.Event) (reject bool, reason string) {
return func(ctx context.Context, evt nostr.Event) (reject bool, reason string) {
for _, fn := range funcs {
reject, reason := fn(ctx, evt)
if reject {
return reject, reason
}
}
return false, ""
}
}
func SeqStore(funcs ...func(ctx context.Context, evt nostr.Event) error) func(context.Context, nostr.Event) error {
return func(ctx context.Context, evt nostr.Event) error {
for _, fn := range funcs {
err := fn(ctx, evt)
if err != nil {
return err
}
}
return nil
}
}
func SeqRequest(
funcs ...func(ctx context.Context, evt nostr.Filter) (bool, string),
) func(context.Context, nostr.Filter) (reject bool, reason string) {
return func(ctx context.Context, evt nostr.Filter) (reject bool, reason string) {
for _, fn := range funcs {
reject, reason := fn(ctx, evt)
if reject {
return reject, reason
}
}
return false, ""
}
}

View File

@@ -1,38 +0,0 @@
package policies
import (
"context"
"slices"
"fiatjaf.com/nostr/khatru"
"fiatjaf.com/nostr"
)
// RejectKind04Snoopers prevents reading NIP-04 messages from people not involved in the conversation.
func RejectKind04Snoopers(ctx context.Context, filter nostr.Filter) (bool, string) {
// prevent kind-4 events from being returned to unauthed users,
// only when authentication is a thing
if !slices.Contains(filter.Kinds, 4) {
return false, ""
}
ws := khatru.GetConnection(ctx)
senders := filter.Authors
receivers, _ := filter.Tags["p"]
switch {
case ws.AuthedPublicKey == "":
// not authenticated
return true, "restricted: this relay does not serve kind-4 to unauthenticated users, does your client implement NIP-42?"
case len(senders) == 1 && len(receivers) < 2 && (senders[0] == ws.AuthedPublicKey):
// allowed filter: ws.authed is sole sender (filter specifies one or all receivers)
return false, ""
case len(receivers) == 1 && len(senders) < 2 && (receivers[0] == ws.AuthedPublicKey):
// allowed filter: ws.authed is sole receiver (filter specifies one or all senders)
return false, ""
default:
// restricted filter: do not return any events,
// even if other elements in filters array were not restricted).
// client should know better.
return true, "restricted: authenticated user does not have authorization for requested filters."
}
}

View File

@@ -5,14 +5,14 @@ import (
"net/http"
"time"
"fiatjaf.com/nostr/khatru"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/khatru"
)
func EventIPRateLimiter(tokensPerInterval int, interval time.Duration, maxTokens int) func(ctx context.Context, _ *nostr.Event) (reject bool, msg string) {
func EventIPRateLimiter(tokensPerInterval int, interval time.Duration, maxTokens int) func(ctx context.Context, _ nostr.Event) (reject bool, msg string) {
rl := startRateLimitSystem[string](tokensPerInterval, interval, maxTokens)
return func(ctx context.Context, _ *nostr.Event) (reject bool, msg string) {
return func(ctx context.Context, _ nostr.Event) (reject bool, msg string) {
ip := khatru.GetIP(ctx)
if ip == "" {
return false, ""
@@ -21,11 +21,11 @@ func EventIPRateLimiter(tokensPerInterval int, interval time.Duration, maxTokens
}
}
func EventPubKeyRateLimiter(tokensPerInterval int, interval time.Duration, maxTokens int) func(ctx context.Context, _ *nostr.Event) (reject bool, msg string) {
func EventPubKeyRateLimiter(tokensPerInterval int, interval time.Duration, maxTokens int) func(ctx context.Context, _ nostr.Event) (reject bool, msg string) {
rl := startRateLimitSystem[string](tokensPerInterval, interval, maxTokens)
return func(ctx context.Context, evt *nostr.Event) (reject bool, msg string) {
return rl(evt.PubKey), "rate-limited: slow down, please"
return func(ctx context.Context, evt nostr.Event) (reject bool, msg string) {
return rl(evt.PubKey.Hex()), "rate-limited: slow down, please"
}
}

View File

@@ -7,17 +7,15 @@ import (
)
func ApplySaneDefaults(relay *khatru.Relay) {
relay.RejectEvent = append(relay.RejectEvent,
relay.OnEvent = SeqEvent(
RejectEventsWithBase64Media,
EventIPRateLimiter(2, time.Minute*3, 10),
)
relay.RejectFilter = append(relay.RejectFilter,
relay.OnRequest = SeqRequest(
NoComplexFilters,
FilterIPRateLimiter(20, time.Minute, 100),
)
relay.RejectConnection = append(relay.RejectConnection,
ConnectionRateLimiter(1, time.Minute*5, 100),
)
relay.RejectConnection = ConnectionRateLimiter(1, time.Minute*5, 100)
}

View File

@@ -12,6 +12,7 @@ import (
"time"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore"
"fiatjaf.com/nostr/nip11"
"fiatjaf.com/nostr/nip45/hyperloglog"
"github.com/fasthttp/websocket"
@@ -57,23 +58,22 @@ type Relay struct {
ServiceURL string
// hooks that will be called at various times
RejectEvent func(ctx context.Context, event *nostr.Event) (reject bool, msg string)
OverwriteDeletionOutcome func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string)
OnEvent func(ctx context.Context, event nostr.Event) (reject bool, msg string)
StoreEvent func(ctx context.Context, event nostr.Event) error
ReplaceEvent func(ctx context.Context, event nostr.Event) error
DeleteEvent func(ctx context.Context, id nostr.ID) error
OnEventSaved func(ctx context.Context, event nostr.Event)
OnEphemeralEvent func(ctx context.Context, event nostr.Event)
RejectFilter func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
RejectCountFilter func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
QueryEvents func(ctx context.Context, filter nostr.Filter) iter.Seq[nostr.Event]
CountEvents func(ctx context.Context, filter nostr.Filter) (uint32, error)
CountEventsHLL func(ctx context.Context, filter nostr.Filter, offset int) (uint32, *hyperloglog.HyperLogLog, error)
OnRequest func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
OnCountFilter func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
QueryStored func(ctx context.Context, filter nostr.Filter) iter.Seq[nostr.Event]
Count func(ctx context.Context, filter nostr.Filter) (uint32, error)
CountHLL func(ctx context.Context, filter nostr.Filter, offset int) (uint32, *hyperloglog.HyperLogLog, error)
RejectConnection func(r *http.Request) bool
OnConnect func(ctx context.Context)
OnDisconnect func(ctx context.Context)
OverwriteRelayInformation func(ctx context.Context, r *http.Request, info nip11.RelayInformationDocument) nip11.RelayInformationDocument
PreventBroadcast func(ws *WebSocket, event *nostr.Event) bool
PreventBroadcast func(ws *WebSocket, event nostr.Event) bool
// these are used when this relays acts as a router
routes []Route
@@ -117,6 +117,24 @@ type Relay struct {
expirationManager *expirationManager
}
func (rl *Relay) UseEventstore(store eventstore.Store) {
rl.QueryStored = func(ctx context.Context, filter nostr.Filter) iter.Seq[nostr.Event] {
return store.QueryEvents(filter)
}
rl.Count = func(ctx context.Context, filter nostr.Filter) (uint32, error) {
return store.CountEvents(filter)
}
rl.StoreEvent = func(ctx context.Context, event nostr.Event) error {
return store.SaveEvent(event)
}
rl.ReplaceEvent = func(ctx context.Context, event nostr.Event) error {
return store.ReplaceEvent(event)
}
rl.DeleteEvent = func(ctx context.Context, id nostr.ID) error {
return store.DeleteEvent(id)
}
}
func (rl *Relay) getBaseURL(r *http.Request) string {
if rl.ServiceURL != "" {
return rl.ServiceURL

View File

@@ -7,41 +7,31 @@ import (
"testing"
"time"
"fiatjaf.com/nostr/eventstore/slicestore"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/slicestore"
)
func TestBasicRelayFunctionality(t *testing.T) {
// setup relay with in-memory store
relay := NewRelay()
store := slicestore.SliceStore{}
store := &slicestore.SliceStore{}
store.Init()
relay.StoreEvent = append(relay.StoreEvent, store.SaveEvent)
relay.QueryEvents = append(relay.QueryEvents, store.QueryEvents)
relay.DeleteEvent = append(relay.DeleteEvent, store.DeleteEvent)
relay.UseEventstore(store)
// start test server
server := httptest.NewServer(relay)
defer server.Close()
// create test keys
sk1 := nostr.GeneratePrivateKey()
pk1, err := nostr.GetPublicKey(sk1)
if err != nil {
t.Fatalf("Failed to get public key 1: %v", err)
}
sk2 := nostr.GeneratePrivateKey()
pk2, err := nostr.GetPublicKey(sk2)
if err != nil {
t.Fatalf("Failed to get public key 2: %v", err)
}
sk1 := nostr.Generate()
pk1 := nostr.GetPublicKey(sk1)
sk2 := nostr.Generate()
pk2 := nostr.GetPublicKey(sk2)
// helper to create signed events
createEvent := func(sk string, kind int, content string, tags nostr.Tags) nostr.Event {
pk, err := nostr.GetPublicKey(sk)
if err != nil {
t.Fatalf("Failed to get public key: %v", err)
}
createEvent := func(sk nostr.SecretKey, kind uint16, content string, tags nostr.Tags) nostr.Event {
pk := nostr.GetPublicKey(sk)
evt := nostr.Event{
PubKey: pk,
CreatedAt: nostr.Now(),
@@ -55,13 +45,13 @@ func TestBasicRelayFunctionality(t *testing.T) {
// connect two test clients
url := "ws" + server.URL[4:]
client1, err := nostr.RelayConnect(context.Background(), url)
client1, err := nostr.RelayConnect(t.Context(), url, nostr.RelayOptions{})
if err != nil {
t.Fatalf("failed to connect client1: %v", err)
}
defer client1.Close()
client2, err := nostr.RelayConnect(context.Background(), url)
client2, err := nostr.RelayConnect(t.Context(), url, nostr.RelayOptions{})
if err != nil {
t.Fatalf("failed to connect client2: %v", err)
}
@@ -69,7 +59,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
// test 1: store and query events
t.Run("store and query events", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
defer cancel()
evt1 := createEvent(sk1, 1, "hello world", nil)
@@ -79,10 +69,10 @@ func TestBasicRelayFunctionality(t *testing.T) {
}
// Query the event back
sub, err := client2.Subscribe(ctx, []nostr.Filter{{
Authors: []string{pk1},
Kinds: []int{1},
}})
sub, err := client2.Subscribe(ctx, nostr.Filter{
Authors: []nostr.PubKey{pk1},
Kinds: []uint16{1},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
@@ -101,14 +91,14 @@ func TestBasicRelayFunctionality(t *testing.T) {
// test 2: live event subscription
t.Run("live event subscription", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
defer cancel()
// Setup subscription first
sub, err := client1.Subscribe(ctx, []nostr.Filter{{
Authors: []string{pk2},
Kinds: []int{1},
}})
sub, err := client1.Subscribe(ctx, nostr.Filter{
Authors: []nostr.PubKey{pk2},
Kinds: []uint16{1},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
@@ -134,7 +124,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
// test 3: event deletion
t.Run("event deletion", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
defer cancel()
// Create an event to be deleted
@@ -145,16 +135,16 @@ func TestBasicRelayFunctionality(t *testing.T) {
}
// Create deletion event
delEvent := createEvent(sk1, 5, "deleting", nostr.Tags{{"e", evt3.ID}})
delEvent := createEvent(sk1, 5, "deleting", nostr.Tags{{"e", evt3.ID.Hex()}})
err = client1.Publish(ctx, delEvent)
if err != nil {
t.Fatalf("failed to publish deletion event: %v", err)
}
// Try to query the deleted event
sub, err := client2.Subscribe(ctx, []nostr.Filter{{
IDs: []string{evt3.ID},
}})
sub, err := client2.Subscribe(ctx, nostr.Filter{
IDs: []nostr.ID{evt3.ID},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
@@ -179,7 +169,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
// test 4: teplaceable events
t.Run("replaceable events", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
defer cancel()
// create initial kind:0 event
@@ -210,17 +200,17 @@ func TestBasicRelayFunctionality(t *testing.T) {
}
// query to verify only the newest event exists
sub, err := client2.Subscribe(ctx, []nostr.Filter{{
Authors: []string{pk1},
Kinds: []int{0},
}})
sub, err := client2.Subscribe(ctx, nostr.Filter{
Authors: []nostr.PubKey{pk1},
Kinds: []uint16{0},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
defer sub.Unsub()
// should only get one event back (the newest one)
var receivedEvents []*nostr.Event
var receivedEvents []nostr.Event
for {
select {
case env := <-sub.Events:
@@ -241,17 +231,15 @@ func TestBasicRelayFunctionality(t *testing.T) {
// test 5: event expiration
t.Run("event expiration", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()
// create a new relay with shorter expiration check interval
relay := NewRelay()
relay.expirationManager.interval = 3 * time.Second // check every 3 seconds
store := slicestore.SliceStore{}
store := &slicestore.SliceStore{}
store.Init()
relay.StoreEvent = append(relay.StoreEvent, store.SaveEvent)
relay.QueryEvents = append(relay.QueryEvents, store.QueryEvents)
relay.DeleteEvent = append(relay.DeleteEvent, store.DeleteEvent)
relay.UseEventstore(store)
// start test server
server := httptest.NewServer(relay)
@@ -259,7 +247,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
// connect test client
url := "ws" + server.URL[4:]
client, err := nostr.RelayConnect(context.Background(), url)
client, err := nostr.RelayConnect(t.Context(), url, nostr.RelayOptions{})
if err != nil {
t.Fatalf("failed to connect client: %v", err)
}
@@ -274,9 +262,9 @@ func TestBasicRelayFunctionality(t *testing.T) {
}
// verify event exists initially
sub, err := client.Subscribe(ctx, []nostr.Filter{{
IDs: []string{evt.ID},
}})
sub, err := client.Subscribe(ctx, nostr.Filter{
IDs: []nostr.ID{evt.ID},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
@@ -296,9 +284,9 @@ func TestBasicRelayFunctionality(t *testing.T) {
time.Sleep(4 * time.Second)
// verify event no longer exists
sub, err = client.Subscribe(ctx, []nostr.Filter{{
IDs: []string{evt.ID},
}})
sub, err = client.Subscribe(ctx, nostr.Filter{
IDs: []nostr.ID{evt.ID},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
@@ -323,7 +311,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
// test 6: unauthorized deletion
t.Run("unauthorized deletion", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
defer cancel()
// create an event from client1
@@ -334,16 +322,16 @@ func TestBasicRelayFunctionality(t *testing.T) {
}
// Try to delete it with client2
delEvent := createEvent(sk2, 5, "trying to delete", nostr.Tags{{"e", evt4.ID}})
delEvent := createEvent(sk2, 5, "trying to delete", nostr.Tags{{"e", evt4.ID.Hex()}})
err = client2.Publish(ctx, delEvent)
if err == nil {
t.Fatalf("should have failed to publish deletion event: %v", err)
}
// Verify event still exists
sub, err := client1.Subscribe(ctx, []nostr.Filter{{
IDs: []string{evt4.ID},
}})
sub, err := client1.Subscribe(ctx, nostr.Filter{
IDs: []nostr.ID{evt4.ID},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}

View File

@@ -21,53 +21,42 @@ func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGr
// because we may, for example, remove some things from the incoming filters
// that we know we don't support, and then if the end result is an empty
// filter we can just reject it)
if rl.RejectFilter != nil {
if reject, msg := rl.RejectFilter(ctx, filter); reject {
if nil != rl.OnRequest {
if reject, msg := rl.OnRequest(ctx, filter); reject {
return errors.New(nostr.NormalizeOKMessage(msg, "blocked"))
}
}
// run the function to query events
if rl.QueryEvents != nil {
ch, err := rl.QueryEvents(ctx, filter)
if err != nil {
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
eose.Done()
} else if ch == nil {
eose.Done()
if nil != rl.QueryStored {
for event := range rl.QueryStored(ctx, filter) {
ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: event})
}
go func(ch chan *nostr.Event) {
for event := range ch {
ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event})
}
eose.Done()
}(ch)
eose.Done()
}
return nil
}
func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, filter nostr.Filter) int64 {
func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, filter nostr.Filter) uint32 {
// check if we'll reject this filter
if rl.RejectCountFilter != nil {
if rejecting, msg := rl.RejectCountFilter(ctx, filter); rejecting {
if nil != rl.OnCountFilter {
if rejecting, msg := rl.OnCountFilter(ctx, filter); rejecting {
ws.WriteJSON(nostr.NoticeEnvelope(msg))
return 0
}
}
// run the functions to count (generally it will be just one)
var subtotal int64 = 0
if rl.CountEvents != nil {
res, err := rl.CountEvents(ctx, filter)
if nil != rl.Count {
res, err := rl.Count(ctx, filter)
if err != nil {
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
}
subtotal += res
return res
}
return subtotal
return 0
}
func (rl *Relay) handleCountRequestWithHLL(
@@ -75,32 +64,22 @@ func (rl *Relay) handleCountRequestWithHLL(
ws *WebSocket,
filter nostr.Filter,
offset int,
) (int64, *hyperloglog.HyperLogLog) {
) (uint32, *hyperloglog.HyperLogLog) {
// check if we'll reject this filter
if rl.RejectCountFilter != nil {
if rejecting, msg := rl.RejectCountFilter(ctx, filter); rejecting {
if nil != rl.OnCountFilter {
if rejecting, msg := rl.OnCountFilter(ctx, filter); rejecting {
ws.WriteJSON(nostr.NoticeEnvelope(msg))
return 0, nil
}
}
// run the functions to count (generally it will be just one)
var subtotal int64 = 0
var hll *hyperloglog.HyperLogLog
if rl.CountEventsHLL != nil {
res, fhll, err := rl.CountEventsHLL(ctx, filter, offset)
if nil != rl.CountHLL {
res, hll, err := rl.CountHLL(ctx, filter, offset)
if err != nil {
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
}
subtotal += res
if fhll != nil {
if hll == nil {
hll = fhll
} else {
hll.Merge(fhll)
}
}
return res, hll
}
return subtotal, hll
return 0, nil
}

View File

@@ -3,11 +3,12 @@ package main
import (
"context"
"fmt"
"slices"
"time"
"github.com/fiatjaf/eventstore"
"github.com/fiatjaf/eventstore/slicestore"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/slicestore"
"fiatjaf.com/nostr/eventstore/wrappers"
"fiatjaf.com/nostr/nip77"
)
@@ -16,8 +17,8 @@ func main() {
db := &slicestore.SliceStore{}
db.Init()
sk := nostr.GeneratePrivateKey()
local := eventstore.RelayWrapper{Store: db}
sk := nostr.Generate()
local := wrappers.StorePublisher{Store: db}
for {
for i := 0; i < 20; i++ {
@@ -29,7 +30,7 @@ func main() {
Tags: nostr.Tags{},
}
evt.Sign(sk)
db.SaveEvent(ctx, &evt)
db.SaveEvent(evt)
}
{
@@ -40,7 +41,7 @@ func main() {
Tags: nostr.Tags{},
}
evt.Sign(sk)
db.SaveEvent(ctx, &evt)
db.SaveEvent(evt)
}
}
@@ -50,11 +51,7 @@ func main() {
panic(err)
}
data, err := local.QuerySync(ctx, nostr.Filter{})
if err != nil {
panic(err)
}
data := slices.Collect(local.QueryEvents(nostr.Filter{}))
fmt.Println("total local events:", len(data))
time.Sleep(time.Second * 10)
}

View File

@@ -24,7 +24,7 @@ func TestConnectContext(t *testing.T) {
// relay client
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
r, err := RelayConnect(ctx, testRelayURL)
r, err := RelayConnect(ctx, testRelayURL, RelayOptions{})
assert.NoError(t, err)
defer r.Close()
@@ -34,7 +34,7 @@ func TestConnectContextCanceled(t *testing.T) {
// relay client
ctx, cancel := context.WithCancel(context.Background())
cancel() // make ctx expired
_, err := RelayConnect(ctx, testRelayURL)
_, err := RelayConnect(ctx, testRelayURL, RelayOptions{})
assert.ErrorIs(t, err, context.Canceled)
}
@@ -60,7 +60,7 @@ func TestPublish(t *testing.T) {
func makeKeyPair(t *testing.T) (priv, pub [32]byte) {
t.Helper()
privkey := GeneratePrivateKey()
privkey := Generate()
pubkey := GetPublicKey(privkey)
return privkey, pubkey
@@ -69,7 +69,7 @@ func makeKeyPair(t *testing.T) (priv, pub [32]byte) {
func mustRelayConnect(t *testing.T, url string) *Relay {
t.Helper()
rl, err := RelayConnect(context.Background(), url)
rl, err := RelayConnect(context.Background(), url, RelayOptions{})
require.NoError(t, err)
return rl

View File

@@ -6,7 +6,7 @@ import (
"time"
"fiatjaf.com/nostr"
"github.com/fiatjaf/eventstore/slicestore"
"fiatjaf.com/nostr/eventstore/slicestore"
"github.com/fiatjaf/khatru"
"github.com/stretchr/testify/require"
)

View File

@@ -13,8 +13,8 @@ import (
"fiatjaf.com/nostr/sdk/hints/memoryh"
"fiatjaf.com/nostr/sdk/kvstore"
kvstore_memory "fiatjaf.com/nostr/sdk/kvstore/memory"
"github.com/fiatjaf/eventstore"
"github.com/fiatjaf/eventstore/nullstore"
"fiatjaf.com/nostr/eventstore"
"fiatjaf.com/nostr/eventstore/nullstore"
)
// System represents the core functionality of the SDK, providing access to