Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"erlang": "25.0.4",
"rabbitmq": "3.10.7"
"rabbitmq": "3.11.0"
}
2 changes: 1 addition & 1 deletion Examples/Performances/BatchVsBatchSend.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class BatchVsBatchSend
private const int AggregateBatchSize = 300;
private const int ModPrintMessages = 10_000_000;

public async Task Start()
public static async Task Start()
{
Console.WriteLine("Stream Client Performance Test");
Console.WriteLine("==============================");
Expand Down
2 changes: 1 addition & 1 deletion Examples/Performances/Program.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
using Performances;

new BatchVsBatchSend().Start().Wait();
BatchVsBatchSend.Start().Wait();
124 changes: 100 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@
- [Publish Messages](#publish-messages)
- [Deduplication](#Deduplication)
- [Consume Messages](#consume-messages)
- [Offset Types](#offset-types)
- [Track Offset](#track-offset)
- [Offset Types](#offset-types)
- [Track Offset](#track-offset)
- [Single Active Consumer](#single-active-consumer)
- [Handle Close](#handle-close)
- [Handle Metadata Update](#handle-metadata-update)
- [Heartbeat](#heartbeat)
- [Reliable](#reliable)
- [Reliable Producer](#reliable-producer)
- [Reliable Consumer](#reliable-consumer)
- [Reliable Producer](#reliable-producer)
- [Reliable Consumer](#reliable-consumer)
- [Single Active Consumer](#single-active-consumer)
- [Build from source](#build-from-source)
- [Project Status](#project-status)
- [Release Process](#release-process)
Expand Down Expand Up @@ -218,9 +220,12 @@ await system.CreateStream(new StreamSpec(stream)

Set a policy is highly recommended.

RabbitMQ does not store the whole stream in a single file, but splits it in segment files.
This is also used for truncate the stream: when a stream reaches his maximum size, an entire segment file is deleted. For this reason `MaxLengthBytes` (the max dimension of the entire stream) is usually significantly higher than `MaxSegmentSizeBytes` (the max dimension of a single segment file).
RabbitMQ enforces the retention policy when the current segment has reached its maximum size and is closed in favor of a new one.
RabbitMQ does not store the whole stream in a single file, but splits it in segment files.
This is also used for truncate the stream: when a stream reaches his maximum size, an entire segment file is deleted.
For this reason `MaxLengthBytes` (the max dimension of the entire stream) is usually significantly higher
than `MaxSegmentSizeBytes` (the max dimension of a single segment file).
RabbitMQ enforces the retention policy when the current segment has reached its maximum size and is closed in favor of a
new one.

```csharp
await system.CreateStream(new StreamSpec(stream)
Expand All @@ -241,6 +246,7 @@ var producer = await system.CreateProducer(
Stream = "my_stream",
});
```

Consider a Producer instance like a long-lived object, do not create one to send just one message.

| Parameter | Description | Default |
Expand Down Expand Up @@ -272,6 +278,7 @@ or more generic `system.QuerySequence("reference", "my_stream")`.

Batch send is a synchronous operation.
It allows to pre-aggregate messages and send them in a single synchronous call.

```csharp
var messages = new List<(ulong, Message)>();
for (ulong i = 0; i < 30; i++)
Expand All @@ -281,9 +288,11 @@ for (ulong i = 0; i < 30; i++)
await producer.BatchSend(messages);
messages.Clear();
```

In most cases, the standard `Send` is easier and works in most of the cases.

#### Sub Entries Batching

A sub-entry is one "slot" in a publishing frame, meaning outbound messages are not only batched in publishing frames,
but in sub-entries as well. Use this feature to increase throughput at the cost of increased latency.

Expand Down Expand Up @@ -314,6 +323,7 @@ You can add missing codecs with `StreamCompressionCodecs.RegisterCodec` api.
See [Examples/CompressCodecs](./Examples/CompressCodecs) for `Lz4`,`Snappy` and `Zstd` implementations.

### Deduplication

[See here for more details](https://rabbitmq.github.io/rabbitmq-stream-java-client/snapshot/htmlsingle/#outbound-message-deduplication)
Set a producer reference to enable the deduplication:

Expand Down Expand Up @@ -355,7 +365,8 @@ var consumer = await system.CreateConsumer(

### Offset Types

There are five types of Offset and they can be set by the `ConsumerConfig.OffsetSpec` property that must be passed to the Consumer constructor, in the example we use `OffsetTypeFirst`:
There are five types of Offset and they can be set by the `ConsumerConfig.OffsetSpec` property that must be passed to
the Consumer constructor, in the example we use `OffsetTypeFirst`:

```csharp
var consumerOffsetTypeFirst = await system.CreateConsumer(
Expand All @@ -373,13 +384,15 @@ var consumerOffsetTypeFirst = await system.CreateConsumer(
```

The five types are:

- First: it takes messages from the first message of the stream.

```csharp
var offsetTypeFirst = new OffsetTypeFirst();
```

- Last: it takes messages from the last chunk of the stream, i.e. it doesn’t start from the last message, but the last “group” of messages.
- Last: it takes messages from the last chunk of the stream, i.e. it doesn’t start from the last message, but the last
“group” of messages.

```csharp
var offsetTypeLast = new OffsetTypeLast();
Expand All @@ -391,7 +404,9 @@ var offsetTypeLast = new OffsetTypeLast();
var offsetTypeNext = new OffsetTypeNext()
```

- Offset: it takes messages starting from the message with id equal to the passed value. If the value is less than the first message of the stream, it starts from the first (i.e. if you pass 0, but the stream starts from 10, it starts from 10). If the message with the id hasn’t yet been published it waits until this publishingId is reached.
- Offset: it takes messages starting from the message with id equal to the passed value. If the value is less than the
first message of the stream, it starts from the first (i.e. if you pass 0, but the stream starts from 10, it starts
from 10). If the message with the id hasn’t yet been published it waits until this publishingId is reached.

```csharp
ulong iWantToStartFromPubId = 10;
Expand All @@ -408,6 +423,7 @@ var offsetTypeTimestamp = new OffsetTypeTimestamp(anHourAgo);
### Track Offset

The server can store the current delivered offset given a consumer with `StoreOffset` in this way:

```csharp
var messagesConsumed = 0;
var consumer = await system.CreateConsumer(
Expand Down Expand Up @@ -437,7 +453,57 @@ var consumer = await system.CreateConsumer(
OffsetSpec = new OffsetTypeOffset(trackedOffset),
```

Note: if you try to store an offset that doesn't exist yet for the consumer's reference on the stream you get will get an `OffsetNotFoundException` exception.
Note: if you try to store an offset that doesn't exist yet for the consumer's reference on the stream you get will get
an `OffsetNotFoundException` exception.

### Single Active Consumer

Use the `ConsumerConfig#IsSingleActiveConsumer()` method to enable the feature:

Enabling single active consumer

```csharp
var consumer = await system.CreateConsumer(
new ConsumerConfig
{
Reference = "application-1", // Set the consumer name (mandatory to enable single active consumer)
IsSingleActiveConsumer = true, // Enable single active consumer
Stream = "my-stream",
OffsetSpec = new OffsetTypeFirst(),
...
});
```

With the configuration above, the consumer will take part in the `application-1` group on the `my-stream` stream.
If the consumer instance is the first in a group, it will get messages as soon as there are some available.
If it is not the first in the group, it will remain idle until it is its turn to be active
(likely when all the instances registered before it are gone).

By default the Single Active Consumer start consuming form the `OffsetSpec` but you can override it with the
`ConsumerUpdateListener` event.

`ConsumerUpdateListener` returns an `OffsetType` that will be used to start consuming from.

For example, if you want to start from the last tracked message can do it like this:

```csharp
var consumer = await system.CreateConsumer(
new ConsumerConfig
{
Reference = "application-1",
Stream = "my-stream",
IsSingleActiveConsumer = true,
// When the consumer is actived it will start from the last tracked offset
ConsumerUpdateListener = async (reference, stream, isActive) =>
{
var trackedOffset = await system.QueryOffset(reference, stream);
return new OffsetTypeOffset(trackedOffset);
}
});
```

Single Active Consumer is available for the standard `Consumer` and also for the `ReliableConsumer`.
For `ReliableConsumer` just use `ReliableConsumerConfig#IsSingleActiveConsumer()` to enable it.

### Handle Close

Expand All @@ -455,7 +521,7 @@ Producers/Consumers raise and event when the client is disconnected:

### Handle Metadata Update

Stream metadata update is raised when the stream topology changes or the stream is deleted.
Stream metadata update is raised when the stream topology changes or the stream is deleted.
You can use `MetadataHandler` to handle it:

```csharp
Expand Down Expand Up @@ -484,13 +550,13 @@ It is possible to configure the heartbeat using:

Heartbeat value shouldn't be too low.

### Reliable
- Reliable Producer
- Reliable Consumer
### Reliable

- Reliable Producer
- Reliable Consumer

See the directory [Examples/Reliable](./Examples/Reliable) for code examples.


### Reliable Producer

Reliable Producer is a smart layer built up of the standard `Producer`.
Expand Down Expand Up @@ -520,10 +586,12 @@ See [Reconnection Strategy](#reconnection-strategy)

#### Trace sent and received messages

Reliable Producer keeps in memory each sent message and removes it from the memory when the message is confirmed or times out.
`ConfirmationHandler` receives the messages back with the status.
Reliable Producer keeps in memory each sent message and removes it from the memory when the message is confirmed or
times out.
`ConfirmationHandler` receives the messages back with the status.
`confirmation.Status` can have different values, but in general `ConfirmationStatus.Confirmed` means the messages
is stored on the server. Other statuses mean that there was a problem with the message/messages under given publishing id.
is stored on the server. Other statuses mean that there was a problem with the message/messages under given publishing
id.

```csharp
ConfirmationHandler = confirmation =>
Expand Down Expand Up @@ -555,22 +623,24 @@ ConfirmationHandler = confirmation =>

#### Invalidate messages

If the client doesn't receive a confirmation within configured timeout (3 seconds by default), Reliable Producer removes the message from the internal messages cache.
If the client doesn't receive a confirmation within configured timeout (3 seconds by default), Reliable Producer removes
the message from the internal messages cache.
The user will receive `ConfirmationStatus.ClientTimeoutError` in the `ConfirmationHandler`.

#### Send API

Reliable Producer implements two `send(..)`

- `Send(Message message)` // standard
- `Send(List<Message> messages, CompressionType compressionType)` //sub-batching with compression


### Reliable Consumer

Reliable Consumer is a smart layer built up of the standard `Consumer`. </b>
The idea is to leave the user decides what to use, the standard or reliable Consumer. </b>

The main features are:

- Auto-Reconnect in case of disconnection
- Auto restart consuming from the last offset
- [Handle the metadata Update](#reliable-handle-metadata-update)
Expand All @@ -595,6 +665,7 @@ If `WhenDisconnected` return is `true` Producer/Consumer will be reconnected els
`connectionInfo` add information about the connection.

You can use it:

```csharp
var p = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
{
Expand All @@ -604,22 +675,27 @@ ReconnectStrategy = MyReconnectStrategy

### Reliable handle metadata update

If the streams changes the topology (ex:Stream deleted or add/remove follower), the client receives an `MetadataUpdate` event.
Reliable Producer detects the event and tries to reconnect the producer if the stream still exist else closes the producer/consumer.
If the streams changes the topology (ex:Stream deleted or add/remove follower), the client receives an `MetadataUpdate`
event.
Reliable Producer detects the event and tries to reconnect the producer if the stream still exist else closes the
producer/consumer.

## Build from source

Build:

```shell
make build
```

Test:

```shell
make test
```

Run test in docker:

```shell
make run-test-in-docker
```
Expand All @@ -638,5 +714,5 @@ The client is work in progress. The API(s) could change prior to version `1.0.0`
* Ensure the build for the tag passes: [link](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/actions)
* Create the new release on GitHub, which triggers a build and publish to NuGet: [link](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/releases)
* Check for the new version on NuGet: [link](https://www.nuget.org/packages/RabbitMQ.Stream.Client)
* Best practice is to download the new package and inspect the contents using [NuGetPackageExplorer](https://github.com/NuGetPackageExplorer/NuGetPackageExplorer)
* Best practice is to download the new package and inspect the contents using [NuGetPackageExplorer](https://github.com/NuGetPackageExplorer/NuGetPackageExplorer)
* Announce the new release on the mailing list: [link](https://groups.google.com/g/rabbitmq-users)
Loading