Skip to content

Commit bf44386

Browse files
TestKit: synchronous TestActor start (#7772)
* Force synchronous start for `TestActor` fix #7770 * separate creation of implicit, default `TestActor` from additional ones * force `TestActor` to start via CTD tweak instead * don't wait for `TestActor` to start * Revert "don't wait for `TestActor` to start" This reverts commit bdd77f9. * run default `TestActor` without `CallingThreadDispatcher` * fix TestKit deadlock during parallel test execution This commit resolves a deadlock that occurs when running tests in parallel, where the initial TestActor creation gets stuck during async initialization with CallingThreadDispatcher. The root cause was that SystemActorOf hardcodes async=true initialization, creating a RepointableActorRef that requires processing a Supervise system message. With CallingThreadDispatcher, this creates a circular dependency: - TestKit constructor blocks waiting for TestActor initialization - CallingThreadDispatcher only runs on the calling thread - The calling thread is blocked, so Supervise message never gets processed The solution bypasses SystemActorOf and directly calls AttachChild with async=false, enabling true synchronous initialization while preserving full system integration including supervision tree and mailbox configuration. This maintains compatibility with CallingThreadDispatcher for deterministic testing while eliminating startup deadlocks in parallel test scenarios. Resolves issue where TestProbe child actor creation and implicit sender functionality would fail due to incomplete TestActor initialization. * Fix TestKit serialization issue - Use AttachChild with isSystemService=true to exempt TestActor from serialization verification - Resolves 700+ test failures caused by UnboundedChannelWriter serialization errors * still working on synchronous `TestActor` startup * Fix TestKit deadlock during parallel test execution Resolves deadlock that occurs when TestKit instances are created in parallel and actors try to interact with TestActor during initialization. The issue was caused by CallingThreadDispatcher creating RepointableActorRef which requires async initialization, leading to deadlocks. Changes: - Add AttachChildWithAsync internal method to ActorCell to control sync/async actor creation - Modify TestKitBase to create TestActor synchronously (LocalActorRef) instead of async (RepointableActorRef) - Update Xunit/Xunit2 TestKits to create logger actors synchronously - Replace Ask with Tell for logger initialization to avoid synchronous wait deadlocks - Add InternalsVisibleTo for Xunit TestKits to access internal Akka methods - Maintain LoggerInitialized response for protocol compatibility (has IDeadLetterSuppression) Fixes #7770 * added API approvals * remove `EnsureTestActorReady` method * API approvals * ensure calls can't get contaminated with references * fix API approvals * Fix race condition in ParallelTestActorDeadlockSpec The test had a race condition where the PingerActor sends 'ping' to TestActor during PreStart, but the test was expecting 'test-message' first. This could cause ExpectMsgAsync to receive the wrong message and fail. Fixed by properly expecting the 'ping' message first before sending and expecting the 'test-message'.
1 parent c4bd954 commit bf44386

File tree

11 files changed

+200
-66
lines changed

11 files changed

+200
-66
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
## Ignore Visual Studio temporary files, build results, and
22
## files generated by popular Visual Studio add-ons.
33

4+
# Claude configuration
5+
.claude/
6+
47
# User-specific files
58
*.suo
69
*.user

src/contrib/testkits/Akka.TestKit.Xunit/Internals/Loggers.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public TestOutputLogger(ITestOutputHelper output)
3535
Receive<InitializeLogger>(e =>
3636
{
3737
e.LoggingBus.Subscribe(Self, typeof (LogEvent));
38+
// Send response to maintain protocol - LoggerInitialized implements IDeadLetterSuppression
39+
// so it won't interfere with dead letter detection or TestActor message expectations
3840
Sender.Tell(new LoggerInitialized());
3941
});
4042
}

src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
using System;
99
using Akka.Actor;
10+
using Akka.Actor.Internal;
1011
using Akka.Actor.Setup;
1112
using Akka.Configuration;
1213
using Akka.Event;
@@ -170,22 +171,39 @@ protected void InitializeLogger(ActorSystem system)
170171
if (Output == null)
171172
return;
172173

173-
var extSystem = (ExtendedActorSystem)system;
174-
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(Output)), "log-test");
175-
logger.Ask<LoggerInitialized>(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout)
176-
.ConfigureAwait(false).GetAwaiter().GetResult();
174+
var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl");
175+
176+
// Create logger actor synchronously to avoid deadlock during parallel test execution
177+
// Use AttachChildWithAsync with isAsync:false to create LocalActorRef instead of RepointableActorRef
178+
var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
179+
Props.Create(() => new TestOutputLogger(Output)),
180+
isSystemService: true, // Mark as system service
181+
isAsync: false, // Create synchronously to avoid deadlock
182+
name: "log-test");
183+
184+
// Send the initialization message without waiting for response to avoid deadlock
185+
// The logger will subscribe to the event stream when it processes this message
186+
logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender);
177187
}
178188

