Skip to content

Commit

Permalink
customizable MaxAttempts default value (#383)
Browse files Browse the repository at this point in the history
This adds a `MaxAttempts` setting to `Config` to enable the default max
attempts to be customized (instead of the global default
`MaxAttemptsDefault` of 25).

Fixes #381.
  • Loading branch information
bgentry authored Jun 5, 2024
1 parent 8edd49c commit edb42d2
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 30 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- The default max attempts of 25 can now be customized on a per-client basis using `Config.MaxAttempts`. This is in addition to the ability to customize at the job type level with `JobArgs`, or on a per-job basis using `InsertOpts`. [PR #383](https://github.com/riverqueue/river/pull/383).

## [0.6.1] - 2024-05-21

### Fixed
Expand Down
22 changes: 17 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ type Config struct {
// or higher.
Logger *slog.Logger

// MaxAttempts is the default number of times a job will be retried before
// being discarded. This value is applied to all jobs by default, and can be
// overridden on individual job types on the JobArgs or on a per-job basis at
// insertion time.
//
// If not specified, defaults to 25 (MaxAttemptsDefault).
MaxAttempts int

// PeriodicJobs are a set of periodic jobs to run at the specified intervals
// in the client.
PeriodicJobs []*PeriodicJob
Expand Down Expand Up @@ -238,6 +246,9 @@ func (c *Config) validate() error {
if c.JobTimeout < -1 {
return errors.New("JobTimeout cannot be negative, except for -1 (infinite)")
}
if c.MaxAttempts < 0 {
return errors.New("MaxAttempts cannot be less than zero")
}
if c.RescueStuckJobsAfter < 0 {
return errors.New("RescueStuckJobsAfter cannot be less than zero")
}
Expand Down Expand Up @@ -433,6 +444,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
ID: valutil.ValOrDefaultFunc(config.ID, func() string { return defaultClientID(time.Now().UTC()) }),
JobTimeout: valutil.ValOrDefault(config.JobTimeout, JobTimeoutDefault),
Logger: logger,
MaxAttempts: valutil.ValOrDefault(config.MaxAttempts, MaxAttemptsDefault),
PeriodicJobs: config.PeriodicJobs,
PollOnly: config.PollOnly,
Queues: config.Queues,
Expand Down Expand Up @@ -568,7 +580,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
maintenanceServices = append(maintenanceServices, periodicJobEnqueuer)
client.testSignals.periodicJobEnqueuer = &periodicJobEnqueuer.TestSignals

client.periodicJobs = newPeriodicJobBundle(periodicJobEnqueuer)
client.periodicJobs = newPeriodicJobBundle(client.config, periodicJobEnqueuer)
client.periodicJobs.AddMany(config.PeriodicJobs)
}

Expand Down Expand Up @@ -1237,7 +1249,7 @@ func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err)
Expand All @@ -1252,7 +1264,7 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*rive
jobInsertOpts = argsWithOpts.InsertOpts()
}

maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, rivercommon.MaxAttemptsDefault)
maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts)
priority := valutil.FirstNonZero(insertOpts.Priority, jobInsertOpts.Priority, rivercommon.PriorityDefault)
queue := valutil.FirstNonZero(insertOpts.Queue, jobInsertOpts.Queue, rivercommon.QueueDefault)

Expand Down Expand Up @@ -1351,7 +1363,7 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
return nil, err
}

params, uniqueOpts, err := insertParamsFromArgsAndOptions(args, opts)
params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(c.config, args, opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1507,7 +1519,7 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive
}

