Merge pull request #20 from stanstacks/cantunsub

problem: can't unsub
This commit is contained in:
fiatjaf
2022-12-23 11:33:11 -03:00
committed by GitHub
3 changed files with 26 additions and 13 deletions

View File

@@ -24,7 +24,7 @@ for notice := range pool.Notices {
### Listening for events ### Listening for events
```go ```go
subId, events := pool.Sub(nostr.Filters{ subId, events, unsubscribe := pool.Sub(nostr.Filters{
{ {
Authors: []string{"0ded86bf80c76847320b16f22b7451c08169434837a51ad5fe3b178af6c35f5d"}, Authors: []string{"0ded86bf80c76847320b16f22b7451c08169434837a51ad5fe3b178af6c35f5d"},
Kinds: []int{nostr.KindTextNote}, // or {1} Kinds: []int{nostr.KindTextNote}, // or {1}
@@ -38,7 +38,7 @@ go func() {
}() }()
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
sub.Unsub() unsubscribe()
``` ```
### Publishing an event ### Publishing an event
@@ -84,4 +84,4 @@ fmt.Println("pk:", nostr.GetPublicKey(sk))
``` ```
go run example/example.go go run example/example.go
``` ```

View File

@@ -8,7 +8,7 @@ import (
) )
// some nostr relay in the wild // some nostr relay in the wild
var relayURL = "wss://nostr-relay.wlvs.space" var relayURL = "wss://nostr.688.org"
func main() { func main() {
// create key pair // create key pair
@@ -26,13 +26,15 @@ func main() {
pool.SecretKey = &secretKey pool.SecretKey = &secretKey
// add a nostr relay to our pool // add a nostr relay to our pool
err = pool.Add(relayURL, nostr.SimplePolicy{Read: true, Write: true}) errchan := pool.Add(relayURL, nostr.SimplePolicy{Read: true, Write: true})
if err != nil { go func() {
fmt.Printf("error calling Add(): %s\n", err.Error()) for err := range errchan {
} fmt.Println(err.Error())
}
}()
// subscribe to relays in our pool, with filtering // subscribe to relays in our pool, with filtering
sub := pool.Sub(nostr.Filters{ _, events, unsubscribe := pool.Sub(nostr.Filters{
{ {
Authors: []string{publicKey}, Authors: []string{publicKey},
Kinds: []int{nostr.KindTextNote}, Kinds: []int{nostr.KindTextNote},
@@ -41,7 +43,7 @@ func main() {
// listen for events from our subscriptions // listen for events from our subscriptions
go func() { go func() {
for event := range sub.UniqueEvents { for event := range nostr.Unique(events) {
fmt.Printf("Received Event: %+v\n\n", event) fmt.Printf("Received Event: %+v\n\n", event)
} }
}() }()
@@ -69,7 +71,7 @@ func main() {
// after 20 seconds, unsubscribe from our pool and terminate program // after 20 seconds, unsubscribe from our pool and terminate program
time.Sleep(20 * time.Second) time.Sleep(20 * time.Second)
fmt.Println("unsubscribing from nostr subscription") fmt.Println("unsubscribing from nostr subscription")
sub.Unsub() unsubscribe()
} }
// handle events from out publish events // handle events from out publish events

View File

@@ -124,7 +124,10 @@ func (r *RelayPool) Remove(url string) {
} }
} }
func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage) { //Sub subscribes to events matching the passed filters and returns the subscription ID,
//a channel which you should pass into Unique to get unique events, and a function which
//you should call to clean up and close your subscription so that the relay doesn't block you.
func (r *RelayPool) Sub(filters Filters) (subID string, events chan EventMessage, unsubscribe func()) {
random := make([]byte, 7) random := make([]byte, 7)
rand.Read(random) rand.Read(random)
id := hex.EncodeToString(random) id := hex.EncodeToString(random)
@@ -132,6 +135,7 @@ func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage) {
r.subscriptions.Store(id, filters) r.subscriptions.Store(id, filters)
eventStream := make(chan EventMessage) eventStream := make(chan EventMessage)
r.eventStreams.Store(id, eventStream) r.eventStreams.Store(id, eventStream)
unsub := make(chan struct{})
r.Relays.Range(func(_ string, relay *Relay) bool { r.Relays.Range(func(_ string, relay *Relay) bool {
sub := relay.prepareSubscription(id) sub := relay.prepareSubscription(id)
@@ -143,10 +147,17 @@ func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage) {
} }
}(sub) }(sub)
go func() {
select {
case <-unsub:
sub.Unsub()
}
}()
return true return true
}) })
return id, eventStream return id, eventStream, func() { close(unsub) }
} }
func Unique(all chan EventMessage) chan Event { func Unique(all chan EventMessage) chan Event {