Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 57 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
[![codecov](https://codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/branch/main/graph/badge.svg?token=OIA04ZQD79)](https://codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client)
</div>

#@ Table of Contents

# Table of Contents

---

Expand Down Expand Up @@ -116,7 +115,7 @@ await system.DeleteStream(stream);
await system.Close();
```

### Usage
## Usage

---

Expand Down Expand Up @@ -238,7 +237,7 @@ Consider a Producer instance like a long-lived object, do not create one to send

Producer with a reference name stores the sequence id on the server.
It is possible to retrieve the id using `producer.GetLastPublishingId()`
or more generic `system.QuerySequence("reference", "my_stream")`
or more generic `system.QuerySequence("reference", "my_stream")`.

### Publish Messages

Expand Down Expand Up @@ -294,15 +293,19 @@ var producer = await system.CreateProducer(
Stream = "my_stream",
});
```

then:

```csharp
var publishingId = 0;
var message = new Message(Encoding.UTF8.GetBytes($"my deduplicate message {i}"));
await producer.Send(publishingId, message);
```

### Consume Messages

Define a consumer:

```csharp
var consumer = await system.CreateConsumer(
new ConsumerConfig
Expand All @@ -317,8 +320,11 @@ var consumer = await system.CreateConsumer(
}
});
```

### Offset Types

There are five types of Offset and they can be set by the `ConsumerConfig.OffsetSpec` property that must be passed to the Consumer constructor, in the example we use `OffsetTypeFirst`:

```csharp
var consumerOffsetTypeFirst = await system.CreateConsumer(
new ConsumerConfig
Expand All @@ -333,29 +339,40 @@ var consumerOffsetTypeFirst = await system.CreateConsumer(
}
});
```

The five types are:
- First: it takes messages from the first message of the stream.
- First: it takes messages from the first message of the stream.

```csharp
var offsetTypeFirst = new OffsetTypeFirst();
```

- Last: it takes messages from the last chunk of the stream, i.e. it doesn’t start from the last message, but the last “group” of messages.

```csharp
var offsetTypeLast = new OffsetTypeLast();
```

- Next: it takes messages published after the consumer connection.

```csharp
var offsetTypeNext = new OffsetTypeNext()
```

- Offset: it takes messages starting from the message with id equal to the passed value. If the value is less than the first message of the stream, it starts from the first (i.e. if you pass 0, but the stream starts from 10, it starts from 10). If the message with the id hasn’t yet been published it waits until this publishingId is reached.

```csharp
ulong iWantToStartFromPubId = 10;
var offsetTypeOffset = new OffsetTypeOffset(iWantToStartFromPubId);
```

- Timestamp: it takes messages starting from the first message with timestamp bigger than the one passed

```csharp
var anHourAgo = (long)DateTime.UtcNow.AddHours(-1).Subtract(new DateTime(1970, 1, 1)).TotalSeconds;
var offsetTypeTimestamp = new OffsetTypeTimestamp(anHourAgo);
```

### Track Offset

The server can store the current delivered offset given a consumer with `StoreOffset` in this way:
Expand All @@ -374,8 +391,7 @@ var consumer = await system.CreateConsumer(
}
```

Note: </b>
**Avoid** to store the offset for each single message, it can reduce the performances.
Note: **Avoid** storing the offset for every single message, it can reduce performance.

It is possible to retrieve the offset with `QueryOffset`:

Expand All @@ -389,9 +405,10 @@ var consumer = await system.CreateConsumer(
OffsetSpec = new OffsetTypeOffset(trackedOffset),
```

OBS. if don't have stored an offset for the consumer's reference on the stream you get an OffsetNotFoundException exception.
Note: if you try to store an offset that doesn't exist yet for the consumer's reference on the stream you get will get an `OffsetNotFoundException` exception.

### Handle Close

Producers/Consumers raise and event when the client is disconnected:

```csharp
Expand All @@ -405,8 +422,10 @@ Producers/Consumers raise and event when the client is disconnected:
```

### Handle Metadata Update

Stream metadata update is raised when the stream topology changes or the stream is deleted.
You can use `MetadataHandler` to handle it:

```csharp
new ProducerConfig/ConsumerConfig
{
Expand All @@ -420,49 +439,60 @@ You can use `MetadataHandler` to handle it:
### Heartbeat

It is possible to configure the heartbeat using:

```csharp
var config = new StreamSystemConfig()
{
Heartbeat = TimeSpan.FromSeconds(30),
}
```

- `60` (`TimeSpan.FromSeconds(60)`) seconds is the default value
- `0` (`TimeSpan.FromSeconds(0)`) will advise server to disable heartbeat

Heartbeat value shouldn't be too low.

### Reliable
- Reliable Producer
- Reliable Consumer </p>
See the directory [Examples/Reliable](./Examples/Reliable)
- Reliable Consumer

See the directory [Examples/Reliable](./Examples/Reliable) for code examples.


### Reliable Producer
Reliable Producer is a smart layer built up of the standard `Producer`. </b>
The idea is to leave the user decides what to use, the standard or reliable producer. </b>

Reliable Producer is a smart layer built up of the standard `Producer`.

The idea is to give the user ability to choose between the standard or reliable producer.

The main features are:

- Provide publishingID automatically
- Auto-Reconnect in case of disconnection
- Trace sent and received messages
- Invalidate messages
- [Handle the metadata Update](#reliable-handle-metadata-update)

#### Provide publishingID automatically
Reliable Producer retrieves the last publishingID given the producer name. </b>
Zero(0) is the default value in case there is no a publishingID.

Reliable Producer retrieves the last publishingID given the producer name.

Zero(0) is the default value in case there is no publishingID for given producer reference.

#### Auto-Reconnect

Reliable Producer restores the TCP connection in case the Producer is disconnected for some reason.
During the reconnection it continues to store the messages in a local-list.
The user will receive back the confirmed or un-confirmed messages.
See [Reconnection Strategy](#reconnection-strategy)

#### Trace sent and received messages
Reliable Producer keeps in memory each sent message and remove from the memory when the message is confirmed or goes in timout.

Reliable Producer keeps in memory each sent message and removes it from the memory when the message is confirmed or times out.
`ConfirmationHandler` receives the messages back with the status.
`confirmation.Status` can have different values, but in general `ConfirmationStatus.Confirmed` means the messages
is stored on the server other status means that there was a problem with the message/messages.
is stored on the server. Other statuses mean that there was a problem with the message/messages under given publishing id.

```csharp
ConfirmationHandler = confirmation =>
{
Expand All @@ -477,17 +507,21 @@ ConfirmationHandler = confirmation =>
}
}
```

#### Invalidate messages
If the client doesn't receive a confirmation within 2 seconds Reliable Producer removes the message from the internal messages cache.
The user will receive `ConfirmationStatus.TimeoutError` in the `ConfirmationHandler`.

If the client doesn't receive a confirmation within configured timeout (3 seconds by default), Reliable Producer removes the message from the internal messages cache.
The user will receive `ConfirmationStatus.ClientTimeoutError` in the `ConfirmationHandler`.

#### Send API
Reliable Producer implements two `send(..)`

- `Send(Message message)` // standard
- `Send(List<Message> messages, CompressionType compressionType)` //sub-batching with compression


### Reliable Consumer

Reliable Consumer is a smart layer built up of the standard `Consumer`. </b>
The idea is to leave the user decides what to use, the standard or reliable Consumer. </b>

Expand All @@ -497,17 +531,21 @@ The main features are:
- [Handle the metadata Update](#reliable-handle-metadata-update)

#### Auto-Reconnect

Reliable Consumer restores the TCP connection in case the Producer is disconnected for some reason.
Reliable Consumer will restart consuming from the last offset stored.
See [Reconnection Strategy](#reconnection-strategy)

### Reconnection Strategy

By default Reliable Producer/Consumer uses an `BackOffReconnectStrategy` to reconnect the client.
You can customize the behaviour implementing the `IReconnectStrategy` interface:

```csharp
bool WhenDisconnected(string connectionInfo);
void WhenConnected(string connectionInfo);
```

If `WhenDisconnected` return is `true` Producer/Consumer will be reconnected else closed.
`connectionInfo` add information about the connection.

Expand All @@ -519,8 +557,8 @@ var p = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig
ReconnectStrategy = MyReconnectStrategy
```


### Reliable handle metadata update

If the streams changes the topology (ex:Stream deleted or add/remove follower), the client receives an `MetadataUpdate` event.
Reliable Producer detects the event and tries to reconnect the producer if the stream still exist else closes the producer/consumer.

Expand Down
6 changes: 4 additions & 2 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -611,18 +611,18 @@ RabbitMQ.Stream.Client.QueryPublisherResponse.Write(System.Span<byte> span) -> i
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.AddUnConfirmedMessage(ulong publishingId, RabbitMQ.Stream.Client.Message message) -> void
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.AddUnConfirmedMessage(ulong publishingId, System.Collections.Generic.List<RabbitMQ.Stream.Client.Message> messages) -> void
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.ConfirmationPipe(System.Func<RabbitMQ.Stream.Client.Reliable.MessagesConfirmation, System.Threading.Tasks.Task> confirmHandler) -> void
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.ConfirmationPipe(System.Func<RabbitMQ.Stream.Client.Reliable.MessagesConfirmation, System.Threading.Tasks.Task> confirmHandler, System.TimeSpan messageTimeout) -> void
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.RemoveUnConfirmedMessage(ulong publishingId, RabbitMQ.Stream.Client.Reliable.ConfirmationStatus confirmationStatus) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.Start() -> void
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.Stop() -> void
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.AccessRefused = 16 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.ClientTimeoutError = 2 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.Confirmed = 1 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.InternalError = 15 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.PreconditionFailed = 17 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.PublisherDoesNotExist = 18 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.StreamNotAvailable = 6 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.TimeoutError = 2 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.UndefinedError = 200 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.WaitForConfirmation = 0 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
Expand Down Expand Up @@ -674,6 +674,8 @@ RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.Stream.get -> string
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.Stream.set -> void
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.StreamSystem.get -> RabbitMQ.Stream.Client.StreamSystem
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.StreamSystem.set -> void
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.TimeoutMessageAfter.get -> System.TimeSpan
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.TimeoutMessageAfter.init -> void
RabbitMQ.Stream.Client.ResponseCode
RabbitMQ.Stream.Client.ResponseCode.AccessRefused = 16 -> RabbitMQ.Stream.Client.ResponseCode
RabbitMQ.Stream.Client.ResponseCode.AuthenticationFailure = 8 -> RabbitMQ.Stream.Client.ResponseCode
Expand Down
34 changes: 24 additions & 10 deletions RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,22 @@ namespace RabbitMQ.Stream.Client.Reliable;
public enum ConfirmationStatus : ushort
{
WaitForConfirmation = 0,
/// <summary>
/// Message was confirmed to be received and stored by server.
/// </summary>
Confirmed = 1,
TimeoutError = 2,
/// <summary>
/// Client gave up on waiting for this publishing id.
/// </summary>
ClientTimeoutError = 2,
/// <summary>
/// Stream is not available anymore (it was deleted).
/// </summary>
StreamNotAvailable = 6,
InternalError = 15,
/// <summary>
/// Signals either bad credentials, or insufficient permissions.
/// </summary>
AccessRefused = 16,
PreconditionFailed = 17,
PublisherDoesNotExist = 18,
Expand Down Expand Up @@ -53,10 +65,12 @@ public class ConfirmationPipe
private readonly ConcurrentDictionary<ulong, MessagesConfirmation> _waitForConfirmation = new();
private readonly Timer _invalidateTimer = new();
private Func<MessagesConfirmation, Task> ConfirmHandler { get; }
private readonly TimeSpan _messageTimeout;

public ConfirmationPipe(Func<MessagesConfirmation, Task> confirmHandler)
public ConfirmationPipe(Func<MessagesConfirmation, Task> confirmHandler, TimeSpan messageTimeout)
{
ConfirmHandler = confirmHandler;
_messageTimeout = messageTimeout;
}

public void Start()
Expand All @@ -82,7 +96,7 @@ public void Start()
});