var err error
insertParams[i], _, err = insertParamsFromArgsAndOptions(param.Args, param.InsertOpts)
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(c.config, param.Args, param.InsertOpts)
if err != nil {
return nil, err
}
Expand Down
56 changes: 46 additions & 10 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func newTestConfig(t *testing.T, callback callbackFunc) *Config {
FetchCooldown: 20 * time.Millisecond,
FetchPollInterval: 50 * time.Millisecond,
Logger: riverinternaltest.Logger(t),
MaxAttempts: MaxAttemptsDefault,
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 50}},
Workers: workers,
disableStaggerStart: true, // disables staggered start in maintenance services
Expand Down Expand Up @@ -2122,7 +2123,7 @@ func Test_Client_ErrorHandler(t *testing.T) {

// Bypass the normal Insert function because that will error on an
// unknown job.
insertParams, _, err := insertParamsFromArgsAndOptions(unregisteredJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil)
require.NoError(t, err)
_, err = client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
require.NoError(t, err)
Expand Down Expand Up @@ -3565,7 +3566,7 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) {
subscribeChan, cancel := client.Subscribe(EventKindJobFailed)
t.Cleanup(cancel)

insertParams, _, err := insertParamsFromArgsAndOptions(unregisteredJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil)
require.NoError(err)
insertedJob, err := client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
require.NoError(err)
Expand Down Expand Up @@ -3714,6 +3715,7 @@ func Test_NewClient_Defaults(t *testing.T) {
require.Equal(t, FetchPollIntervalDefault, client.config.FetchPollInterval)
require.Equal(t, JobTimeoutDefault, client.config.JobTimeout)
require.NotZero(t, client.baseService.Logger)
require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts)
require.IsType(t, &DefaultClientRetryPolicy{}, client.config.RetryPolicy)
require.False(t, client.config.disableStaggerStart)
}
Expand Down Expand Up @@ -3742,6 +3744,7 @@ func Test_NewClient_Overrides(t *testing.T) {
FetchPollInterval: 124 * time.Millisecond,
JobTimeout: 125 * time.Millisecond,
Logger: logger,
MaxAttempts: 5,
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
RetryPolicy: retryPolicy,
Workers: workers,
Expand All @@ -3764,6 +3767,7 @@ func Test_NewClient_Overrides(t *testing.T) {
require.Equal(t, 124*time.Millisecond, client.config.FetchPollInterval)
require.Equal(t, 125*time.Millisecond, client.config.JobTimeout)
require.Equal(t, logger, client.baseService.Logger)
require.Equal(t, 5, client.config.MaxAttempts)
require.Equal(t, retryPolicy, client.config.RetryPolicy)
require.True(t, client.config.disableStaggerStart)
}
Expand Down Expand Up @@ -3891,6 +3895,23 @@ func Test_NewClient_Validations(t *testing.T) {
config.JobTimeout = 7 * 24 * time.Hour
},
},
{
name: "MaxAttempts cannot be less than zero",
configFunc: func(config *Config) {
config.MaxAttempts = -1
},
wantErr: errors.New("MaxAttempts cannot be less than zero"),
},
{
name: "MaxAttempts of zero applies DefaultMaxAttempts",
configFunc: func(config *Config) {
config.MaxAttempts = 0
},
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
// A client config value of zero gets interpreted as the default max attempts:
require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts)
},
},
{
name: "RescueStuckJobsAfter may be overridden",
configFunc: func(config *Config) {
Expand Down Expand Up @@ -4138,14 +4159,16 @@ func TestClient_JobTimeout(t *testing.T) {
func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

config := newTestConfig(t, nil)

t.Run("Defaults", func(t *testing.T) {
t.Parallel()

insertParams, uniqueOpts, err := insertParamsFromArgsAndOptions(noOpArgs{}, nil)
insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, nil)
require.NoError(t, err)
require.Equal(t, `{"name":""}`, string(insertParams.EncodedArgs))
require.Equal(t, (noOpArgs{}).Kind(), insertParams.Kind)
require.Equal(t, rivercommon.MaxAttemptsDefault, insertParams.MaxAttempts)
require.Equal(t, config.MaxAttempts, insertParams.MaxAttempts)
require.Equal(t, rivercommon.PriorityDefault, insertParams.Priority)
require.Equal(t, QueueDefault, insertParams.Queue)
require.Nil(t, insertParams.ScheduledAt)
Expand All @@ -4154,6 +4177,18 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
require.True(t, uniqueOpts.IsEmpty())
})

t.Run("ConfigOverrides", func(t *testing.T) {
t.Parallel()

overrideConfig := &Config{
MaxAttempts: 34,
}

insertParams, _, err := insertParamsFromConfigArgsAndOptions(overrideConfig, noOpArgs{}, nil)
require.NoError(t, err)
require.Equal(t, overrideConfig.MaxAttempts, insertParams.MaxAttempts)
})

t.Run("InsertOptsOverrides", func(t *testing.T) {
t.Parallel()

Expand All @@ -4164,7 +4199,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ScheduledAt: time.Now().Add(time.Hour),
Tags: []string{"tag1", "tag2"},
}
insertParams, _, err := insertParamsFromArgsAndOptions(noOpArgs{}, opts)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, opts)
require.NoError(t, err)
require.Equal(t, 42, insertParams.MaxAttempts)
require.Equal(t, 2, insertParams.Priority)
Expand All @@ -4176,7 +4211,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Run("WorkerInsertOptsOverrides", func(t *testing.T) {
t.Parallel()

insertParams, _, err := insertParamsFromArgsAndOptions(&customInsertOptsJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, &customInsertOptsJobArgs{}, nil)
require.NoError(t, err)
// All these come from overrides in customInsertOptsJobArgs's definition:
require.Equal(t, 42, insertParams.MaxAttempts)
Expand All @@ -4195,7 +4230,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted},
}

_, internalUniqueOpts, err := insertParamsFromArgsAndOptions(noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
_, internalUniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
require.NoError(t, err)
require.Equal(t, uniqueOpts.ByArgs, internalUniqueOpts.ByArgs)
require.Equal(t, uniqueOpts.ByPeriod, internalUniqueOpts.ByPeriod)
Expand All @@ -4206,7 +4241,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Run("PriorityIsLimitedTo4", func(t *testing.T) {
t.Parallel()

insertParams, _, err := insertParamsFromArgsAndOptions(noOpArgs{}, &InsertOpts{Priority: 5})
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{Priority: 5})
require.ErrorContains(t, err, "priority must be between 1 and 4")
require.Nil(t, insertParams)
})
Expand All @@ -4215,7 +4250,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

args := timeoutTestArgs{TimeoutValue: time.Hour}
insertParams, _, err := insertParamsFromArgsAndOptions(args, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, args, nil)
require.NoError(t, err)
require.Equal(t, `{"timeout_value":3600000000000}`, string(insertParams.EncodedArgs))
})
Expand All @@ -4226,7 +4261,8 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
// Ensure that unique opts are validated. No need to be exhaustive here
// since we already have tests elsewhere for that. Just make sure validation
// is running.
insertParams, _, err := insertParamsFromArgsAndOptions(
insertParams, _, err := insertParamsFromConfigArgsAndOptions(
config,
noOpArgs{},
&InsertOpts{UniqueOpts: UniqueOpts{ByPeriod: 1 * time.Millisecond}},
)
Expand Down
2 changes: 1 addition & 1 deletion job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult)
if res.Err != nil && errors.As(res.Err, &snoozeErr) {
e.Logger.InfoContext(ctx, e.Name+": Job snoozed",
slog.Int64("job_id", e.JobRow.ID),
slog.String("job_kind", e.JobRow.Kind),
slog.String("job_kind", e.JobRow.Kind),
slog.Duration("duration", snoozeErr.duration),
)
nextAttemptScheduledAt := time.Now().Add(snoozeErr.duration)
Expand Down
7 changes: 5 additions & 2 deletions periodic_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ func (s *periodicIntervalSchedule) Next(t time.Time) time.Time {
// made accessible through Client, where new periodic jobs can be configured,
// and only ones removed.
type PeriodicJobBundle struct {
clientConfig *Config
periodicJobEnqueuer *maintenance.PeriodicJobEnqueuer
}

func newPeriodicJobBundle(periodicJobEnqueuer *maintenance.PeriodicJobEnqueuer) *PeriodicJobBundle {
func newPeriodicJobBundle(config *Config, periodicJobEnqueuer *maintenance.PeriodicJobEnqueuer) *PeriodicJobBundle {
return &PeriodicJobBundle{
clientConfig: config,
periodicJobEnqueuer: periodicJobEnqueuer,
}
}
Expand Down Expand Up @@ -178,10 +180,11 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe
if periodicJob.opts != nil {
opts = periodicJob.opts
}
args, options := periodicJob.constructorFunc()

return &maintenance.PeriodicJob{
ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
return insertParamsFromArgsAndOptions(periodicJob.constructorFunc())
return insertParamsFromConfigArgsAndOptions(b.clientConfig, args, options)
},
RunOnStart: opts.RunOnStart,
ScheduleFunc: periodicJob.scheduleFunc.Next,
Expand Down
Loading

0 comments on commit edb42d2

Please sign in to comment.