From 4a65311fe74d382e2a90f824befc4554544340bb Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 29 Jan 2024 14:40:24 +0100 Subject: [PATCH] Fixing double invocation of value task --- .../Eventuous.Shared/Tools/TaskExtensions.cs | 25 +++++++++---------- .../Consumers/DefaultConsumer.cs | 12 ++++++--- .../Fixtures/SubscriptionFixture.cs | 2 +- .../PublishAndSubscribeManyTests.cs | 7 ++---- .../PublishAndSubscribeOneTests.cs | 9 +++---- 5 files changed, 27 insertions(+), 28 deletions(-) diff --git a/src/Core/src/Eventuous.Shared/Tools/TaskExtensions.cs b/src/Core/src/Eventuous.Shared/Tools/TaskExtensions.cs index d52490fa..5c48ae9a 100644 --- a/src/Core/src/Eventuous.Shared/Tools/TaskExtensions.cs +++ b/src/Core/src/Eventuous.Shared/Tools/TaskExtensions.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. using System.Runtime.CompilerServices; + // ReSharper disable PossibleMultipleEnumeration namespace Eventuous.Tools; @@ -27,28 +28,26 @@ public static ConfiguredCancelableAsyncEnumerable NoContext(this IAsyncEnu public static Task WhenAll(this IEnumerable tasks) => Task.WhenAll(tasks); [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static async Task WhenAll(this IEnumerable tasks) { - var toAwait = tasks.Where(valueTask => !valueTask.IsCompletedSuccessfully).Select(valueTask => valueTask.AsTask()); - - if (toAwait.Any()) await Task.WhenAll(toAwait).NoContext(); + public static ValueTask WhenAll(this IEnumerable tasks) { + return tasks is ValueTask[] array ? AwaitArray(array) : AwaitArray(tasks.ToArray()); + + // ReSharper disable once SuggestBaseTypeForParameter + async ValueTask AwaitArray(ValueTask[] t) { + // ReSharper disable once ForCanBeConvertedToForeach + for (var i = 0; i < t.Length; i++) { + await t[i]; + } + } } [MethodImpl(MethodImplOptions.AggressiveInlining)] public static async Task> WhenAll(this IEnumerable> tasks) { var results = new List(); - var toAwait = new List>(); foreach (var valueTask in tasks) { - if (valueTask.IsCompletedSuccessfully) - results.Add(valueTask.Result); - else - toAwait.Add(valueTask.AsTask()); + results.Add(valueTask.IsCompletedSuccessfully ? valueTask.Result : await valueTask); } - if (toAwait.Count == 0) return results; - - results.AddRange(await Task.WhenAll(toAwait).NoContext()); - return results; } diff --git a/src/Core/src/Eventuous.Subscriptions/Consumers/DefaultConsumer.cs b/src/Core/src/Eventuous.Subscriptions/Consumers/DefaultConsumer.cs index d197cdd0..77bf3072 100644 --- a/src/Core/src/Eventuous.Subscriptions/Consumers/DefaultConsumer.cs +++ b/src/Core/src/Eventuous.Subscriptions/Consumers/DefaultConsumer.cs @@ -23,15 +23,19 @@ public async ValueTask Consume(IMessageConsumeContext context) { } var typedContext = context.ConvertToGeneric(); - var tasks = eventHandlers.Select(handler => Handle(typedContext, handler)); - await tasks.WhenAll().NoContext(); + + // ReSharper disable once ForCanBeConvertedToForeach + for (var index = 0; index < eventHandlers.Length; index++) { + var handler = eventHandlers[index]; + await Handle(typedContext, handler).NoContext(); + } } catch (Exception e) { context.Nack(e); } - - return; } + return; + async ValueTask Handle(IMessageConsumeContext typedContext, IEventHandler handler) { try { var status = await handler.HandleEvent(typedContext).NoContext(); diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/SubscriptionFixture.cs b/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/SubscriptionFixture.cs index 85063069..70a2bf51 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/SubscriptionFixture.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/SubscriptionFixture.cs @@ -60,7 +60,7 @@ public async Task InitializeAsync() { new StreamSubscriptionOptions { StreamName = Stream, SubscriptionId = subscriptionId, - ResolveLinkTos = Stream.ToString().StartsWith("$") + ResolveLinkTos = Stream.ToString().StartsWith('$') }, CheckpointStore, pipe, diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyTests.cs index 3b053a9a..13917318 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyTests.cs @@ -5,20 +5,17 @@ namespace Eventuous.Tests.EventStore; public class PublishAndSubscribeManyTests(IntegrationFixture fixture, ITestOutputHelper outputHelper) - : SubscriptionFixture(fixture, outputHelper, new TestEventHandler(TimeSpan.FromMilliseconds(5)), false) { + : SubscriptionFixture(fixture, outputHelper, new TestEventHandler(TimeSpan.FromMilliseconds(1)), false, logLevel: LogLevel.Trace) { [Fact] public async Task SubscribeAndProduceMany() { - const int count = 1000; + const int count = 100; var testEvents = Auto.CreateMany(count).ToList(); Handler.AssertThat().Exactly(count, x => testEvents.Contains(x)); await Start(); - await Producer.Produce(Stream, testEvents, new Metadata()); - await Handler.Validate(10.Seconds()); - await Stop(); CheckpointStore.Last.Position.Should().Be(count - 1); diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeOneTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeOneTests.cs index f231c462..999852a8 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeOneTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeOneTests.cs @@ -5,16 +5,15 @@ namespace Eventuous.Tests.EventStore; public class PublishAndSubscribeOneTests(IntegrationFixture fixture, ITestOutputHelper outputHelper) - : SubscriptionFixture(fixture, outputHelper, new TestEventHandler(), false) { + : SubscriptionFixture(fixture, outputHelper, new TestEventHandler(), false, logLevel: LogLevel.Trace) { [Fact] public async Task SubscribeAndProduce() { var testEvent = Auto.Create(); - Handler.AssertThat().Any(x => x as TestEvent == testEvent); + Handler.AssertThat().Exactly(1, x => x as TestEvent == testEvent); - await Producer.Produce(Stream, testEvent, new Metadata()); await Start(); - - await Handler.Validate(10.Seconds()); + await Producer.Produce(Stream, testEvent, new Metadata()); + await Handler.Validate(5.Seconds()); await Stop(); await Task.Delay(100);