Skip to content

Commit 8413897

Browse files
authored
Add Reliable Producer Documentation (#112)
* Add Reliable Producer Documentation and Example Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 33e832c commit 8413897

File tree

3 files changed

+139
-1
lines changed

3 files changed

+139
-1
lines changed

Examples/Reliable/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Reliable Examples
2+
---
3+
4+
How to use the reliable producer and consumer
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
using System.Buffers;
2+
using System.Text;
3+
using RabbitMQ.Stream.Client;
4+
using RabbitMQ.Stream.Client.Reliable;
5+
6+
namespace example;
7+
8+
public class ReliableProducer
9+
{
10+
public async Task StartDefaultConfigurations()
11+
{
12+
var config = new StreamSystemConfig();
13+
const string stream = "my-reliable-stream";
14+
var system = await StreamSystem.Create(config);
15+
await system.CreateStream(new StreamSpec(stream)
16+
{
17+
MaxLengthBytes = 2073741824
18+
});
19+
const int totalMessages = 1_000;
20+
21+
var reliableProducer = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
22+
{
23+
StreamSystem = system,
24+
Stream = stream,
25+
Reference = "my-reliable-producer",
26+
ConfirmationHandler = confirmation =>
27+
{
28+
Console.WriteLine(confirmation.Status == ConfirmationStatus.Confirmed
29+
? $"Confirmed: Publishing id {confirmation.PublishingId}"
30+
: $"Not Confirmed: Publishing id {confirmation.PublishingId}, error: {confirmation.Status} ");
31+
return Task.CompletedTask;
32+
}
33+
});
34+
var start = DateTime.Now;
35+
for (var i = 0; i < totalMessages; i++)
36+
{
37+
// standard send
38+
await reliableProducer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}")));
39+
}
40+
41+
// sub-batch
42+
var listMessages = new List<Message>();
43+
for (var i = 0; i < 100; i++)
44+
{
45+
listMessages.Add(new Message(Encoding.UTF8.GetBytes($"hello {i}")));
46+
}
47+
48+
// in this case you will receive back one confirmation with all the messages
49+
await reliableProducer.Send(listMessages, CompressionType.Gzip);
50+
51+
52+
Console.WriteLine($"End...Done {DateTime.Now - start}");
53+
// just to receive all the notification back
54+
Thread.Sleep(2000);
55+
await reliableProducer.Close();
56+
57+
}
58+
}

README.md

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
- [Track Offset](#track-offset)
3030
- [Handle Close](#handle-close)
3131
- [Handle Metadata Update](#handle-metadata-update)
32+
- [Reliable](#reliable)
33+
- [Reliable Producer](#reliable-producer)
3234
- [Build from source](#build-from-source)
3335
- [Project Status](#project-status)
3436

@@ -220,7 +222,6 @@ var producer = await system.CreateProducer(
220222
Stream = "my_stream",
221223
});
222224
```
223-
224225
Consider a Producer instance like a long-lived object, do not create one to send just one message.
225226

226227
| Parameter | Description | Default |
@@ -408,6 +409,81 @@ You can use `MetadataHandler` to handle it:
408409
},
409410
}
410411
```
412+
### Reliable
413+
- Reliable Producer
414+
- Reliable Consumer (not ready yet)
415+
416+
### Reliable Producer
417+
Reliable Producer is a smart layer built up of the standard `Producer`. </b>
418+
The idea is to leave the user decides what to use, the standard or reliable producer. </b>
419+
420+
The main features are:
421+
- Provide publishingID automatically
422+
- Auto-Reconnect in case of disconnection
423+
- Trace sent and received messages
424+
- Invalidate messages
425+
- Handle the metadata Update
426+
427+
#### Provide publishingID automatically
428+
Reliable Producer retrieves the last publishingID given the producer name. </b>
429+
Zero(0) is the default value in case there is no a publishingID.
430+
431+
#### Auto-Reconnect
432+
Reliable Producer restores the TCP connection in case the Producer is disconnected for some reason.
433+
During the reconnection it continues to store the messages in a local-list.
434+
The user will receive back the confirmed or un-confirmed messages.
435+
436+
#### Trace sent and received messages
437+
Reliable Producer keeps in memory each sent message and remove from the memory when the message is confirmed or goes in timout.
438+
`ConfirmationHandler` receives the messages back with the status.
439+
`confirmation.Status` can have different values, but in general `ConfirmationStatus.Confirmed` means the messages
440+
is stored on the server other status means that there was a problem with the message/messages.
441+
```csharp
442+
ConfirmationHandler = confirmation =>
443+
{
444+
if (confirmation.Status == ConfirmationStatus.Confirmed)
445+
{
446+
447+
// OK
448+
}
449+
else
450+
{
451+
// Some problem
452+
}
453+
}
454+
```
455+
#### Invalidate messages
456+
If the client doesn't receive a confirmation within 2 seconds Reliable Producer removes the message from the internal messages cache.
457+
The user will receive `ConfirmationStatus.TimeoutError` in the `ConfirmationHandler`.
458+
459+
#### Handle the metadata Update
460+
If the streams changes the topology (ex:Stream deleted or add/remove follower), the client receives an `MetadataUpdate` event.
461+
Reliable Producer detects the event and tries to reconnect the producer if the stream still exist else closes the producer.
462+
#### Send API
463+
Reliable Producer implements two `send(..)`
464+
- `Send(Message message)` // standard
465+
- `Send(List<Message> messages, CompressionType compressionType)` //sub-batching with compression
466+
467+
### Reconnection Strategy
468+
By default Reliable Producer uses an `BackOffReconnectStrategy` to reconnect the client.
469+
You can customize the behaviour implementing the `IReconnectStrategy` interface:
470+
```csharp
471+
void WhenDisconnected(out bool reconnect);
472+
void WhenConnected();
473+
```
474+
with `reconnect` you can decide when reconnect the producer.
475+
476+
You can use it:
477+
```csharp
478+
var p = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
479+
{
480+
...
481+
ReconnectStrategy = MyReconnectStrategy
482+
```
483+
484+
#### Examples
485+
See the directory [Examples/Reliable](./Examples/Reliable)
486+
411487

412488
### Build from source
413489
Build:

0 commit comments

Comments
 (0)