Skip to content

Commit

Permalink
Batch completer + additional completer test suite and benchmarks
Browse files Browse the repository at this point in the history
Here, add a new completer using a completion strategy designed to be
much faster than what we're doing right now. Rather than blindly
throwing completion work into goroutine slots, it accumulates "batches"
of completions to be carried out, and using a debounced channel to fire
periodically (currently, up to every 100 milliseconds) and submit entire
batches for completion at once up to 2,000 jobs.

For the purposes of not grossly expanding the `riverdriver` interface,
the completer only batches jobs being set to `completed`, which under
most normal workloads we expect to be the vast common case. Jobs going
to other states are fed into a member `AsyncCompleter`, thereby allowing
the `BatchCompleter` to keeps implementation quite simple.

According to in-package benchmarking, the new completer is in the range
of 3-5x faster than `AsyncCompleter` (the one currently in use by River
client), and 10-15x faster than `InlineCompleter`.

    $ go test -bench=. ./internal/jobcompleter
    goos: darwin
    goarch: arm64
    pkg: github.com/riverqueue/river/internal/jobcompleter
    BenchmarkAsyncCompleter_Concurrency10/Completion-8                 10851            112318 ns/op
    BenchmarkAsyncCompleter_Concurrency10/RotatingStates-8             11386            120706 ns/op
    BenchmarkAsyncCompleter_Concurrency100/Completion-8                 9763            116773 ns/op
    BenchmarkAsyncCompleter_Concurrency100/RotatingStates-8            10884            115718 ns/op
    BenchmarkBatchCompleter/Completion-8                               54916             27314 ns/op
    BenchmarkBatchCompleter/RotatingStates-8                           11518            100997 ns/op
    BenchmarkInlineCompleter/Completion-8                               4656            369281 ns/op
    BenchmarkInlineCompleter/RotatingStates-8                           1561            794136 ns/op
    PASS
    ok      github.com/riverqueue/river/internal/jobcompleter       21.123s

Along with the new completer, we also add a vastly more thorough test
suite to help tease out race conditions and test edges that were
previously being ignored completely. For most cases we drop the heavy
mocking that was happening before, which was having the effect of
minimizing the surface area under test, and producing misleading timing
that wasn't realistic.

Similarly, we bring in a new benchmark framework to allow us to easily
vet and compare completer implementations relative to each other. The
expectation is that this will act as a more synthetic proxy, with the
new benchmarking tool in #254 providing a more realistic end-to-end
measurement.
  • Loading branch information
brandur committed Mar 12, 2024
1 parent ccea9c7 commit 8904e7e
Show file tree
Hide file tree
Showing 21 changed files with 1,573 additions and 264 deletions.
49 changes: 44 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}

completer := jobcompleter.NewAsyncCompleter(archetype, driver.GetExecutor(), 100)
completer := jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor())

client := &Client[TTx]{
completer: completer,
Expand Down Expand Up @@ -616,6 +616,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return err
}

// TODO: Stop completer (and any other services) if Start leaves with an error.
if err := c.completer.Start(ctx); err != nil {
return err
}

// Receives job complete notifications from the completer and distributes
// them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)
Expand Down Expand Up @@ -665,7 +670,7 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) {
//
// TODO: there's a risk here that the completer is stuck on a job that won't
// complete. We probably need a timeout or way to move on in those cases.
c.completer.Wait()
c.completer.Stop()

// Will only be started if this client was leader, but can tolerate a stop
// without having been started.
Expand Down Expand Up @@ -764,7 +769,41 @@ func (c *Client[TTx]) Stopped() <-chan struct{} {
// versions. If new event kinds are added, callers will have to explicitly add
// them to their requested list and ensure they can be handled correctly.
func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) {
for _, kind := range kinds {
return c.SubscribeConfig(&SubscribeConfig{Kinds: kinds})
}

// The default maximum size of the subscribe channel. Events that would overflow
// it will be dropped.
const subscribeChanSizeDefault = 1_000

// SubscribeConfig is more thorough subscription configuration used for
// Client.SubscribeConfig.
type SubscribeConfig struct {
// ChanSize is the size of the buffered channel that will be created for the
// subscription. Incoming events that overall this number because a listener
// isn't reading from the channel in a timely manner will be dropped.
//
// Defaults to 1000.
ChanSize int

// Kinds are the kinds of events that the subscription will receive.
// Requiring that kinds are specified explicitly allows for forward
// compatibility in case new kinds of events are added in future versions.
// If new event kinds are added, callers will have to explicitly add them to
// their requested list and esnure they can be handled correctly.
Kinds []EventKind
}

// Special internal variant that lets us inject an overridden size.
func (c *Client[TTx]) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) {
if config.ChanSize < 0 {
panic("SubscribeConfig.ChanSize must be greater or equal to 1")
}
if config.ChanSize == 0 {
config.ChanSize = subscribeChanSizeDefault
}

for _, kind := range config.Kinds {
if _, ok := allKinds[kind]; !ok {
panic(fmt.Errorf("unknown event kind: %s", kind))
}
Expand All @@ -773,15 +812,15 @@ func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

subChan := make(chan *Event, subscribeChanSize)
subChan := make(chan *Event, config.ChanSize)

// Just gives us an easy way of removing the subscription again later.
subID := c.subscriptionsSeq
c.subscriptionsSeq++

c.subscriptions[subID] = &eventSubscription{
Chan: subChan,
Kinds: sliceutil.KeyBy(kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }),
Kinds: sliceutil.KeyBy(config.Kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }),
}

cancel := func() {
Expand Down
198 changes: 181 additions & 17 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2316,6 +2316,137 @@ func Test_Client_Subscribe(t *testing.T) {
require.Equal(t, JobStateRetryable, eventFailed.Job.State)
})

t.Run("PanicOnUnknownKind", func(t *testing.T) {
t.Parallel()

dbPool := riverinternaltest.TestDB(ctx, t)

config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return nil
})

client := newTestClient(t, dbPool, config)

require.PanicsWithError(t, "unknown event kind: does_not_exist", func() {
_, _ = client.Subscribe(EventKind("does_not_exist"))
})
})

