diff --git a/CHANGELOG.md b/CHANGELOG.md index eb5ad957..9701e3eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fixed a memory leak caused by not always cancelling the context used to enable jobs to be cancelled remotely. [PR #243](https://github.com/riverqueue/river/pull/243). + ## [0.0.23] - 2024-02-29 ### Added diff --git a/job_executor.go b/job_executor.go index cbd3524d..d8be5213 100644 --- a/job_executor.go +++ b/job_executor.go @@ -138,6 +138,9 @@ func (e *jobExecutor) Cancel() { } func (e *jobExecutor) Execute(ctx context.Context) { + // Ensure that the context is cancelled no matter what, or it will leak: + defer e.CancelFunc(nil) + e.start = e.TimeNowUTC() e.stats = &jobstats.JobStatistics{ QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt), diff --git a/job_executor_test.go b/job_executor_test.go index 5cf892ae..cdf19eac 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -174,7 +174,12 @@ func TestJobExecutor_Execute(t *testing.T) { jobRow: job, } + // allocate this context just so we can set the CancelFunc: + _, cancel := context.WithCancelCause(ctx) + t.Cleanup(func() { cancel(nil) }) + executor := baseservice.Init(archetype, &jobExecutor{ + CancelFunc: cancel, ClientRetryPolicy: &retryPolicyNoJitter{}, Completer: bundle.completer, ErrorHandler: bundle.errorHandler, @@ -640,6 +645,7 @@ func TestJobExecutor_Execute(t *testing.T) { workCtx, cancelFunc := context.WithCancelCause(ctx) executor.CancelFunc = cancelFunc + t.Cleanup(func() { cancelFunc(nil) }) executor.Execute(workCtx) executor.Completer.Wait() diff --git a/producer.go b/producer.go index a73b557e..a5e4db46 100644 --- a/producer.go +++ b/producer.go @@ -358,6 +358,7 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype. workUnit = workInfo.workUnitFactory.MakeUnit(job) } + // jobCancel will always be called by the executor to prevent leaks. jobCtx, jobCancel := context.WithCancelCause(workCtx) executor := baseservice.Init(&p.Archetype, &jobExecutor{