From 3ebfc7812b9bfa5203c3f35f2e42c52f31729444 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Thu, 20 Mar 2025 20:54:51 -0300 Subject: [PATCH] sdk: simplified dataloader event more. should be faster. --- sdk/addressable_loader.go | 9 +- sdk/dataloader/dataloader.go | 218 ++++++++++------------------------- sdk/replaceable_loader.go | 9 +- 3 files changed, 75 insertions(+), 161 deletions(-) diff --git a/sdk/addressable_loader.go b/sdk/addressable_loader.go index 01fe435..d4c678f 100644 --- a/sdk/addressable_loader.go +++ b/sdk/addressable_loader.go @@ -33,8 +33,10 @@ func (sys *System) createAddressableDataloader(kind int) *dataloader.Loader[stri func(ctxs []context.Context, pubkeys []string) map[string]dataloader.Result[[]*nostr.Event] { return sys.batchLoadAddressableEvents(ctxs, kind, pubkeys) }, - dataloader.WithBatchCapacity[string, []*nostr.Event](30), - dataloader.WithWait[string, []*nostr.Event](time.Millisecond*350), + dataloader.Options{ + Wait: time.Millisecond * 110, + MaxThreshold: 30, + }, ) } @@ -56,7 +58,8 @@ func (sys *System) batchLoadAddressableEvents( waiting := len(pubkeys) for i, pubkey := range pubkeys { - ctx := ctxs[i] + ctx, cancel := context.WithCancel(ctxs[i]) + defer cancel() // build batched queries for the external relays go func(i int, pubkey string) { diff --git a/sdk/dataloader/dataloader.go b/sdk/dataloader/dataloader.go index 10f19b3..7abc356 100644 --- a/sdk/dataloader/dataloader.go +++ b/sdk/dataloader/dataloader.go @@ -21,38 +21,16 @@ type Result[V any] struct { Error error } -// ResultMany is used by the LoadMany method. -// It contains a list of resolved data and a list of errors. -// The lengths of the data list and error list will match, and elements at each index correspond to each other. -type ResultMany[V any] struct { - Data []V - Error []error -} - -// PanicErrorWrapper wraps the error interface. -// This is used to check if the error is a panic error. -// We should not cache panic errors. -type PanicErrorWrapper struct { - panicError error -} - -func (p *PanicErrorWrapper) Error() string { - return p.panicError.Error() -} - // Loader implements the dataloader.Interface. type Loader[K comparable, V any] struct { // the batch function to be used by this loader batchFn BatchFunc[K, V] // the maximum batch size. Set to 0 if you want it to be unbounded. - batchCap int + batchCap uint // count of queued up items - count int - - // the maximum input queue size. Set to 0 if you want it to be unbounded. - inputCap int + count uint // the amount of time to wait before triggering a batch wait time.Duration @@ -64,10 +42,7 @@ type Loader[K comparable, V any] struct { curBatcher *batcher[K, V] // used to close the sleeper of the current batcher - endSleeper chan bool - - // used by tests to prevent logs - silent bool + thresholdReached chan bool } // type used to on input channel @@ -77,49 +52,18 @@ type batchRequest[K comparable, V any] struct { channel chan Result[V] } -// Option allows for configuration of Loader fields. -type Option[K comparable, V any] func(*Loader[K, V]) - -// WithBatchCapacity sets the batch capacity. Default is 0 (unbounded). -func WithBatchCapacity[K comparable, V any](c int) Option[K, V] { - return func(l *Loader[K, V]) { - l.batchCap = c - } -} - -// WithInputCapacity sets the input capacity. Default is 1000. -func WithInputCapacity[K comparable, V any](c int) Option[K, V] { - return func(l *Loader[K, V]) { - l.inputCap = c - } -} - -// WithWait sets the amount of time to wait before triggering a batch. -// Default duration is 16 milliseconds. -func WithWait[K comparable, V any](d time.Duration) Option[K, V] { - return func(l *Loader[K, V]) { - l.wait = d - } -} - -// withSilentLogger turns of log messages. It's used by the tests -func withSilentLogger[K comparable, V any]() Option[K, V] { - return func(l *Loader[K, V]) { - l.silent = true - } +type Options struct { + Wait time.Duration + MaxThreshold uint } // NewBatchedLoader constructs a new Loader with given options. -func NewBatchedLoader[K comparable, V any](batchFn BatchFunc[K, V], opts ...Option[K, V]) *Loader[K, V] { +func NewBatchedLoader[K comparable, V any](batchFn BatchFunc[K, V], opts Options) *Loader[K, V] { loader := &Loader[K, V]{ batchFn: batchFn, - inputCap: 1000, - wait: 16 * time.Millisecond, - } - - // Apply options - for _, apply := range opts { - apply(loader) + batchCap: opts.MaxThreshold, + count: 0, + wait: opts.Wait, } return loader @@ -133,36 +77,64 @@ func (l *Loader[K, V]) Load(ctx context.Context, key K) (value V, err error) { // this is sent to batch fn. It contains the key and the channel to return // the result on - req := &batchRequest[K, V]{ctx, key, c} + req := batchRequest[K, V]{ctx, key, c} l.batchLock.Lock() // start the batch window if it hasn't already started. if l.curBatcher == nil { - l.curBatcher = l.newBatcher(l.silent) - // start the current batcher batch function - go l.curBatcher.batch() + l.curBatcher = l.newBatcher() + // start a sleeper for the current batcher - l.endSleeper = make(chan bool) - go l.sleeper(l.curBatcher, l.endSleeper) + l.thresholdReached = make(chan bool) + + // we will run the batch function either after some time or after a threshold has been reached + b := l.curBatcher + go func() { + select { + case <-l.thresholdReached: + case <-time.After(l.wait): + } + + // We can end here also if the batcher has already been closed and a + // new one has been created. So reset the loader state only if the batcher + // is the current one + if l.curBatcher == b { + l.reset() + } + + var ( + ctxs = make([]context.Context, 0, len(b.requests)) + keys = make([]K, 0, len(b.requests)) + res map[K]Result[V] + ) + + for _, item := range b.requests { + ctxs = append(ctxs, item.ctx) + keys = append(keys, item.key) + } + + res = l.batchFn(ctxs, keys) + + for _, req := range b.requests { + if r, ok := res[req.key]; ok { + req.channel <- r + } + close(req.channel) + } + }() } - l.curBatcher.input <- req + l.curBatcher.requests = append(l.curBatcher.requests, req) - // if we need to keep track of the count (max batch), then do so. - if l.batchCap > 0 { - l.count++ - // if we hit our limit, force the batch to start - if l.count == l.batchCap { - // end the batcher synchronously here because another call to Load - // may concurrently happen and needs to go to a new batcher. - l.curBatcher.end() - // end the sleeper for the current batcher. - // this is to stop the goroutine without waiting for the - // sleeper timeout. - close(l.endSleeper) - l.reset() - } + l.count++ + if l.count == l.batchCap { + close(l.thresholdReached) + + // end the batcher synchronously here because another call to Load + // may concurrently happen and needs to go to a new batcher. + l.reset() } + l.batchLock.Unlock() if v, ok := <-c; ok { @@ -178,78 +150,14 @@ func (l *Loader[K, V]) reset() { } type batcher[K comparable, V any] struct { - input chan *batchRequest[K, V] + requests []batchRequest[K, V] batchFn BatchFunc[K, V] - finished bool - silent bool } // newBatcher returns a batcher for the current requests -// all the batcher methods must be protected by a global batchLock -func (l *Loader[K, V]) newBatcher(silent bool) *batcher[K, V] { +func (l *Loader[K, V]) newBatcher() *batcher[K, V] { return &batcher[K, V]{ - input: make(chan *batchRequest[K, V], l.inputCap), - batchFn: l.batchFn, - silent: silent, + requests: make([]batchRequest[K, V], 0, l.batchCap), + batchFn: l.batchFn, } } - -// stop receiving input and process batch function -func (b *batcher[K, V]) end() { - if !b.finished { - close(b.input) - b.finished = true - } -} - -// execute the batch of all items in queue -func (b *batcher[K, V]) batch() { - var ( - ctxs = make([]context.Context, 0, 30) - keys = make([]K, 0, 30) - reqs = make([]*batchRequest[K, V], 0, 30) - res map[K]Result[V] - ) - - for item := range b.input { - ctxs = append(ctxs, item.ctx) - keys = append(keys, item.key) - reqs = append(reqs, item) - } - - func() { - res = b.batchFn(ctxs, keys) - }() - - for _, req := range reqs { - if r, ok := res[req.key]; ok { - req.channel <- r - } - close(req.channel) - } -} - -// wait the appropriate amount of time for the provided batcher -func (l *Loader[K, V]) sleeper(b *batcher[K, V], close chan bool) { - select { - // used by batch to close early. usually triggered by max batch size - case <-close: - return - // this will move this goroutine to the back of the callstack? - case <-time.After(l.wait): - } - - // reset - // this is protected by the batchLock to avoid closing the batcher input - // channel while Load is inserting a request - l.batchLock.Lock() - b.end() - - // We can end here also if the batcher has already been closed and a - // new one has been created. So reset the loader state only if the batcher - // is the current one - if l.curBatcher == b { - l.reset() - } - l.batchLock.Unlock() -} diff --git a/sdk/replaceable_loader.go b/sdk/replaceable_loader.go index 8f62697..f1e7a7b 100644 --- a/sdk/replaceable_loader.go +++ b/sdk/replaceable_loader.go @@ -52,8 +52,10 @@ func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[stri func(ctxs []context.Context, pubkeys []string) map[string]dataloader.Result[*nostr.Event] { return sys.batchLoadReplaceableEvents(ctxs, kind, pubkeys) }, - dataloader.WithBatchCapacity[string, *nostr.Event](30), - dataloader.WithWait[string, *nostr.Event](time.Millisecond*350), + dataloader.Options{ + Wait: time.Millisecond * 110, + MaxThreshold: 30, + }, ) } @@ -75,7 +77,8 @@ func (sys *System) batchLoadReplaceableEvents( waiting := len(pubkeys) for i, pubkey := range pubkeys { - ctx := ctxs[i] + ctx, cancel := context.WithCancel(ctxs[i]) + defer cancel() // build batched queries for the external relays go func(i int, pubkey string) {