Skip to content

Commit

Permalink
respect non-zero ScheduledAt from JobArgsWithInsertOpts
Browse files Browse the repository at this point in the history
This allows for job arg definitions to utilize custom logic at the args
level for determining when the job should be scheduled.

Fixes #484.
  • Loading branch information
bgentry committed Jul 31, 2024
1 parent 280896e commit 5be1df0
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Include `pending` state in `JobListParams` by default so pending jobs are included in `JobList` / `JobListTx` results.
- Respect `ScheduledAt` if set to a non-zero value by `JobArgsWithInsertOpts`. This allows for job arg definitions to utilize custom logic at the args level for determining when the job should be scheduled. [PR #XXX](https://github.com/riverqueue/river/pull/XXX).

## [0.10.1] - 2024-07-23

Expand Down
11 changes: 7 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1221,13 +1221,16 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
Tags: tags,
}

if insertOpts.ScheduledAt.IsZero() {
if !insertOpts.ScheduledAt.IsZero() {

Check failure on line 1224 in client.go

View workflow job for this annotation

GitHub Actions / lint

ifElseChain: rewrite if-else to switch statement (gocritic)
insertParams.ScheduledAt = &insertOpts.ScheduledAt
insertParams.State = rivertype.JobStateScheduled
} else if !jobInsertOpts.ScheduledAt.IsZero() {
insertParams.ScheduledAt = &jobInsertOpts.ScheduledAt
insertParams.State = rivertype.JobStateScheduled
} else {
// Use a stubbed time if there was one, but otherwise prefer the value
// generated by the database. createdAt is nil unless time is stubbed.
insertParams.ScheduledAt = createdAt
} else {
insertParams.ScheduledAt = &insertOpts.ScheduledAt
insertParams.State = rivertype.JobStateScheduled
}

if insertOpts.Pending {
Expand Down
23 changes: 21 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4617,15 +4617,31 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Run("WorkerInsertOptsOverrides", func(t *testing.T) {
t.Parallel()

insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, nil)
nearFuture := time.Now().Add(5 * time.Minute)

insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{
ScheduledAt: nearFuture,
}, nil)
require.NoError(t, err)
// All these come from overrides in customInsertOptsJobArgs's definition:
require.Equal(t, 42, insertParams.MaxAttempts)
require.Equal(t, 2, insertParams.Priority)
require.Equal(t, "other", insertParams.Queue)
require.NotNil(t, insertParams.ScheduledAt)
require.Equal(t, nearFuture, *insertParams.ScheduledAt)
require.Equal(t, []string{"tag1", "tag2"}, insertParams.Tags)
})

t.Run("WorkerInsertOptsScheduledAtNotRespectedIfZero", func(t *testing.T) {
t.Parallel()

insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{
ScheduledAt: time.Time{},
}, nil)
require.NoError(t, err)
require.Nil(t, insertParams.ScheduledAt)
})

t.Run("TagFormatValidated", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -4717,7 +4733,9 @@ func TestID(t *testing.T) {
})
}

type customInsertOptsJobArgs struct{}
type customInsertOptsJobArgs struct {
ScheduledAt time.Time `json:"scheduled_at"`
}

func (w *customInsertOptsJobArgs) Kind() string { return "customInsertOpts" }

Expand All @@ -4726,6 +4744,7 @@ func (w *customInsertOptsJobArgs) InsertOpts() InsertOpts {
MaxAttempts: 42,
Priority: 2,
Queue: "other",
ScheduledAt: w.ScheduledAt,
Tags: []string{"tag1", "tag2"},
}
}
Expand Down

0 comments on commit 5be1df0

Please sign in to comment.