_invalidateTimer.Elapsed += OnTimedEvent;
_invalidateTimer.Interval = 2000;
_invalidateTimer.Interval = _messageTimeout.TotalMilliseconds;
_invalidateTimer.Enabled = true;
}

Expand All @@ -94,24 +108,24 @@ public void Stop()

private async void OnTimedEvent(object sender, ElapsedEventArgs e)
{
var timedOutMessages = _waitForConfirmation.Where(pair =>
(DateTime.Now - pair.Value.InsertDateTime).TotalSeconds > _messageTimeout.TotalSeconds);

foreach (var pair in timedOutMessages)
{
foreach (var pair in _waitForConfirmation.Where(pair =>
(DateTime.Now - pair.Value.InsertDateTime).Seconds > 2))
{
await RemoveUnConfirmedMessage(pair.Value.PublishingId, ConfirmationStatus.TimeoutError);
}
await RemoveUnConfirmedMessage(pair.Value.PublishingId, ConfirmationStatus.ClientTimeoutError);
}
}

public void AddUnConfirmedMessage(ulong publishingId, Message message)
{
AddUnConfirmedMessage(publishingId, new List<Message>() { message });
AddUnConfirmedMessage(publishingId, new List<Message> { message });
}

public void AddUnConfirmedMessage(ulong publishingId, List<Message> messages)
{
_waitForConfirmation.TryAdd(publishingId,
new MessagesConfirmation()
new MessagesConfirmation
{
Messages = messages,
PublishingId = publishingId,
Expand Down
9 changes: 6 additions & 3 deletions RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public record ReliableProducerConfig
public Func<MessagesConfirmation, Task> ConfirmationHandler { get; init; }
public string ClientProvidedName { get; set; } = "dotnet-stream-rproducer";
public IReconnectStrategy ReconnectStrategy { get; set; } = new BackOffReconnectStrategy();
public TimeSpan TimeoutMessageAfter { get; init; } = TimeSpan.FromSeconds(3);
}

/// <summary>
Expand All @@ -40,7 +41,10 @@ public class ReliableProducer : ReliableBase
private ReliableProducer(ReliableProducerConfig reliableProducerConfig)
{
_reliableProducerConfig = reliableProducerConfig;
_confirmationPipe = new ConfirmationPipe(reliableProducerConfig.ConfirmationHandler);
_confirmationPipe = new ConfirmationPipe(
reliableProducerConfig.ConfirmationHandler,
reliableProducerConfig.TimeoutMessageAfter
);
_confirmationPipe.Start();
}

Expand Down Expand Up @@ -192,7 +196,6 @@ public async ValueTask Send(List<Message> messages, CompressionType compressionT

public override string ToString()
{
return $"Producer reference: {_reliableProducerConfig.Reference}," +
$"stream: {_reliableProducerConfig.Stream} ";
return $"Producer reference: {_reliableProducerConfig.Reference}, stream: {_reliableProducerConfig.Stream} ";
}
}
Loading