Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions Examples/Performances/BatchVsBatchSend.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;

namespace Performances;

public class BatchVsBatchSend
{
private const int TotalMessages = 20_000_000;
private const int MessageSize = 100;
private const int AggregateBatchSize = 300;
private const int ModPrintMessages = 10_000_000;

public async Task Start()
{
Console.WriteLine("Stream Client Performance Test");
Console.WriteLine("==============================");
Console.WriteLine("Client will test Batch vs Batch Send");
Console.WriteLine("Total Messages: {0}", TotalMessages);
Console.WriteLine("Message Size: {0}", MessageSize);
Console.WriteLine("Aggregate Batch Size: {0}", AggregateBatchSize);
Console.WriteLine("Print Messages each: {0} messages", ModPrintMessages);


var config = new StreamSystemConfig() {Heartbeat = TimeSpan.Zero};
var system = await StreamSystem.Create(config);
await BatchSend(system, await RecreateStream(system, "StandardBatchSend"));
await StandardProducerSend(await RecreateStream(system, "StandardProducerSendNoBatch"), system);
await RProducerBatchSend(await RecreateStream(system, "ReliableProducerBatch"), system);
await RProducerSend(await RecreateStream(system, "ReliableProducerSendNoBatch"), system);
}

private static async Task RProducerSend(string stream, StreamSystem system)
{
Console.WriteLine("*****Reliable Producer Send No Batch*****");
var total = 0;
var confirmed = 0;
var error = 0;
var reliableProducer = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
{
Stream = stream,
StreamSystem = system,
MaxInFlight = 1_000_000,
ConfirmationHandler = messagesConfirmed =>
{
if (messagesConfirmed.Status == ConfirmationStatus.Confirmed)
{
confirmed += messagesConfirmed.Messages.Count;
}
else
{
error += messagesConfirmed.Messages.Count;
}

if (++total % ModPrintMessages == 0)
{
Console.WriteLine(
$"*****Reliable Producer Send No Batch Confirmed: {confirmed} Error: {error}*****");
}

return Task.CompletedTask;
}
});

var start = DateTime.Now;
for (ulong i = 1; i <= TotalMessages; i++)
{
var array = new byte[MessageSize];
await reliableProducer.Send(new Message(array));

if (i % ModPrintMessages == 0)
{
Console.WriteLine($"*****Reliable Producer Send No Batch: {i}");
}
}

Console.WriteLine(
$"*****Reliable Producer Send No Batch***** send time: {DateTime.Now - start}, messages sent: {TotalMessages}");

Thread.Sleep(1000);
await reliableProducer.Close();
}


private static async Task RProducerBatchSend(string stream, StreamSystem system)
{
Console.WriteLine("*****Reliable Producer Batch Send*****");
var total = 0;
var confirmed = 0;
var error = 0;
var reliableProducer = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
{
Stream = stream,
StreamSystem = system,
MaxInFlight = 1_000_000,
ConfirmationHandler = messagesConfirmed =>
{
if (messagesConfirmed.Status == ConfirmationStatus.Confirmed)
{
confirmed += messagesConfirmed.Messages.Count;
}
else
{
error += messagesConfirmed.Messages.Count;
}

if (++total % ModPrintMessages == 0)
{
Console.WriteLine($"*****Reliable Producer Batch Confirmed: {confirmed}, Error: {error}*****");
}

return Task.CompletedTask;
}
});

var messages = new List<Message>();


var start = DateTime.Now;
for (ulong i = 1; i <= TotalMessages; i++)
{
var array = new byte[MessageSize];
messages.Add(new Message(array));
if (i % AggregateBatchSize == 0)
{
await reliableProducer.BatchSend(messages);
messages.Clear();
}

if (i % ModPrintMessages == 0)
{
Console.WriteLine($"*****Reliable Producer Batch Send: {i}");
}
}

await reliableProducer.BatchSend(messages);
messages.Clear();

Console.WriteLine(
$"*****Reliable Producer Batch Send***** time: {DateTime.Now - start}, messages sent: {TotalMessages}");
Thread.Sleep(1000);
await reliableProducer.Close();
}


private static async Task StandardProducerSend(string stream, StreamSystem system)
{
Console.WriteLine("*****Standard Producer Send*****");
var confirmed = 0;
var producer = await system.CreateProducer(new ProducerConfig()
{
Stream = stream,
MaxInFlight = 1_000_000,
ConfirmHandler = _ =>
{
if (++confirmed % ModPrintMessages == 0)
{
Console.WriteLine($"*****Standard Producer Send Confirmed: {confirmed}");
}
}
});

var start = DateTime.Now;
for (ulong i = 1; i <= TotalMessages; i++)
{
var array = new byte[MessageSize];

await producer.Send(i, new Message(array));

if (i % ModPrintMessages == 0)
{
Console.WriteLine($"*****Standard Producer: {i}");
}
}

Console.WriteLine(
$"*****Standard Producer Send***** send time: {DateTime.Now - start}, messages sent: {TotalMessages}");
Thread.Sleep(1000);
await producer.Close();
}

