Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.

# Claude configuration
.claude/

# User-specific files
*.suo
*.user
Expand Down
2 changes: 2 additions & 0 deletions src/contrib/testkits/Akka.TestKit.Xunit/Internals/Loggers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public TestOutputLogger(ITestOutputHelper output)
Receive<InitializeLogger>(e =>
{
e.LoggingBus.Subscribe(Self, typeof (LogEvent));
// Send response to maintain protocol - LoggerInitialized implements IDeadLetterSuppression
// so it won't interfere with dead letter detection or TestActor message expectations
Sender.Tell(new LoggerInitialized());
});
}
Expand Down
36 changes: 27 additions & 9 deletions src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Actor.Setup;
using Akka.Configuration;
using Akka.Event;
Expand Down Expand Up @@ -170,22 +171,39 @@ protected void InitializeLogger(ActorSystem system)
if (Output == null)
return;

var extSystem = (ExtendedActorSystem)system;
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(Output)), "log-test");
logger.Ask<LoggerInitialized>(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout)
.ConfigureAwait(false).GetAwaiter().GetResult();
var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl");

// Create logger actor synchronously to avoid deadlock during parallel test execution
// Use AttachChildWithAsync with isAsync:false to create LocalActorRef instead of RepointableActorRef
var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
Props.Create(() => new TestOutputLogger(Output)),
isSystemService: true, // Mark as system service
isAsync: false, // Create synchronously to avoid deadlock
name: "log-test");

// Send the initialization message without waiting for response to avoid deadlock
// The logger will subscribe to the event stream when it processes this message
logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender);
}

protected void InitializeLogger(ActorSystem system, string prefix)
{
if (Output == null)
return;

var extSystem = (ExtendedActorSystem)system;
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(
string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))), "log-test");
logger.Ask<LoggerInitialized>(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout)
.ConfigureAwait(false).GetAwaiter().GetResult();
var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl");

// Create logger actor synchronously to avoid deadlock during parallel test execution
var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
Props.Create(() => new TestOutputLogger(
string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))),
isSystemService: true, // Mark as system service
isAsync: false, // Create synchronously to avoid deadlock
name: "log-test");

// Send the initialization message without waiting for response to avoid deadlock
// The logger will subscribe to the event stream when it processes this message
logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender);
}

/// <summary>
Expand Down
2 changes: 2 additions & 0 deletions src/contrib/testkits/Akka.TestKit.Xunit2/Internals/Loggers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public TestOutputLogger(ITestOutputHelper output)
Receive<InitializeLogger>(e =>
{
e.LoggingBus.Subscribe(Self, typeof (LogEvent));
// Send response to maintain protocol - LoggerInitialized implements IDeadLetterSuppression
// so it won't interfere with dead letter detection or TestActor message expectations
Sender.Tell(new LoggerInitialized());
});
}
Expand Down
40 changes: 25 additions & 15 deletions src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Actor.Setup;
using Akka.Configuration;
using Akka.Event;
Expand Down Expand Up @@ -140,30 +141,39 @@ protected void InitializeLogger(ActorSystem system)
{
if (Output != null)
{
var extSystem = (ExtendedActorSystem)system;
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(Output)), "log-test");
// Start the logger initialization task but don't wait for it yet
var loggerTask = logger.Ask<LoggerInitialized>(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout);
var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl");

// By the time TestActor is ready (which happens in base constructor),
// the logger is likely ready too. Now we can safely wait.
loggerTask.ConfigureAwait(false).GetAwaiter().GetResult();
// Create logger actor synchronously to avoid deadlock during parallel test execution
// Use AttachChildWithAsync with isAsync:false to create LocalActorRef instead of RepointableActorRef
var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
Props.Create(() => new TestOutputLogger(Output)),
isSystemService: true, // Mark as system service
isAsync: false, // Create synchronously to avoid deadlock
name: "log-test");

// Send the initialization message without waiting for response to avoid deadlock
// The logger will subscribe to the event stream when it processes this message
logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender);
}
}

protected void InitializeLogger(ActorSystem system, string prefix)
{
if (Output != null)
{
var extSystem = (ExtendedActorSystem)system;
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(
string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))), "log-test");
// Start the logger initialization task but don't wait for it yet
var loggerTask = logger.Ask<LoggerInitialized>(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout);
var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl");

// Create logger actor synchronously to avoid deadlock during parallel test execution
var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
Props.Create(() => new TestOutputLogger(
string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))),
isSystemService: true, // Mark as system service
isAsync: false, // Create synchronously to avoid deadlock
name: "log-test");