179189
protected void InitializeLogger(ActorSystem system, string prefix)
180190
{
181191
if (Output == null)
182192
return;
183193

184-
var extSystem = (ExtendedActorSystem)system;
185-
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(
186-
string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))), "log-test");
187-
logger.Ask<LoggerInitialized>(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout)
188-
.ConfigureAwait(false).GetAwaiter().GetResult();
194+
var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl");
195+
196+
// Create logger actor synchronously to avoid deadlock during parallel test execution
197+
var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
198+
Props.Create(() => new TestOutputLogger(
199+
string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))),
200+
isSystemService: true, // Mark as system service
201+
isAsync: false, // Create synchronously to avoid deadlock
202+
name: "log-test");
203+
204+
// Send the initialization message without waiting for response to avoid deadlock
205+
// The logger will subscribe to the event stream when it processes this message
206+
logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender);
189207
}
190208

191209
/// <summary>

src/contrib/testkits/Akka.TestKit.Xunit2/Internals/Loggers.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public TestOutputLogger(ITestOutputHelper output)
3535
Receive<InitializeLogger>(e =>
3636
{
3737
e.LoggingBus.Subscribe(Self, typeof (LogEvent));
38+
// Send response to maintain protocol - LoggerInitialized implements IDeadLetterSuppression
39+
// so it won't interfere with dead letter detection or TestActor message expectations
3840
Sender.Tell(new LoggerInitialized());
3941
});
4042
}

src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System;
99
using System.Threading.Tasks;
1010
using Akka.Actor;
11+
using Akka.Actor.Internal;
1112
using Akka.Actor.Setup;
1213
using Akka.Configuration;
1314
using Akka.Event;
@@ -140,30 +141,39 @@ protected void InitializeLogger(ActorSystem system)
140141
{
141142
if (Output != null)
142143
{
143-
var extSystem = (ExtendedActorSystem)system;
144-
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(Output)), "log-test");
145-
// Start the logger initialization task but don't wait for it yet
146-
var loggerTask = logger.Ask<LoggerInitialized>(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout);
144+
var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl");
147145

148-
// By the time TestActor is ready (which happens in base constructor),
149-
// the logger is likely ready too. Now we can safely wait.
150-
loggerTask.ConfigureAwait(false).GetAwaiter().GetResult();
146+
// Create logger actor synchronously to avoid deadlock during parallel test execution
147+
// Use AttachChildWithAsync with isAsync:false to create LocalActorRef instead of RepointableActorRef
148+
var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
149+
Props.Create(() => new TestOutputLogger(Output)),
150+
isSystemService: true, // Mark as system service
151+
isAsync: false, // Create synchronously to avoid deadlock
152+
name: "log-test");
153+
154+
// Send the initialization message without waiting for response to avoid deadlock
155+
// The logger will subscribe to the event stream when it processes this message
156+
logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender);
151157
}
152158
}
153159

154160
protected void InitializeLogger(ActorSystem system, string prefix)
155161
{
156162
if (Output != null)
157163
{
158-
var extSystem = (ExtendedActorSystem)system;
159-
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(
160-
string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))), "log-test");
161-
// Start the logger initialization task but don't wait for it yet
162-
var loggerTask = logger.Ask<LoggerInitialized>(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout);
164+
var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl");
165+
166+
// Create logger actor synchronously to avoid deadlock during parallel test execution
167+
var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
168+
Props.Create(() => new TestOutputLogger(
169+
string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))),
170+
isSystemService: true, // Mark as system service
171+
isAsync: false, // Create synchronously to avoid deadlock
172+
name: "log-test");
163173

