Skip to content

Commit

Permalink
Add JobCleaner timeout dynamically (riverqueue#576)
Browse files Browse the repository at this point in the history
The JobCleaner maintenance process had a hardcoded 30 second timeout. While this is
plenty long enough for most installations, in certain very large job tables this may not
be enough time to run a single round of the query.

This makes the timeout customizable via a Client config.

---------

Co-authored-by: Blake Gentry <blakesgentry@gmail.com>
  • Loading branch information
2 people authored and tigrato committed Dec 18, 2024
1 parent a59c7fb commit 7792ad5
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `rivertest.WorkContext`, a test function that can be used to initialize a context to test a `JobArgs.Work` implementation that will have a client set to context for use with `river.ClientFromContext`. [PR #526](https://github.com/riverqueue/river/pull/526).
- A new `river migrate-list` command is available which lists available migrations and which version a target database is migrated to. [PR #534](https://github.com/riverqueue/river/pull/534).
- `river version` or `river --version` now prints River version information. [PR #537](https://github.com/riverqueue/river/pull/537).
- `Config.JobCleanerTimeout` was added to allow configuration of the job cleaner query timeout. In some deployments with millions of stale jobs, the cleaner may not be able to complete its query within the default 30 seconds.

### Changed

Expand Down
8 changes: 8 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ type Config struct {
// If in doubt, leave this property empty.
ID string

// JobCleanerTimeout is the timeout of the individual queries within the job
// cleaner.
//
// Defaults to 30 seconds, which should be more than enough time for most
// deployments.
JobCleanerTimeout time.Duration

// JobTimeout is the maximum amount of time a job is allowed to run before its
// context is cancelled. A timeout of zero means JobTimeoutDefault will be
// used, whereas a value of -1 means the job's context will not be cancelled
Expand Down Expand Up @@ -561,6 +568,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
CancelledJobRetentionPeriod: config.CancelledJobRetentionPeriod,
CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod,
DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod,
Timeout: config.JobCleanerTimeout,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, jobCleaner)
client.testSignals.jobCleaner = &jobCleaner.TestSignals
Expand Down
1 change: 1 addition & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4627,6 +4627,7 @@ func Test_NewClient_Defaults(t *testing.T) {
require.Equal(t, maintenance.CancelledJobRetentionPeriodDefault, jobCleaner.Config.CancelledJobRetentionPeriod)
require.Equal(t, maintenance.CompletedJobRetentionPeriodDefault, jobCleaner.Config.CompletedJobRetentionPeriod)
require.Equal(t, maintenance.DiscardedJobRetentionPeriodDefault, jobCleaner.Config.DiscardedJobRetentionPeriod)
require.Equal(t, maintenance.JobCleanerTimeoutDefault, jobCleaner.Config.Timeout)
require.False(t, jobCleaner.StaggerStartupIsDisabled())

enqueuer := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)
Expand Down
10 changes: 9 additions & 1 deletion internal/maintenance/job_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
CompletedJobRetentionPeriodDefault = 24 * time.Hour
DiscardedJobRetentionPeriodDefault = 7 * 24 * time.Hour
JobCleanerIntervalDefault = 30 * time.Second
JobCleanerTimeoutDefault = 30 * time.Second
)

// Test-only properties.
Expand All @@ -48,6 +49,9 @@ type JobCleanerConfig struct {

// Interval is the amount of time to wait between runs of the cleaner.
Interval time.Duration

// Timeout of the individual queries in the job cleaner.
Timeout time.Duration
}

func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig {
Expand All @@ -63,6 +67,9 @@ func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig {
if c.Interval <= 0 {
panic("JobCleanerConfig.Interval must be above zero")
}
if c.Timeout <= 0 {
panic("JobCleanerConfig.Timeout must be above zero")
}

return c
}
Expand All @@ -88,6 +95,7 @@ func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, e
CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, CompletedJobRetentionPeriodDefault),
DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, DiscardedJobRetentionPeriodDefault),
Interval: valutil.ValOrDefault(config.Interval, JobCleanerIntervalDefault),
Timeout: valutil.ValOrDefault(config.Timeout, JobCleanerTimeoutDefault),
}).mustValidate(),

batchSize: BatchSizeDefault,
Expand Down Expand Up @@ -147,7 +155,7 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err
for {
// Wrapped in a function so that defers run as expected.
numDeleted, err := func() (int, error) {
ctx, cancelFunc := context.WithTimeout(ctx, 30*time.Second)
ctx, cancelFunc := context.WithTimeout(ctx, s.Config.Timeout)
defer cancelFunc()

numDeleted, err := s.exec.JobDeleteBefore(ctx, &riverdriver.JobDeleteBeforeParams{
Expand Down
1 change: 1 addition & 0 deletions internal/maintenance/job_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestJobCleaner(t *testing.T) {
require.Equal(t, CompletedJobRetentionPeriodDefault, cleaner.Config.CompletedJobRetentionPeriod)
require.Equal(t, DiscardedJobRetentionPeriodDefault, cleaner.Config.DiscardedJobRetentionPeriod)
require.Equal(t, JobCleanerIntervalDefault, cleaner.Config.Interval)
require.Equal(t, JobCleanerTimeoutDefault, cleaner.Config.Timeout)
})

t.Run("StartStopStress", func(t *testing.T) {
Expand Down

0 comments on commit 7792ad5

Please sign in to comment.