diff --git a/CHANGELOG.md b/CHANGELOG.md index d2e767ee..a9692074 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- The default max attempts of 25 can now be customized on a per-client basis using `Config.MaxAttempts`. This is in addition to the ability to customize at the job type level with `JobArgs`, or on a per-job basis using `InsertOpts`. [PR #383](https://github.com/riverqueue/river/pull/383). + ## [0.6.1] - 2024-05-21 ### Fixed diff --git a/client.go b/client.go index e0749bb5..b0686d9c 100644 --- a/client.go +++ b/client.go @@ -138,6 +138,14 @@ type Config struct { // or higher. Logger *slog.Logger + // MaxAttempts is the default number of times a job will be retried before + // being discarded. This value is applied to all jobs by default, and can be + // overridden on individual job types on the JobArgs or on a per-job basis at + // insertion time. + // + // If not specified, defaults to 25 (MaxAttemptsDefault). + MaxAttempts int + // PeriodicJobs are a set of periodic jobs to run at the specified intervals // in the client. PeriodicJobs []*PeriodicJob @@ -238,6 +246,9 @@ func (c *Config) validate() error { if c.JobTimeout < -1 { return errors.New("JobTimeout cannot be negative, except for -1 (infinite)") } + if c.MaxAttempts < 0 { + return errors.New("MaxAttempts cannot be less than zero") + } if c.RescueStuckJobsAfter < 0 { return errors.New("RescueStuckJobsAfter cannot be less than zero") } @@ -433,6 +444,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client ID: valutil.ValOrDefaultFunc(config.ID, func() string { return defaultClientID(time.Now().UTC()) }), JobTimeout: valutil.ValOrDefault(config.JobTimeout, JobTimeoutDefault), Logger: logger, + MaxAttempts: valutil.ValOrDefault(config.MaxAttempts, MaxAttemptsDefault), PeriodicJobs: config.PeriodicJobs, PollOnly: config.PollOnly, Queues: config.Queues, @@ -568,7 +580,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client maintenanceServices = append(maintenanceServices, periodicJobEnqueuer) client.testSignals.periodicJobEnqueuer = &periodicJobEnqueuer.TestSignals - client.periodicJobs = newPeriodicJobBundle(periodicJobEnqueuer) + client.periodicJobs = newPeriodicJobBundle(client.config, periodicJobEnqueuer) client.periodicJobs.AddMany(config.PeriodicJobs) } @@ -1237,7 +1249,7 @@ func (c *Client[TTx]) ID() string { return c.config.ID } -func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { +func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { encodedArgs, err := json.Marshal(args) if err != nil { return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err) @@ -1252,7 +1264,7 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*rive jobInsertOpts = argsWithOpts.InsertOpts() } - maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, rivercommon.MaxAttemptsDefault) + maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts) priority := valutil.FirstNonZero(insertOpts.Priority, jobInsertOpts.Priority, rivercommon.PriorityDefault) queue := valutil.FirstNonZero(insertOpts.Queue, jobInsertOpts.Queue, rivercommon.QueueDefault) @@ -1351,7 +1363,7 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg return nil, err } - params, uniqueOpts, err := insertParamsFromArgsAndOptions(args, opts) + params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(c.config, args, opts) if err != nil { return nil, err } @@ -1507,7 +1519,7 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive } var err error - insertParams[i], _, err = insertParamsFromArgsAndOptions(param.Args, param.InsertOpts) + insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(c.config, param.Args, param.InsertOpts) if err != nil { return nil, err } diff --git a/client_test.go b/client_test.go index ce4e47c7..2525cc11 100644 --- a/client_test.go +++ b/client_test.go @@ -140,6 +140,7 @@ func newTestConfig(t *testing.T, callback callbackFunc) *Config { FetchCooldown: 20 * time.Millisecond, FetchPollInterval: 50 * time.Millisecond, Logger: riverinternaltest.Logger(t), + MaxAttempts: MaxAttemptsDefault, Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 50}}, Workers: workers, disableStaggerStart: true, // disables staggered start in maintenance services @@ -2122,7 +2123,7 @@ func Test_Client_ErrorHandler(t *testing.T) { // Bypass the normal Insert function because that will error on an // unknown job. - insertParams, _, err := insertParamsFromArgsAndOptions(unregisteredJobArgs{}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil) require.NoError(t, err) _, err = client.driver.GetExecutor().JobInsertFast(ctx, insertParams) require.NoError(t, err) @@ -3565,7 +3566,7 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) { subscribeChan, cancel := client.Subscribe(EventKindJobFailed) t.Cleanup(cancel) - insertParams, _, err := insertParamsFromArgsAndOptions(unregisteredJobArgs{}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil) require.NoError(err) insertedJob, err := client.driver.GetExecutor().JobInsertFast(ctx, insertParams) require.NoError(err) @@ -3714,6 +3715,7 @@ func Test_NewClient_Defaults(t *testing.T) { require.Equal(t, FetchPollIntervalDefault, client.config.FetchPollInterval) require.Equal(t, JobTimeoutDefault, client.config.JobTimeout) require.NotZero(t, client.baseService.Logger) + require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts) require.IsType(t, &DefaultClientRetryPolicy{}, client.config.RetryPolicy) require.False(t, client.config.disableStaggerStart) } @@ -3742,6 +3744,7 @@ func Test_NewClient_Overrides(t *testing.T) { FetchPollInterval: 124 * time.Millisecond, JobTimeout: 125 * time.Millisecond, Logger: logger, + MaxAttempts: 5, Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}}, RetryPolicy: retryPolicy, Workers: workers, @@ -3764,6 +3767,7 @@ func Test_NewClient_Overrides(t *testing.T) { require.Equal(t, 124*time.Millisecond, client.config.FetchPollInterval) require.Equal(t, 125*time.Millisecond, client.config.JobTimeout) require.Equal(t, logger, client.baseService.Logger) + require.Equal(t, 5, client.config.MaxAttempts) require.Equal(t, retryPolicy, client.config.RetryPolicy) require.True(t, client.config.disableStaggerStart) } @@ -3891,6 +3895,23 @@ func Test_NewClient_Validations(t *testing.T) { config.JobTimeout = 7 * 24 * time.Hour }, }, + { + name: "MaxAttempts cannot be less than zero", + configFunc: func(config *Config) { + config.MaxAttempts = -1 + }, + wantErr: errors.New("MaxAttempts cannot be less than zero"), + }, + { + name: "MaxAttempts of zero applies DefaultMaxAttempts", + configFunc: func(config *Config) { + config.MaxAttempts = 0 + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + // A client config value of zero gets interpreted as the default max attempts: + require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts) + }, + }, { name: "RescueStuckJobsAfter may be overridden", configFunc: func(config *Config) { @@ -4138,14 +4159,16 @@ func TestClient_JobTimeout(t *testing.T) { func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Parallel() + config := newTestConfig(t, nil) + t.Run("Defaults", func(t *testing.T) { t.Parallel() - insertParams, uniqueOpts, err := insertParamsFromArgsAndOptions(noOpArgs{}, nil) + insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, nil) require.NoError(t, err) require.Equal(t, `{"name":""}`, string(insertParams.EncodedArgs)) require.Equal(t, (noOpArgs{}).Kind(), insertParams.Kind) - require.Equal(t, rivercommon.MaxAttemptsDefault, insertParams.MaxAttempts) + require.Equal(t, config.MaxAttempts, insertParams.MaxAttempts) require.Equal(t, rivercommon.PriorityDefault, insertParams.Priority) require.Equal(t, QueueDefault, insertParams.Queue) require.Nil(t, insertParams.ScheduledAt) @@ -4154,6 +4177,18 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { require.True(t, uniqueOpts.IsEmpty()) }) + t.Run("ConfigOverrides", func(t *testing.T) { + t.Parallel() + + overrideConfig := &Config{ + MaxAttempts: 34, + } + + insertParams, _, err := insertParamsFromConfigArgsAndOptions(overrideConfig, noOpArgs{}, nil) + require.NoError(t, err) + require.Equal(t, overrideConfig.MaxAttempts, insertParams.MaxAttempts) + }) + t.Run("InsertOptsOverrides", func(t *testing.T) { t.Parallel() @@ -4164,7 +4199,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { ScheduledAt: time.Now().Add(time.Hour), Tags: []string{"tag1", "tag2"}, } - insertParams, _, err := insertParamsFromArgsAndOptions(noOpArgs{}, opts) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, opts) require.NoError(t, err) require.Equal(t, 42, insertParams.MaxAttempts) require.Equal(t, 2, insertParams.Priority) @@ -4176,7 +4211,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Run("WorkerInsertOptsOverrides", func(t *testing.T) { t.Parallel() - insertParams, _, err := insertParamsFromArgsAndOptions(&customInsertOptsJobArgs{}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, &customInsertOptsJobArgs{}, nil) require.NoError(t, err) // All these come from overrides in customInsertOptsJobArgs's definition: require.Equal(t, 42, insertParams.MaxAttempts) @@ -4195,7 +4230,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted}, } - _, internalUniqueOpts, err := insertParamsFromArgsAndOptions(noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}) + _, internalUniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}) require.NoError(t, err) require.Equal(t, uniqueOpts.ByArgs, internalUniqueOpts.ByArgs) require.Equal(t, uniqueOpts.ByPeriod, internalUniqueOpts.ByPeriod) @@ -4206,7 +4241,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Run("PriorityIsLimitedTo4", func(t *testing.T) { t.Parallel() - insertParams, _, err := insertParamsFromArgsAndOptions(noOpArgs{}, &InsertOpts{Priority: 5}) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{Priority: 5}) require.ErrorContains(t, err, "priority must be between 1 and 4") require.Nil(t, insertParams) }) @@ -4215,7 +4250,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Parallel() args := timeoutTestArgs{TimeoutValue: time.Hour} - insertParams, _, err := insertParamsFromArgsAndOptions(args, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, args, nil) require.NoError(t, err) require.Equal(t, `{"timeout_value":3600000000000}`, string(insertParams.EncodedArgs)) }) @@ -4226,7 +4261,8 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { // Ensure that unique opts are validated. No need to be exhaustive here // since we already have tests elsewhere for that. Just make sure validation // is running. - insertParams, _, err := insertParamsFromArgsAndOptions( + insertParams, _, err := insertParamsFromConfigArgsAndOptions( + config, noOpArgs{}, &InsertOpts{UniqueOpts: UniqueOpts{ByPeriod: 1 * time.Millisecond}}, ) diff --git a/job_executor.go b/job_executor.go index 740d8add..33747bf6 100644 --- a/job_executor.go +++ b/job_executor.go @@ -247,7 +247,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) if res.Err != nil && errors.As(res.Err, &snoozeErr) { e.Logger.InfoContext(ctx, e.Name+": Job snoozed", slog.Int64("job_id", e.JobRow.ID), - slog.String("job_kind", e.JobRow.Kind), + slog.String("job_kind", e.JobRow.Kind), slog.Duration("duration", snoozeErr.duration), ) nextAttemptScheduledAt := time.Now().Add(snoozeErr.duration) diff --git a/periodic_job.go b/periodic_job.go index 86181a6b..c2cea8df 100644 --- a/periodic_job.go +++ b/periodic_job.go @@ -93,11 +93,13 @@ func (s *periodicIntervalSchedule) Next(t time.Time) time.Time { // made accessible through Client, where new periodic jobs can be configured, // and only ones removed. type PeriodicJobBundle struct { + clientConfig *Config periodicJobEnqueuer *maintenance.PeriodicJobEnqueuer } -func newPeriodicJobBundle(periodicJobEnqueuer *maintenance.PeriodicJobEnqueuer) *PeriodicJobBundle { +func newPeriodicJobBundle(config *Config, periodicJobEnqueuer *maintenance.PeriodicJobEnqueuer) *PeriodicJobBundle { return &PeriodicJobBundle{ + clientConfig: config, periodicJobEnqueuer: periodicJobEnqueuer, } } @@ -178,10 +180,11 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe if periodicJob.opts != nil { opts = periodicJob.opts } + args, options := periodicJob.constructorFunc() return &maintenance.PeriodicJob{ ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { - return insertParamsFromArgsAndOptions(periodicJob.constructorFunc()) + return insertParamsFromConfigArgsAndOptions(b.clientConfig, args, options) }, RunOnStart: opts.RunOnStart, ScheduleFunc: periodicJob.scheduleFunc.Next, diff --git a/producer_test.go b/producer_test.go index fd54be7a..2e82653c 100644 --- a/producer_test.go +++ b/producer_test.go @@ -49,6 +49,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { archetype := riverinternaltest.BaseServiceArchetype(t) + config := newTestConfig(t, nil) dbDriver := riverpgxv5.New(dbPool) exec := dbDriver.GetExecutor() listener := dbDriver.GetListener() @@ -101,7 +102,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { params := make([]*riverdriver.JobInsertFastParams, maxJobCount) for i := range params { - insertParams, _, err := insertParamsFromArgsAndOptions(WithJobNumArgs{JobNum: i}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, WithJobNumArgs{JobNum: i}, nil) require.NoError(err) params[i] = insertParams @@ -230,6 +231,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin type testBundle struct { completer jobcompleter.JobCompleter + config *Config exec riverdriver.Executor jobUpdates chan jobcompleter.CompleterJobUpdated workers *Workers @@ -240,6 +242,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin producer := makeProducer(ctx, t) producer.testSignals.Init() + config := newTestConfig(t, nil) jobUpdates := make(chan jobcompleter.CompleterJobUpdated, 10) producer.completer.Subscribe(func(update jobcompleter.CompleterJobUpdated) { @@ -248,19 +251,20 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin return producer, &testBundle{ completer: producer.completer, + config: config, exec: producer.exec, jobUpdates: jobUpdates, workers: producer.workers, } } - mustInsert := func(ctx context.Context, t *testing.T, exec riverdriver.Executor, args JobArgs) { + mustInsert := func(ctx context.Context, t *testing.T, bundle *testBundle, args JobArgs) { t.Helper() - insertParams, _, err := insertParamsFromArgsAndOptions(args, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(bundle.config, args, nil) require.NoError(t, err) - _, err = exec.JobInsertFast(ctx, insertParams) + _, err = bundle.exec.JobInsertFast(ctx, insertParams) require.NoError(t, err) } @@ -285,7 +289,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin producer, bundle := setup(t) AddWorker(bundle.workers, &noOpWorker{}) - mustInsert(ctx, t, bundle.exec, &noOpArgs{}) + mustInsert(ctx, t, bundle, &noOpArgs{}) startProducer(t, ctx, ctx, producer) @@ -320,8 +324,8 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin producer, bundle := setup(t) AddWorker(bundle.workers, &noOpWorker{}) - mustInsert(ctx, t, bundle.exec, &noOpArgs{}) - mustInsert(ctx, t, bundle.exec, &callbackArgs{}) // not registered + mustInsert(ctx, t, bundle, &noOpArgs{}) + mustInsert(ctx, t, bundle, &callbackArgs{}) // not registered startProducer(t, ctx, ctx, producer) @@ -370,7 +374,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin workCtx, workCancel := context.WithCancel(ctx) defer workCancel() - mustInsert(ctx, t, bundle.exec, &JobArgs{}) + mustInsert(ctx, t, bundle, &JobArgs{}) startProducer(t, ctx, workCtx, producer) @@ -406,7 +410,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin })) for i := 0; i < numJobs; i++ { - mustInsert(ctx, t, bundle.exec, &JobArgs{}) + mustInsert(ctx, t, bundle, &JobArgs{}) } startProducer(t, ctx, ctx, producer) @@ -451,7 +455,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin PausedAt: ptrutil.Ptr(time.Now()), }) - mustInsert(ctx, t, bundle.exec, &noOpArgs{}) + mustInsert(ctx, t, bundle, &noOpArgs{}) startProducer(t, ctx, ctx, producer) @@ -470,7 +474,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin producer.config.QueuePollInterval = 50 * time.Millisecond AddWorker(bundle.workers, &noOpWorker{}) - mustInsert(ctx, t, bundle.exec, &noOpArgs{}) + mustInsert(ctx, t, bundle, &noOpArgs{}) startProducer(t, ctx, ctx, producer) @@ -487,7 +491,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin producer.testSignals.Paused.WaitOrTimeout() // Job should not be executed while paused: - mustInsert(ctx, t, bundle.exec, &noOpArgs{}) + mustInsert(ctx, t, bundle, &noOpArgs{}) select { case update := <-bundle.jobUpdates: