Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JobCleaner timeout dynamically #576

Merged
merged 3 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `rivertest.WorkContext`, a test function that can be used to initialize a context to test a `JobArgs.Work` implementation that will have a client set to context for use with `river.ClientFromContext`. [PR #526](https://github.com/riverqueue/river/pull/526).
- A new `river migrate-list` command is available which lists available migrations and which version a target database is migrated to. [PR #534](https://github.com/riverqueue/river/pull/534).
- `river version` or `river --version` now prints River version information. [PR #537](https://github.com/riverqueue/river/pull/537).
- `Config.JobCleanerTimeout` was added to allow configuration of the job cleaner query timeout. In some deployments with millions of stale jobs, the cleaner may not be able to complete its query within the default 30 seconds.

### Changed

Expand Down
8 changes: 8 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ type Config struct {
// If in doubt, leave this property empty.
ID string

// JobCleanerTimeout is the timeout of the individual queries within the job
// cleaner.
//
// Defaults to 30 seconds, which should be more than enough time for most
// deployments.
JobCleanerTimeout time.Duration

// JobTimeout is the maximum amount of time a job is allowed to run before its
// context is cancelled. A timeout of zero means JobTimeoutDefault will be
// used, whereas a value of -1 means the job's context will not be cancelled
Expand Down Expand Up @@ -558,6 +565,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
CancelledJobRetentionPeriod: config.CancelledJobRetentionPeriod,
CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod,
DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod,
Timeout: config.JobCleanerTimeout,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, jobCleaner)
client.testSignals.jobCleaner = &jobCleaner.TestSignals
Expand Down
1 change: 1 addition & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4627,6 +4627,7 @@ func Test_NewClient_Defaults(t *testing.T) {
require.Equal(t, maintenance.CancelledJobRetentionPeriodDefault, jobCleaner.Config.CancelledJobRetentionPeriod)
require.Equal(t, maintenance.CompletedJobRetentionPeriodDefault, jobCleaner.Config.CompletedJobRetentionPeriod)
require.Equal(t, maintenance.DiscardedJobRetentionPeriodDefault, jobCleaner.Config.DiscardedJobRetentionPeriod)
require.Equal(t, maintenance.JobCleanerTimeoutDefault, jobCleaner.Config.Timeout)
require.False(t, jobCleaner.StaggerStartupIsDisabled())

enqueuer := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)
Expand Down
10 changes: 9 additions & 1 deletion internal/maintenance/job_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
CompletedJobRetentionPeriodDefault = 24 * time.Hour
DiscardedJobRetentionPeriodDefault = 7 * 24 * time.Hour
JobCleanerIntervalDefault = 30 * time.Second
JobCleanerTimeoutDefault = 30 * time.Second
)

// Test-only properties.
Expand All @@ -48,6 +49,9 @@ type JobCleanerConfig struct {

// Interval is the amount of time to wait between runs of the cleaner.
Interval time.Duration

// Timeout of the individual queries in the job cleaner.
Timeout time.Duration
}

func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig {
Expand All @@ -63,6 +67,9 @@ func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig {
if c.Interval <= 0 {
panic("JobCleanerConfig.Interval must be above zero")
}
if c.Timeout <= 0 {
panic("JobCleanerConfig.Timeout must be above zero")
}

return c
}
Expand All @@ -88,6 +95,7 @@ func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, e
CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, CompletedJobRetentionPeriodDefault),
DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, DiscardedJobRetentionPeriodDefault),
Interval: valutil.ValOrDefault(config.Interval, JobCleanerIntervalDefault),
Timeout: valutil.ValOrDefault(config.Timeout, JobCleanerTimeoutDefault),
}).mustValidate(),

batchSize: BatchSizeDefault,
Expand Down Expand Up @@ -147,7 +155,7 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err
for {
// Wrapped in a function so that defers run as expected.
numDeleted, err := func() (int, error) {
ctx, cancelFunc := context.WithTimeout(ctx, 30*time.Second)
ctx, cancelFunc := context.WithTimeout(ctx, s.Config.Timeout)
defer cancelFunc()

numDeleted, err := s.exec.JobDeleteBefore(ctx, &riverdriver.JobDeleteBeforeParams{
Expand Down
1 change: 1 addition & 0 deletions internal/maintenance/job_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestJobCleaner(t *testing.T) {
require.Equal(t, CompletedJobRetentionPeriodDefault, cleaner.Config.CompletedJobRetentionPeriod)
require.Equal(t, DiscardedJobRetentionPeriodDefault, cleaner.Config.DiscardedJobRetentionPeriod)
require.Equal(t, JobCleanerIntervalDefault, cleaner.Config.Interval)
require.Equal(t, JobCleanerTimeoutDefault, cleaner.Config.Timeout)
})

t.Run("StartStopStress", func(t *testing.T) {
Expand Down
Loading