Skip to content

Commit 5361f04

Browse files
Fixed forwarding of StatusChanged event (#432)
* Fixed forwarding of StatusChanged event * add tests Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent ea8193b commit 5361f04

File tree

2 files changed

+17
-25
lines changed

2 files changed

+17
-25
lines changed

RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,7 @@ public static async Task<DeduplicatingProducer> Create(DeduplicatingProducerConf
3939
var x = new DeduplicatingProducer()
4040
{
4141
_producer = await Producer
42-
.Create(
43-
new ProducerConfig(producerConfig.StreamSystem, producerConfig.Stream)
44-
{
45-
_reference = producerConfig.Reference,
46-
ConfirmationHandler = producerConfig.ConfirmationHandler,
47-
ReconnectStrategy = producerConfig.ReconnectStrategy,
48-
ClientProvidedName = producerConfig.ClientProvidedName,
49-
SuperStreamConfig = producerConfig.SuperStreamConfig,
50-
MaxInFlight = producerConfig.MaxInFlight,
51-
MessagesBufferSize = producerConfig.MessagesBufferSize,
52-
TimeoutMessageAfter = producerConfig.TimeoutMessageAfter,
53-
Filter = producerConfig.Filter,
54-
Identifier = producerConfig.Identifier,
55-
ResourceAvailableReconnectStrategy = producerConfig.ResourceAvailableReconnectStrategy,
56-
57-
}, logger)
42+
.Create(producerConfig, logger)
5843
.ConfigureAwait(false)
5944
};
6045
return x;

Tests/DeduplicationProducerTests.cs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

55
using System;
6+
using System.Collections.Generic;
67
using System.Text;
78
using System.Threading;
89
using System.Threading.Tasks;
@@ -80,16 +81,22 @@ public async Task DeduplicationInActionSendingTheSameIdMessagesWontStore()
8081
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
8182
var testPassed = new TaskCompletionSource<ulong>();
8283
const ulong TotalMessages = 1000UL;
83-
var p = await DeduplicatingProducer.Create(
84-
new DeduplicatingProducerConfig(system, stream, "my_producer_reference")
84+
var dupConfig = new DeduplicatingProducerConfig(system, stream, "my_producer_reference")
85+
{
86+
ConfirmationHandler = async confirmation =>
8587
{
86-
ConfirmationHandler = async confirmation =>
87-
{
88-
if (confirmation.PublishingId == TotalMessages)
89-
testPassed.SetResult(TotalMessages);
90-
await Task.CompletedTask;
91-
},
92-
});
88+
if (confirmation.PublishingId == TotalMessages)
89+
testPassed.SetResult(TotalMessages);
90+
await Task.CompletedTask;
91+
},
92+
};
93+
var statusInfoReceived = new List<StatusInfo>();
94+
dupConfig.StatusChanged += (status) => { statusInfoReceived.Add(status); };
95+
96+
var p = await DeduplicatingProducer.Create(dupConfig);
97+
98+
Assert.Equal(ReliableEntityStatus.Initialization, statusInfoReceived[0].From);
99+
Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[0].To);
93100
// first send and the messages are stored
94101
for (ulong i = 1; i <= TotalMessages; i++)
95102
{

0 commit comments

Comments
 (0)