t.Run("SubscriptionCancellation", func(t *testing.T) {
t.Parallel()

dbPool := riverinternaltest.TestDB(ctx, t)

config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
return nil
})

client := newTestClient(t, dbPool, config)

subscribeChan, cancel := client.Subscribe(EventKindJobCompleted)
cancel()

// Drops through immediately because the channel is closed.
riverinternaltest.WaitOrTimeout(t, subscribeChan)

require.Empty(t, client.subscriptions)
})
}

// SubscribeConfig uses all the same code as Subscribe, so these are just a
// minimal set of new tests to make sure that the function also works when used
// independently.
func Test_Client_SubscribeConfig(t *testing.T) {
t.Parallel()

ctx := context.Background()

keyEventsByName := func(events []*Event) map[string]*Event {
return sliceutil.KeyBy(events, func(event *Event) (string, *Event) {
var args callbackArgs
require.NoError(t, json.Unmarshal(event.Job.EncodedArgs, &args))
return args.Name, event
})
}

requireInsert := func(ctx context.Context, client *Client[pgx.Tx], jobName string) *rivertype.JobRow {
job, err := client.Insert(ctx, callbackArgs{Name: jobName}, nil)
require.NoError(t, err)
return job
}

t.Run("Success", func(t *testing.T) {
t.Parallel()

dbPool := riverinternaltest.TestDB(ctx, t)

// Fail/succeed jobs based on their name so we can get a mix of both to
// verify.
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
if strings.HasPrefix(job.Args.Name, "failed") {
return errors.New("job error")
}
return nil
})

client := newTestClient(t, dbPool, config)

subscribeChan, cancel := client.SubscribeConfig(&SubscribeConfig{
Kinds: []EventKind{EventKindJobCompleted, EventKindJobFailed},
})
t.Cleanup(cancel)

jobCompleted1 := requireInsert(ctx, client, "completed1")
jobCompleted2 := requireInsert(ctx, client, "completed2")
jobFailed1 := requireInsert(ctx, client, "failed1")
jobFailed2 := requireInsert(ctx, client, "failed2")

expectedJobs := []*rivertype.JobRow{
jobCompleted1,
jobCompleted2,
jobFailed1,
jobFailed2,
}

startClient(ctx, t, client)

events := make([]*Event, len(expectedJobs))

for i := 0; i < len(expectedJobs); i++ {
events[i] = riverinternaltest.WaitOrTimeout(t, subscribeChan)
}

eventsByName := keyEventsByName(events)

{
eventCompleted1 := eventsByName["completed1"]
require.Equal(t, EventKindJobCompleted, eventCompleted1.Kind)
require.Equal(t, jobCompleted1.ID, eventCompleted1.Job.ID)
require.Equal(t, JobStateCompleted, eventCompleted1.Job.State)
}

{
eventCompleted2 := eventsByName["completed2"]
require.Equal(t, EventKindJobCompleted, eventCompleted2.Kind)
require.Equal(t, jobCompleted2.ID, eventCompleted2.Job.ID)
require.Equal(t, JobStateCompleted, eventCompleted2.Job.State)
}

{
eventFailed1 := eventsByName["failed1"]
require.Equal(t, EventKindJobFailed, eventFailed1.Kind)
require.Equal(t, jobFailed1.ID, eventFailed1.Job.ID)
require.Equal(t, JobStateRetryable, eventFailed1.Job.State)
}

{
eventFailed2 := eventsByName["failed2"]
require.Equal(t, EventKindJobFailed, eventFailed2.Kind)
require.Equal(t, jobFailed2.ID, eventFailed2.Job.ID)
require.Equal(t, JobStateRetryable, eventFailed2.Job.State)
}
})

