From a83f6507748c6771e67c885d6fd9adbe859a9344 Mon Sep 17 00:00:00 2001 From: vladopajic Date: Mon, 17 Feb 2025 12:29:29 +0100 Subject: [PATCH 1/2] fix(mailbox): potentional data not received when stopped while concurrently sending (#108) --- actor/helpers_test.go | 3 -- actor/mailbox.go | 23 +++++++++--- actor/mailbox_test.go | 86 +++++++++++++++++++++++++------------------ 3 files changed, 69 insertions(+), 43 deletions(-) diff --git a/actor/helpers_test.go b/actor/helpers_test.go index 59cf7a5..eb90c4d 100644 --- a/actor/helpers_test.go +++ b/actor/helpers_test.go @@ -1,7 +1,6 @@ package actor_test import ( - "fmt" "io" "testing" @@ -31,8 +30,6 @@ func (r errReader) Read(b []byte) (int, error) { return 0, io.ErrUnexpectedEOF } -func tostr(v any) string { return fmt.Sprintf("%v", v) } - type delegateActor struct { start func() stop func() diff --git a/actor/mailbox.go b/actor/mailbox.go index 0d9a9b3..d651d36 100644 --- a/actor/mailbox.go +++ b/actor/mailbox.go @@ -195,6 +195,7 @@ func newMailbox[T any](options optionsMailbox) *mailbox[T] { actor: New(newMailboxWorker(sendC, receiveC, options)), sendC: sendC, receiveC: receiveC, + stopSigC: make(chan struct{}), state: &atomic.Int32{}, } } @@ -203,6 +204,7 @@ type mailbox[T any] struct { actor Actor sendC chan T receiveC <-chan T + stopSigC chan struct{} state *atomic.Int32 } @@ -214,6 +216,7 @@ func (m *mailbox[T]) Start() { func (m *mailbox[T]) Stop() { if m.state.CompareAndSwap(mbxStateRunning, mbxStateStopped) { + close(m.stopSigC) m.actor.Stop() } } @@ -227,6 +230,10 @@ func (m *mailbox[T]) Send(ctx Context, msg T) error { } select { + case <-m.stopSigC: + // this block can potentially not be covered with tests because of race condition. + // it can cause flakiness with CI. + return fmt.Errorf("Mailbox.Send canceled: %w", ErrMailboxStopped) case <-ctx.Done(): return fmt.Errorf("Mailbox.Send canceled: %w", ctx.Err()) case m.sendC <- msg: @@ -290,16 +297,22 @@ func (w *mailboxWorker[T]) DoWork(ctx Context) WorkerStatus { } func (w *mailboxWorker[T]) OnStop() { - close(w.sendC) - - // receiveC channel needs to receive after all data from queue before closing + // receiveC channel needs to receive all data before closing if w.options.StopAfterReceivingAll { + // first: receive data from queue for !w.queue.IsEmpty() { w.receiveC <- w.queue.PopFront() } - for msg := range w.sendC { - w.receiveC <- msg + // second: received data from sendC + sendCRead: + for { + select { + case d := <-w.sendC: + w.receiveC <- d + default: + break sendCRead + } } } diff --git a/actor/mailbox_test.go b/actor/mailbox_test.go index 7931794..34f7cd7 100644 --- a/actor/mailbox_test.go +++ b/actor/mailbox_test.go @@ -2,6 +2,7 @@ package actor_test import ( "sync" + "sync/atomic" "testing" "time" @@ -144,63 +145,78 @@ func Test_Mailbox_MessageQueue(t *testing.T) { } // Test asserts that Mailbox will end only after all messages have been received. +// +//nolint:maintidx // relax func Test_Mailbox_OptEndAfterReceivingAll(t *testing.T) { t.Parallel() - const messagesCount = 1000 + const initialMessagesCount = 1000 - sendMessages := func(m Mailbox[any]) { + sentMessages := atomic.Int64{} + sendMessages := func(m Mailbox[int64]) { t.Helper() - for i := range messagesCount { - assert.NoError(t, m.Send(ContextStarted(), `🥥`+tostr(i))) + for i := range initialMessagesCount { + assert.NoError(t, m.Send(ContextStarted(), int64(i))) } + + sentMessages.Add(initialMessagesCount) } - assertGotAllMessages := func(m Mailbox[any]) { + sendMessagesConcurrentlyWithStop := func(m Mailbox[int64]) { t.Helper() - gotMessages := 0 - - for msg := range m.ReceiveC() { - assert.Equal(t, `🥥`+tostr(gotMessages), msg) + for { + id := sentMessages.Add(1) - 1 // -1 because we need old value - gotMessages++ + err := m.Send(ContextStarted(), id) + if err != nil { + sentMessages.Add(-1) // because msg was not sent + return + } } - - assert.Equal(t, messagesCount, gotMessages) } + assertGotAllMessages := func(m Mailbox[int64]) <-chan int { + t.Helper() - t.Run("the-best-way", func(t *testing.T) { - t.Parallel() + resultC := make(chan int, 1) + allMsgs := make(map[int64]struct{}) - m := NewMailbox[any](OptStopAfterReceivingAll()) - m.Start() - sendMessages(m) + go func() { + for msg := range m.ReceiveC() { + allMsgs[msg] = struct{}{} + } - // Stop has to be called in goroutine because Stop is blocking until - // actor (mailbox) has fully ended. And current thread of execution is needed - // to read data from mailbox. - go m.Stop() + resultC <- len(allMsgs) + }() - assertGotAllMessages(m) - }) + return resultC + } - t.Run("suboptimal-way", func(t *testing.T) { - t.Parallel() + m := NewMailbox[int64](OptStopAfterReceivingAll()) + m.Start() - m := NewMailbox[any](OptStopAfterReceivingAll()) - m.Start() - sendMessages(m) + // send some messages to fill queue + sendMessages(m) - // This time we start goroutine which will read all messages from mailbox instead of - // stopping in separate goroutine. - // There are no guaranies that this goroutine will finish after Stop is called, so - // it could be the case that this goroutine has received all messages from mailbox, - // even before mailbox was stopped. Which wouldn't correctly assert this feature. - go assertGotAllMessages(m) + // before Stop is called, we are going to send messages (concurrently with Stop), + // because we want to ensure that all those messages will be received as well. + for range 20 { + go sendMessagesConcurrentlyWithStop(m) + } + go func() { + // Small sleep is needed because we want to give goroutine from above + // greater chances to be running and sending messages when Stop() is called + time.Sleep(time.Millisecond * 300) //nolint:forbidigo // explained m.Stop() - }) + }() + + gotMessages := <-assertGotAllMessages(m) + + assert.Equal(t, int(sentMessages.Load()), gotMessages) + // must ensure to get more messages then initially sent in order to + // be sure that messages have been send concurrently with Stop + assert.Greater(t, gotMessages, initialMessagesCount) } // Test asserts mailbox invariants when `OptAsChan()` option is used. From 45d8d5c19e6283dd7e24726104d7c28ca570dbc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Mon, 17 Feb 2025 15:44:41 +0100 Subject: [PATCH 2/2] chore(mailbox): add comment and cosmetics --- actor/mailbox.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/actor/mailbox.go b/actor/mailbox.go index d651d36..473ec59 100644 --- a/actor/mailbox.go +++ b/actor/mailbox.go @@ -297,6 +297,11 @@ func (w *mailboxWorker[T]) DoWork(ctx Context) WorkerStatus { } func (w *mailboxWorker[T]) OnStop() { + // close receiveC, after receiving all data, + // so everyone reading from this mailbox can be + // notified that no more messages will ever be received. + defer close(w.receiveC) + // receiveC channel needs to receive all data before closing if w.options.StopAfterReceivingAll { // first: receive data from queue @@ -315,6 +320,4 @@ func (w *mailboxWorker[T]) OnStop() { } } } - - close(w.receiveC) }