Skip to content

Commit

Permalink
pool.BatchedSubMany()
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Feb 24, 2024
1 parent 7826e95 commit 28b3479
Showing 1 changed file with 51 additions and 2 deletions.
53 changes: 51 additions & 2 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"context"
"fmt"
"log"
"slices"
"strings"
"sync"
"time"

"slices"

"github.com/puzpuzpuz/xsync/v3"
)

Expand All @@ -25,6 +24,11 @@ type SimplePool struct {
cancel context.CancelFunc
}

type DirectedFilter struct {
Filter
Relay string
}

type IncomingEvent struct {
*Event
Relay *Relay
Expand Down Expand Up @@ -310,3 +314,48 @@ func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter F
}
return nil
}

func (pool *SimplePool) batchedSubMany(
ctx context.Context,
dfs []DirectedFilter,
subFn func(context.Context, []string, Filters, bool) chan IncomingEvent,
) chan IncomingEvent {
type batch struct {
filter Filter
relays []string
}

batches := make([]batch, 0, len(dfs))
for _, df := range dfs {
idx := slices.IndexFunc(batches, func(b batch) bool { return FilterEqual(b.filter, df.Filter) })
if idx != -1 {
batches[idx].relays = append(batches[idx].relays, df.Relay)
} else {
relays := make([]string, 0, 10)
relays = append(relays, df.Relay)
batches = append(batches, batch{filter: df.Filter, relays: relays})
}
}

res := make(chan IncomingEvent)

for _, b := range batches {
go func(b batch) {
for ie := range subFn(ctx, b.relays, Filters{b.filter}, true) {
res <- ie
}
}(b)
}

return res
}

// BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same.
func (pool *SimplePool) BatchedSubMany(ctx context.Context, dfs []DirectedFilter) chan IncomingEvent {
return pool.batchedSubMany(ctx, dfs, pool.subMany)
}

// BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays.
func (pool *SimplePool) BatchedSubManyEose(ctx context.Context, dfs []DirectedFilter) chan IncomingEvent {
return pool.batchedSubMany(ctx, dfs, pool.subManyEose)
}

0 comments on commit 28b3479

Please sign in to comment.