Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

customizable MaxAttempts default value #383

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- The default max attempts of 25 can now be customized on a per-client basis using `Config.MaxAttempts`. This is in addition to the ability to customize at the job type level with `JobArgs`, or on a per-job basis using `InsertOpts`. [PR #383](https://github.com/riverqueue/river/pull/383).

## [0.6.1] - 2024-05-21

### Fixed
Expand Down
22 changes: 17 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ type Config struct {
// or higher.
Logger *slog.Logger

// MaxAttempts is the default number of times a job will be retried before
// being discarded. This value is applied to all jobs by default, and can be
// overridden on individual job types on the JobArgs or on a per-job basis at
// insertion time.
//
// If not specified, defaults to 25 (MaxAttemptsDefault).
MaxAttempts int

// PeriodicJobs are a set of periodic jobs to run at the specified intervals
// in the client.
PeriodicJobs []*PeriodicJob
Expand Down Expand Up @@ -238,6 +246,9 @@ func (c *Config) validate() error {
if c.JobTimeout < -1 {
return errors.New("JobTimeout cannot be negative, except for -1 (infinite)")
}
if c.MaxAttempts < 0 {
return errors.New("MaxAttempts cannot be less than zero")
}
if c.RescueStuckJobsAfter < 0 {
return errors.New("RescueStuckJobsAfter cannot be less than zero")
}
Expand Down Expand Up @@ -433,6 +444,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
ID: valutil.ValOrDefaultFunc(config.ID, func() string { return defaultClientID(time.Now().UTC()) }),
JobTimeout: valutil.ValOrDefault(config.JobTimeout, JobTimeoutDefault),
Logger: logger,
MaxAttempts: valutil.ValOrDefault(config.MaxAttempts, MaxAttemptsDefault),
PeriodicJobs: config.PeriodicJobs,
PollOnly: config.PollOnly,
Queues: config.Queues,
Expand Down Expand Up @@ -568,7 +580,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
maintenanceServices = append(maintenanceServices, periodicJobEnqueuer)
client.testSignals.periodicJobEnqueuer = &periodicJobEnqueuer.TestSignals

client.periodicJobs = newPeriodicJobBundle(periodicJobEnqueuer)
client.periodicJobs = newPeriodicJobBundle(client.config, periodicJobEnqueuer)
client.periodicJobs.AddMany(config.PeriodicJobs)
}

Expand Down Expand Up @@ -1237,7 +1249,7 @@ func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err)
Expand All @@ -1252,7 +1264,7 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*rive
jobInsertOpts = argsWithOpts.InsertOpts()
}

maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, rivercommon.MaxAttemptsDefault)
maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts)
priority := valutil.FirstNonZero(insertOpts.Priority, jobInsertOpts.Priority, rivercommon.PriorityDefault)
queue := valutil.FirstNonZero(insertOpts.Queue, jobInsertOpts.Queue, rivercommon.QueueDefault)

Expand Down Expand Up @@ -1351,7 +1363,7 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
return nil, err
}

params, uniqueOpts, err := insertParamsFromArgsAndOptions(args, opts)
params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(c.config, args, opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1507,7 +1519,7 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive
}