// By the time TestActor is ready (which happens in base constructor),
// the logger is likely ready too. Now we can safely wait.
loggerTask.ConfigureAwait(false).GetAwaiter().GetResult();
// Send the initialization message without waiting for response to avoid deadlock
// The logger will subscribe to the event stream when it processes this message
logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Streams.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit2")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests.Performance")]
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Streams.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit2")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests.Performance")]
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Xunit;
using Xunit.Abstractions;

namespace Akka.TestKit.Tests.TestActorRefTests
{
public class ParallelTestActorDeadlockSpec
{
private readonly ITestOutputHelper _output;

public ParallelTestActorDeadlockSpec(ITestOutputHelper output)
{
_output = output;
}

// This test reproduces the deadlock that occurs in Akka.Hosting.TestKit
// when multiple TestKits start up in parallel and actors try to interact
// with TestActor during initialization.
//
// Related issues:
// - https://github.com/akkadotnet/akka.net/issues/7770
// - https://github.com/akkadotnet/Akka.Hosting/pull/643
[Fact(Timeout = 20000)]
public async Task Parallel_TestKit_startup_should_not_deadlock()
{
var concurrentTests = 40; // High parallelism to trigger the issue

var tasks = Enumerable.Range(0, concurrentTests)
.Select(_ => Task.Run(RunOneTestKit))
.ToArray();

await Task.WhenAll(tasks);

async Task RunOneTestKit()
{
await Task.Run(async () =>
{
var id = Guid.NewGuid().ToString("N").Substring(0, 8);
try
{
_output.WriteLine($"[{id}] Creating TestKit...");
// Create TestKit synchronously like a normal test would
using var testKit = new Akka.TestKit.Xunit2.TestKit($"test-{id}", output: _output);
_output.WriteLine($"[{id}] TestKit created");

// Simulate what happens in Akka.Hosting - actor creation during startup
// that tries to interact with TestActor
_output.WriteLine($"[{id}] Creating PingerActor...");
var actor = testKit.Sys.ActorOf(Props.Create(() => new PingerActor(testKit.TestActor)));
_output.WriteLine($"[{id}] PingerActor created");

// Expect the "ping" message from PingerActor's PreStart
await testKit.ExpectMsgAsync<string>("ping", TimeSpan.FromSeconds(2));
_output.WriteLine($"[{id}] Received ping from PingerActor");

// Now verify the TestKit is working normally
_output.WriteLine($"[{id}] Sending test message...");
testKit.TestActor.Tell("test-message");
await testKit.ExpectMsgAsync<string>("test-message", TimeSpan.FromSeconds(2));
_output.WriteLine($"[{id}] Test completed successfully");
}
catch (Exception ex)
{
_output.WriteLine($"[{id}] Failed: {ex.Message}");
throw;
}
});
}
}

private class PingerActor : ActorBase
{
private readonly IActorRef _testActor;

public PingerActor(IActorRef testActor)
{
_testActor = testActor;
}

protected override bool Receive(object message) => false;

protected override void PreStart()
{
// This simulates what StartupPinger does in Akka.Hosting
// Sending a message to TestActor during actor initialization
_testActor.Tell("ping");
}
}
}
}
68 changes: 26 additions & 42 deletions src/core/Akka.TestKit/TestKitBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ protected TestKitBase(ITestKitAssertions assertions, ActorSystem system, ActorSy
{
_assertions = assertions ?? throw new ArgumentNullException(nameof(assertions), "The supplied assertions must not be null.");

// ReSharper disable once VirtualMemberCallInConstructor
InitializeTest(system, config, actorSystemName, testActorName);
}

Expand Down Expand Up @@ -170,10 +171,11 @@ protected virtual void InitializeTest(ActorSystem system, ActorSystemSetup confi
if (string.IsNullOrEmpty(testActorName))
testActorName = "testActor" + _testActorId.IncrementAndGet();

var testActor = CreateTestActor(system, testActorName);
var testActor = CreateInitialTestActor(system, testActorName);

// Wait for the testactor to start
WaitUntilTestActorIsReady(testActor, _testState.TestKitSettings);
// For async initialization, don't wait in constructor to avoid deadlock
// The TestActor property getter will ensure it's ready when first accessed
_testState.TestActor = testActor;

if (this is not INoImplicitSender)
{
Expand All @@ -187,45 +189,6 @@ protected virtual void InitializeTest(ActorSystem system, ActorSystemSetup confi
}
SynchronizationContext.SetSynchronizationContext(
new ActorCellKeepingSynchronizationContext(InternalCurrentActorCellKeeper.Current));

_testState.TestActor = testActor;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
// Do not convert this method to async, it is being called inside the constructor.
private static void WaitUntilTestActorIsReady(IActorRef testActor, TestKitSettings settings)
{
var deadline = settings.TestKitStartupTimeout;
var stopwatch = Stopwatch.StartNew();
var ready = false;

try
{
// TestActor should start almost instantly (microseconds).
// Use SpinWait which will spin for ~10-20 microseconds then yield.
var spinWait = new SpinWait();

while (stopwatch.Elapsed < deadline)
{
ready = testActor is not IRepointableRef repRef || repRef.IsStarted;
if (ready) break;

// SpinWait automatically handles the progression:
// - First ~10 iterations: tight spin loop (microseconds)
// - Next iterations: Thread.Yield()
// - Later: Thread.Sleep(0)
// - Finally: Thread.Sleep(1)
// This is optimal for both fast startup and system under load
spinWait.SpinOnce();
}
}
finally
{
stopwatch.Stop();
}

if (!ready)
throw new Exception("Timeout waiting for test actor to be ready");
}

/// <summary>
Expand Down Expand Up @@ -710,10 +673,31 @@ public IActorRef CreateTestActor(string name)
return CreateTestActor(_testState.System, name);
}

private IActorRef CreateInitialTestActor(ActorSystem system, string name)
{
// Fix both serialization and deadlock issues:
// 1. Use isSystemService=true to skip serialization checks
// 2. Use isAsync=false to create LocalActorRef synchronously (avoids RepointableActorRef deadlock)
var testActorProps = Props.Create(() => new InternalTestActor(_testState.Queue))
.WithDispatcher("akka.test.test-actor.dispatcher");

var systemImpl = system.AsInstanceOf<ActorSystemImpl>();
// Use the new AttachChildWithAsync method to create TestActor synchronously
var testActor = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
testActorProps,
isSystemService: true, // Skip serialization checks
isAsync: false, // Create synchronously to avoid deadlock
name: name);

return testActor;
}

private IActorRef CreateTestActor(ActorSystem system, string name)
{
var testActorProps = Props.Create(() => new InternalTestActor(_testState.Queue))
.WithDispatcher("akka.test.test-actor.dispatcher");

// For additional test actors, always use the standard SystemActorOf
var testActor = system.AsInstanceOf<ActorSystemImpl>().SystemActorOf(testActorProps, name);
return testActor;
}
Expand Down
16 changes: 16 additions & 0 deletions src/core/Akka/Actor/ActorCell.Children.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ public virtual IActorRef AttachChild(Props props, bool isSystemService, string?
{
return MakeChild(props, name == null ? GetRandomActorName() : CheckName(name), true, isSystemService);
}

/// <summary>
/// INTERNAL API
///
/// Attaches a child actor with explicit control over async initialization.
/// Used by TestKit to create TestActors synchronously to avoid deadlocks.
/// </summary>
/// <param name="props">The <see cref="Props"/> this child actor will use.</param>
/// <param name="isSystemService">If <c>true</c>, then this actor is a system actor and skips serialization checks.</param>
/// <param name="isAsync">If <c>true</c>, creates RepointableActorRef with async init. If <c>false</c>, creates LocalActorRef synchronously.</param>
/// <param name="name">The name of the actor being started. Can be <c>null</c> for auto-generated name.</param>
/// <returns>A reference to the initialized child actor.</returns>
internal IActorRef AttachChildWithAsync(Props props, bool isSystemService, bool isAsync, string? name = null)
{
return MakeChild(props, name == null ? GetRandomActorName() : CheckName(name), isAsync, isSystemService);
}

/// <summary>
/// TBD
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
[assembly: InternalsVisibleTo("Akka.Tests.Performance")]
[assembly: InternalsVisibleTo("Akka.TestKit")]
[assembly: InternalsVisibleTo("Akka.TestKit.Tests")]
[assembly: InternalsVisibleTo("Akka.TestKit.Xunit")]
[assembly: InternalsVisibleTo("Akka.TestKit.Xunit2")]
[assembly: InternalsVisibleTo("Akka.Remote")]
[assembly: InternalsVisibleTo("Akka.Remote.TestKit")]
[assembly: InternalsVisibleTo("Akka.Remote.Tests")]
Expand Down
Loading