Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Examples/Reliable/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Reliable Examples
---

How to use the reliable producer and consumer
58 changes: 58 additions & 0 deletions Examples/Reliable/ReliableProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System.Buffers;
using System.Text;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;

namespace example;

public class ReliableProducer
{
public async Task StartDefaultConfigurations()
{
var config = new StreamSystemConfig();
const string stream = "my-reliable-stream";
var system = await StreamSystem.Create(config);
await system.CreateStream(new StreamSpec(stream)
{
MaxLengthBytes = 2073741824
});
const int totalMessages = 1_000;

var reliableProducer = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
{
StreamSystem = system,
Stream = stream,
Reference = "my-reliable-producer",
ConfirmationHandler = confirmation =>
{
Console.WriteLine(confirmation.Status == ConfirmationStatus.Confirmed
? $"Confirmed: Publishing id {confirmation.PublishingId}"
: $"Not Confirmed: Publishing id {confirmation.PublishingId}, error: {confirmation.Status} ");
return Task.CompletedTask;
}
});
var start = DateTime.Now;
for (var i = 0; i < totalMessages; i++)
{
// standard send
await reliableProducer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}")));
}

// sub-batch
var listMessages = new List<Message>();
for (var i = 0; i < 100; i++)
{
listMessages.Add(new Message(Encoding.UTF8.GetBytes($"hello {i}")));
}

// in this case you will receive back one confirmation with all the messages
await reliableProducer.Send(listMessages, CompressionType.Gzip);


Console.WriteLine($"End...Done {DateTime.Now - start}");
// just to receive all the notification back
Thread.Sleep(2000);
await reliableProducer.Close();

}
}
78 changes: 77 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
- [Track Offset](#track-offset)
- [Handle Close](#handle-close)
- [Handle Metadata Update](#handle-metadata-update)
- [Reliable](#reliable)
- [Reliable Producer](#reliable-producer)
- [Build from source](#build-from-source)
- [Project Status](#project-status)

Expand Down Expand Up @@ -220,7 +222,6 @@ var producer = await system.CreateProducer(
Stream = "my_stream",
});
```

Consider a Producer instance like a long-lived object, do not create one to send just one message.

| Parameter | Description | Default |
Expand Down Expand Up @@ -408,6 +409,81 @@ You can use `MetadataHandler` to handle it:
},
}
```
### Reliable
- Reliable Producer
- Reliable Consumer (not ready yet)

### Reliable Producer
Reliable Producer is a smart layer built up of the standard `Producer`. </b>
The idea is to leave the user decides what to use, the standard or reliable producer. </b>

The main features are:
- Provide publishingID automatically
- Auto-Reconnect in case of disconnection
- Trace sent and received messages
- Invalidate messages
- Handle the metadata Update

#### Provide publishingID automatically
Reliable Producer retrieves the last publishingID given the producer name. </b>
Zero(0) is the default value in case there is no a publishingID.

#### Auto-Reconnect
Reliable Producer restores the TCP connection in case the Producer is disconnected for some reason.
During the reconnection it continues to store the messages in a local-list.
The user will receive back the confirmed or un-confirmed messages.

#### Trace sent and received messages
Reliable Producer keeps in memory each sent message and remove from the memory when the message is confirmed or goes in timout.
`ConfirmationHandler` receives the messages back with the status.
`confirmation.Status` can have different values, but in general `ConfirmationStatus.Confirmed` means the messages
is stored on the server other status means that there was a problem with the message/messages.
```csharp
ConfirmationHandler = confirmation =>
{
if (confirmation.Status == ConfirmationStatus.Confirmed)
{

// OK
}
else
{
// Some problem
}
}
```
#### Invalidate messages
If the client doesn't receive a confirmation within 2 seconds Reliable Producer removes the message from the internal messages cache.
The user will receive `ConfirmationStatus.TimeoutError` in the `ConfirmationHandler`.

#### Handle the metadata Update
If the streams changes the topology (ex:Stream deleted or add/remove follower), the client receives an `MetadataUpdate` event.
Reliable Producer detects the event and tries to reconnect the producer if the stream still exist else closes the producer.
#### Send API
Reliable Producer implements two `send(..)`
- `Send(Message message)` // standard
- `Send(List<Message> messages, CompressionType compressionType)` //sub-batching with compression

### Reconnection Strategy
By default Reliable Producer uses an `BackOffReconnectStrategy` to reconnect the client.
You can customize the behaviour implementing the `IReconnectStrategy` interface:
```csharp
void WhenDisconnected(out bool reconnect);
void WhenConnected();
```
with `reconnect` you can decide when reconnect the producer.

You can use it:
```csharp
var p = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
{
...
ReconnectStrategy = MyReconnectStrategy
```

#### Examples
See the directory [Examples/Reliable](./Examples/Reliable)


### Build from source
Build:
Expand Down