From a1caa4e0bbb6cdd010976321fdb479393ba3b5bc Mon Sep 17 00:00:00 2001 From: Corentin Clabaut Date: Fri, 7 Jun 2024 19:53:31 +0200 Subject: [PATCH 1/5] Clear pending tasks in the worker when the context is canceled to avoid deadlocks in StopAndWait when tasks are queued for the worker. --- pond.go | 12 ++++++------ pond_blackbox_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ worker.go | 7 +++++-- 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/pond.go b/pond.go index e5e6192..90fa70d 100644 --- a/pond.go +++ b/pond.go @@ -353,6 +353,11 @@ func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) { // Mark pool as stopped atomic.StoreInt32(&p.stopped, 1) + // close tasks channel (only once, in case multiple concurrent calls to StopAndWait are made) + p.tasksCloseOnce.Do(func() { + close(p.tasks) + }) + if waitForQueuedTasksToComplete { // Wait for all queued tasks to complete p.tasksWaitGroup.Wait() @@ -366,11 +371,6 @@ func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) { // Wait for all workers & purger goroutine to exit p.workersWaitGroup.Wait() - - // close tasks channel (only once, in case multiple concurrent calls to StopAndWait are made) - p.tasksCloseOnce.Do(func() { - close(p.tasks) - }) } // purge represents the work done by the purger goroutine @@ -420,7 +420,7 @@ func (p *WorkerPool) maybeStartWorker(firstTask func()) bool { } // Launch worker goroutine - go worker(p.context, &p.workersWaitGroup, firstTask, p.tasks, p.executeTask) + go worker(p.context, &p.workersWaitGroup, firstTask, p.tasks, p.executeTask, &p.tasksWaitGroup) return true } diff --git a/pond_blackbox_test.go b/pond_blackbox_test.go index ca10b11..e020b0b 100644 --- a/pond_blackbox_test.go +++ b/pond_blackbox_test.go @@ -542,6 +542,47 @@ func TestSubmitWithContext(t *testing.T) { assertEqual(t, int32(0), atomic.LoadInt32(&doneCount)) } +func TestSubmitWithContextCancelWithIdleTasks(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + + pool := pond.New(1, 5, pond.Context(ctx)) + + var doneCount, taskCount int32 + + // Submit a long-running, cancellable task + pool.Submit(func() { + atomic.AddInt32(&taskCount, 1) + select { + case <-ctx.Done(): + return + case <-time.After(10 * time.Minute): + atomic.AddInt32(&doneCount, 1) + return + } + }) + + // Submit a long-running, cancellable task + pool.Submit(func() { + atomic.AddInt32(&taskCount, 1) + select { + case <-ctx.Done(): + return + case <-time.After(10 * time.Minute): + atomic.AddInt32(&doneCount, 1) + return + } + }) + + // Cancel the context + cancel() + + pool.StopAndWait() + + assertEqual(t, int32(1), atomic.LoadInt32(&taskCount)) + assertEqual(t, int32(0), atomic.LoadInt32(&doneCount)) +} + func TestConcurrentStopAndWait(t *testing.T) { pool := pond.New(1, 5) diff --git a/worker.go b/worker.go index 1677c27..02a7288 100644 --- a/worker.go +++ b/worker.go @@ -6,7 +6,7 @@ import ( ) // worker represents a worker goroutine -func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func(), tasks <-chan func(), taskExecutor func(func(), bool)) { +func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func(), tasks <-chan func(), taskExecutor func(func(), bool), taskWaitGroup *sync.WaitGroup) { // If provided, execute the first task immediately, before listening to the tasks channel if firstTask != nil { @@ -20,7 +20,10 @@ func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func() for { select { case <-context.Done(): - // Pool context was cancelled, exit + // Pool context was cancelled, empty tasks channel and exit + for _ = range tasks { + taskWaitGroup.Done() + } return case task, ok := <-tasks: if task == nil || !ok { From f59ff90fd9ea752107c8b3ac950c07d0b904efb2 Mon Sep 17 00:00:00 2001 From: Corentin Clabaut Date: Mon, 10 Jun 2024 16:35:37 +0200 Subject: [PATCH 2/5] Prioritize context.Done statement in worker --- worker.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/worker.go b/worker.go index 02a7288..00a8780 100644 --- a/worker.go +++ b/worker.go @@ -26,13 +26,22 @@ func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func() } return case task, ok := <-tasks: - if task == nil || !ok { - // We have received a signal to exit - return - } + // Prioritize context.Done statement (https://stackoverflow.com/questions/46200343/force-priority-of-go-select-statement) + select { + case <-context.Done(): + if task != nil && ok { + // We have received a task, ignore it + taskWaitGroup.Done() + } + default: + if task == nil || !ok { + // We have received a signal to exit + return + } - // We have received a task, execute it - taskExecutor(task, false) + // We have received a task, execute it + taskExecutor(task, false) + } } } } From bdb616caa5d4197fe801e0063f4234d900fa3a0e Mon Sep 17 00:00:00 2001 From: Corentin Clabaut Date: Tue, 11 Jun 2024 08:41:29 +0200 Subject: [PATCH 3/5] Fix race condition --- pond.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pond.go b/pond.go index 90fa70d..6bd1a6f 100644 --- a/pond.go +++ b/pond.go @@ -353,11 +353,6 @@ func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) { // Mark pool as stopped atomic.StoreInt32(&p.stopped, 1) - // close tasks channel (only once, in case multiple concurrent calls to StopAndWait are made) - p.tasksCloseOnce.Do(func() { - close(p.tasks) - }) - if waitForQueuedTasksToComplete { // Wait for all queued tasks to complete p.tasksWaitGroup.Wait() @@ -369,6 +364,11 @@ func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) { // Terminate all workers & purger goroutine p.contextCancel() + // close tasks channel (only once, in case multiple concurrent calls to StopAndWait are made) + p.tasksCloseOnce.Do(func() { + close(p.tasks) + }) + // Wait for all workers & purger goroutine to exit p.workersWaitGroup.Wait() } From 57e5a0ad6247e3b0bf28921baec5156dec9413ef Mon Sep 17 00:00:00 2001 From: Corentin Clabaut Date: Mon, 17 Jun 2024 08:57:40 +0200 Subject: [PATCH 4/5] Fix tests --- pond_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pond_test.go b/pond_test.go index 76a4876..e498673 100644 --- a/pond_test.go +++ b/pond_test.go @@ -39,6 +39,9 @@ func TestPurgeAfterPoolStopped(t *testing.T) { pool.SubmitAndWait(func() { atomic.AddInt32(&doneCount, 1) }) + + time.Sleep(10 * time.Millisecond) + assertEqual(t, int32(1), atomic.LoadInt32(&doneCount)) assertEqual(t, 1, pool.RunningWorkers()) @@ -59,6 +62,8 @@ func TestPurgeDuringSubmit(t *testing.T) { atomic.AddInt32(&doneCount, 1) }) + time.Sleep(10 * time.Millisecond) + assertEqual(t, 1, pool.IdleWorkers()) // Stop an idle worker right before submitting another task From f681bc1144d06810f63b21ccfdd1701a50b13d6e Mon Sep 17 00:00:00 2001 From: Corentin Clabaut Date: Mon, 17 Jun 2024 09:19:27 +0200 Subject: [PATCH 5/5] Improve readablility --- worker.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/worker.go b/worker.go index 00a8780..c312bde 100644 --- a/worker.go +++ b/worker.go @@ -21,9 +21,7 @@ func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func() select { case <-context.Done(): // Pool context was cancelled, empty tasks channel and exit - for _ = range tasks { - taskWaitGroup.Done() - } + drainTasks(tasks, taskWaitGroup) return case task, ok := <-tasks: // Prioritize context.Done statement (https://stackoverflow.com/questions/46200343/force-priority-of-go-select-statement) @@ -45,3 +43,10 @@ func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func() } } } + +// drainPendingTasks discards queued tasks and decrements the corresponding wait group +func drainTasks(tasks <-chan func(), tasksWaitGroup *sync.WaitGroup) { + for _ = range tasks { + tasksWaitGroup.Done() + } +}