Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tweak JobRetry to reset ScheduledAt in most cases #211

Merged
merged 1 commit into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Tweaked behavior of `JobRetry` so that it does actually update the `ScheduledAt` time of the job in all cases where the job is actually being rescheduled. As before, jobs which are already available with a past `ScheduledAt` will not be touched by this query so that they retain their place in line. [PR #211](https://github.com/riverqueue/river/pull/211).

## [0.0.20] - 2024-02-14

### Added
Expand Down
34 changes: 31 additions & 3 deletions internal/dbadapter/db_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,16 +1064,44 @@ func Test_StandardAdapter_JobRetryImmediately(t *testing.T) {
})
}

t.Run("DoesNotAlterScheduledAtIfInThePast", func(t *testing.T) {
// We don't want to update ScheduledAt if the job was already scheduled
t.Run("AltersScheduledAtForAlreadyCompletedJob", func(t *testing.T) {
// A job which has already completed will have a ScheduledAt that could be
// long in the past. Now that we're re-scheduling it, we should update that
// to the current time to slot it in alongside other recently-scheduled jobs
// and not skip the line; also, its wait duration can't be calculated
// accurately if we don't reset the scheduled_at.
t.Parallel()

adapter, bundle := setupTx(t)

params := makeFakeJobInsertParams(0, nil)
params.ScheduledAt = bundle.baselineTime.Add(-1 * time.Hour)
res, err := adapter.JobInsert(ctx, params)
require.NoError(t, err)
_, err = adapter.queries.JobUpdate(ctx, bundle.ex, dbsqlc.JobUpdateParams{
FinalizedAtDoUpdate: true,
FinalizedAt: &bundle.baselineTime,
ID: res.Job.ID,
StateDoUpdate: true,
State: dbsqlc.JobStateCompleted,
})
require.NoError(t, err)

jAfter, err := adapter.JobRetryImmediately(ctx, res.Job.ID)
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateAvailable, jAfter.State)
require.WithinDuration(t, time.Now().UTC(), jAfter.ScheduledAt, 5*time.Second)
})

t.Run("DoesNotAlterScheduledAtIfInThePastAndJobAlreadyAvailable", func(t *testing.T) {
// We don't want to update ScheduledAt if the job was already available
// because doing so can make it lose its place in line.
t.Parallel()

adapter, bundle := setupTx(t)

params := makeFakeJobInsertParams(0, nil)
params.ScheduledAt = bundle.baselineTime.Add(-1 * time.Hour)
params.State = dbsqlc.JobStateScheduled
res, err := adapter.JobInsert(ctx, params)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ updated_job AS (
UPDATE river_job
SET
state = 'available'::river_job_state,
scheduled_at = CASE WHEN scheduled_at < now() THEN scheduled_at ELSE now() END,
scheduled_at = now(),
max_attempts = CASE WHEN attempt = max_attempts THEN max_attempts + 1 ELSE max_attempts END,
finalized_at = NULL
FROM job_to_update
Expand Down
2 changes: 1 addition & 1 deletion internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading