-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Async TestKit] Convert Akka.Stream.TestKit to async - Refactor TestKit.Tests #5906
Changes from all commits
bc74412
6d394d8
096dceb
5e9d4d9
f0ac6a8
e7c5358
723ec0f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,6 @@ | |
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<EmbeddedResource Include="reference.conf" /> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reference.conf is moved to Akka.Streams.TestKit |
||
<ProjectReference Include="..\Akka.Streams.TestKit\Akka.Streams.TestKit.csproj" /> | ||
<ProjectReference Include="..\Akka.Tests.Shared.Internals\Akka.Tests.Shared.Internals.csproj" /> | ||
</ItemGroup> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,41 +15,49 @@ | |
using Akka.TestKit; | ||
using Akka.Util.Internal; | ||
|
||
namespace Akka.Streams.TestKit.Tests | ||
namespace Akka.Streams.TestKit | ||
{ | ||
public static class Utils | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class needs to be reviewed, its actually converted to async, I can revert it back if nescessary |
||
{ | ||
public static Config UnboundedMailboxConfig { get; } = | ||
ConfigurationFactory.ParseString(@"akka.actor.default-mailbox.mailbox-type = ""Akka.Dispatch.UnboundedMailbox, Akka"""); | ||
|
||
public static void AssertAllStagesStopped(this AkkaSpec spec, Action block, IMaterializer materializer) | ||
{ | ||
AssertAllStagesStopped(spec, () => | ||
{ | ||
block(); | ||
return NotUsed.Instance; | ||
}, materializer); | ||
} | ||
=> AssertAllStagesStoppedAsync(spec, () => | ||
{ | ||
block(); | ||
return NotUsed.Instance; | ||
}, materializer) | ||
.ConfigureAwait(false).GetAwaiter().GetResult(); | ||
|
||
public static T AssertAllStagesStopped<T>(this AkkaSpec spec, Func<T> block, IMaterializer materializer) | ||
=> AssertAllStagesStoppedAsync(spec, () => Task.FromResult(block()), materializer) | ||
.ConfigureAwait(false).GetAwaiter().GetResult(); | ||
|
||
public static async Task<T> AssertAllStagesStoppedAsync<T>(this AkkaSpec spec, Func<T> block, | ||
IMaterializer materializer) | ||
=> await AssertAllStagesStoppedAsync(spec, () => Task.FromResult(block()), materializer) | ||
.ConfigureAwait(false); | ||
|
||
public static async Task<T> AssertAllStagesStoppedAsync<T>(this AkkaSpec spec, Func<Task<T>> block, IMaterializer materializer) | ||
{ | ||
if (!(materializer is ActorMaterializerImpl impl)) | ||
return block(); | ||
return await block(); | ||
|
||
var probe = spec.CreateTestProbe(impl.System); | ||
probe.Send(impl.Supervisor, StreamSupervisor.StopChildren.Instance); | ||
probe.ExpectMsg<StreamSupervisor.StoppedChildren>(); | ||
var result = block(); | ||
await probe.ExpectMsgAsync<StreamSupervisor.StoppedChildren>(); | ||
var result = await block(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lgtm |
||
|
||
probe.Within(TimeSpan.FromSeconds(5), () => | ||
await probe.WithinAsync(TimeSpan.FromSeconds(5), async () => | ||
{ | ||
IImmutableSet<IActorRef> children = ImmutableHashSet<IActorRef>.Empty; | ||
try | ||
{ | ||
probe.AwaitAssert(() => | ||
await probe.AwaitAssertAsync(async () => | ||
{ | ||
impl.Supervisor.Tell(StreamSupervisor.GetChildren.Instance, probe.Ref); | ||
children = probe.ExpectMsg<StreamSupervisor.Children>().Refs; | ||
children = (await probe.ExpectMsgAsync<StreamSupervisor.Children>()).Refs; | ||
if (children.Count != 0) | ||
throw new Exception($"expected no StreamSupervisor children, but got {children.Aggregate("", (s, @ref) => s + @ref + ", ")}"); | ||
}); | ||
|
@@ -66,9 +74,7 @@ public static T AssertAllStagesStopped<T>(this AkkaSpec spec, Func<T> block, IMa | |
|
||
public static void AssertDispatcher(IActorRef @ref, string dispatcher) | ||
{ | ||
var r = @ref as ActorRefWithCell; | ||
|
||
if (r == null) | ||
if (!(@ref is ActorRefWithCell r)) | ||
throw new Exception($"Unable to determine dispatcher of {@ref}"); | ||
|
||
if (r.Underlying.Props.Dispatcher != dispatcher) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
# | ||
# All stream tests should use the dedicated `akka.test.stream-dispatcher` or disable this validation by defining: | ||
# akka.actor.default-mailbox.mailbox-type = "Akka.Dispatch.UnboundedMailbox, Akka" | ||
akka.actor.default-mailbox.mailbox-type = "Akka.Streams.TestKit.Tests.StreamTestDefaultMailbox, Akka.Streams.TestKit.Tests" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lgtm |
||
akka.actor.default-mailbox.mailbox-type = "Akka.Streams.TestKit.StreamTestDefaultMailbox, Akka.Streams.TestKit" | ||
|
||
# Dispatcher for stream actors. Specified in tests with | ||
# ActorMaterializerSettings(dispatcher = "akka.test.stream-dispatcher") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,7 @@ | |
using Akka.Configuration; | ||
using Akka.Streams.Dsl; | ||
using Akka.Streams.Implementation.Fusing; | ||
using Akka.Streams.TestKit.Tests; | ||
using Akka.Streams.TestKit; | ||
using Akka.Util; | ||
using NBench; | ||
using Reactive.Streams; | ||
|
@@ -54,7 +54,7 @@ public class FlowSelectBenchmark | |
public void Setup(BenchmarkContext context) | ||
{ | ||
_actorSystem = ActorSystem.Create("FlowSelectBenchmark", Config.WithFallback( | ||
ConfigurationFactory.FromResource<ScriptedTest>("Akka.Streams.TestKit.Tests.reference.conf"))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The magic text for loading the config is moved to a static property in the |
||
StreamTestDefaultMailbox.DefaultConfig)); | ||
_actorSystem.Settings.InjectTopLevelFallback(ActorMaterializer.DefaultConfig()); | ||
|
||
var buffer8 = ActorMaterializerSettings.Create(_actorSystem).WithInputBuffer(8, 8); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm skipping these racy tests while we're converting the rest of Streams.Tests over so it wouldn't block every PRs