164-
// By the time TestActor is ready (which happens in base constructor),
165-
// the logger is likely ready too. Now we can safely wait.
166-
loggerTask.ConfigureAwait(false).GetAwaiter().GetResult();
174+
// Send the initialization message without waiting for response to avoid deadlock
175+
// The logger will subscribe to the event stream when it processes this message
176+
logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender);
167177
}
168178
}
169179

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Streams.Tests")]
2828
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit")]
2929
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Tests")]
30+
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit")]
31+
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit2")]
3032
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests")]
3133
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests.Performance")]
3234
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)]

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Streams.Tests")]
2828
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit")]
2929
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Tests")]
30+
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit")]
31+
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit2")]
3032
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests")]
3133
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests.Performance")]
3234
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)]
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Akka.Actor;
5+
using Xunit;
6+
using Xunit.Abstractions;
7+
8+
namespace Akka.TestKit.Tests.TestActorRefTests
9+
{
10+
public class ParallelTestActorDeadlockSpec
11+
{
12+
private readonly ITestOutputHelper _output;
13+
14+
public ParallelTestActorDeadlockSpec(ITestOutputHelper output)
15+
{
16+
_output = output;
17+
}
18+
19+
// This test reproduces the deadlock that occurs in Akka.Hosting.TestKit
20+
// when multiple TestKits start up in parallel and actors try to interact
21+
// with TestActor during initialization.
22+
//
23+
// Related issues:
24+
// - https://github.com/akkadotnet/akka.net/issues/7770
25+
// - https://github.com/akkadotnet/Akka.Hosting/pull/643
26+
[Fact(Timeout = 20000)]
27+
public async Task Parallel_TestKit_startup_should_not_deadlock()
28+
{
29+
var concurrentTests = 40; // High parallelism to trigger the issue
30+
31+
var tasks = Enumerable.Range(0, concurrentTests)
32+
.Select(_ => Task.Run(RunOneTestKit))
33+
.ToArray();
34+
35+
await Task.WhenAll(tasks);
36+
37+
async Task RunOneTestKit()
38+
{
39+
await Task.Run(async () =>
40+
{
41+
var id = Guid.NewGuid().ToString("N").Substring(0, 8);
42+
try
43+
{
44+
_output.WriteLine($"[{id}] Creating TestKit...");
45+
// Create TestKit synchronously like a normal test would
46+
using var testKit = new Akka.TestKit.Xunit2.TestKit($"test-{id}", output: _output);
47+
_output.WriteLine($"[{id}] TestKit created");
48+
49+
// Simulate what happens in Akka.Hosting - actor creation during startup
50+
// that tries to interact with TestActor
51+
_output.WriteLine($"[{id}] Creating PingerActor...");
52+
var actor = testKit.Sys.ActorOf(Props.Create(() => new PingerActor(testKit.TestActor)));
53+
_output.WriteLine($"[{id}] PingerActor created");
54+
55+
// Expect the "ping" message from PingerActor's PreStart
56+
await testKit.ExpectMsgAsync<string>("ping", TimeSpan.FromSeconds(2));
57+
_output.WriteLine($"[{id}] Received ping from PingerActor");
58+
59+
// Now verify the TestKit is working normally
60+
_output.WriteLine($"[{id}] Sending test message...");
61+
testKit.TestActor.Tell("test-message");
62+
await testKit.ExpectMsgAsync<string>("test-message", TimeSpan.FromSeconds(2));
63+
_output.WriteLine($"[{id}] Test completed successfully");
64+
}
65+
catch (Exception ex)
66+
{
67+
_output.WriteLine($"[{id}] Failed: {ex.Message}");
68+
throw;
69+
}
70+
});
71+
}
72+
}
73+
74+
private class PingerActor : ActorBase
75+
{
76+
private readonly IActorRef _testActor;
77+
78+
public PingerActor(IActorRef testActor)
79+
{
80+
_testActor = testActor;
81+
}
82+
83+
protected override bool Receive(object message) => false;
84+
85+
protected override void PreStart()
86+
{
87+
// This simulates what StartupPinger does in Akka.Hosting
88+
// Sending a message to TestActor during actor initialization
89+
_testActor.Tell("ping");
90+
}
91+
}
92+
}
93+
}

src/core/Akka.TestKit/TestKitBase.cs

