From 1c17c7c8fb902f0cb92888a420c9a12d7da8db84 Mon Sep 17 00:00:00 2001 From: vladopajic Date: Fri, 31 Jan 2025 16:02:50 +0200 Subject: [PATCH 1/3] combine: only call Stop() once on underlying actors (#94) --- actor/combine.go | 11 +++------- actor/combine_test.go | 48 +++++++++++++++++++++++++++++++++++++++++-- actor/helpers_test.go | 17 +++++++++++++++ 3 files changed, 66 insertions(+), 10 deletions(-) diff --git a/actor/combine.go b/actor/combine.go index 303c67f..62ebc19 100644 --- a/actor/combine.go +++ b/actor/combine.go @@ -87,7 +87,7 @@ func (a *combinedActor) onActorStopped() { } // First actor to stop should stop other actors - if a.stopTogether && a.stopping.CompareAndSwap(false, true) { + if a.stopTogether && !a.stopping.Load() { // Run stop in goroutine because wrapped actor // should not wait for other actors to stop. // @@ -102,16 +102,12 @@ func (a *combinedActor) onActorStopped() { func (a *combinedActor) Stop() { a.runningLock.Lock() - if !a.running { + if !a.running || a.stopping.Swap(true) { a.runningLock.Unlock() return } - if ctx := a.ctx; ctx != nil { - ctx.end() - - a.ctx = nil - } + a.ctx.end() a.runningLock.Unlock() @@ -130,7 +126,6 @@ func (a *combinedActor) Start() { ctx := newContext() a.ctx = ctx - a.stopping.Store(false) a.running = true diff --git a/actor/combine_test.go b/actor/combine_test.go index 39d1d2e..5b9579a 100644 --- a/actor/combine_test.go +++ b/actor/combine_test.go @@ -1,6 +1,7 @@ package actor_test import ( + "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -66,12 +67,12 @@ func Test_Combine_OptOnStopOptOnStart(t *testing.T) { testCombineOptOnStopOptOnStart(t, 5) } -func testCombineOptOnStopOptOnStart(t *testing.T, count int) { +func testCombineOptOnStopOptOnStart(t *testing.T, actorsCount int) { t.Helper() onStatC, onStartOpt := createCombinedOnStartOption(t, 1) onStopC, onStopOpt := createCombinedOnStopOption(t, 1) - actors := createActors(count) + actors := createActors(actorsCount) a := Combine(actors...). WithOptions(onStopOpt, onStartOpt). @@ -89,6 +90,49 @@ func testCombineOptOnStopOptOnStart(t *testing.T, count int) { assert.Empty(t, onStatC) } +func Test_Combine_StoppingOnce(t *testing.T) { + t.Parallel() + + testCombineStoppingOnce(t, 1) + testCombineStoppingOnce(t, 2) + testCombineStoppingOnce(t, 10) + testCombineStoppingOnce(t, 20) +} + +func testCombineStoppingOnce(t *testing.T, actorsCount int32) { + t.Helper() + + c := atomic.Int32{} + stopConcurrentlyFinishedC := make(chan any) + actors := make([]Actor, actorsCount) + + for i := range actorsCount { + actors[i] = delegateActor{stop: func() { + <-stopConcurrentlyFinishedC + c.Add(1) + }} + } + + a := Combine(actors...).Build() + a.Start() + + // Call Stop() multiple times in separate goroutine to force concurrency + const stopCallsCount = 100 + stopFinsihedC := make(chan any, stopCallsCount) + + for range stopCallsCount { + go func() { + a.Stop() + stopFinsihedC <- `🛑` + }() + } + + close(stopConcurrentlyFinishedC) + drainC(stopFinsihedC, stopCallsCount) + + assert.Equal(t, actorsCount, c.Load()) +} + // Test_Combine_OptStopTogether asserts that all actors will end as soon // as first actors ends. func Test_Combine_OptStopTogether(t *testing.T) { diff --git a/actor/helpers_test.go b/actor/helpers_test.go index 3122f2b..4e4ca3d 100644 --- a/actor/helpers_test.go +++ b/actor/helpers_test.go @@ -6,6 +6,23 @@ import ( "testing" ) +type delegateActor struct { + start func() + stop func() +} + +func (a delegateActor) Start() { + if fn := a.start; fn != nil { + fn() + } +} + +func (a delegateActor) Stop() { + if fn := a.stop; fn != nil { + fn() + } +} + func drainC(c <-chan any, count int) { for range count { <-c From 6355bd2323b3e28070625b726038814fd0bd695b Mon Sep 17 00:00:00 2001 From: vladopajic Date: Fri, 31 Jan 2025 16:20:15 +0200 Subject: [PATCH 2/3] combine: run tests in parallel (#95) --- actor/combine_test.go | 64 +++++++++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/actor/combine_test.go b/actor/combine_test.go index 5b9579a..af1967b 100644 --- a/actor/combine_test.go +++ b/actor/combine_test.go @@ -1,6 +1,7 @@ package actor_test import ( + "fmt" "sync/atomic" "testing" @@ -9,21 +10,33 @@ import ( . "github.com/vladopajic/go-actor/actor" ) +func combineParallel( + t *testing.T, + actorsCount int, + testFn func(*testing.T, int), +) { + t.Helper() + + t.Run(fmt.Sprintf("actors count %v", actorsCount), func(t *testing.T) { + t.Parallel() + testFn(t, actorsCount) + }) +} + func Test_Combine_TestSuite(t *testing.T) { t.Parallel() - TestSuite(t, func() Actor { - actors := createActors(0) - return Combine(actors...).Build() - }) + combineParallel(t, 0, testCombineTestSuite) + combineParallel(t, 1, testCombineTestSuite) + combineParallel(t, 2, testCombineTestSuite) + combineParallel(t, 10, testCombineTestSuite) +} - TestSuite(t, func() Actor { - actors := createActors(1) - return Combine(actors...).Build() - }) +func testCombineTestSuite(t *testing.T, actorsCount int) { + t.Helper() TestSuite(t, func() Actor { - actors := createActors(10) + actors := createActors(actorsCount) return Combine(actors...).Build() }) } @@ -32,9 +45,10 @@ func Test_Combine_TestSuite(t *testing.T) { func Test_Combine(t *testing.T) { t.Parallel() - testCombine(t, 0) - testCombine(t, 1) - testCombine(t, 5) + combineParallel(t, 0, testCombine) + combineParallel(t, 1, testCombine) + combineParallel(t, 2, testCombine) + combineParallel(t, 10, testCombine) } func testCombine(t *testing.T, actorsCount int) { @@ -62,9 +76,10 @@ func testCombine(t *testing.T, actorsCount int) { func Test_Combine_OptOnStopOptOnStart(t *testing.T) { t.Parallel() - testCombineOptOnStopOptOnStart(t, 0) - testCombineOptOnStopOptOnStart(t, 1) - testCombineOptOnStopOptOnStart(t, 5) + combineParallel(t, 0, testCombineOptOnStopOptOnStart) + combineParallel(t, 1, testCombineOptOnStopOptOnStart) + combineParallel(t, 2, testCombineOptOnStopOptOnStart) + combineParallel(t, 10, testCombineOptOnStopOptOnStart) } func testCombineOptOnStopOptOnStart(t *testing.T, actorsCount int) { @@ -93,13 +108,14 @@ func testCombineOptOnStopOptOnStart(t *testing.T, actorsCount int) { func Test_Combine_StoppingOnce(t *testing.T) { t.Parallel() - testCombineStoppingOnce(t, 1) - testCombineStoppingOnce(t, 2) - testCombineStoppingOnce(t, 10) - testCombineStoppingOnce(t, 20) + combineParallel(t, 0, testCombineStoppingOnce) + combineParallel(t, 1, testCombineStoppingOnce) + combineParallel(t, 2, testCombineStoppingOnce) + combineParallel(t, 10, testCombineStoppingOnce) + combineParallel(t, 20, testCombineStoppingOnce) } -func testCombineStoppingOnce(t *testing.T, actorsCount int32) { +func testCombineStoppingOnce(t *testing.T, actorsCount int) { t.Helper() c := atomic.Int32{} @@ -130,7 +146,7 @@ func testCombineStoppingOnce(t *testing.T, actorsCount int32) { close(stopConcurrentlyFinishedC) drainC(stopFinsihedC, stopCallsCount) - assert.Equal(t, actorsCount, c.Load()) + assert.Equal(t, actorsCount, int(c.Load())) } // Test_Combine_OptStopTogether asserts that all actors will end as soon @@ -141,9 +157,9 @@ func Test_Combine_OptStopTogether(t *testing.T) { // no need to test OptStopTogether for actors count < 2 // because single actor is always "stopped together" - testCombineOptStopTogether(t, 1) - testCombineOptStopTogether(t, 2) - testCombineOptStopTogether(t, 10) + combineParallel(t, 1, testCombineOptStopTogether) + combineParallel(t, 2, testCombineOptStopTogether) + combineParallel(t, 10, testCombineOptStopTogether) } func testCombineOptStopTogether(t *testing.T, actorsCount int) { From af7094ee3a6a00b8ad12b2613f8ce6499cc771a0 Mon Sep 17 00:00:00 2001 From: vlado Date: Fri, 31 Jan 2025 16:43:04 +0200 Subject: [PATCH 3/3] combine: add Test_Combine_OthersNotStopped --- actor/combine_test.go | 47 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/actor/combine_test.go b/actor/combine_test.go index af1967b..aac2be5 100644 --- a/actor/combine_test.go +++ b/actor/combine_test.go @@ -4,6 +4,7 @@ import ( "fmt" "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" @@ -183,6 +184,52 @@ func testCombineOptStopTogether(t *testing.T, actorsCount int) { } } +// Test_Combine_OthersNotStopped asserts that if any of underlaying +// actors end it wont affect other actors. This is default behavior when +// `OptStopTogether` is not provided. +func Test_Combine_OthersNotStopped(t *testing.T) { + t.Parallel() + + combineParallel(t, 1, testCombineOthersNotStopped) + combineParallel(t, 2, testCombineOthersNotStopped) + combineParallel(t, 10, testCombineOthersNotStopped) +} + +func testCombineOthersNotStopped(t *testing.T, actorsCount int) { + t.Helper() + + for i := range actorsCount { + onStartC := make(chan any, actorsCount) + onStopC := make(chan any, actorsCount) + onStart := OptOnStart(func(Context) { onStartC <- `🌞` }) + onStop := OptOnStop(func() { onStopC <- `🌚` }) + actors := createActors(actorsCount, onStart, onStop) + + a := Combine(actors...).WithOptions().Build() + + a.Start() + drainC(onStartC, actorsCount) + + // stop actors indiviually, and expect that after some time + // there wont be any other actors stopping. + stopCount := i + 1 + for j := range stopCount { + actors[j].Stop() + } + + drainC(onStopC, stopCount) + + select { + case <-onStartC: + t.Fatal("should not have any more stopped actors") + case <-time.After(time.Millisecond * 20): + } + + a.Stop() + drainC(onStopC, actorsCount-stopCount) + } +} + func Test_Combine_OptOnStop_AfterActorStops(t *testing.T) { t.Parallel()