t.Run("EventsDropWithNoListeners", func(t *testing.T) {
t.Parallel()

Expand All @@ -2327,37 +2458,70 @@ func Test_Client_Subscribe(t *testing.T) {

client := newTestClient(t, dbPool, config)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return nil
}))

// A first channel that we'll use to make sure all the expected jobs are
// finished.
subscribeChan, cancel := client.Subscribe(EventKindJobCompleted)
t.Cleanup(cancel)

// Artificially lowered subscribe channel size so we don't have to try
// and process thousands of jobs.
const (
subscribeChanSize = 100
numJobsToInsert = subscribeChanSize + 1
)

// Another channel with no listeners. Despite no listeners, it shouldn't
// block or gum up the client's progress in any way.
_, cancel = client.Subscribe(EventKindJobCompleted)
subscribeChan2, cancel := client.SubscribeConfig(&SubscribeConfig{
ChanSize: subscribeChanSize,
Kinds: []EventKind{EventKindJobCompleted},
})
t.Cleanup(cancel)

// Insert more jobs than the maximum channel size. We'll be pulling from
// one channel but not the other.
for i := 0; i < subscribeChanSize+1; i++ {
_ = requireInsert(ctx, client, fmt.Sprintf("job %d", i))
var (
insertParams = make([]*riverdriver.JobInsertFastParams, numJobsToInsert)
kind = (&JobArgs{}).Kind()
)
for i := 0; i < numJobsToInsert; i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{}`),
Kind: kind,
MaxAttempts: rivercommon.MaxAttemptsDefault,
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
State: rivertype.JobStateAvailable,
}
}

_, err := client.driver.GetExecutor().JobInsertFastMany(ctx, insertParams)
require.NoError(t, err)

// Need to start waiting on events before running the client or the
// channel could overflow before we start listening.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_ = riverinternaltest.WaitOrTimeoutN(t, subscribeChan, subscribeChanSize+1)
_ = riverinternaltest.WaitOrTimeoutN(t, subscribeChan, numJobsToInsert)
}()

startClient(ctx, t, client)

wg.Wait()

// Filled to maximum.
require.Len(t, subscribeChan2, subscribeChanSize)
})

t.Run("PanicOnUnknownKind", func(t *testing.T) {
t.Run("PanicOnChanSizeLessThanZero", func(t *testing.T) {
t.Parallel()

dbPool := riverinternaltest.TestDB(ctx, t)
Expand All @@ -2368,12 +2532,14 @@ func Test_Client_Subscribe(t *testing.T) {

client := newTestClient(t, dbPool, config)

require.PanicsWithError(t, "unknown event kind: does_not_exist", func() {
_, _ = client.Subscribe(EventKind("does_not_exist"))
require.PanicsWithValue(t, "SubscribeConfig.ChanSize must be greater or equal to 1", func() {
_, _ = client.SubscribeConfig(&SubscribeConfig{
ChanSize: -1,
})
})
})

t.Run("SubscriptionCancellation", func(t *testing.T) {
t.Run("PanicOnUnknownKind", func(t *testing.T) {
t.Parallel()

dbPool := riverinternaltest.TestDB(ctx, t)
Expand All @@ -2384,13 +2550,11 @@ func Test_Client_Subscribe(t *testing.T) {

client := newTestClient(t, dbPool, config)

subscribeChan, cancel := client.Subscribe(EventKindJobCompleted)
cancel()

// Drops through immediately because the channel is closed.
riverinternaltest.WaitOrTimeout(t, subscribeChan)

require.Empty(t, client.subscriptions)
require.PanicsWithError(t, "unknown event kind: does_not_exist", func() {
_, _ = client.SubscribeConfig(&SubscribeConfig{
Kinds: []EventKind{EventKind("does_not_exist")},
})
})
})
}

Expand Down
17 changes: 8 additions & 9 deletions cmd/river/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ module github.com/riverqueue/river/cmd/river

go 1.21.4

// replace github.com/riverqueue/river => ../..
replace github.com/riverqueue/river => ../..

// replace github.com/riverqueue/river/riverdriver => ../../riverdriver
replace github.com/riverqueue/river/riverdriver => ../../riverdriver

// replace github.com/riverqueue/river/riverdriver/riverdatabasesql => ../../riverdriver/riverdatabasesql
replace github.com/riverqueue/river/riverdriver/riverdatabasesql => ../../riverdriver/riverdatabasesql

// replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../riverdriver/riverpgxv5
replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../riverdriver/riverpgxv5

require (
github.com/jackc/pgx/v5 v5.5.2
github.com/jackc/pgx/v5 v5.5.5
github.com/riverqueue/river v0.0.17
github.com/riverqueue/river/riverdriver v0.0.17
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17
github.com/riverqueue/river/riverdriver v0.0.25
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.25
github.com/riverqueue/river/rivertype v0.0.25
github.com/spf13/cobra v1.8.0
)

Expand All @@ -23,10 +24,8 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/oklog/ulid/v2 v2.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
Loading

0 comments on commit 8904e7e

Please sign in to comment.