var err error
insertParams[i], _, err = insertParamsFromArgsAndOptions(param.Args, param.InsertOpts)
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(c.config, param.Args, param.InsertOpts)
if err != nil {
return nil, err
}
Expand Down
56 changes: 46 additions & 10 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func newTestConfig(t *testing.T, callback callbackFunc) *Config {
FetchCooldown: 20 * time.Millisecond,
FetchPollInterval: 50 * time.Millisecond,
Logger: riverinternaltest.Logger(t),
MaxAttempts: MaxAttemptsDefault,
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 50}},
Workers: workers,
disableStaggerStart: true, // disables staggered start in maintenance services
Expand Down Expand Up @@ -2122,7 +2123,7 @@ func Test_Client_ErrorHandler(t *testing.T) {

// Bypass the normal Insert function because that will error on an
// unknown job.
insertParams, _, err := insertParamsFromArgsAndOptions(unregisteredJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil)
require.NoError(t, err)
_, err = client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
require.NoError(t, err)
Expand Down Expand Up @@ -3565,7 +3566,7 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) {
subscribeChan, cancel := client.Subscribe(EventKindJobFailed)
t.Cleanup(cancel)

insertParams, _, err := insertParamsFromArgsAndOptions(unregisteredJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil)
require.NoError(err)
insertedJob, err := client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
require.NoError(err)
Expand Down Expand Up @@ -3714,6 +3715,7 @@ func Test_NewClient_Defaults(t *testing.T) {
require.Equal(t, FetchPollIntervalDefault, client.config.FetchPollInterval)
require.Equal(t, JobTimeoutDefault, client.config.JobTimeout)
require.NotZero(t, client.baseService.Logger)
require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts)
require.IsType(t, &DefaultClientRetryPolicy{}, client.config.RetryPolicy)
require.False(t, client.config.disableStaggerStart)
}
Expand Down Expand Up @@ -3742,6 +3744,7 @@ func Test_NewClient_Overrides(t *testing.T) {
FetchPollInterval: 124 * time.Millisecond,
JobTimeout: 125 * time.Millisecond,
Logger: logger,
MaxAttempts: 5,
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
RetryPolicy: retryPolicy,
Workers: workers,
Expand All @@ -3764,6 +3767,7 @@ func Test_NewClient_Overrides(t *testing.T) {
require.Equal(t, 124*time.Millisecond, client.config.FetchPollInterval)
require.Equal(t, 125*time.Millisecond, client.config.JobTimeout)
require.Equal(t, logger, client.baseService.Logger)
require.Equal(t, 5, client.config.MaxAttempts)
require.Equal(t, retryPolicy, client.config.RetryPolicy)
require.True(t, client.config.disableStaggerStart)
}
Expand Down Expand Up @@ -3891,6 +3895,23 @@ func Test_NewClient_Validations(t *testing.T) {
config.JobTimeout = 7 * 24 * time.Hour
},
},
{
name: "MaxAttempts cannot be less than zero",
configFunc: func(config *Config) {
config.MaxAttempts = -1
},
wantErr: errors.New("MaxAttempts cannot be less than zero"),
},
{
name: "MaxAttempts of zero applies DefaultMaxAttempts",
configFunc: func(config *Config) {
config.MaxAttempts = 0
},
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
// A client config value of zero gets interpreted as the default max attempts:
require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts)
},
},
{
name: "RescueStuckJobsAfter may be overridden",
configFunc: func(config *Config) {
Expand Down Expand Up @@ -4138,14 +4159,16 @@ func TestClient_JobTimeout(t *testing.T) {
func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

config := newTestConfig(t, nil)

t.Run("Defaults", func(t *testing.T) {
t.Parallel()

insertParams, uniqueOpts, err := insertParamsFromArgsAndOptions(noOpArgs{}, nil)
insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, nil)
require.NoError(t, err)
require.Equal(t, `{"name":""}`, string(insertParams.EncodedArgs))
require.Equal(t, (noOpArgs{}).Kind(), insertParams.Kind)
require.Equal(t, rivercommon.MaxAttemptsDefault, insertParams.MaxAttempts)
require.Equal(t, config.MaxAttempts, insertParams.MaxAttempts)
require.Equal(t, rivercommon.PriorityDefault, insertParams.Priority)
require.Equal(t, QueueDefault, insertParams.Queue)
require.Nil(t, insertParams.ScheduledAt)
Expand All @@ -4154,6 +4177,18 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
require.True(t, uniqueOpts.IsEmpty())
})

t.Run("ConfigOverrides", func(t *testing.T) {
t.Parallel()

overrideConfig := &Config{
MaxAttempts: 34,
}

insertParams, _, err := insertParamsFromConfigArgsAndOptions(overrideConfig, noOpArgs{}, nil)
require.NoError(t, err)
require.Equal(t, overrideConfig.MaxAttempts, insertParams.MaxAttempts)
})

t.Run("InsertOptsOverrides", func(t *testing.T) {
t.Parallel()

Expand All @@ -4164,7 +4199,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ScheduledAt: time.Now().Add(time.Hour),
Tags: []string{"tag1", "tag2"},
}
insertParams, _, err := insertParamsFromArgsAndOptions(noOpArgs{}, opts)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, opts)
require.NoError(t, err)
require.Equal(t, 42, insertParams.MaxAttempts)
require.Equal(t, 2, insertParams.Priority)
Expand All @@ -4176,7 +4211,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Run("WorkerInsertOptsOverrides", func(t *testing.T) {
t.Parallel()

insertParams, _, err := insertParamsFromArgsAndOptions(&customInsertOptsJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, &customInsertOptsJobArgs{}, nil)
require.NoError(t, err)
// All these come from overrides in customInsertOptsJobArgs's definition:
require.Equal(t, 42, insertParams.MaxAttempts)
Expand All @@ -4195,7 +4230,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted},
}

_, internalUniqueOpts, err := insertParamsFromArgsAndOptions(noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
_, internalUniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
require.NoError(t, err)
require.Equal(t, uniqueOpts.ByArgs, internalUniqueOpts.ByArgs)
require.Equal(t, uniqueOpts.ByPeriod, internalUniqueOpts.ByPeriod)
Expand All @@ -4206,7 +4241,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Run("PriorityIsLimitedTo4", func(t *testing.T) {
t.Parallel()

insertParams, _, err := insertParamsFromArgsAndOptions(noOpArgs{}, &InsertOpts{Priority: 5})
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{Priority: 5})
require.ErrorContains(t, err, "priority must be between 1 and 4")
require.Nil(t, insertParams)
})
Expand All @@ -4215,7 +4250,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

args := timeoutTestArgs{TimeoutValue: time.Hour}
insertParams, _, err := insertParamsFromArgsAndOptions(args, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, args, nil)
require.NoError(t, err)
require.Equal(t, `{"timeout_value":3600000000000}`, string(insertParams.EncodedArgs))
})
Expand All @@ -4226,7 +4261,8 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
// Ensure that unique opts are validated. No need to be exhaustive here
// since we already have tests elsewhere for that. Just make sure validation
// is running.
insertParams, _, err := insertParamsFromArgsAndOptions(
insertParams, _, err := insertParamsFromConfigArgsAndOptions(
config,
noOpArgs{},
&InsertOpts{UniqueOpts: UniqueOpts{ByPeriod: 1 * time.Millisecond}},
)
Expand Down
2 changes: 1 addition & 1 deletion job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult)
if res.Err != nil && errors.As(res.Err, &snoozeErr) {
e.Logger.InfoContext(ctx, e.Name+": Job snoozed",
slog.Int64("job_id", e.JobRow.ID),
slog.String("job_kind", e.JobRow.Kind),
slog.String("job_kind", e.JobRow.Kind),
slog.Duration("duration", snoozeErr.duration),
)
nextAttemptScheduledAt := time.Now().Add(snoozeErr.duration)
Expand Down
7 changes: 5 additions & 2 deletions periodic_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ func (s *periodicIntervalSchedule) Next(t time.Time) time.Time {
// made accessible through Client, where new periodic jobs can be configured,
// and only ones removed.
type PeriodicJobBundle struct {
clientConfig *Config
periodicJobEnqueuer *maintenance.PeriodicJobEnqueuer
}

func newPeriodicJobBundle(periodicJobEnqueuer *maintenance.PeriodicJobEnqueuer) *PeriodicJobBundle {
func newPeriodicJobBundle(config *Config, periodicJobEnqueuer *maintenance.PeriodicJobEnqueuer) *PeriodicJobBundle {
return &PeriodicJobBundle{
clientConfig: config,
periodicJobEnqueuer: periodicJobEnqueuer,
}
}
Expand Down Expand Up @@ -178,10 +180,11 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe
if periodicJob.opts != nil {
opts = periodicJob.opts
}
args, options := periodicJob.constructorFunc()

return &maintenance.PeriodicJob{
ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
return insertParamsFromArgsAndOptions(periodicJob.constructorFunc())
return insertParamsFromConfigArgsAndOptions(b.clientConfig, args, options)
},
RunOnStart: opts.RunOnStart,
ScheduleFunc: periodicJob.scheduleFunc.Next,
Expand Down
Loading
Loading