From df7af43433ab59bd7578a90f35c5b38b56d4fe82 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 9 Mar 2024 08:08:58 -0800 Subject: [PATCH] Batch completer + additional completer test suite and benchmarks Here, add a new completer using a completion strategy designed to be much faster than what we're doing right now. Rather than blindly throwing completion work into goroutine slots, it accumulates "batches" of completions to be carried out, and using a debounced channel to fire periodically (currently, up to every 100 milliseconds) and submit entire batches for completion at once up to 2,000 jobs. For the purposes of not grossly expanding the `riverdriver` interface, the completer only batches jobs being set to `completed`, which under most normal workloads we expect to be the vast common case. Jobs going to other states are fed into a member `AsyncCompleter`, thereby allowing the `BatchCompleter` to keeps implementation quite simple. According to in-package benchmarking, the new completer is in the range of 3-5x faster than `AsyncCompleter` (the one currently in use by River client), and 10-15x faster than `InlineCompleter`. $ go test -bench=. ./internal/jobcompleter goos: darwin goarch: arm64 pkg: github.com/riverqueue/river/internal/jobcompleter BenchmarkAsyncCompleter_Concurrency10/Completion-8 10851 112318 ns/op BenchmarkAsyncCompleter_Concurrency10/RotatingStates-8 11386 120706 ns/op BenchmarkAsyncCompleter_Concurrency100/Completion-8 9763 116773 ns/op BenchmarkAsyncCompleter_Concurrency100/RotatingStates-8 10884 115718 ns/op BenchmarkBatchCompleter/Completion-8 54916 27314 ns/op BenchmarkBatchCompleter/RotatingStates-8 11518 100997 ns/op BenchmarkInlineCompleter/Completion-8 4656 369281 ns/op BenchmarkInlineCompleter/RotatingStates-8 1561 794136 ns/op PASS ok github.com/riverqueue/river/internal/jobcompleter 21.123s Along with the new completer, we also add a vastly more thorough test suite to help tease out race conditions and test edges that were previously being ignored completely. For most cases we drop the heavy mocking that was happening before, which was having the effect of minimizing the surface area under test, and producing misleading timing that wasn't realistic. Similarly, we bring in a new benchmark framework to allow us to easily vet and compare completer implementations relative to each other. The expectation is that this will act as a more synthetic proxy, with the new benchmarking tool in #254 providing a more realistic end-to-end measurement. --- client.go | 14 +- client_test.go | 42 +- event.go | 2 +- internal/jobcompleter/job_completer.go | 369 ++++++++--- internal/jobcompleter/job_completer_test.go | 601 +++++++++++++++++- internal/jobcompleter/main_test.go | 11 + .../riverdrivertest/riverdrivertest.go | 64 ++ .../testfactory/test_factory.go | 18 +- job_executor.go | 10 +- job_executor_test.go | 50 +- producer.go | 3 + producer_test.go | 2 +- riverdriver/river_driver_interface.go | 14 +- .../internal/dbsqlc/river_job.sql.go | 97 ++- .../riverdatabasesql/river_database_sql.go | 4 + .../riverpgxv5/internal/dbsqlc/river_job.sql | 51 +- .../internal/dbsqlc/river_job.sql.go | 94 ++- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 11 + 18 files changed, 1250 insertions(+), 207 deletions(-) create mode 100644 internal/jobcompleter/main_test.go diff --git a/client.go b/client.go index fa507b76..ecba0d78 100644 --- a/client.go +++ b/client.go @@ -428,7 +428,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client TimeNowUTC: func() time.Time { return time.Now().UTC() }, } - completer := jobcompleter.NewAsyncCompleter(archetype, driver.GetExecutor(), 100) + completer := jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor()) client := &Client[TTx]{ completer: completer, @@ -600,6 +600,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error { // to shut down prior to closing the monitor. go c.monitor.Run() + // TODO: Stop completer (and any other services) if Start leaves with an error. + if err := c.completer.Start(ctx); err != nil { + return err + } + // Receives job complete notifications from the completer and distributes // them to any subscriptions. c.completer.Subscribe(c.distributeJobCompleterCallback) @@ -662,7 +667,7 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) { // // TODO: there's a risk here that the completer is stuck on a job that won't // complete. We probably need a timeout or way to move on in those cases. - c.completer.Wait() + c.completer.Stop() c.notifier.Stop() c.queueMaintainer.Stop() @@ -760,6 +765,11 @@ func (c *Client[TTx]) Stopped() <-chan struct{} { // versions. If new event kinds are added, callers will have to explicitly add // them to their requested list and ensure they can be handled correctly. func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) { + return c.subscribeWithSize(subscribeChanSizeDefault, kinds...) +} + +// Special internal variant that lets us inject an overridden size. +func (c *Client[TTx]) subscribeWithSize(subscribeChanSize int, kinds ...EventKind) (<-chan *Event, func()) { for _, kind := range kinds { if _, ok := allKinds[kind]; !ok { panic(fmt.Errorf("unknown event kind: %s", kind)) diff --git a/client_test.go b/client_test.go index e6bf6fb4..30b4d649 100644 --- a/client_test.go +++ b/client_test.go @@ -2327,34 +2327,64 @@ func Test_Client_Subscribe(t *testing.T) { client := newTestClient(t, dbPool, config) + type JobArgs struct { + JobArgsReflectKind[JobArgs] + } + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + return nil + })) + // A first channel that we'll use to make sure all the expected jobs are // finished. subscribeChan, cancel := client.Subscribe(EventKindJobCompleted) t.Cleanup(cancel) + // Artificially lowered subscribe channel size so we don't have to try + // and process thousands of jobs. + const ( + subscribeChanSize = 100 + numJobsToInsert = subscribeChanSize + 1 + ) + // Another channel with no listeners. Despite no listeners, it shouldn't // block or gum up the client's progress in any way. - _, cancel = client.Subscribe(EventKindJobCompleted) + subscribeChan2, cancel := client.subscribeWithSize(subscribeChanSize, EventKindJobCompleted) t.Cleanup(cancel) - // Insert more jobs than the maximum channel size. We'll be pulling from - // one channel but not the other. - for i := 0; i < subscribeChanSize+1; i++ { - _ = requireInsert(ctx, client, fmt.Sprintf("job %d", i)) + var ( + insertParams = make([]*riverdriver.JobInsertFastParams, numJobsToInsert) + kind = (&JobArgs{}).Kind() + ) + for i := 0; i < numJobsToInsert; i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{}`), + Kind: kind, + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateAvailable, + } } + _, err := client.driver.GetExecutor().JobInsertFastMany(ctx, insertParams) + require.NoError(t, err) + // Need to start waiting on events before running the client or the // channel could overflow before we start listening. var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - _ = riverinternaltest.WaitOrTimeoutN(t, subscribeChan, subscribeChanSize+1) + _ = riverinternaltest.WaitOrTimeoutN(t, subscribeChan, numJobsToInsert) }() startClient(ctx, t, client) wg.Wait() + + // Filled to maximum. + require.Len(t, subscribeChan2, subscribeChanSize) }) t.Run("PanicOnUnknownKind", func(t *testing.T) { diff --git a/event.go b/event.go index ef84b86b..da03717c 100644 --- a/event.go +++ b/event.go @@ -69,7 +69,7 @@ func jobStatisticsFromInternal(stats *jobstats.JobStatistics) *JobStatistics { // The maximum size of the subscribe channel. Events that would overflow it will // be dropped. -const subscribeChanSize = 100 +const subscribeChanSizeDefault = 50_000 // eventSubscription is an active subscription for events being produced by a // client, created with Client.Subscribe. diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 17d227df..6df4e8dc 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -10,23 +10,29 @@ import ( "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/jobstats" + "github.com/riverqueue/river/internal/maintenance/startstop" + "github.com/riverqueue/river/internal/util/chanutil" + "github.com/riverqueue/river/internal/util/maputil" "github.com/riverqueue/river/internal/util/timeutil" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivertype" ) +// JobCompleter is an interface to a service that "completes" jobs by marking +// them with an appropriate state and any other necessary metadata in the +// database. It's a generic interface to let us experiment with the speed of a +// number of implementations, although River will likely always prefer our most +// optimized one. type JobCompleter interface { + startstop.Service + // JobSetState sets a new state for the given job, as long as it's // still running (i.e. its state has not changed to something else already). - JobSetStateIfRunning(stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error + JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error // Subscribe injects a callback which will be invoked whenever a job is // updated. Subscribe(subscribeFunc func(update CompleterJobUpdated)) - - // Wait waits for all ongoing completions to finish, enabling graceful - // shutdown. - Wait() } type CompleterJobUpdated struct { @@ -38,15 +44,15 @@ type CompleterJobUpdated struct { // but is a minimal interface with the functions needed for completers to work // to more easily facilitate mocking. type PartialExecutor interface { + JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) } -type InlineJobCompleter struct { +type InlineCompleter struct { baseservice.BaseService + withSubscribe - exec PartialExecutor - subscribeFunc func(update CompleterJobUpdated) - subscribeFuncMu sync.Mutex + exec PartialExecutor // A waitgroup is not actually needed for the inline completer because as // long as the caller is waiting on each function call, completion is @@ -56,117 +62,275 @@ type InlineJobCompleter struct { wg sync.WaitGroup } -func NewInlineCompleter(archetype *baseservice.Archetype, exec PartialExecutor) *InlineJobCompleter { - return baseservice.Init(archetype, &InlineJobCompleter{ +func NewInlineCompleter(archetype *baseservice.Archetype, exec PartialExecutor) *InlineCompleter { + return baseservice.Init(archetype, &InlineCompleter{ exec: exec, }) } -func (c *InlineJobCompleter) JobSetStateIfRunning(stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { - return c.doOperation(stats, func(ctx context.Context) (*rivertype.JobRow, error) { - return c.exec.JobSetStateIfRunning(ctx, params) - }) -} - -func (c *InlineJobCompleter) Subscribe(subscribeFunc func(update CompleterJobUpdated)) { - c.subscribeFuncMu.Lock() - defer c.subscribeFuncMu.Unlock() - - c.subscribeFunc = subscribeFunc -} - -func (c *InlineJobCompleter) Wait() { - c.wg.Wait() -} - -func (c *InlineJobCompleter) doOperation(stats *jobstats.JobStatistics, f func(ctx context.Context) (*rivertype.JobRow, error)) error { +func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { c.wg.Add(1) defer c.wg.Done() start := c.TimeNowUTC() - job, err := withRetries(&c.BaseService, f) + job, err := withRetries(ctx, &c.BaseService, func(ctx context.Context) (*rivertype.JobRow, error) { + return c.exec.JobSetStateIfRunning(ctx, params) + }) if err != nil { return err } stats.CompleteDuration = c.TimeNowUTC().Sub(start) + c.sendJobToSubscription(job, stats) - func() { - c.subscribeFuncMu.Lock() - defer c.subscribeFuncMu.Unlock() + return nil +} - if c.subscribeFunc != nil { - c.subscribeFunc(CompleterJobUpdated{Job: job, JobStats: stats}) - } - }() +func (c *InlineCompleter) Start(ctx context.Context) error { return nil } - return nil +func (c *InlineCompleter) Stop() { + c.wg.Wait() } -type AsyncJobCompleter struct { +// A default concurrency of 100 seems to perform better a much smaller number +// like 10, but it's quite dependent on environment (10 and 100 bench almost +// identically on MBA when it's on battery power). This number should represent +// our best known default for most use cases, but don't consider its choice to +// be particularly well informed at this point. +const asyncCompleterDefaultConcurrency = 100 + +type AsyncCompleter struct { baseservice.BaseService + withSubscribe - concurrency uint32 - exec PartialExecutor - eg *errgroup.Group - subscribeFunc func(update CompleterJobUpdated) - subscribeFuncMu sync.Mutex + concurrency int + errGroup *errgroup.Group + exec PartialExecutor } -func NewAsyncCompleter(archetype *baseservice.Archetype, exec PartialExecutor, concurrency uint32) *AsyncJobCompleter { - eg := &errgroup.Group{} - // TODO: int concurrency may feel more natural than uint32 - eg.SetLimit(int(concurrency)) +func NewAsyncCompleter(archetype *baseservice.Archetype, exec PartialExecutor) *AsyncCompleter { + return newAsyncCompleterWithConcurrency(archetype, exec, asyncCompleterDefaultConcurrency) +} - return baseservice.Init(archetype, &AsyncJobCompleter{ +func newAsyncCompleterWithConcurrency(archetype *baseservice.Archetype, exec PartialExecutor, concurrency int) *AsyncCompleter { + errGroup := &errgroup.Group{} + errGroup.SetLimit(concurrency) + + return baseservice.Init(archetype, &AsyncCompleter{ exec: exec, concurrency: concurrency, - eg: eg, + errGroup: errGroup, }) } -func (c *AsyncJobCompleter) JobSetStateIfRunning(stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { - return c.doOperation(stats, func(ctx context.Context) (*rivertype.JobRow, error) { - return c.exec.JobSetStateIfRunning(ctx, params) - }) -} - -func (c *AsyncJobCompleter) doOperation(stats *jobstats.JobStatistics, f func(ctx context.Context) (*rivertype.JobRow, error)) error { - c.eg.Go(func() error { - start := c.TimeNowUTC() +func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { + // Start clock outside of goroutine so that the time spent blocking waiting + // for an errgroup slot is accurately measured. + start := c.TimeNowUTC() - job, err := withRetries(&c.BaseService, f) + c.errGroup.Go(func() error { + job, err := withRetries(ctx, &c.BaseService, func(ctx context.Context) (*rivertype.JobRow, error) { + return c.exec.JobSetStateIfRunning(ctx, params) + }) if err != nil { return err } stats.CompleteDuration = c.TimeNowUTC().Sub(start) + c.sendJobToSubscription(job, stats) + + return nil + }) + return nil +} + +func (c *AsyncCompleter) Start(ctx context.Context) error { return nil } + +func (c *AsyncCompleter) Stop() { + if err := c.errGroup.Wait(); err != nil { + c.Logger.Error("Error waiting on async completer: %s", err) + } +} + +type batchCompleterSetState struct { + Params *riverdriver.JobSetStateIfRunningParams + Stats *jobstats.JobStatistics + WaitingAt time.Time // went job was submitted for completion +} + +// BatchCompleter uses a debounced channel to accumulate incoming completions +// and every so often complete many of them as a single efficient batch. To +// minimize the amount of driver surface area we need, the batching is only +// performed for jobs being changed to a `completed` state, which we expect to +// the vast common case under normal operation. The completer embeds an +// AsyncCompleter to perform other non-`completed` state completions. +type BatchCompleter struct { + baseservice.BaseService + startstop.BaseStartStop + withSubscribe + + asyncCompleter *AsyncCompleter // used for non-complete completions + debounceChan *chanutil.DebouncedChan + exec PartialExecutor + ready chan struct{} + setStateParams map[int64]*batchCompleterSetState + setStateParamsMu sync.Mutex +} + +func NewBatchCompleter(archetype *baseservice.Archetype, exec PartialExecutor) *BatchCompleter { + return baseservice.Init(archetype, &BatchCompleter{ + asyncCompleter: NewAsyncCompleter(archetype, exec), + exec: exec, + setStateParams: make(map[int64]*batchCompleterSetState), + }) +} + +func (c *BatchCompleter) Start(ctx context.Context) error { + stopCtx, shouldStart, stopped := c.StartInit(ctx) + if !shouldStart { + return nil + } + + c.ready = make(chan struct{}) - func() { - c.subscribeFuncMu.Lock() - defer c.subscribeFuncMu.Unlock() + go func() { + // This defer should come first so that it's last out, thereby avoiding + // races. + defer close(stopped) - if c.subscribeFunc != nil { - c.subscribeFunc(CompleterJobUpdated{Job: job, JobStats: stats}) + c.debounceChan = chanutil.NewDebouncedChan(stopCtx, 100*time.Millisecond) + + close(c.ready) + + for { + select { + case <-stopCtx.Done(): + // Try to insert last batch before leaving. Note we use the + // original context so operations aren't immediately cancelled. + if err := c.handleBatch(ctx); err != nil { + c.Logger.Error(c.Name+": Error completing batch", "err", err) + } + + return + case <-c.debounceChan.C(): + if err := c.handleBatch(ctx); err != nil { + c.Logger.Error(c.Name+": Error completing batch", "err", err) + } } - }() + } + }() + + return nil +} + +func (c *BatchCompleter) handleBatch(ctx context.Context) error { + var setStateBatch map[int64]*batchCompleterSetState + func() { + c.setStateParamsMu.Lock() + defer c.setStateParamsMu.Unlock() + + setStateBatch = c.setStateParams + + // Don't bother resetting the map if there's nothing to process, + // allowing the completer to idle efficiently. + if len(setStateBatch) > 0 { + c.setStateParams = make(map[int64]*batchCompleterSetState) + } + }() + if len(setStateBatch) < 1 { return nil - }) + } + + // Use a single `finalized_at` value for the whole batch. Not the greatest + // thing maybe, but makes things much easier. + var sampleFinalizedAt time.Time + for _, setState := range setStateBatch { + sampleFinalizedAt = *setState.Params.FinalizedAt + break + } + + // Insert a sub-batch with retries. Also helps reduce visual noise and + // increase readability of loop below. + insertSubBatch := func(ids []int64) ([]*rivertype.JobRow, error) { + return withRetries(ctx, &c.BaseService, func(ctx context.Context) ([]*rivertype.JobRow, error) { + return c.exec.JobSetCompleteIfRunningMany(ctx, &riverdriver.JobSetCompleteIfRunningManyParams{ + ID: ids, + FinalizedAt: sampleFinalizedAt, + }) + }) + } + + // Tease apart enormous batches into sub-batches. + // + // All the code below is concerned with doing that, with a fast loop that + // doesn't allocate any additional memory in case the entire batch is + // smaller than the sub-batch maximum size (which will be the common case). + const oneOperationMax = 2_000 + + var ( + ids = maputil.Keys(setStateBatch) + jobRows []*rivertype.JobRow + ) + if len(setStateBatch) > oneOperationMax { + jobRows = make([]*rivertype.JobRow, 0, len(setStateBatch)) + for i := 0; i < len(setStateBatch); i += oneOperationMax { + jobRowsSubBatch, err := insertSubBatch(ids[i:min(i+oneOperationMax, len(ids))]) + if err != nil { + return err + } + jobRows = append(jobRows, jobRowsSubBatch...) + } + } else { + var err error + jobRows, err = insertSubBatch(ids) + if err != nil { + return err + } + } + + for _, jobRow := range jobRows { + setState := setStateBatch[jobRow.ID] + setState.Stats.CompleteDuration = c.TimeNowUTC().Sub(setState.WaitingAt) + c.sendJobToSubscription(jobRow, setState.Stats) + } + return nil } -func (c *AsyncJobCompleter) Subscribe(subscribeFunc func(update CompleterJobUpdated)) { - c.subscribeFuncMu.Lock() - defer c.subscribeFuncMu.Unlock() +func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { + // Send completions other than setting to `complete` to an async completer. + // We consider this okay because these are expected to be much more rare, so + // only optimizing `complete` will yield huge speed gains. + if params.State != rivertype.JobStateCompleted { + return c.asyncCompleter.JobSetStateIfRunning(ctx, stats, params) + } - c.subscribeFunc = subscribeFunc + // Wait until the completer is started and ready to start processing + // batches. Alternatively, we could remove this and allow batches to start + // accumulating even if the service isn't started, but that could introduce + // some danger of the service never being started and therefore accumulating + // forever without completing jobs. + <-c.ready + + c.setStateParamsMu.Lock() + defer c.setStateParamsMu.Unlock() + + c.setStateParams[params.ID] = &batchCompleterSetState{params, stats, c.TimeNowUTC()} + c.debounceChan.Call() + + return nil } -func (c *AsyncJobCompleter) Wait() { - // TODO: handle error? - _ = c.eg.Wait() +func (c *BatchCompleter) Subscribe(subscribeFunc func(update CompleterJobUpdated)) { + c.withSubscribe.Subscribe(subscribeFunc) + c.asyncCompleter.Subscribe(subscribeFunc) +} + +func (c *BatchCompleter) Stop() { + c.BaseStartStop.Stop() + c.asyncCompleter.Stop() } // As configued, total time from initial attempt is ~7 seconds (1 + 2 + 4) (not @@ -174,7 +338,7 @@ func (c *AsyncJobCompleter) Wait() { // may want to rethink these numbers and strategy. const numRetries = 3 -func withRetries(c *baseservice.BaseService, f func(ctx context.Context) (*rivertype.JobRow, error)) (*rivertype.JobRow, error) { //nolint:varnamelen +func withRetries[T any](ctx context.Context, baseServic *baseservice.BaseService, retryFunc func(ctx context.Context) (T, error)) (T, error) { retrySecondsWithoutJitter := func(attempt int) float64 { // Uses a different algorithm (2 ** N) compared to retry policies (4 ** // N) so we can get more retries sooner: 1, 2, 4, 8 @@ -185,41 +349,66 @@ func withRetries(c *baseservice.BaseService, f func(ctx context.Context) (*river retrySeconds := retrySecondsWithoutJitter(attempt) // Jitter number of seconds +/- 10%. - retrySeconds += retrySeconds * (c.Rand.Float64()*0.2 - 0.1) + retrySeconds += retrySeconds * (baseServic.Rand.Float64()*0.2 - 0.1) return retrySeconds } - tryOnce := func() (*rivertype.JobRow, error) { - ctx := context.Background() + tryOnce := func() (T, error) { + uncancelledCtx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + // I've found that we want at least ten seconds for a large batch, + // although it usually doesn't need that long. + uncancelledCtx, cancel := context.WithTimeout(uncancelledCtx, 10*time.Second) defer cancel() - return f(ctx) + return retryFunc(uncancelledCtx) } var lastErr error // TODO: Added a basic retry algorithm based on the top-level retry policies // for now, but we may want to reevaluate this somewhat. + // + // TODO: Should use base service exponential backoff once available. for attempt := 1; attempt < numRetries+1; attempt++ { - job, err := tryOnce() + retVal, err := tryOnce() if err != nil { lastErr = err sleepDuration := timeutil.SecondsAsDuration(retrySeconds(attempt)) - // TODO: this logger doesn't use the user-provided context because it's - // not currently available here. It should. - c.Logger.Error(c.Name+": Completer error (will retry)", "attempt", attempt, "err", err, "sleep_duration", sleepDuration) - c.CancellableSleep(context.Background(), sleepDuration) + baseServic.Logger.ErrorContext(ctx, baseServic.Name+": Completer error (will retry)", "attempt", attempt, "err", err, "sleep_duration", sleepDuration) + baseServic.CancellableSleep(ctx, sleepDuration) continue } - return job, nil + return retVal, nil } - // TODO: this logger doesn't use the user-provided context because it's - // not currently available here. It should. - c.Logger.Error(c.Name + ": Too many errors; giving up") - return nil, lastErr + baseServic.Logger.ErrorContext(ctx, baseServic.Name+": Too many errors; giving up") + + var defaultVal T + return defaultVal, lastErr +} + +// Utility struct embedded in completers to give them an easy way to provide a +// Subscribe function and to handle locking around its use. +type withSubscribe struct { + subscribeFunc func(update CompleterJobUpdated) + subscribeFuncMu sync.RWMutex +} + +func (c *withSubscribe) Subscribe(subscribeFunc func(update CompleterJobUpdated)) { + c.subscribeFuncMu.Lock() + defer c.subscribeFuncMu.Unlock() + + c.subscribeFunc = subscribeFunc +} + +func (c *withSubscribe) sendJobToSubscription(job *rivertype.JobRow, stats *jobstats.JobStatistics) { + c.subscribeFuncMu.RLock() + defer c.subscribeFuncMu.RUnlock() + + if c.subscribeFunc != nil { + c.subscribeFunc(CompleterJobUpdated{Job: job, JobStats: stats}) + } } diff --git a/internal/jobcompleter/job_completer_test.go b/internal/jobcompleter/job_completer_test.go index 5845a2ea..01df48f9 100644 --- a/internal/jobcompleter/job_completer_test.go +++ b/internal/jobcompleter/job_completer_test.go @@ -3,38 +3,66 @@ package jobcompleter import ( "context" "errors" + "fmt" "sync" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" "github.com/riverqueue/river/internal/jobstats" + "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/internal/riverinternaltest/testfactory" + "github.com/riverqueue/river/internal/util/ptrutil" + "github.com/riverqueue/river/internal/util/randutil" "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivertype" ) -type executorMock struct { - JobSetStateIfRunningCalled bool - JobSetStateIfRunningFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) - mu sync.Mutex +type partialExecutorMock struct { + JobSetCompleteIfRunningManyCalled bool + JobSetCompleteIfRunningManyFunc func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) + JobSetStateIfRunningCalled bool + JobSetStateIfRunningFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) + mu sync.Mutex } -func (m *executorMock) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { - m.mu.Lock() - m.JobSetStateIfRunningCalled = true - m.mu.Unlock() +// NewPartialExecutorMock returns a new mock with all mock functions set to call +// down into the given real executor. +func NewPartialExecutorMock(exec riverdriver.Executor) *partialExecutorMock { + return &partialExecutorMock{ + JobSetCompleteIfRunningManyFunc: exec.JobSetCompleteIfRunningMany, + JobSetStateIfRunningFunc: exec.JobSetStateIfRunning, + } +} +func (m *partialExecutorMock) JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + m.setCalled(func() { m.JobSetCompleteIfRunningManyCalled = true }) + return m.JobSetCompleteIfRunningManyFunc(ctx, params) +} + +func (m *partialExecutorMock) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { + m.setCalled(func() { m.JobSetStateIfRunningCalled = true }) return m.JobSetStateIfRunningFunc(ctx, params) } +func (m *partialExecutorMock) setCalled(setCalledFunc func()) { + m.mu.Lock() + defer m.mu.Unlock() + setCalledFunc() +} + func TestInlineJobCompleter_Complete(t *testing.T) { t.Parallel() + ctx := context.Background() + var attempt int expectedErr := errors.New("an error from the completer") - adapter := &executorMock{ + execMock := &partialExecutorMock{ JobSetStateIfRunningFunc: func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { require.Equal(t, int64(1), params.ID) attempt++ @@ -42,15 +70,15 @@ func TestInlineJobCompleter_Complete(t *testing.T) { }, } - completer := NewInlineCompleter(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), adapter) - t.Cleanup(completer.Wait) + completer := NewInlineCompleter(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), execMock) + t.Cleanup(completer.Stop) - err := completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now())) + err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now())) if !errors.Is(err, expectedErr) { t.Errorf("expected %v, got %v", expectedErr, err) } - require.True(t, adapter.JobSetStateIfRunningCalled) + require.True(t, execMock.JobSetStateIfRunningCalled) require.Equal(t, numRetries, attempt) } @@ -73,6 +101,8 @@ func TestInlineJobCompleter_Wait(t *testing.T) { func TestAsyncJobCompleter_Complete(t *testing.T) { t.Parallel() + ctx := context.Background() + type jobInput struct { // TODO: Try to get rid of containing the context in struct. It'd be // better to pass it forward instead. @@ -89,26 +119,26 @@ func TestAsyncJobCompleter_Complete(t *testing.T) { resultCh <- expectedErr }() - adapter := &executorMock{ + adapter := &partialExecutorMock{ JobSetStateIfRunningFunc: func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { inputCh <- jobInput{ctx: ctx, jobID: params.ID} err := <-resultCh return nil, err }, } - completer := NewAsyncCompleter(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), adapter, 2) - t.Cleanup(completer.Wait) + completer := newAsyncCompleterWithConcurrency(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), adapter, 2) + t.Cleanup(completer.Stop) // launch 4 completions, only 2 can be inline due to the concurrency limit: for i := int64(0); i < 2; i++ { - if err := completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now())); err != nil { + if err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now())); err != nil { t.Errorf("expected nil err, got %v", err) } } bgCompletionsStarted := make(chan struct{}) go func() { for i := int64(2); i < 4; i++ { - if err := completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now())); err != nil { + if err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now())); err != nil { t.Errorf("expected nil err, got %v", err) } } @@ -159,7 +189,7 @@ func TestAsyncJobCompleter_Subscribe(t *testing.T) { t.Parallel() testCompleterSubscribe(t, func(exec PartialExecutor) JobCompleter { - return NewAsyncCompleter(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), exec, 4) + return newAsyncCompleterWithConcurrency(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), exec, 4) }) } @@ -167,14 +197,16 @@ func TestAsyncJobCompleter_Wait(t *testing.T) { t.Parallel() testCompleterWait(t, func(exec PartialExecutor) JobCompleter { - return NewAsyncCompleter(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), exec, 4) + return newAsyncCompleterWithConcurrency(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), exec, 4) }) } func testCompleterSubscribe(t *testing.T, constructor func(PartialExecutor) JobCompleter) { t.Helper() - exec := &executorMock{ + ctx := context.Background() + + exec := &partialExecutorMock{ JobSetStateIfRunningFunc: func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { return &rivertype.JobRow{ State: rivertype.JobStateCompleted, @@ -190,10 +222,10 @@ func testCompleterSubscribe(t *testing.T, constructor func(PartialExecutor) JobC }) for i := 0; i < 4; i++ { - require.NoError(t, completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now()))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now()))) } - completer.Wait() + completer.Stop() updates := riverinternaltest.WaitOrTimeoutN(t, jobUpdates, 4) for i := 0; i < 4; i++ { @@ -204,9 +236,11 @@ func testCompleterSubscribe(t *testing.T, constructor func(PartialExecutor) JobC func testCompleterWait(t *testing.T, constructor func(PartialExecutor) JobCompleter) { t.Helper() + ctx := context.Background() + resultCh := make(chan error) completeStartedCh := make(chan struct{}) - exec := &executorMock{ + exec := &partialExecutorMock{ JobSetStateIfRunningFunc: func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { completeStartedCh <- struct{}{} err := <-resultCh @@ -220,7 +254,7 @@ func testCompleterWait(t *testing.T, constructor func(PartialExecutor) JobComple for i := 0; i < 4; i++ { i := i go func() { - require.NoError(t, completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now()))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now()))) }() <-completeStartedCh // wait for func to actually start } @@ -230,7 +264,7 @@ func testCompleterWait(t *testing.T, constructor func(PartialExecutor) JobComple waitDone := make(chan struct{}) go func() { - completer.Wait() + completer.Stop() close(waitDone) }() @@ -259,3 +293,518 @@ func testCompleterWait(t *testing.T, constructor func(PartialExecutor) JobComple t.Errorf("expected Wait to return after all jobs are complete") } } + +func TestAsyncCompleter(t *testing.T) { + t.Parallel() + + testCompleter(t, func(exec riverdriver.Executor) *AsyncCompleter { + return NewAsyncCompleter(riverinternaltest.BaseServiceArchetype(t), exec) + }, + func(completer *AsyncCompleter) { completer.DisableSleep = true }, + func(completer *AsyncCompleter, exec PartialExecutor) { completer.exec = exec }) +} + +func TestBatchCompleter(t *testing.T) { + t.Parallel() + + testCompleter(t, func(exec riverdriver.Executor) *BatchCompleter { + return NewBatchCompleter(riverinternaltest.BaseServiceArchetype(t), exec) + }, + func(completer *BatchCompleter) { completer.DisableSleep = true }, + func(completer *BatchCompleter, exec PartialExecutor) { completer.exec = exec }) +} + +func TestInlineCompleter(t *testing.T) { + t.Parallel() + + testCompleter(t, func(exec riverdriver.Executor) *InlineCompleter { + return NewInlineCompleter(riverinternaltest.BaseServiceArchetype(t), exec) + }, + func(completer *InlineCompleter) { completer.DisableSleep = true }, + func(completer *InlineCompleter, exec PartialExecutor) { completer.exec = exec }) +} + +func testCompleter[TCompleter JobCompleter]( + t *testing.T, + newCompleter func(exec riverdriver.Executor) TCompleter, + + // These functions are here to help us inject test behavior that's not part + // of the JobCompleter interface. We could alternatively define a second + // interface like jobCompleterWithTestFacilities to expose the additional + // functionality, although that's not particularly beautiful either. + disableSleep func(completer TCompleter), + setExec func(completer TCompleter, exec PartialExecutor), +) { + t.Helper() + + ctx := context.Background() + + type testBundle struct { + exec riverdriver.Executor + } + + setup := func(t *testing.T) (TCompleter, *testBundle) { + t.Helper() + + var ( + driver = riverpgxv5.New(riverinternaltest.TestDB(ctx, t)) + exec = driver.GetExecutor() + completer = newCompleter(exec) + ) + + require.NoError(t, completer.Start(ctx)) + + return completer, &testBundle{ + exec: exec, + } + } + + requireJob := func(t *testing.T, exec riverdriver.Executor, jobID int64) *rivertype.JobRow { + t.Helper() + + job, err := exec.JobGetByID(ctx, jobID) + require.NoError(t, err) + return job + } + + requireState := func(t *testing.T, exec riverdriver.Executor, jobID int64, state rivertype.JobState) { + t.Helper() + + job := requireJob(t, exec, jobID) + require.Equal(t, state, job.State) + } + + t.Run("CompletesJobs", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + var ( + job1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job3 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + ) + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job1.ID, time.Now()))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, time.Now()))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job3.ID, time.Now()))) + + completer.Stop() + + requireState(t, bundle.exec, job1.ID, rivertype.JobStateCompleted) + requireState(t, bundle.exec, job2.ID, rivertype.JobStateCompleted) + requireState(t, bundle.exec, job3.ID, rivertype.JobStateCompleted) + }) + + // Some completers like BatchCompleter have special logic for when they're + // handling enormous numbers of jobs, so make sure we're covered for cases + // like that. + t.Run("CompletesManyJobs", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + const ( + kind = "many_jobs_kind" + numJobs = 4_400 + ) + + var ( + insertParams = make([]*riverdriver.JobInsertFastParams, numJobs) + stats = make([]jobstats.JobStatistics, numJobs) + ) + for i := 0; i < numJobs; i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{}`), + Kind: kind, + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateRunning, + } + } + + _, err := bundle.exec.JobInsertFastMany(ctx, insertParams) + require.NoError(t, err) + + jobs, err := bundle.exec.JobGetByKindMany(ctx, []string{kind}) + require.NoError(t, err) + + for i := range jobs { + require.NoError(t, completer.JobSetStateIfRunning(ctx, &stats[i], riverdriver.JobSetStateCompleted(jobs[i].ID, time.Now()))) + } + + completer.Stop() + + updatedJobs, err := bundle.exec.JobGetByKindMany(ctx, []string{kind}) + require.NoError(t, err) + for i := range updatedJobs { + require.Equal(t, rivertype.JobStateCompleted, updatedJobs[i].State) + } + }) + + // Performs continuous job insertion from a background goroutine. Returns a + // function that should be invoked to stop insertion, which will block until + // insertion stops, then return the total number of jobs that were inserted. + doContinuousInsertion := func(t *testing.T, completer JobCompleter, exec riverdriver.Executor) func() int { + t.Helper() + + var ( + insertionStopped = make(chan struct{}) + numInserted atomic.Int64 + stopInsertion = make(chan struct{}) + ) + go func() { + defer close(insertionStopped) + + defer func() { + t.Logf("Inserted %d jobs", numInserted.Load()) + }() + + for { + select { + case <-stopInsertion: + return + default: + } + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))) + numInserted.Add(1) + } + }() + + return func() int { + close(stopInsertion) + <-insertionStopped + return int(numInserted.Load()) + } + } + + t.Run("ContinuousCompletion", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + stopInsertion := doContinuousInsertion(t, completer, bundle.exec) + + // Give some time for some jobs to be inserted. + time.Sleep(100 * time.Millisecond) + + // Signal to stop insertion and wait for the goroutine to return. + numInserted := stopInsertion() + + require.Greater(t, numInserted, 0) + + completer.Stop() + }) + + t.Run("AllJobStates", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + var ( + job1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job3 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job4 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job5 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job6 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job7 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + ) + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCancelled(job1.ID, time.Now(), []byte("{}")))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, time.Now()))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateDiscarded(job3.ID, time.Now(), []byte("{}")))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorAvailable(job4.ID, time.Now(), []byte("{}")))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorRetryable(job5.ID, time.Now(), []byte("{}")))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozed(job6.ID, time.Now(), 10))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozedAvailable(job7.ID, time.Now(), 10))) + + completer.Stop() + + requireState(t, bundle.exec, job1.ID, rivertype.JobStateCancelled) + requireState(t, bundle.exec, job2.ID, rivertype.JobStateCompleted) + requireState(t, bundle.exec, job3.ID, rivertype.JobStateDiscarded) + requireState(t, bundle.exec, job4.ID, rivertype.JobStateAvailable) + requireState(t, bundle.exec, job5.ID, rivertype.JobStateRetryable) + requireState(t, bundle.exec, job6.ID, rivertype.JobStateScheduled) + requireState(t, bundle.exec, job7.ID, rivertype.JobStateAvailable) + }) + + t.Run("Subscription", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + var jobUpdate CompleterJobUpdated + completer.Subscribe(func(update CompleterJobUpdated) { + jobUpdate = update + }) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))) + + completer.Stop() + + require.NotZero(t, jobUpdate) + require.Equal(t, rivertype.JobStateCompleted, jobUpdate.Job.State) + }) + + t.Run("MultipleCycles", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + { + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))) + + completer.Stop() + + requireState(t, bundle.exec, job.ID, rivertype.JobStateCompleted) + } + + { + require.NoError(t, completer.Start(ctx)) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))) + + completer.Stop() + + requireState(t, bundle.exec, job.ID, rivertype.JobStateCompleted) + } + }) + + t.Run("CompletionFailure", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + // The completers will do an exponential backoff sleep while retrying. + // Make sure to disable it for this test case so the tests stay fast. + disableSleep(completer) + + var numCalls int + maybeError := func() error { + numCalls++ + switch numCalls { + case 1: + fallthrough + case 2: + return fmt.Errorf("error from executor %d", numCalls) + } + return nil + } + + execMock := NewPartialExecutorMock(bundle.exec) + execMock.JobSetCompleteIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + if err := maybeError(); err != nil { + return nil, err + } + return bundle.exec.JobSetCompleteIfRunningMany(ctx, params) + } + execMock.JobSetStateIfRunningFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { + if err := maybeError(); err != nil { + return nil, err + } + return bundle.exec.JobSetStateIfRunning(ctx, params) + } + setExec(completer, execMock) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))) + + completer.Stop() + + // Make sure our mocks were really called. The specific function called + // will depend on the completer under test, so okay as long as one or + // the other was. + require.True(t, execMock.JobSetCompleteIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) + + // Job still managed to complete despite the errors. + requireState(t, bundle.exec, job.ID, rivertype.JobStateCompleted) + }) + + t.Run("SubscribeStress", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + stopInsertion := doContinuousInsertion(t, completer, bundle.exec) + + const numGoroutines = 5 + + var ( + rand = randutil.NewCryptoSeededConcurrentSafeRand() + stopSubscribing = make(chan struct{}) + wg sync.WaitGroup + ) + + wg.Add(numGoroutines) + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for { + select { + case <-stopSubscribing: + return + case <-time.After(time.Duration(randutil.IntBetween(rand, int(2*time.Millisecond), int(20*time.Millisecond)))): + completer.Subscribe(func(update CompleterJobUpdated) {}) + } + } + }() + } + + // Give some time for some jobs to be inserted and the subscriber + // goroutines to churn. + time.Sleep(100 * time.Millisecond) + + close(stopSubscribing) + wg.Wait() + + // Signal to stop insertion and wait for the goroutine to return. + numInserted := stopInsertion() + + require.Greater(t, numInserted, 0) + + completer.Stop() + }) +} + +func BenchmarkAsyncCompleter_Concurrency10(b *testing.B) { + benchmarkCompleter(b, func(exec riverdriver.Executor) JobCompleter { + return newAsyncCompleterWithConcurrency(riverinternaltest.BaseServiceArchetype(b), exec, 10) + }) +} + +func BenchmarkAsyncCompleter_Concurrency100(b *testing.B) { + benchmarkCompleter(b, func(exec riverdriver.Executor) JobCompleter { + return newAsyncCompleterWithConcurrency(riverinternaltest.BaseServiceArchetype(b), exec, 100) + }) +} + +func BenchmarkBatchCompleter(b *testing.B) { + benchmarkCompleter(b, func(exec riverdriver.Executor) JobCompleter { + return NewBatchCompleter(riverinternaltest.BaseServiceArchetype(b), exec) + }) +} + +func BenchmarkInlineCompleter(b *testing.B) { + benchmarkCompleter(b, func(exec riverdriver.Executor) JobCompleter { + return NewInlineCompleter(riverinternaltest.BaseServiceArchetype(b), exec) + }) +} + +func benchmarkCompleter( + b *testing.B, + newCompleter func(exec riverdriver.Executor) JobCompleter, +) { + b.Helper() + + ctx := context.Background() + + type testBundle struct { + exec riverdriver.Executor + jobs []*rivertype.JobRow + stats []jobstats.JobStatistics + } + + setup := func(b *testing.B) (JobCompleter, *testBundle) { + b.Helper() + + var ( + driver = riverpgxv5.New(riverinternaltest.TestDB(ctx, b)) + exec = driver.GetExecutor() + completer = newCompleter(exec) + ) + + require.NoError(b, completer.Start(ctx)) + + insertParams := make([]*riverdriver.JobInsertFastParams, b.N) + for i := 0; i < b.N; i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{}`), + Kind: "benchmark_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateRunning, + } + } + + _, err := exec.JobInsertFastMany(ctx, insertParams) + require.NoError(b, err) + + jobs, err := exec.JobGetByKindMany(ctx, []string{"benchmark_kind"}) + require.NoError(b, err) + + return completer, &testBundle{ + exec: exec, + jobs: jobs, + stats: make([]jobstats.JobStatistics, b.N), + } + } + + b.Run("Completion", func(b *testing.B) { + completer, bundle := setup(b) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCompleted(bundle.jobs[i].ID, time.Now())) + require.NoError(b, err) + } + + completer.Stop() + }) + + b.Run("RotatingStates", func(b *testing.B) { + completer, bundle := setup(b) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + switch i % 7 { + case 0: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCancelled(bundle.jobs[i].ID, time.Now(), []byte("{}"))) + require.NoError(b, err) + + case 1: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCompleted(bundle.jobs[i].ID, time.Now())) + require.NoError(b, err) + + case 2: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateDiscarded(bundle.jobs[i].ID, time.Now(), []byte("{}"))) + require.NoError(b, err) + + case 3: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorAvailable(bundle.jobs[i].ID, time.Now(), []byte("{}"))) + require.NoError(b, err) + + case 4: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorRetryable(bundle.jobs[i].ID, time.Now(), []byte("{}"))) + require.NoError(b, err) + + case 5: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateSnoozed(bundle.jobs[i].ID, time.Now(), 10)) + require.NoError(b, err) + + case 6: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateSnoozedAvailable(bundle.jobs[i].ID, time.Now(), 10)) + require.NoError(b, err) + + default: + panic("unexpected modulo result (did you update cases without changing the modulo divider or vice versa?") + } + } + + completer.Stop() + }) +} diff --git a/internal/jobcompleter/main_test.go b/internal/jobcompleter/main_test.go new file mode 100644 index 00000000..86be3eb2 --- /dev/null +++ b/internal/jobcompleter/main_test.go @@ -0,0 +1,11 @@ +package jobcompleter + +import ( + "testing" + + "github.com/riverqueue/river/internal/riverinternaltest" +) + +func TestMain(m *testing.M) { + riverinternaltest.WrapTestMain(m) +} diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 50c85021..24a55ef4 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -1060,6 +1060,70 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State) }) + t.Run("JobSetCompleteIfRunningMany", func(t *testing.T) { + t.Parallel() + + t.Run("CompletesRunningJobs", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + now := time.Now().UTC() + + job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + jobsAfter, err := exec.JobSetCompleteIfRunningMany(ctx, &riverdriver.JobSetCompleteIfRunningManyParams{ + ID: []int64{job1.ID, job2.ID}, + FinalizedAt: now, + }) + require.NoError(t, err) + for _, jobAfter := range jobsAfter { + require.Equal(t, rivertype.JobStateCompleted, jobAfter.State) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + } + + job1Updated, err := exec.JobGetByID(ctx, job1.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCompleted, job1Updated.State) + job2Updated, err := exec.JobGetByID(ctx, job1.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCompleted, job2Updated.State) + }) + + t.Run("DoesNotCompleteJobsInNonRunningStates", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + now := time.Now().UTC() + + job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRetryable)}) + job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + + jobsAfter, err := exec.JobSetCompleteIfRunningMany(ctx, &riverdriver.JobSetCompleteIfRunningManyParams{ + ID: []int64{job1.ID, job2.ID, job3.ID}, + FinalizedAt: now, + }) + require.NoError(t, err) + for _, jobAfter := range jobsAfter { + require.NotEqual(t, rivertype.JobStateCompleted, jobAfter.State) + require.Nil(t, jobAfter.FinalizedAt) + } + + job1Updated, err := exec.JobGetByID(ctx, job1.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, job1Updated.State) + job2Updated, err := exec.JobGetByID(ctx, job2.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, job2Updated.State) + job3Updated, err := exec.JobGetByID(ctx, job3.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateScheduled, job3Updated.State) + }) + }) + t.Run("JobSetStateIfRunning_JobSetStateCompleted", func(t *testing.T) { t.Parallel() diff --git a/internal/riverinternaltest/testfactory/test_factory.go b/internal/riverinternaltest/testfactory/test_factory.go index ffd4e9aa..6b4a4891 100644 --- a/internal/riverinternaltest/testfactory/test_factory.go +++ b/internal/riverinternaltest/testfactory/test_factory.go @@ -33,8 +33,8 @@ type JobOpts struct { Tags []string } -func Job(ctx context.Context, t *testing.T, exec riverdriver.Executor, opts *JobOpts) *rivertype.JobRow { - t.Helper() +func Job(ctx context.Context, tb testing.TB, exec riverdriver.Executor, opts *JobOpts) *rivertype.JobRow { + tb.Helper() encodedArgs := opts.EncodedArgs if opts.EncodedArgs == nil { @@ -67,7 +67,7 @@ func Job(ctx context.Context, t *testing.T, exec riverdriver.Executor, opts *Job State: ptrutil.ValOrDefault(opts.State, rivertype.JobStateAvailable), Tags: tags, }) - require.NoError(t, err) + require.NoError(tb, err) return job } @@ -78,8 +78,8 @@ type LeaderOpts struct { Name *string } -func Leader(ctx context.Context, t *testing.T, exec riverdriver.Executor, opts *LeaderOpts) *riverdriver.Leader { - t.Helper() +func Leader(ctx context.Context, tb testing.TB, exec riverdriver.Executor, opts *LeaderOpts) *riverdriver.Leader { + tb.Helper() leader, err := exec.LeaderInsert(ctx, &riverdriver.LeaderInsertParams{ ElectedAt: opts.ElectedAt, @@ -88,7 +88,7 @@ func Leader(ctx context.Context, t *testing.T, exec riverdriver.Executor, opts * Name: ptrutil.ValOrDefault(opts.Name, "default"), TTL: 10 * time.Second, }) - require.NoError(t, err) + require.NoError(tb, err) return leader } @@ -96,13 +96,13 @@ type MigrationOpts struct { Version *int } -func Migration(ctx context.Context, t *testing.T, exec riverdriver.Executor, opts *MigrationOpts) *riverdriver.Migration { - t.Helper() +func Migration(ctx context.Context, tb testing.TB, exec riverdriver.Executor, opts *MigrationOpts) *riverdriver.Migration { + tb.Helper() migration, err := exec.MigrationInsertMany(ctx, []int{ ptrutil.ValOrDefaultFunc(opts.Version, nextSeq), }) - require.NoError(t, err) + require.NoError(tb, err) return migration[0] } diff --git a/job_executor.go b/job_executor.go index 35aebb5c..87ea382b 100644 --- a/job_executor.go +++ b/job_executor.go @@ -262,7 +262,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) } else { params = riverdriver.JobSetStateSnoozed(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1) } - if err := e.Completer.JobSetStateIfRunning(e.stats, params); err != nil { + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, params); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Error snoozing job", slog.Int64("job_id", e.JobRow.ID), ) @@ -275,7 +275,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) return } - if err := e.Completer.JobSetStateIfRunning(e.stats, riverdriver.JobSetStateCompleted(e.JobRow.ID, e.TimeNowUTC())); err != nil { + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCompleted(e.JobRow.ID, e.TimeNowUTC())); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Error completing job", slog.String("err", err.Error()), slog.Int64("job_id", e.JobRow.ID), @@ -326,14 +326,14 @@ func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) { now := time.Now() if cancelJob { - if err := e.Completer.JobSetStateIfRunning(e.stats, riverdriver.JobSetStateCancelled(e.JobRow.ID, now, errData)); err != nil { + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCancelled(e.JobRow.ID, now, errData)); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Failed to cancel job and report error", logAttrs...) } return } if e.JobRow.Attempt >= e.JobRow.MaxAttempts { - if err := e.Completer.JobSetStateIfRunning(e.stats, riverdriver.JobSetStateDiscarded(e.JobRow.ID, now, errData)); err != nil { + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateDiscarded(e.JobRow.ID, now, errData)); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Failed to discard job and report error", logAttrs...) } return @@ -367,7 +367,7 @@ func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) { } else { params = riverdriver.JobSetStateErrorRetryable(e.JobRow.ID, nextRetryScheduledAt, errData) } - if err := e.Completer.JobSetStateIfRunning(e.stats, params); err != nil { + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, params); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Failed to report error for job", logAttrs...) } } diff --git a/job_executor_test.go b/job_executor_test.go index 3f4a4cf3..5c98cb2e 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -116,7 +116,7 @@ func TestJobExecutor_Execute(t *testing.T) { ctx := context.Background() type testBundle struct { - completer *jobcompleter.InlineJobCompleter + completer *jobcompleter.InlineCompleter exec riverdriver.Executor errorHandler *testErrorHandler getUpdatesAndStop func() []jobcompleter.CompleterJobUpdated @@ -139,7 +139,7 @@ func TestJobExecutor_Execute(t *testing.T) { }) getJobUpdates := func() []jobcompleter.CompleterJobUpdated { - completer.Wait() + completer.Stop() return updates } t.Cleanup(func() { _ = getJobUpdates() }) @@ -206,7 +206,7 @@ func TestJobExecutor_Execute(t *testing.T) { }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -232,7 +232,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -256,7 +256,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -276,7 +276,7 @@ func TestJobExecutor_Execute(t *testing.T) { { executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -295,7 +295,7 @@ func TestJobExecutor_Execute(t *testing.T) { { executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -315,7 +315,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -335,7 +335,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return cancelErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -358,7 +358,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return cancelErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -378,7 +378,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return cancelErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -398,7 +398,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -418,7 +418,7 @@ func TestJobExecutor_Execute(t *testing.T) { }).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -436,7 +436,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -457,7 +457,7 @@ func TestJobExecutor_Execute(t *testing.T) { } executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -478,7 +478,7 @@ func TestJobExecutor_Execute(t *testing.T) { } executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -499,7 +499,7 @@ func TestJobExecutor_Execute(t *testing.T) { } executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -515,7 +515,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -536,7 +536,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -554,7 +554,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -574,7 +574,7 @@ func TestJobExecutor_Execute(t *testing.T) { } executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -594,7 +594,7 @@ func TestJobExecutor_Execute(t *testing.T) { } executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -614,7 +614,7 @@ func TestJobExecutor_Execute(t *testing.T) { } executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -634,7 +634,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.CancelFunc = cancelFunc executor.Execute(workCtx) - executor.Completer.Wait() + executor.Completer.Stop() require.ErrorIs(t, context.Cause(workCtx), errExecutorDefaultCancel) }) @@ -664,7 +664,7 @@ func TestJobExecutor_Execute(t *testing.T) { t.Cleanup(func() { cancelFunc(nil) }) executor.Execute(workCtx) - executor.Completer.Wait() + executor.Completer.Stop() jobRow, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) diff --git a/producer.go b/producer.go index afe774de..b8a5b8a3 100644 --- a/producer.go +++ b/producer.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "log/slog" "sync/atomic" "time" @@ -337,6 +338,8 @@ func (p *producer) dispatchWork(count int, jobsFetchedCh chan<- producerFetchRes jobsFetchedCh <- producerFetchResult{err: err} return } + p.Logger.Info(fmt.Sprintf("--- fetched %d jobs", len(jobs))) + jobsFetchedCh <- producerFetchResult{jobs: jobs} } diff --git a/producer_test.go b/producer_test.go index b11d9f0c..8b20ae94 100644 --- a/producer_test.go +++ b/producer_test.go @@ -49,7 +49,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { listener := dbDriver.GetListener() completer := jobcompleter.NewInlineCompleter(archetype, exec) - t.Cleanup(completer.Wait) + t.Cleanup(completer.Stop) type WithJobNumArgs struct { JobArgsReflectKind[WithJobNumArgs] diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index d107fe9d..49611152 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -90,6 +90,7 @@ type Executor interface { JobRescueMany(ctx context.Context, params *JobRescueManyParams) (*struct{}, error) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow, error) JobSchedule(ctx context.Context, params *JobScheduleParams) (int, error) + JobSetCompleteIfRunningMany(ctx context.Context, params *JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*rivertype.JobRow, error) JobUpdate(ctx context.Context, params *JobUpdateParams) (*rivertype.JobRow, error) LeaderAttemptElect(ctx context.Context, params *LeaderElectParams) (bool, error) @@ -232,9 +233,16 @@ type JobScheduleParams struct { Now time.Time } -// JobSetStateIfRunningParams are parameters to update the state of a currently running -// job. Use one of the constructors below to ensure a correct combination of -// parameters. +// JobSetCompleteIfRunningManyParams are parameters to set many running jobs to +// `complete` all at once for improved throughput and efficiency. +type JobSetCompleteIfRunningManyParams struct { + ID []int64 + FinalizedAt time.Time +} + +// JobSetStateIfRunningParams are parameters to update the state of a currently +// running job. Use one of the constructors below to ensure a correct +// combination of parameters. type JobSetStateIfRunningParams struct { ID int64 ErrData []byte diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 5b439369..5414f504 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -738,11 +738,82 @@ func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobSchedulePara return count, err } +const jobSetCompleteIfRunningMany = `-- name: JobSetCompleteIfRunningMany :many +WITH job_to_update AS ( + SELECT id + FROM river_job + WHERE id = any($1::bigint[]) + FOR UPDATE +), +updated_job AS ( + UPDATE river_job + SET + finalized_at = $2, + state = 'completed' + FROM job_to_update + WHERE river_job.id = job_to_update.id + AND river_job.state = 'running'::river_job_state + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags +) +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM river_job +WHERE id = any($1::bigint[]) + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM updated_job +` + +type JobSetCompleteIfRunningManyParams struct { + ID []int64 + FinalizedAt *time.Time +} + +func (q *Queries) JobSetCompleteIfRunningMany(ctx context.Context, db DBTX, arg *JobSetCompleteIfRunningManyParams) ([]*RiverJob, error) { + rows, err := db.QueryContext(ctx, jobSetCompleteIfRunningMany, pq.Array(arg.ID), arg.FinalizedAt) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + pq.Array(&i.AttemptedBy), + &i.CreatedAt, + pq.Array(&i.Errors), + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + pq.Array(&i.Tags), + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobSetStateIfRunning = `-- name: JobSetStateIfRunning :one WITH job_to_update AS ( SELECT - id, - $1::river_job_state IN ('retryable'::river_job_state, 'scheduled'::river_job_state) AND metadata ? 'cancel_attempted_at' AS should_cancel + id, + $1::river_job_state IN ('retryable'::river_job_state, 'scheduled'::river_job_state) AND metadata ? 'cancel_attempted_at' AS should_cancel FROM river_job WHERE id = $2::bigint FOR UPDATE @@ -750,17 +821,17 @@ WITH job_to_update AS ( updated_job AS ( UPDATE river_job SET - state = CASE WHEN should_cancel THEN 'cancelled'::river_job_state - ELSE $1::river_job_state END, - finalized_at = CASE WHEN should_cancel THEN now() - WHEN $3::boolean THEN $4 - ELSE finalized_at END, - errors = CASE WHEN $5::boolean THEN array_append(errors, $6::jsonb) - ELSE errors END, - max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 - ELSE max_attempts END, - scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz - ELSE scheduled_at END + state = CASE WHEN should_cancel THEN 'cancelled'::river_job_state + ELSE $1::river_job_state END, + finalized_at = CASE WHEN should_cancel THEN now() + WHEN $3::boolean THEN $4 + ELSE finalized_at END, + errors = CASE WHEN $5::boolean THEN array_append(errors, $6::jsonb) + ELSE errors END, + max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 + ELSE max_attempts END, + scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz + ELSE scheduled_at END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running'::river_job_state diff --git a/riverdriver/riverdatabasesql/river_database_sql.go b/riverdriver/riverdatabasesql/river_database_sql.go index 43187879..7afe74e1 100644 --- a/riverdriver/riverdatabasesql/river_database_sql.go +++ b/riverdriver/riverdatabasesql/river_database_sql.go @@ -133,6 +133,10 @@ func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobSched return 0, riverdriver.ErrNotImplemented } +func (e *Executor) JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + return nil, riverdriver.ErrNotImplemented +} + func (e *Executor) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { return nil, riverdriver.ErrNotImplemented } diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 19024be8..38fcc9c1 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -292,11 +292,36 @@ FROM ( FROM river_job_scheduled ) AS notifications_sent; +-- name: JobSetCompleteIfRunningMany :many +WITH job_to_update AS ( + SELECT id + FROM river_job + WHERE id = any(@id::bigint[]) + FOR UPDATE +), +updated_job AS ( + UPDATE river_job + SET + finalized_at = @finalized_at, + state = 'completed' + FROM job_to_update + WHERE river_job.id = job_to_update.id + AND river_job.state = 'running'::river_job_state + RETURNING river_job.* +) +SELECT * +FROM river_job +WHERE id = any(@id::bigint[]) + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT * +FROM updated_job; + -- name: JobSetStateIfRunning :one WITH job_to_update AS ( SELECT - id, - @state::river_job_state IN ('retryable'::river_job_state, 'scheduled'::river_job_state) AND metadata ? 'cancel_attempted_at' AS should_cancel + id, + @state::river_job_state IN ('retryable'::river_job_state, 'scheduled'::river_job_state) AND metadata ? 'cancel_attempted_at' AS should_cancel FROM river_job WHERE id = @id::bigint FOR UPDATE @@ -304,17 +329,17 @@ WITH job_to_update AS ( updated_job AS ( UPDATE river_job SET - state = CASE WHEN should_cancel THEN 'cancelled'::river_job_state - ELSE @state::river_job_state END, - finalized_at = CASE WHEN should_cancel THEN now() - WHEN @finalized_at_do_update::boolean THEN @finalized_at - ELSE finalized_at END, - errors = CASE WHEN @error_do_update::boolean THEN array_append(errors, @error::jsonb) - ELSE errors END, - max_attempts = CASE WHEN NOT should_cancel AND @max_attempts_update::boolean THEN @max_attempts - ELSE max_attempts END, - scheduled_at = CASE WHEN NOT should_cancel AND @scheduled_at_do_update::boolean THEN sqlc.narg('scheduled_at')::timestamptz - ELSE scheduled_at END + state = CASE WHEN should_cancel THEN 'cancelled'::river_job_state + ELSE @state::river_job_state END, + finalized_at = CASE WHEN should_cancel THEN now() + WHEN @finalized_at_do_update::boolean THEN @finalized_at + ELSE finalized_at END, + errors = CASE WHEN @error_do_update::boolean THEN array_append(errors, @error::jsonb) + ELSE errors END, + max_attempts = CASE WHEN NOT should_cancel AND @max_attempts_update::boolean THEN @max_attempts + ELSE max_attempts END, + scheduled_at = CASE WHEN NOT should_cancel AND @scheduled_at_do_update::boolean THEN sqlc.narg('scheduled_at')::timestamptz + ELSE scheduled_at END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running'::river_job_state diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index b38e1efe..f18e4ff7 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -723,11 +723,79 @@ func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobSchedulePara return count, err } +const jobSetCompleteIfRunningMany = `-- name: JobSetCompleteIfRunningMany :many +WITH job_to_update AS ( + SELECT id + FROM river_job + WHERE id = any($1::bigint[]) + FOR UPDATE +), +updated_job AS ( + UPDATE river_job + SET + finalized_at = $2, + state = 'completed' + FROM job_to_update + WHERE river_job.id = job_to_update.id + AND river_job.state = 'running'::river_job_state + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags +) +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM river_job +WHERE id = any($1::bigint[]) + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM updated_job +` + +type JobSetCompleteIfRunningManyParams struct { + ID []int64 + FinalizedAt *time.Time +} + +func (q *Queries) JobSetCompleteIfRunningMany(ctx context.Context, db DBTX, arg *JobSetCompleteIfRunningManyParams) ([]*RiverJob, error) { + rows, err := db.Query(ctx, jobSetCompleteIfRunningMany, arg.ID, arg.FinalizedAt) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobSetStateIfRunning = `-- name: JobSetStateIfRunning :one WITH job_to_update AS ( SELECT - id, - $1::river_job_state IN ('retryable'::river_job_state, 'scheduled'::river_job_state) AND metadata ? 'cancel_attempted_at' AS should_cancel + id, + $1::river_job_state IN ('retryable'::river_job_state, 'scheduled'::river_job_state) AND metadata ? 'cancel_attempted_at' AS should_cancel FROM river_job WHERE id = $2::bigint FOR UPDATE @@ -735,17 +803,17 @@ WITH job_to_update AS ( updated_job AS ( UPDATE river_job SET - state = CASE WHEN should_cancel THEN 'cancelled'::river_job_state - ELSE $1::river_job_state END, - finalized_at = CASE WHEN should_cancel THEN now() - WHEN $3::boolean THEN $4 - ELSE finalized_at END, - errors = CASE WHEN $5::boolean THEN array_append(errors, $6::jsonb) - ELSE errors END, - max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 - ELSE max_attempts END, - scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz - ELSE scheduled_at END + state = CASE WHEN should_cancel THEN 'cancelled'::river_job_state + ELSE $1::river_job_state END, + finalized_at = CASE WHEN should_cancel THEN now() + WHEN $3::boolean THEN $4 + ELSE finalized_at END, + errors = CASE WHEN $5::boolean THEN array_append(errors, $6::jsonb) + ELSE errors END, + max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 + ELSE max_attempts END, + scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz + ELSE scheduled_at END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running'::river_job_state diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index a0de8222..c3d4d3b6 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -294,6 +294,17 @@ func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobSched return int(numScheduled), interpretError(err) } +func (e *Executor) JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + jobs, err := e.queries.JobSetCompleteIfRunningMany(ctx, e.dbtx, &dbsqlc.JobSetCompleteIfRunningManyParams{ + ID: params.ID, + FinalizedAt: ¶ms.FinalizedAt, + }) + if err != nil { + return nil, interpretError(err) + } + return mapSlice(jobs, jobRowFromInternal), nil +} + func (e *Executor) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { var maxAttempts int16 if params.MaxAttempts != nil {