private static async Task BatchSend(StreamSystem system, string stream)
{
Console.WriteLine("*****Standard Batch Send*****");
var confirmed = 0;
var producer = await system.CreateProducer(new ProducerConfig()
{
Stream = stream,
MaxInFlight = 1_000_000,
ConfirmHandler = _ =>
{
if (++confirmed % ModPrintMessages == 0)
{
Console.WriteLine($"*****Standard Batch Confirmed: {confirmed}");
}
}
});
var messages = new List<(ulong, Message)>();
var start = DateTime.Now;
for (ulong i = 1; i <= TotalMessages; i++)
{
var array = new byte[MessageSize];
messages.Add((i, new Message(array)));
if (i % AggregateBatchSize == 0)
{
await producer.BatchSend(messages);
messages.Clear();
}

if (i % ModPrintMessages == 0)
{
Console.WriteLine($"*****Standard Batch Send: {i}");
}
}

await producer.BatchSend(messages);
messages.Clear();

Console.WriteLine(
$"*****Standard Batch Send***** send time: {DateTime.Now - start}, messages sent: {TotalMessages}");
Thread.Sleep(1000);
await producer.Close();
}

private static async Task<string> RecreateStream(StreamSystem system, string stream)
{
Console.WriteLine("==============================");
Console.WriteLine($"Recreate Stream: {stream}, just wait a bit..");
Thread.Sleep(5000);
try
{
await system.DeleteStream(stream);
}
catch (Exception e)
{
// Console.WriteLine(e);
}

Thread.Sleep(5000);

await system.CreateStream(new StreamSpec(stream) { });
Thread.Sleep(1000);
Console.WriteLine($"Stream: {stream} created");
return stream;
}
}
14 changes: 14 additions & 0 deletions Examples/Performances/Performances.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\RabbitMQ.Stream.Client\RabbitMQ.Stream.Client.csproj" />
</ItemGroup>

</Project>
3 changes: 3 additions & 0 deletions Examples/Performances/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using Performances;

new BatchVsBatchSend().Start().Wait();
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,21 @@ or more generic `system.QuerySequence("reference", "my_stream")`.

`publishingId` must be incremented for each send.

#### Standard Batch publish

Batch send is a synchronous operation.
It allows to pre-aggregate messages and send them in a single synchronous call.
```csharp
var messages = new List<(ulong, Message)>();
for (ulong i = 0; i < 30; i++)
{
messages.Add((i, new Message(Encoding.UTF8.GetBytes($"batch {i}"))));
}
await producer.BatchSend(messages);
messages.Clear();
```
In most cases, the standard `Send` is easier and works in most of the cases.

#### Sub Entries Batching
A sub-entry is one "slot" in a publishing frame, meaning outbound messages are not only batched in publishing frames,
but in sub-entries as well. Use this feature to increase throughput at the cost of increased latency.
Expand Down
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public class Client : IClient
public int PublishCommandsSent => publishCommandsSent;

public int MessagesSent => messagesSent;
public uint MaxFrameSize => tuneReceived.Task.Result.FrameMax;

private int messagesSent;
private int confirmFrames;
Expand Down Expand Up @@ -215,7 +216,7 @@ await client.Request<SaslAuthenticateRequest, SaslAuthenticateResponse>(corr =>
ClientExceptions.MaybeThrowException(authResponse.ResponseCode, parameters.UserName);

//tune
var tune = await client.tuneReceived.Task;
await client.tuneReceived.Task;
await client.Publish(new TuneRequest(0,
(uint)client.Parameters.Heartbeat.TotalSeconds));

Expand Down
62 changes: 51 additions & 11 deletions RabbitMQ.Stream.Client/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -139,6 +140,42 @@ public async ValueTask Send(ulong publishingId, List<Message> subEntryMessages,
}
}

public async ValueTask BatchSend(List<(ulong, Message)> messages)
{
PreValidateBatch(messages);
await InternalBatchSend(messages);
}

internal async Task InternalBatchSend(List<(ulong, Message)> messages)
{
for (var i = 0; i < messages.Count; i++)
{
await SemaphoreWait();
}

if (messages.Count != 0 && !client.IsClosed)
{
await SendMessages(messages, false).ConfigureAwait(false);
}
}

internal void PreValidateBatch(List<(ulong, Message)> messages)
{
if (messages.Count > config.MaxInFlight)
{
throw new InvalidOperationException($"Too many messages in batch. " +
$"Max allowed is {config.MaxInFlight}");
}

var totalSize = messages.Sum(message => message.Item2.Size);

if (totalSize > client.MaxFrameSize)
{
throw new InvalidOperationException($"Total size of messages in batch is too big. " +
$"Max allowed is {client.MaxFrameSize}");
}
}

private async Task SemaphoreWait()
{
if (!semaphore.Wait(0) && !client.IsClosed)
Expand All @@ -151,6 +188,20 @@ private async Task SemaphoreWait()
}
}

private async Task SendMessages(List<(ulong, Message)> messages, bool clearMessagesList = true)
{
var publishTask = client.Publish(new Publish(publisherId, messages));
if (!publishTask.IsCompletedSuccessfully)
{
await publishTask.ConfigureAwait(false);
}

if (clearMessagesList)
{
messages.Clear();
}
}

/// <summary>
/// GetLastPublishingId
/// </summary>
Expand Down Expand Up @@ -196,17 +247,6 @@ private async Task ProcessBuffer()
await SendMessages(messages).ConfigureAwait(false);
}
}

async Task SendMessages(List<(ulong, Message)> messages)
{
var publishTask = client.Publish(new Publish(publisherId, messages));
if (!publishTask.IsCompletedSuccessfully)
{
await publishTask.ConfigureAwait(false);
}

messages.Clear();
}
}

public Task<ResponseCode> Close()
Expand Down
Loading