Skip to content

Commit

Permalink
fix memory leak of job cancellation contexts
Browse files Browse the repository at this point in the history
When remote job cancellation was added, a new cancellable context was
allocated within the producer before the executor is spawned. The cancel
func here was only called if the job was actually cancelled remotely or
via a parent context cancellation, meaning we would slowly leak memory
for every job worked that wasn't cancelled.

Thank you @brandur for pinpointing the issue.

Fixes #239.

Co-Authored-By: Brandur Leach <brandur@brandur.org>
  • Loading branch information
bgentry and brandur committed Mar 1, 2024
1 parent 0ace41d commit aa7e128
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Fixed a memory leak caused by not always cancelling the context used to enable jobs to be cancelled remotely. [PR #243](https://github.com/riverqueue/river/pull/243).

## [0.0.23] - 2024-02-29

### Added
Expand Down
3 changes: 3 additions & 0 deletions job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func (e *jobExecutor) Cancel() {
}

func (e *jobExecutor) Execute(ctx context.Context) {
// Ensure that the context is cancelled no matter what, or it will leak:
defer e.CancelFunc(nil)

e.start = e.TimeNowUTC()
e.stats = &jobstats.JobStatistics{
QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt),
Expand Down
6 changes: 6 additions & 0 deletions job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,12 @@ func TestJobExecutor_Execute(t *testing.T) {
jobRow: job,
}

// allocate this context just so we can set the CancelFunc:
_, cancel := context.WithCancelCause(ctx)
t.Cleanup(func() { cancel(nil) })

executor := baseservice.Init(archetype, &jobExecutor{
CancelFunc: cancel,
ClientRetryPolicy: &retryPolicyNoJitter{},
Completer: bundle.completer,
ErrorHandler: bundle.errorHandler,
Expand Down Expand Up @@ -640,6 +645,7 @@ func TestJobExecutor_Execute(t *testing.T) {

workCtx, cancelFunc := context.WithCancelCause(ctx)
executor.CancelFunc = cancelFunc
t.Cleanup(func() { cancelFunc(nil) })

executor.Execute(workCtx)
executor.Completer.Wait()
Expand Down
1 change: 1 addition & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype.
workUnit = workInfo.workUnitFactory.MakeUnit(job)
}

// jobCancel will always be called by the executor to prevent leaks.
jobCtx, jobCancel := context.WithCancelCause(workCtx)

executor := baseservice.Init(&p.Archetype, &jobExecutor{
Expand Down

0 comments on commit aa7e128

Please sign in to comment.