khatru: expiration manager takes query and delete methods, which are given automatically by .UseEventstore()

This commit is contained in:
fiatjaf
2025-10-01 12:08:08 -03:00
parent 8957144c77
commit 46f28203ed
2 changed files with 46 additions and 46 deletions

View File

@@ -3,6 +3,8 @@ package khatru
import ( import (
"container/heap" "container/heap"
"context" "context"
"iter"
"slices"
"sync" "sync"
"time" "time"
@@ -34,25 +36,18 @@ func (h *expiringEventHeap) Pop() interface{} {
} }
type expirationManager struct { type expirationManager struct {
events expiringEventHeap events expiringEventHeap
mu sync.Mutex mu sync.Mutex
relay *Relay
queryStored func(ctx context.Context, filter nostr.Filter) iter.Seq[nostr.Event]
deleteEvent func(ctx context.Context, id nostr.ID) error
interval time.Duration interval time.Duration
initialScanDone bool initialScanDone bool
kill chan struct{} // used for manually killing this kill chan struct{} // used for manually killing this
killonce *sync.Once killonce *sync.Once
} }
func newExpirationManager(relay *Relay) *expirationManager {
return &expirationManager{
events: make(expiringEventHeap, 0),
relay: relay,
interval: time.Hour,
kill: make(chan struct{}),
killonce: &sync.Once{},
}
}
func (em *expirationManager) stop() { func (em *expirationManager) stop() {
em.killonce.Do(func() { em.killonce.Do(func() {
close(em.kill) close(em.kill)
@@ -86,14 +81,12 @@ func (em *expirationManager) initialScan(ctx context.Context) {
// query all events // query all events
ctx = context.WithValue(ctx, internalCallKey, struct{}{}) ctx = context.WithValue(ctx, internalCallKey, struct{}{})
if nil != em.relay.QueryStored { for evt := range em.queryStored(ctx, nostr.Filter{}) {
for evt := range em.relay.QueryStored(ctx, nostr.Filter{}) { if expiresAt := nip40.GetExpiration(evt.Tags); expiresAt != -1 {
if expiresAt := nip40.GetExpiration(evt.Tags); expiresAt != -1 { heap.Push(&em.events, expiringEvent{
heap.Push(&em.events, expiringEvent{ id: evt.ID,
id: evt.ID, expiresAt: expiresAt,
expiresAt: expiresAt, })
})
}
} }
} }
@@ -116,9 +109,7 @@ func (em *expirationManager) checkExpiredEvents(ctx context.Context) {
heap.Pop(&em.events) heap.Pop(&em.events)
ctx := context.WithValue(ctx, internalCallKey, struct{}{}) ctx := context.WithValue(ctx, internalCallKey, struct{}{})
if nil != em.relay.DeleteEvent { em.deleteEvent(ctx, next.id)
em.relay.DeleteEvent(ctx, next.id)
}
} }
} }
@@ -145,3 +136,33 @@ func (em *expirationManager) removeEvent(id nostr.ID) {
} }
} }
} }
func (rl *Relay) StartExpirationManager(
queryStored func(ctx context.Context, filter nostr.Filter) iter.Seq[nostr.Event],
deleteEvent func(ctx context.Context, id nostr.ID) error,
) {
rl.expirationManager = &expirationManager{
events: make(expiringEventHeap, 0),
queryStored: queryStored,
deleteEvent: deleteEvent,
interval: time.Hour,
kill: make(chan struct{}),
killonce: &sync.Once{},
}
go rl.expirationManager.start(rl.ctx)
rl.Info.AddSupportedNIP(40)
}
func (rl *Relay) DisableExpirationManager() {
rl.expirationManager.stop()
rl.expirationManager = nil
idx := slices.Index(rl.Info.SupportedNIPs, 40)
if idx != -1 {
rl.Info.SupportedNIPs[idx] = rl.Info.SupportedNIPs[len(rl.Info.SupportedNIPs)-1]
rl.Info.SupportedNIPs = rl.Info.SupportedNIPs[0 : len(rl.Info.SupportedNIPs)-1]
}
}

View File

@@ -6,7 +6,6 @@ import (
"log" "log"
"net/http" "net/http"
"os" "os"
"slices"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@@ -53,8 +52,6 @@ func NewRelay() *Relay {
MaxAuthenticatedClients: 32, MaxAuthenticatedClients: 32,
} }
rl.expirationManager = newExpirationManager(rl)
return rl return rl
} }
@@ -152,7 +149,7 @@ func (rl *Relay) UseEventstore(store eventstore.Store, maxQueryLimit int) {
} }
// only when using the eventstore we automatically set up the expiration manager // only when using the eventstore we automatically set up the expiration manager
rl.StartExpirationManager() rl.StartExpirationManager(rl.QueryStored, rl.DeleteEvent)
} }
func (rl *Relay) getBaseURL(r *http.Request) string { func (rl *Relay) getBaseURL(r *http.Request) string {
@@ -180,21 +177,3 @@ func (rl *Relay) getBaseURL(r *http.Request) string {
} }
return proto + "://" + host return proto + "://" + host
} }
func (rl *Relay) StartExpirationManager() {
if rl.expirationManager != nil {
go rl.expirationManager.start(rl.ctx)
rl.Info.AddSupportedNIP(40)
}
}
func (rl *Relay) DisableExpirationManager() {
rl.expirationManager.stop()
rl.expirationManager = nil
idx := slices.Index(rl.Info.SupportedNIPs, 40)
if idx != -1 {
rl.Info.SupportedNIPs[idx] = rl.Info.SupportedNIPs[len(rl.Info.SupportedNIPs)-1]
rl.Info.SupportedNIPs = rl.Info.SupportedNIPs[0 : len(rl.Info.SupportedNIPs)-1]
}
}