diff --git a/pool.go b/pool.go index 49fdb54..6e1b955 100644 --- a/pool.go +++ b/pool.go @@ -4,12 +4,11 @@ import ( "context" "fmt" "log" + "slices" "strings" "sync" "time" - "slices" - "github.com/puzpuzpuz/xsync/v3" ) @@ -25,6 +24,11 @@ type SimplePool struct { cancel context.CancelFunc } +type DirectedFilter struct { + Filter + Relay string +} + type IncomingEvent struct { *Event Relay *Relay @@ -310,3 +314,48 @@ func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter F } return nil } + +func (pool *SimplePool) batchedSubMany( + ctx context.Context, + dfs []DirectedFilter, + subFn func(context.Context, []string, Filters, bool) chan IncomingEvent, +) chan IncomingEvent { + type batch struct { + filter Filter + relays []string + } + + batches := make([]batch, 0, len(dfs)) + for _, df := range dfs { + idx := slices.IndexFunc(batches, func(b batch) bool { return FilterEqual(b.filter, df.Filter) }) + if idx != -1 { + batches[idx].relays = append(batches[idx].relays, df.Relay) + } else { + relays := make([]string, 0, 10) + relays = append(relays, df.Relay) + batches = append(batches, batch{filter: df.Filter, relays: relays}) + } + } + + res := make(chan IncomingEvent) + + for _, b := range batches { + go func(b batch) { + for ie := range subFn(ctx, b.relays, Filters{b.filter}, true) { + res <- ie + } + }(b) + } + + return res +} + +// BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same. +func (pool *SimplePool) BatchedSubMany(ctx context.Context, dfs []DirectedFilter) chan IncomingEvent { + return pool.batchedSubMany(ctx, dfs, pool.subMany) +} + +// BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays. +func (pool *SimplePool) BatchedSubManyEose(ctx context.Context, dfs []DirectedFilter) chan IncomingEvent { + return pool.batchedSubMany(ctx, dfs, pool.subManyEose) +}