diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 3eb1b53..520a51b 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -14,7 +14,7 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v6 with: - version: v1.60.3 + version: v1.63.4 - id: govulncheck uses: golang/govulncheck-action@v1 - name: go mod tidy check diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 92946b2..4f2a3f1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -59,8 +59,7 @@ jobs: body: | go-test-coverage report: ``` - ${{ fromJSON(steps.coverage.outputs.report) }} - ``` + ${{ fromJSON(steps.coverage.outputs.report) }}``` edit-mode: replace - name: "finally check coverage" diff --git a/.github/workflows/test_experimental.yml b/.github/workflows/test_experimental.yml new file mode 100644 index 0000000..1572e86 --- /dev/null +++ b/.github/workflows/test_experimental.yml @@ -0,0 +1,24 @@ +name: test experimental +on: [push] +permissions: + contents: write + pull-requests: write +jobs: + experimental: + name: experimental + runs-on: ubuntu-latest + steps: + - name: checkout + uses: actions/checkout@v4 + - name: setup go + uses: actions/setup-go@v5 + with: + go-version: 1.24 + + - name: replace go version + run: sed -i 's/go 1.22/go 1.24/g' go.mod + + - name: test + run: | + go env -w GOEXPERIMENT=synctest + go test -tags "experimental" -run Experimental$ -timeout=10s -race -count=10 -shuffle=on -failfast -v ./... \ No newline at end of file diff --git a/.golangci.yml b/.golangci.yml index daed2da..037da03 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,6 +1,3 @@ -run: - deadline: 5m - linters: enable: - asasalint @@ -22,6 +19,7 @@ linters: - errname - errorlint - exhaustive + - exptostd - forbidigo - forcetypeassert - funlen @@ -45,6 +43,7 @@ linters: - gosimple - govet - grouper + - iface - importas - inamedparam - interfacebloat @@ -58,6 +57,7 @@ linters: - nakedret - nestif - nilerr + - nilnesserr - nilnil - nlreturn - noctx @@ -71,6 +71,7 @@ linters: - promlinter - protogetter - reassign + - recvcheck - revive - sloglint - sqlclosecheck @@ -87,6 +88,7 @@ linters: - unconvert - unparam - unused + - usetesting - usestdlibvars - wastedassign - whitespace @@ -155,6 +157,10 @@ linters-settings: enable-all: true disable: - fieldalignment + iface: + enable: + - identical + - unused lll: line-length: 90 tab-width: 1 diff --git a/Makefile b/Makefile index c6ee41c..b6fc1ee 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ GO ?= go GOBIN ?= $$($(GO) env GOPATH)/bin GOLANGCI_LINT ?= $(GOBIN)/golangci-lint -GOLANGCI_LINT_VERSION ?= v1.60.3 +GOLANGCI_LINT_VERSION ?= v1.63.4 TEST_COVERAGE ?= $(GOBIN)/go-test-coverage .PHONY: install-golangcilint @@ -16,12 +16,13 @@ lint: install-golangcilint # Runs benchmark on entire repo .PHONY: benchmark benchmark: - go test -benchmem -count 5 -run=^# -bench=. github.com/vladopajic/go-actor/actor + go version + go test -bench=. github.com/vladopajic/go-actor/actor -run=^# -count 5 -benchmem # Runs tests on entire repo .PHONY: test test: - go test -timeout=3s -race -count=10 -shuffle=on -failfast ./... + go test -timeout=10s -race -count=10 -shuffle=on -failfast ./... # Code tidy .PHONY: tidy diff --git a/README.md b/README.md index 7316b07..e2d0d95 100644 --- a/README.md +++ b/README.md @@ -112,7 +112,6 @@ func (w *consumerWorker) DoWork(ctx actor.Context) actor.WorkerStatus { } ``` - ## Add-ons While `go-actor` is designed to be a minimal library with lean interfaces, developers can extend its functionality with domain-specific add-ons. Some notable add-ons include: @@ -129,6 +128,10 @@ To enhance code quality in projects that heavily rely on the actor model with `g You can find detailed design decisions [here](./docs/design_decisions.md). +## Benchmarks + +See library benchmarks [here](./docs/benchmarks.md). + ## Versioning diff --git a/actor/actor_benchmark_test.go b/actor/actor_benchmark_test.go new file mode 100644 index 0000000..e63bee7 --- /dev/null +++ b/actor/actor_benchmark_test.go @@ -0,0 +1,53 @@ +package actor_test + +import ( + "testing" + + . "github.com/vladopajic/go-actor/actor" +) + +func BenchmarkActorProducerToConsumer(b *testing.B) { + produceNext := int(0) + doneC := make(chan struct{}) + + // intentionally using option as chan with large capacity as it has + // the best performance. + mbx := NewMailbox[any](OptAsChan(), OptCapacity(largeCap)) + + producer := New(NewWorker(func(c Context) WorkerStatus { + select { + case <-c.Done(): + return WorkerEnd + + default: + produceNext++ + mbx.Send(c, produceNext) //nolint:errcheck // error should never happen + + if b.N == produceNext { + return WorkerEnd + } + + return WorkerContinue + } + })) + + consumer := New(NewWorker(func(c Context) WorkerStatus { + select { + case <-c.Done(): + return WorkerEnd + + case rcv := <-mbx.ReceiveC(): + if b.N == rcv { + close(doneC) + } + + return WorkerContinue + } + })) + + a := Combine(mbx, producer, consumer).Build() + a.Start() + defer a.Stop() + + <-doneC +} diff --git a/actor/actor_test.go b/actor/actor_test.go index 7a51dc5..2abb7f4 100644 --- a/actor/actor_test.go +++ b/actor/actor_test.go @@ -32,7 +32,7 @@ func Test_NewWorker(t *testing.T) { } } -// Test asserts basic Actor functions +// Test asserts that Actor will execute underlying worker. func Test_Actor_New(t *testing.T) { t.Parallel() @@ -42,7 +42,7 @@ func Test_Actor_New(t *testing.T) { a.Start() // Asset that worker is going to be executed by actor - assertDoWork(t, w.doWorkC, 0) + assertDoWork(t, w.doWorkC) a.Stop() @@ -50,46 +50,47 @@ func Test_Actor_New(t *testing.T) { assertNoWork(t, w.doWorkC) } -// Test asserts that restarting actor will no impact on worker execution. +// Test asserts that restarting actor will not impact worker's state. func Test_Actor_Restart(t *testing.T) { t.Parallel() w := newWorker() a := New(w) - for i := range 20 { + for i := range 10 { a.Start() - - assertDoWork(t, w.doWorkC, i*workIterationsPerAssert) - + assertDoWorkWithStart(t, w.doWorkC, i*workIterationsPerAssert) a.Stop() + assertNoWork(t, w.doWorkC) } +} - AssertStartStopAtRandom(t, a) +// Test with AssertStartStopAtRandom. +func Test_Actor_StartStopAtRandom(t *testing.T) { + t.Parallel() + + AssertStartStopAtRandom(t, New(newWorker())) } -// Test asserts that nothing should happen if -// Start() or Stop() methods are called multiple times. +// Test asserts that calling Start() and Stop() methods multiple times +// will not have effect. func Test_Actor_MultipleStartStop(t *testing.T) { t.Parallel() const count = 3 - onStartC := make(chan any, count) - onStopC := make(chan any, count) + onStartC, onStartFn := createOnStartOption(t, count) + onStopC, onStopFn := createOnStopOption(t, count) w := newWorker() - a := New(w, - OptOnStart(func(Context) { onStartC <- `🌞` }), - OptOnStop(func() { onStopC <- `🌚` }), - ) + a := New(w, OptOnStart(onStartFn), OptOnStop(onStopFn)) // Calling Start() multiple times should have same effect as calling it once for range count { a.Start() } - assertDoWork(t, w.doWorkC, 0) + assertDoWork(t, w.doWorkC) // Calling Stop() multiple times should have same effect as calling it once for range count { @@ -106,15 +107,10 @@ func Test_Actor_MultipleStartStop(t *testing.T) { func Test_Actor_OnStartOnStop(t *testing.T) { t.Parallel() - readySigC := make(chan any) - onStartC, onStopC := make(chan any, 1), make(chan any, 1) - onStartFn := func(_ Context) { <-readySigC; onStartC <- `🌞` } - onStopFn := func() { <-readySigC; onStopC <- `🌚` } - { // Nothing should happen when calling OnStart and OnStop // when callbacks are not defined (no panic should occur) - w := NewWorker(func(_ Context) WorkerStatus { return WorkerContinue }) + w := NewWorker(func(Context) WorkerStatus { return WorkerContinue }) a := NewActorImpl(w) a.OnStart() a.OnStop() @@ -134,27 +130,29 @@ func Test_Actor_OnStartOnStop(t *testing.T) { } { // Assert that actor will call callbacks passed by options - w := NewWorker(func(_ Context) WorkerStatus { return WorkerContinue }) + onStartC, onStartFn := createOnStartOption(t, 1) + onStopC, onStopFn := createOnStopOption(t, 1) + w := NewWorker(func(Context) WorkerStatus { return WorkerContinue }) a := NewActorImpl(w, OptOnStart(onStartFn), OptOnStop(onStopFn)) - go a.OnStart() - readySigC <- struct{}{} - + a.OnStart() assert.Equal(t, `🌞`, <-onStartC) assert.Empty(t, onStartC) - go a.OnStop() - readySigC <- struct{}{} - + a.OnStop() assert.Equal(t, `🌚`, <-onStopC) assert.Empty(t, onStopC) } { // Assert that actor will call callbacks implemented by worker, - // then callbacks passed by options + // then callbacks passed by options. + readySigC := make(chan any) + onStartC, onStopC := make(chan any, 1), make(chan any, 1) + onStartOpt := OptOnStart(func(Context) { <-readySigC; onStartC <- `🌞` }) + onStopOpt := OptOnStop(func() { <-readySigC; onStopC <- `🌚` }) w := newWorker() - a := NewActorImpl(w, OptOnStart(onStartFn), OptOnStop(onStopFn)) + a := NewActorImpl(w, onStartOpt, onStopOpt) go a.OnStart() @@ -206,7 +204,7 @@ func Test_Actor_StopAfterWorkerEnded(t *testing.T) { select { case p, ok := <-doWorkC: if !ok { - defer close(workEndedC) + close(workEndedC) return WorkerEnd } @@ -227,7 +225,7 @@ func Test_Actor_StopAfterWorkerEnded(t *testing.T) { a.Start() - assertDoWork(t, doWorkC, 0) + assertDoWork(t, doWorkC) // Closing doWorkC will cause worker to end close(doWorkC) @@ -264,7 +262,7 @@ func Test_Actor_ContextEndedAfterStop(t *testing.T) { a.Start() - assertDoWork(t, w.doWorkC, 0) + assertDoWork(t, w.doWorkC) a.Stop() @@ -371,7 +369,7 @@ func (w *worker) OnStop() { const workIterationsPerAssert = 20 -func assertDoWork(t *testing.T, doWorkC chan chan int, start int) { +func assertDoWorkWithStart(t *testing.T, doWorkC chan chan int, start int) { t.Helper() for i := start; i < workIterationsPerAssert; i++ { @@ -381,14 +379,25 @@ func assertDoWork(t *testing.T, doWorkC chan chan int, start int) { } } +func assertDoWork(t *testing.T, doWorkC chan chan int) { + t.Helper() + + assertDoWorkWithStart(t, doWorkC, 0) +} + func assertNoWork(t *testing.T, doWorkC chan chan int) { t.Helper() p := make(chan int) doWorkC <- p + select { case <-p: assert.FailNow(t, "actor should not be running worker") case <-time.After(time.Millisecond * 20): } + + // drain work request in order to make the same state before calling + // this helper function + <-doWorkC } diff --git a/actor/combine_test.go b/actor/combine_test.go index aac2be5..a9a7c4c 100644 --- a/actor/combine_test.go +++ b/actor/combine_test.go @@ -42,7 +42,7 @@ func testCombineTestSuite(t *testing.T, actorsCount int) { }) } -// Test asserts that all Start and Stop is delegated to all combined actors. +// Test asserts that Start() and Stop() is delegated to all combined actors. func Test_Combine(t *testing.T) { t.Parallel() @@ -55,11 +55,9 @@ func Test_Combine(t *testing.T) { func testCombine(t *testing.T, actorsCount int) { t.Helper() - onStartC := make(chan any, actorsCount) - onStopC := make(chan any, actorsCount) - onStart := OptOnStart(func(Context) { onStartC <- `🌞` }) - onStop := OptOnStop(func() { onStopC <- `🌚` }) - actors := createActors(actorsCount, onStart, onStop) + onStartC, onStartFn := createOnStartOption(t, actorsCount) + onStopC, onStopFn := createOnStopOption(t, actorsCount) + actors := createActors(actorsCount, OptOnStart(onStartFn), OptOnStop(onStopFn)) a := Combine(actors...).Build() @@ -74,6 +72,7 @@ func testCombine(t *testing.T, actorsCount int) { assert.Len(t, onStopC, actorsCount) } +// Test asserts that combined actor will invoke OnStart and OnStop callbacks. func Test_Combine_OptOnStopOptOnStart(t *testing.T) { t.Parallel() @@ -86,12 +85,12 @@ func Test_Combine_OptOnStopOptOnStart(t *testing.T) { func testCombineOptOnStopOptOnStart(t *testing.T, actorsCount int) { t.Helper() - onStatC, onStartOpt := createCombinedOnStartOption(t, 1) - onStopC, onStopOpt := createCombinedOnStopOption(t, 1) + onStartC, onStartFn := createOnStartOption(t, 1) + onStopC, onStopFn := createOnStopOption(t, 1) actors := createActors(actorsCount) a := Combine(actors...). - WithOptions(onStopOpt, onStartOpt). + WithOptions(OptOnStartCombined(onStartFn), OptOnStopCombined(onStopFn)). Build() a.Start() @@ -101,11 +100,13 @@ func testCombineOptOnStopOptOnStart(t *testing.T, actorsCount int) { a.Stop() // should have no effect a.Stop() // should have no effect assert.Equal(t, `🌚`, <-onStopC) - assert.Equal(t, `🌞`, <-onStatC) + assert.Equal(t, `🌞`, <-onStartC) assert.Empty(t, onStopC) - assert.Empty(t, onStatC) + assert.Empty(t, onStartC) } +// Test asserts that combined actor will call Stop() only once on +// combined actors even when Stop() is called concurrently. func Test_Combine_StoppingOnce(t *testing.T) { t.Parallel() @@ -135,17 +136,17 @@ func testCombineStoppingOnce(t *testing.T, actorsCount int) { // Call Stop() multiple times in separate goroutine to force concurrency const stopCallsCount = 100 - stopFinsihedC := make(chan any, stopCallsCount) + stopFinishedC := make(chan any, stopCallsCount) for range stopCallsCount { go func() { a.Stop() - stopFinsihedC <- `🛑` + stopFinishedC <- `🛑` }() } close(stopConcurrentlyFinishedC) - drainC(stopFinsihedC, stopCallsCount) + drainC(stopFinishedC, stopCallsCount) assert.Equal(t, actorsCount, int(c.Load())) } @@ -167,11 +168,9 @@ func testCombineOptStopTogether(t *testing.T, actorsCount int) { t.Helper() for i := range actorsCount { - onStartC := make(chan any, actorsCount) - onStopC := make(chan any, actorsCount) - onStart := OptOnStart(func(Context) { onStartC <- `🌞` }) - onStop := OptOnStop(func() { onStopC <- `🌚` }) - actors := createActors(actorsCount, onStart, onStop) + onStartC, onStartFn := createOnStartOption(t, actorsCount) + onStopC, onStopFn := createOnStopOption(t, actorsCount) + actors := createActors(actorsCount, OptOnStart(onStartFn), OptOnStop(onStopFn)) a := Combine(actors...).WithOptions(OptStopTogether()).Build() @@ -184,7 +183,7 @@ func testCombineOptStopTogether(t *testing.T, actorsCount int) { } } -// Test_Combine_OthersNotStopped asserts that if any of underlaying +// Test_Combine_OthersNotStopped asserts that if any of underlying // actors end it wont affect other actors. This is default behavior when // `OptStopTogether` is not provided. func Test_Combine_OthersNotStopped(t *testing.T) { @@ -199,18 +198,16 @@ func testCombineOthersNotStopped(t *testing.T, actorsCount int) { t.Helper() for i := range actorsCount { - onStartC := make(chan any, actorsCount) - onStopC := make(chan any, actorsCount) - onStart := OptOnStart(func(Context) { onStartC <- `🌞` }) - onStop := OptOnStop(func() { onStopC <- `🌚` }) - actors := createActors(actorsCount, onStart, onStop) + onStartC, onStartFn := createOnStartOption(t, actorsCount) + onStopC, onStopFn := createOnStopOption(t, actorsCount) + actors := createActors(actorsCount, OptOnStart(onStartFn), OptOnStop(onStopFn)) a := Combine(actors...).WithOptions().Build() a.Start() drainC(onStartC, actorsCount) - // stop actors indiviually, and expect that after some time + // stop actors individually, and expect that after some time // there wont be any other actors stopping. stopCount := i + 1 for j := range stopCount { @@ -230,30 +227,40 @@ func testCombineOthersNotStopped(t *testing.T, actorsCount int) { } } -func Test_Combine_OptOnStop_AfterActorStops(t *testing.T) { +// Test asserts that wrapActors is correctly wrapping actors with onStopFunc callback. +func Test_Combine_WrapActors(t *testing.T) { t.Parallel() - const actorsCount = 5 * 2 - - for i := range actorsCount/2 + 1 { - onStopC, onStopOpt := createCombinedOnStopOption(t, 2) - actors := createActors(actorsCount / 2) + stopActorC := make(chan any, 3) + actors := []Actor{ + New(newWorker(), OptOnStop(func() { stopActorC <- "new" })), + Idle(OptOnStop(func() { stopActorC <- "idle" })), + Combine(createActors(10)...). + WithOptions(OptOnStopCombined(func() { stopActorC <- "combine" })). + Build(), + } - // append one more actor to actors list - cmb := Combine(createActors(actorsCount / 2)...).WithOptions(onStopOpt).Build() - actors = append(actors, cmb) + // onStopFunc will write to stopWrappedC channel and result is asserted at end + // of this test. if we assert in this callback then it might be the case that + // function is never called so it will always pass. + stopWrappedC := make(chan any, 3) + onStopFunc := func() { stopWrappedC <- <-stopActorC } - a := Combine(actors...). - WithOptions(onStopOpt, OptStopTogether()). - Build() + wActors := WrapActors(actors, onStopFunc) + // Start then stop wrapped actors + for _, a := range wActors { a.Start() + } - actors[i].Stop() - assert.Equal(t, `🌚`, <-onStopC) - assert.Equal(t, `🌚`, <-onStopC) - a.Stop() // should have no effect + for _, a := range wActors { + a.Stop() } + + // Expect data on stopWrappedC in order in which actors appeared in this list + assert.Equal(t, "new", <-stopWrappedC) + assert.Equal(t, "idle", <-stopWrappedC) + assert.Equal(t, "combine", <-stopWrappedC) } func createActors(count int, opts ...Option) []Actor { @@ -273,33 +280,3 @@ func createActor(i int, opts ...Option) Actor { return Idle(opts...) } - -func createCombinedOnStopOption(t *testing.T, count int) (<-chan any, CombinedOption) { - t.Helper() - - c := make(chan any, count) - fn := func() { - select { - case c <- `🌚`: - default: - t.Fatal("onStopFunc should be called only once") - } - } - - return c, OptOnStopCombined(fn) -} - -func createCombinedOnStartOption(t *testing.T, count int) (<-chan any, CombinedOption) { - t.Helper() - - c := make(chan any, count) - fn := func(_ Context) { - select { - case c <- `🌞`: - default: - t.Fatal("onStart should be called only once") - } - } - - return c, OptOnStartCombined(fn) -} diff --git a/actor/experimental_test.go b/actor/experimental_test.go new file mode 100644 index 0000000..41a715c --- /dev/null +++ b/actor/experimental_test.go @@ -0,0 +1,148 @@ +//go:build experimental +// +build experimental + +package actor_test + +// This file contains experimental tests which utilize "testing/synctest" +// package that elegantly solve race issues which have been previously +// hacked with time.Sleep() + +import ( + "testing" + "testing/synctest" + + "github.com/stretchr/testify/assert" + + . "github.com/vladopajic/go-actor/actor" +) + +// Test asserts that actor should stop after worker +// has signaled that there is no more work via WorkerEnd signal. +func Test_Actor_StopAfterWorkerEnded_Experimental(t *testing.T) { + t.Parallel() + + synctest.Run(func() { + var ctx Context + + workIteration := 0 + doWorkC := make(chan chan int) + workEndedC := make(chan struct{}) + workerFunc := func(c Context) WorkerStatus { + ctx = c + + // assert that DoWork should not be called + // after WorkerEnd signal is returned + select { + case <-workEndedC: + assert.FailNow(t, "worker should be ended") + default: + } + + select { + case p, ok := <-doWorkC: + if !ok { + close(workEndedC) + return WorkerEnd + } + + p <- workIteration + + workIteration++ + + return WorkerContinue + + case <-c.Done(): + // Test should fail if done signal is received from Actor + assert.FailNow(t, "worker should be ended") + return WorkerEnd + } + } + + a := New(NewWorker(workerFunc)) + + a.Start() + + assertDoWork(t, doWorkC) + + // Closing doWorkC will cause worker to end + close(doWorkC) + + // Assert that context is ended after worker ends. + // We use synctest.Wait() to ensure that actor goroutine has ended. + <-workEndedC + synctest.Wait() + assertContextEnded(t, ctx) + + // Stopping actor should produce no effect (since worker has ended) + a.Stop() + + assertContextEnded(t, ctx) + }) +} + +// Test asserts that mailbox `Send()` returns error when sending data is blocked and +// Stop() is simultaneously called. +func Test_Mailbox_AsChan_SendStopped_Experimental(t *testing.T) { + t.Parallel() + + synctest.Run(func() { + m := NewMailbox[any](OptAsChan()) + m.Start() + sendResultC := make(chan error, 1) + + // NOTE: must use NewContext() instead of ContextStarted() because + // later creates channels outside of the bubble. + ctx := NewContext() + + // Start goroutine that will send to mailbox, but since no one is waiting + // to receive data from it should receive stopped error after mailbox is stopped. + + go func() { + sendResultC <- m.Send(ctx, `🌹`) + }() + + synctest.Wait() + m.Stop() // stopping mailbox while there is some goroutines trying to send + + assert.ErrorIs(t, <-sendResultC, ErrMailboxStopped, "Send() should result with error") + + // sending again should result with stopped + assert.ErrorIs(t, m.Send(ctx, `🌹`), ErrMailboxStopped, "Send() should result with error") + }) +} + +func Test_Mailbox_AsChan_SendCanceled_Experimental(t *testing.T) { + t.Parallel() + + synctest.Run(func() { + m := NewMailbox[any](OptAsChan()) + m.Start() + defer m.Stop() + + sendResultC := make(chan error, 1) + + ctx := NewContext() + + // Start goroutine that will send to mailbox, but since no one is waiting + // to receive data from it should receive send cancelled error after context is canceled. + go func() { + sendResultC <- m.Send(ctx, `🌹`) + }() + + synctest.Wait() + ctx.End() + + sendErr := <-sendResultC + assert.Error(t, sendErr) + assert.ErrorIs(t, sendErr, ctx.Err()) + assert.NotErrorIs(t, sendErr, ErrMailboxStopped) + assertReceiveBlocking(t, m) // should not have anything to receive + + // sending again with started context should succeed + go func() { + sendResultC <- m.Send(NewContext(), `🌹`) + }() + assert.Equal(t, `🌹`, <-m.ReceiveC()) + assert.NoError(t, <-sendResultC) + }) +} diff --git a/actor/export_test.go b/actor/export_test.go index 20f082a..e7a1832 100644 --- a/actor/export_test.go +++ b/actor/export_test.go @@ -27,6 +27,13 @@ func (a *ActorImpl) OnStop() { a.onStop() } +func WrapActors( + actors []Actor, + onStopFunc func(), +) []Actor { + return wrapActors(actors, onStopFunc) +} + func NewContext() *context { return newContext() } diff --git a/actor/helpers_test.go b/actor/helpers_test.go index 4e4ca3d..59cf7a5 100644 --- a/actor/helpers_test.go +++ b/actor/helpers_test.go @@ -4,24 +4,9 @@ import ( "fmt" "io" "testing" -) - -type delegateActor struct { - start func() - stop func() -} - -func (a delegateActor) Start() { - if fn := a.start; fn != nil { - fn() - } -} -func (a delegateActor) Stop() { - if fn := a.stop; fn != nil { - fn() - } -} + . "github.com/vladopajic/go-actor/actor" +) func drainC(c <-chan any, count int) { for range count { @@ -47,3 +32,50 @@ func (r errReader) Read(b []byte) (int, error) { } func tostr(v any) string { return fmt.Sprintf("%v", v) } + +type delegateActor struct { + start func() + stop func() +} + +func (a delegateActor) Start() { + if fn := a.start; fn != nil { + fn() + } +} + +func (a delegateActor) Stop() { + if fn := a.stop; fn != nil { + fn() + } +} + +func createOnStopOption(t *testing.T, count int) (<-chan any, func()) { + t.Helper() + + c := make(chan any, count) + fn := func() { + select { + case c <- `🌚`: + default: + t.Fatal("onStop should be called only once") + } + } + + return c, fn +} + +func createOnStartOption(t *testing.T, count int) (<-chan any, func(Context)) { + t.Helper() + + c := make(chan any, count) + fn := func(Context) { + select { + case c <- `🌞`: + default: + t.Fatal("onStart should be called only once") + } + } + + return c, fn +} diff --git a/actor/mailbox_test.go b/actor/mailbox_test.go index 531fca3..7931794 100644 --- a/actor/mailbox_test.go +++ b/actor/mailbox_test.go @@ -10,6 +10,7 @@ import ( . "github.com/vladopajic/go-actor/actor" ) +// Test asserts that FromMailboxes creates single actor using multiple mailboxes. func Test_FromMailboxes(t *testing.T) { t.Parallel() @@ -37,6 +38,7 @@ func Test_FromMailboxes(t *testing.T) { } } +// Test asserts correct behavior of FanOut utility. func Test_FanOut(t *testing.T) { t.Parallel() @@ -90,6 +92,7 @@ func Test_FanOut(t *testing.T) { } } +// Test asserts that MailboxWorker returns `WorkerEnd` when context is canceled. func Test_MailboxWorker_EndSignal(t *testing.T) { t.Parallel() @@ -104,6 +107,7 @@ func Test_MailboxWorker_EndSignal(t *testing.T) { assert.Equal(t, WorkerEnd, w.DoWork(ContextEnded())) } +// Test which runs tests in AssertMailboxInvariantsAsync helper function. func Test_Mailbox_Invariants(t *testing.T) { t.Parallel() @@ -112,6 +116,8 @@ func Test_Mailbox_Invariants(t *testing.T) { }) } +// Test asserts that mailbox will receive (enqueue) +// larger number of messages without blocking. func Test_Mailbox_MessageQueue(t *testing.T) { t.Parallel() @@ -137,7 +143,7 @@ func Test_Mailbox_MessageQueue(t *testing.T) { assertMailboxStopped(t, m) } -// This test asserts that Mailbox will end only after all messages have been received. +// Test asserts that Mailbox will end only after all messages have been received. func Test_Mailbox_OptEndAfterReceivingAll(t *testing.T) { t.Parallel() @@ -197,6 +203,7 @@ func Test_Mailbox_OptEndAfterReceivingAll(t *testing.T) { }) } +// Test asserts mailbox invariants when `OptAsChan()` option is used. func Test_Mailbox_AsChan(t *testing.T) { t.Parallel() @@ -227,15 +234,17 @@ func Test_Mailbox_AsChan(t *testing.T) { }) } +// Test asserts that mailbox `Send()` returns error when sending data is blocked and +// Stop() is simultaneously called. func Test_Mailbox_AsChan_SendStopped(t *testing.T) { t.Parallel() - testDoneC, senderBlockedC := make(chan any), make(chan any) + sendResultC, senderBlockedC := make(chan error, 1), make(chan any) m := NewMailbox[any](OptAsChan()) m.Start() // Start goroutine that will send to mailbox, but since no one is waiting - // to receive data from it should receive sopped error after mailbox is stopped. + // to receive data from it should receive ErrMailboxStopped after mailbox is stopped. go func() { // This goroutine will notify that goroutine doing m.Send has been blocked. go func() { @@ -244,15 +253,16 @@ func Test_Mailbox_AsChan_SendStopped(t *testing.T) { close(senderBlockedC) }() - assert.ErrorIs(t, m.Send(ContextStarted(), `🌹`), ErrMailboxStopped) - close(testDoneC) + sendResultC <- m.Send(ContextStarted(), `🌹`) }() <-senderBlockedC m.Stop() // stopping mailbox wile there is some goroutines trying to send - <-testDoneC + + assert.ErrorIs(t, <-sendResultC, ErrMailboxStopped, "Send() should result with error") } +// AssertMailboxInvariantsAsync is helper functions that asserts mailbox invariants. func AssertMailboxInvariantsAsync(t *testing.T, mFact func() Mailbox[any]) { t.Helper() @@ -376,17 +386,11 @@ func assertReceiveBlocking(t *testing.T, m Mailbox[any]) { func assertSendBlocking(t *testing.T, m Mailbox[any]) { t.Helper() - testDoneSigC := make(chan struct{}) - + sendResultC := make(chan error, 1) ctx := NewContext() go func() { - err := m.Send(ctx, `🌹`) - if err == nil { - assert.FailNow(t, "should not be able to send") //nolint:testifylint // relax - } - - close(testDoneSigC) + sendResultC <- m.Send(ctx, `🌹`) }() // This sleep is necessary to give some time goroutine from above @@ -394,5 +398,8 @@ func assertSendBlocking(t *testing.T, m Mailbox[any]) { time.Sleep(time.Millisecond * 10) //nolint:forbidigo // relax ctx.End() - <-testDoneSigC + sendErr := <-sendResultC + assert.Error(t, sendErr) + assert.ErrorIs(t, sendErr, ctx.Err()) + assert.NotErrorIs(t, sendErr, ErrMailboxStopped) } diff --git a/actor/test_helpers_test.go b/actor/test_helpers_test.go index 01932af..83ccb91 100644 --- a/actor/test_helpers_test.go +++ b/actor/test_helpers_test.go @@ -41,9 +41,9 @@ func Test_AssertWorkerEndSig(t *testing.T) { AssertWorkerEndSig(tw, New(nil)) assert.True(t, tw.hadError) - // Test expected to fail becaue worker didn't return end singal + // Test expected to fail because worker didn't return end signal tw = &tWrapper{T: t} - AssertWorkerEndSig(tw, NewWorker(func(_ Context) WorkerStatus { return WorkerContinue })) + AssertWorkerEndSig(tw, NewWorker(func(Context) WorkerStatus { return WorkerContinue })) assert.True(t, tw.hadError) } @@ -67,7 +67,7 @@ func Test_RandInt32WithReader(t *testing.T) { v[RandInt32WithReader(t, rand.Reader)] = struct{}{} } - assert.GreaterOrEqual(t, len(v), 1000) // should have at least 1000 unque elements + assert.GreaterOrEqual(t, len(v), 1000) // should have at least 1000 unique elements // Test expected to fail because bytes could not be read tw := &tWrapper{T: t} diff --git a/docs/benchmarks.md b/docs/benchmarks.md new file mode 100644 index 0000000..5df13ff --- /dev/null +++ b/docs/benchmarks.md @@ -0,0 +1,39 @@ +# `go-actor` Benchmarks + +Here you can find benchmarks of `go-actor` library. + + +``` +go version +go version go1.22.12 linux/amd64 +go test -bench=. github.com/vladopajic/go-actor/actor -run=^# -count 5 -benchmem +goos: linux +goarch: amd64 +pkg: github.com/vladopajic/go-actor/actor +cpu: Intel(R) Core(TM) i7-10700K CPU @ 3.80GHz +BenchmarkActorProducerToConsumer-16 6257289 197.0 ns/op 8 B/op 0 allocs/op +BenchmarkActorProducerToConsumer-16 6255667 191.5 ns/op 8 B/op 0 allocs/op +BenchmarkActorProducerToConsumer-16 6292658 187.9 ns/op 8 B/op 0 allocs/op +BenchmarkActorProducerToConsumer-16 6224390 205.5 ns/op 8 B/op 0 allocs/op +BenchmarkActorProducerToConsumer-16 6328387 183.9 ns/op 8 B/op 0 allocs/op +BenchmarkMailbox-16 4006864 437.2 ns/op 14 B/op 0 allocs/op +BenchmarkMailbox-16 1298016 871.7 ns/op 12 B/op 0 allocs/op +BenchmarkMailbox-16 2595217 751.7 ns/op 13 B/op 0 allocs/op +BenchmarkMailbox-16 1747360 909.2 ns/op 9 B/op 0 allocs/op +BenchmarkMailbox-16 2028043 1126 ns/op 10 B/op 0 allocs/op +BenchmarkMailboxWithLargeCap-16 1477164 1173 ns/op 0 B/op 0 allocs/op +BenchmarkMailboxWithLargeCap-16 1625610 957.4 ns/op 2 B/op 0 allocs/op +BenchmarkMailboxWithLargeCap-16 1413207 826.2 ns/op 3 B/op 0 allocs/op +BenchmarkMailboxWithLargeCap-16 2883579 368.5 ns/op 5 B/op 0 allocs/op +BenchmarkMailboxWithLargeCap-16 2676229 390.7 ns/op 4 B/op 0 allocs/op +BenchmarkMailboxAsChan-16 3844426 309.7 ns/op 0 B/op 0 allocs/op +BenchmarkMailboxAsChan-16 4264402 917.8 ns/op 0 B/op 0 allocs/op +BenchmarkMailboxAsChan-16 4005238 566.2 ns/op 0 B/op 0 allocs/op +BenchmarkMailboxAsChan-16 1954975 793.2 ns/op 0 B/op 0 allocs/op +BenchmarkMailboxAsChan-16 3280045 618.2 ns/op 0 B/op 0 allocs/op +BenchmarkMailboxAsChanWithLargeCap-16 9353541 121.3 ns/op 0 B/op 0 allocs/op +BenchmarkMailboxAsChanWithLargeCap-16 9558877 115.3 ns/op 0 B/op 0 allocs/op +BenchmarkMailboxAsChanWithLargeCap-16 9771594 113.1 ns/op 0 B/op 0 allocs/op +BenchmarkMailboxAsChanWithLargeCap-16 9749954 112.8 ns/op 0 B/op 0 allocs/op +BenchmarkMailboxAsChanWithLargeCap-16 9487732 116.3 ns/op 0 B/op 0 allocs/op +``` \ No newline at end of file diff --git a/go.mod b/go.mod index e45d710..f0a94a4 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/vladopajic/go-actor -go 1.23 +go 1.22 require ( github.com/gammazero/deque v1.0.0