Skip to content

Commit

Permalink
Batch completer + additional completer test suite and benchmarks
Browse files Browse the repository at this point in the history
Here, add a new completer using a completion strategy designed to be
much faster than what we're doing right now. Rather than blindly
throwing completion work into goroutine slots, it accumulates "batches"
of completions to be carried out, and using a debounced channel to fire
periodically (currently, up to every 100 milliseconds) and submit entire
batches for completion at once up to 2,000 jobs.

For the purposes of not grossly expanding the `riverdriver` interface,
the completer only batches jobs being set to `completed`, which under
most normal workloads we expect to be the vast common case. Jobs going
to other states are fed into a member `AsyncCompleter`, thereby allowing
the `BatchCompleter` to keeps implementation quite simple.

According to in-package benchmarking, the new completer is in the range
of 3-5x faster than `AsyncCompleter` (the one currently in use by River
client), and 10-15x faster than `InlineCompleter`.

    $ go test -bench=. ./internal/jobcompleter
    goos: darwin
    goarch: arm64
    pkg: github.com/riverqueue/river/internal/jobcompleter
    BenchmarkAsyncCompleter_Concurrency10/Completion-8                 10851            112318 ns/op
    BenchmarkAsyncCompleter_Concurrency10/RotatingStates-8             11386            120706 ns/op
    BenchmarkAsyncCompleter_Concurrency100/Completion-8                 9763            116773 ns/op
    BenchmarkAsyncCompleter_Concurrency100/RotatingStates-8            10884            115718 ns/op
    BenchmarkBatchCompleter/Completion-8                               54916             27314 ns/op
    BenchmarkBatchCompleter/RotatingStates-8                           11518            100997 ns/op
    BenchmarkInlineCompleter/Completion-8                               4656            369281 ns/op
    BenchmarkInlineCompleter/RotatingStates-8                           1561            794136 ns/op
    PASS
    ok      github.com/riverqueue/river/internal/jobcompleter       21.123s

Along with the new completer, we also add a vastly more thorough test
suite to help tease out race conditions and test edges that were
previously being ignored completely. For most cases we drop the heavy
mocking that was happening before, which was having the effect of
minimizing the surface area under test, and producing misleading timing
that wasn't realistic.

Similarly, we bring in a new benchmark framework to allow us to easily
vet and compare completer implementations relative to each other. The
expectation is that this will act as a more synthetic proxy, with the
new benchmarking tool in #254 providing a more realistic end-to-end
measurement.
  • Loading branch information
brandur committed Mar 10, 2024
1 parent d9a7fc3 commit df7af43
Show file tree
Hide file tree
Showing 18 changed files with 1,250 additions and 207 deletions.
14 changes: 12 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}

completer := jobcompleter.NewAsyncCompleter(archetype, driver.GetExecutor(), 100)
completer := jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor())

client := &Client[TTx]{
completer: completer,
Expand Down Expand Up @@ -600,6 +600,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// to shut down prior to closing the monitor.
go c.monitor.Run()

// TODO: Stop completer (and any other services) if Start leaves with an error.
if err := c.completer.Start(ctx); err != nil {
return err
}

// Receives job complete notifications from the completer and distributes
// them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)
Expand Down Expand Up @@ -662,7 +667,7 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) {
//
// TODO: there's a risk here that the completer is stuck on a job that won't
// complete. We probably need a timeout or way to move on in those cases.
c.completer.Wait()
c.completer.Stop()

c.notifier.Stop()
c.queueMaintainer.Stop()
Expand Down Expand Up @@ -760,6 +765,11 @@ func (c *Client[TTx]) Stopped() <-chan struct{} {
// versions. If new event kinds are added, callers will have to explicitly add
// them to their requested list and ensure they can be handled correctly.
func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) {
return c.subscribeWithSize(subscribeChanSizeDefault, kinds...)
}

// Special internal variant that lets us inject an overridden size.
func (c *Client[TTx]) subscribeWithSize(subscribeChanSize int, kinds ...EventKind) (<-chan *Event, func()) {
for _, kind := range kinds {
if _, ok := allKinds[kind]; !ok {
panic(fmt.Errorf("unknown event kind: %s", kind))
Expand Down
42 changes: 36 additions & 6 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2327,34 +2327,64 @@ func Test_Client_Subscribe(t *testing.T) {

client := newTestClient(t, dbPool, config)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return nil
}))

// A first channel that we'll use to make sure all the expected jobs are
// finished.
subscribeChan, cancel := client.Subscribe(EventKindJobCompleted)
t.Cleanup(cancel)

// Artificially lowered subscribe channel size so we don't have to try
// and process thousands of jobs.
const (
subscribeChanSize = 100
numJobsToInsert = subscribeChanSize + 1
)

// Another channel with no listeners. Despite no listeners, it shouldn't
// block or gum up the client's progress in any way.
_, cancel = client.Subscribe(EventKindJobCompleted)
subscribeChan2, cancel := client.subscribeWithSize(subscribeChanSize, EventKindJobCompleted)
t.Cleanup(cancel)

// Insert more jobs than the maximum channel size. We'll be pulling from
// one channel but not the other.
for i := 0; i < subscribeChanSize+1; i++ {
_ = requireInsert(ctx, client, fmt.Sprintf("job %d", i))
var (
insertParams = make([]*riverdriver.JobInsertFastParams, numJobsToInsert)
kind = (&JobArgs{}).Kind()
)
for i := 0; i < numJobsToInsert; i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{}`),
Kind: kind,
MaxAttempts: rivercommon.MaxAttemptsDefault,
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
State: rivertype.JobStateAvailable,
}
}

_, err := client.driver.GetExecutor().JobInsertFastMany(ctx, insertParams)
require.NoError(t, err)

// Need to start waiting on events before running the client or the
// channel could overflow before we start listening.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_ = riverinternaltest.WaitOrTimeoutN(t, subscribeChan, subscribeChanSize+1)
_ = riverinternaltest.WaitOrTimeoutN(t, subscribeChan, numJobsToInsert)
}()

startClient(ctx, t, client)

wg.Wait()

// Filled to maximum.
require.Len(t, subscribeChan2, subscribeChanSize)
})

t.Run("PanicOnUnknownKind", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion event.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func jobStatisticsFromInternal(stats *jobstats.JobStatistics) *JobStatistics {

// The maximum size of the subscribe channel. Events that would overflow it will
// be dropped.
const subscribeChanSize = 100
const subscribeChanSizeDefault = 50_000

// eventSubscription is an active subscription for events being produced by a
// client, created with Client.Subscribe.
Expand Down
Loading

0 comments on commit df7af43

Please sign in to comment.