Lines changed: 26 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ protected TestKitBase(ITestKitAssertions assertions, ActorSystem system, ActorSy
116116
{
117117
_assertions = assertions ?? throw new ArgumentNullException(nameof(assertions), "The supplied assertions must not be null.");
118118

119+
// ReSharper disable once VirtualMemberCallInConstructor
119120
InitializeTest(system, config, actorSystemName, testActorName);
120121
}
121122

@@ -170,10 +171,11 @@ protected virtual void InitializeTest(ActorSystem system, ActorSystemSetup confi
170171
if (string.IsNullOrEmpty(testActorName))
171172
testActorName = "testActor" + _testActorId.IncrementAndGet();
172173

173-
var testActor = CreateTestActor(system, testActorName);
174+
var testActor = CreateInitialTestActor(system, testActorName);
174175

175-
// Wait for the testactor to start
176-
WaitUntilTestActorIsReady(testActor, _testState.TestKitSettings);
176+
// For async initialization, don't wait in constructor to avoid deadlock
177+
// The TestActor property getter will ensure it's ready when first accessed
178+
_testState.TestActor = testActor;
177179

178180
if (this is not INoImplicitSender)
179181
{
@@ -187,45 +189,6 @@ protected virtual void InitializeTest(ActorSystem system, ActorSystemSetup confi
187189
}
188190
SynchronizationContext.SetSynchronizationContext(
189191
new ActorCellKeepingSynchronizationContext(InternalCurrentActorCellKeeper.Current));
190-
191-
_testState.TestActor = testActor;
192-
}
193-
194-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
195-
// Do not convert this method to async, it is being called inside the constructor.
196-
private static void WaitUntilTestActorIsReady(IActorRef testActor, TestKitSettings settings)
197-
{
198-
var deadline = settings.TestKitStartupTimeout;
199-
var stopwatch = Stopwatch.StartNew();
200-
var ready = false;
201-
202-
try
203-
{
204-
// TestActor should start almost instantly (microseconds).
205-
// Use SpinWait which will spin for ~10-20 microseconds then yield.
206-
var spinWait = new SpinWait();
207-
208-
while (stopwatch.Elapsed < deadline)
209-
{
210-
ready = testActor is not IRepointableRef repRef || repRef.IsStarted;
211-
if (ready) break;
212-
213-
// SpinWait automatically handles the progression:
214-
// - First ~10 iterations: tight spin loop (microseconds)
215-
// - Next iterations: Thread.Yield()
216-
// - Later: Thread.Sleep(0)
217-
// - Finally: Thread.Sleep(1)
218-
// This is optimal for both fast startup and system under load
219-
spinWait.SpinOnce();
220-
}
221-
}
222-
finally
223-
{
224-
stopwatch.Stop();
225-
}
226-
227-
if (!ready)
228-
throw new Exception("Timeout waiting for test actor to be ready");
229192
}
230193

231194
/// <summary>
@@ -710,10 +673,31 @@ public IActorRef CreateTestActor(string name)
710673
return CreateTestActor(_testState.System, name);
711674
}
712675

676+
private IActorRef CreateInitialTestActor(ActorSystem system, string name)
677+
{
678+
// Fix both serialization and deadlock issues:
679+
// 1. Use isSystemService=true to skip serialization checks
680+
// 2. Use isAsync=false to create LocalActorRef synchronously (avoids RepointableActorRef deadlock)
681+
var testActorProps = Props.Create(() => new InternalTestActor(_testState.Queue))
682+
.WithDispatcher("akka.test.test-actor.dispatcher");
683+
684+
var systemImpl = system.AsInstanceOf<ActorSystemImpl>();
685+
// Use the new AttachChildWithAsync method to create TestActor synchronously
686+
var testActor = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
687+
testActorProps,
688+
isSystemService: true, // Skip serialization checks
689+
isAsync: false, // Create synchronously to avoid deadlock
690+
name: name);
691+
692+
return testActor;
693+
}
694+
713695
private IActorRef CreateTestActor(ActorSystem system, string name)
714696
{
715697
var testActorProps = Props.Create(() => new InternalTestActor(_testState.Queue))
716698
.WithDispatcher("akka.test.test-actor.dispatcher");
699+
700+
// For additional test actors, always use the standard SystemActorOf
717701
var testActor = system.AsInstanceOf<ActorSystemImpl>().SystemActorOf(testActorProps, name);
718702
return testActor;
719703
}

src/core/Akka/Actor/ActorCell.Children.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,22 @@ public virtual IActorRef AttachChild(Props props, bool isSystemService, string?
9898
{
9999
return MakeChild(props, name == null ? GetRandomActorName() : CheckName(name), true, isSystemService);
100100
}
101+
102+
/// <summary>
103+
/// INTERNAL API
104+
///
105+
/// Attaches a child actor with explicit control over async initialization.
106+
/// Used by TestKit to create TestActors synchronously to avoid deadlocks.
107+
/// </summary>
108+
/// <param name="props">The <see cref="Props"/> this child actor will use.</param>
109+
/// <param name="isSystemService">If <c>true</c>, then this actor is a system actor and skips serialization checks.</param>
110+
/// <param name="isAsync">If <c>true</c>, creates RepointableActorRef with async init. If <c>false</c>, creates LocalActorRef synchronously.</param>
111+
/// <param name="name">The name of the actor being started. Can be <c>null</c> for auto-generated name.</param>
112+
/// <returns>A reference to the initialized child actor.</returns>
113+
internal IActorRef AttachChildWithAsync(Props props, bool isSystemService, bool isAsync, string? name = null)
114+
{
115+
return MakeChild(props, name == null ? GetRandomActorName() : CheckName(name), isAsync, isSystemService);
116+
}
101117

102118
/// <summary>
103119
/// TBD

0 commit comments

Comments
 (0)