Skip to content

Commit

Permalink
Added extra Batch-Join test.
Browse files Browse the repository at this point in the history
  • Loading branch information
electricessence committed Nov 10, 2024
1 parent 64dbf4a commit 0feb5d3
Showing 1 changed file with 66 additions and 0 deletions.
66 changes: 66 additions & 0 deletions Open.ChannelExtensions.Tests/BatchTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using System.Threading;

namespace Open.ChannelExtensions.Tests;

Expand Down Expand Up @@ -407,4 +409,68 @@ public static async IAsyncEnumerable<IList<T>> ReadBatchEnumerableAsyncBakedIn<T
if (item?.Count > 0) yield return item;
}
}

[Fact]
public static async Task AnotherComplexBatchJoinScenario()
{
const int SIZE = 300000;
const long HTTP_BATCH_TIMEOUT = 10000;
const int MAX_PARALLELISM_HTTP = 8;
const int MAX_CACHE_WRITE_PARALLELISM = 8;

using var cts = new CancellationTokenSource(30_000);
var cancellationToken = cts.Token;
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(20) { SingleReader = false, SingleWriter = true });

int remaining = SIZE;
_ = Task.Run(async () =>
{
int n = 50;

while(remaining > 0)
{
await Task.Delay(20);
for (int i = 0; remaining > 0 && i < n; i++)
{
await channel.Writer.WriteAsync(remaining, cancellationToken);
remaining--;
}

n += 15;
}

await Task.Delay(30);
channel.Writer.Complete();
});

int count = 0;
var total = await channel.Reader
.Batch(100)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x =>
{
await Task.Delay(50);
return x.ConvertAll(x => x * 2);
},
cancellationToken: cancellationToken
)
.Join()
.Batch(10)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(
MAX_CACHE_WRITE_PARALLELISM,
x =>
{
Debug.WriteLine("Value: {0}", string.Join(' ', x));
for (int i = 0; i < x.Count; i++)
Interlocked.Increment(ref count);
},
cancellationToken);

remaining.Should().Be(0);
channel.Reader.TryRead(out _).Should().BeFalse();
count.Should().Be(SIZE);
}
}

0 comments on commit 0feb5d3

Please sign in to comment.