Skip to content

Commit

Permalink
Fixing double invocation of value task
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev committed Jan 29, 2024
1 parent 9a232b2 commit 4a65311
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 28 deletions.
25 changes: 12 additions & 13 deletions src/Core/src/Eventuous.Shared/Tools/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0.

using System.Runtime.CompilerServices;

// ReSharper disable PossibleMultipleEnumeration

namespace Eventuous.Tools;
Expand All @@ -27,28 +28,26 @@ public static ConfiguredCancelableAsyncEnumerable<T> NoContext<T>(this IAsyncEnu
public static Task WhenAll(this IEnumerable<Task> tasks) => Task.WhenAll(tasks);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static async Task WhenAll(this IEnumerable<ValueTask> tasks) {
var toAwait = tasks.Where(valueTask => !valueTask.IsCompletedSuccessfully).Select(valueTask => valueTask.AsTask());

if (toAwait.Any()) await Task.WhenAll(toAwait).NoContext();
public static ValueTask WhenAll(this IEnumerable<ValueTask> tasks) {
return tasks is ValueTask[] array ? AwaitArray(array) : AwaitArray(tasks.ToArray());

// ReSharper disable once SuggestBaseTypeForParameter
async ValueTask AwaitArray(ValueTask[] t) {
// ReSharper disable once ForCanBeConvertedToForeach
for (var i = 0; i < t.Length; i++) {
await t[i];
}
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static async Task<IReadOnlyCollection<T>> WhenAll<T>(this IEnumerable<ValueTask<T>> tasks) {
var results = new List<T>();
var toAwait = new List<Task<T>>();

foreach (var valueTask in tasks) {
if (valueTask.IsCompletedSuccessfully)
results.Add(valueTask.Result);
else
toAwait.Add(valueTask.AsTask());
results.Add(valueTask.IsCompletedSuccessfully ? valueTask.Result : await valueTask);
}

if (toAwait.Count == 0) return results;

results.AddRange(await Task.WhenAll(toAwait).NoContext());

return results;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@ public async ValueTask Consume(IMessageConsumeContext context) {
}

var typedContext = context.ConvertToGeneric();
var tasks = eventHandlers.Select(handler => Handle(typedContext, handler));
await tasks.WhenAll().NoContext();

// ReSharper disable once ForCanBeConvertedToForeach
for (var index = 0; index < eventHandlers.Length; index++) {
var handler = eventHandlers[index];
await Handle(typedContext, handler).NoContext();
}
} catch (Exception e) {
context.Nack<DefaultConsumer>(e);
}

return;
}

return;

async ValueTask Handle(IMessageConsumeContext typedContext, IEventHandler handler) {
try {
var status = await handler.HandleEvent(typedContext).NoContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public async Task InitializeAsync() {
new StreamSubscriptionOptions {
StreamName = Stream,
SubscriptionId = subscriptionId,
ResolveLinkTos = Stream.ToString().StartsWith("$")
ResolveLinkTos = Stream.ToString().StartsWith('$')
},
CheckpointStore,
pipe,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,17 @@
namespace Eventuous.Tests.EventStore;

public class PublishAndSubscribeManyTests(IntegrationFixture fixture, ITestOutputHelper outputHelper)
: SubscriptionFixture<TestEventHandler>(fixture, outputHelper, new TestEventHandler(TimeSpan.FromMilliseconds(5)), false) {
: SubscriptionFixture<TestEventHandler>(fixture, outputHelper, new TestEventHandler(TimeSpan.FromMilliseconds(1)), false, logLevel: LogLevel.Trace) {
[Fact]
public async Task SubscribeAndProduceMany() {
const int count = 1000;
const int count = 100;

var testEvents = Auto.CreateMany<TestEvent>(count).ToList();
Handler.AssertThat().Exactly(count, x => testEvents.Contains(x));

await Start();

await Producer.Produce(Stream, testEvents, new Metadata());

await Handler.Validate(10.Seconds());

await Stop();

CheckpointStore.Last.Position.Should().Be(count - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
namespace Eventuous.Tests.EventStore;

public class PublishAndSubscribeOneTests(IntegrationFixture fixture, ITestOutputHelper outputHelper)
: SubscriptionFixture<TestEventHandler>(fixture, outputHelper, new TestEventHandler(), false) {
: SubscriptionFixture<TestEventHandler>(fixture, outputHelper, new TestEventHandler(), false, logLevel: LogLevel.Trace) {
[Fact]
public async Task SubscribeAndProduce() {
var testEvent = Auto.Create<TestEvent>();
Handler.AssertThat().Any(x => x as TestEvent == testEvent);
Handler.AssertThat().Exactly(1, x => x as TestEvent == testEvent);

await Producer.Produce(Stream, testEvent, new Metadata());
await Start();

await Handler.Validate(10.Seconds());
await Producer.Produce(Stream, testEvent, new Metadata());
await Handler.Validate(5.Seconds());
await Stop();

await Task.Delay(100);
Expand Down

0 comments on commit 4a65311

Please sign in to comment.