Skip to content

Commit e51e445

Browse files
PAN-3799 Add ConditionWaiter to the messagequeue so that a max queue length can be assigned to put back pressure on the publisher
1 parent 8e7b0eb commit e51e445

File tree

8 files changed

+77
-13
lines changed

8 files changed

+77
-13
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using System.Threading;
4+
5+
namespace Learning.MessageQueue
6+
{
7+
public class AsyncConditionWaiter
8+
{
9+
private readonly Func<Task<bool>> _conditionCheck;
10+
private readonly int _pollingInterval;
11+
private TaskCompletionSource<object> _tcs;
12+
13+
public AsyncConditionWaiter(Func<Task<bool>> conditionCheck, int pollingInterval = 100)
14+
{
15+
_conditionCheck = conditionCheck;
16+
_pollingInterval = pollingInterval;
17+
_tcs = new TaskCompletionSource<object>();
18+
}
19+
20+
public async Task WaitForConditionAsync(CancellationToken cancellationToken = default)
21+
{
22+
while (!await _conditionCheck())
23+
{
24+
await Task.Delay(_pollingInterval, cancellationToken); // Wait before checking again to avoid busy-waiting
25+
}
26+
27+
// Condition is met, signal completion
28+
_tcs.SetResult(null);
29+
}
30+
31+
public Task WaitForCompletionAsync()
32+
{
33+
return _tcs.Task;
34+
}
35+
}
36+
}

src/Learning.MessageQueue/IMessageQueue.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ public interface IMessageQueue
77
{
88
Task PublishAsync(IMessage message);
99

10-
Task PublishAsync(string serializedMessage, string messageId, string messageType);
10+
Task PublishAsync(IMessage message, int? capacity = null);
11+
12+
Task PublishAsync(string serializedMessage, string messageId, string messageType, int? capacity = null);
1113
}
1214
}

src/Learning.MessageQueue/RedisMessageQueue.cs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,28 @@ public RedisMessageQueue(IRedisClient redis, RedisEventStoreSettings settings, s
3535

3636
public async Task PublishAsync(IMessage message)
3737
{
38-
if(message.TimeStamp == default(DateTimeOffset))
38+
await PublishAsync(message, null).ConfigureAwait(false);
39+
}
40+
41+
public async Task PublishAsync(IMessage message, int? capacity)
42+
{
43+
if (message.TimeStamp == default(DateTimeOffset))
3944
{
4045
message.TimeStamp = DateTimeOffset.UtcNow;
4146
}
4247

4348
var serializedEvent = JsonConvert.SerializeObject(message, JsonSerializerSettings);
4449
var messageType = message.GetType().Name;
4550

46-
await PublishAsync(serializedEvent, message.Id, messageType).ConfigureAwait(false);
51+
await PublishAsync(serializedEvent, message.Id, messageType, capacity).ConfigureAwait(false);
4752
}
4853

49-
public async Task PublishAsync(string serializedMessage, string messageId, string messageType)
54+
public async Task PublishAsync(string serializedMessage, string messageId, string messageType, int? capacity = null)
5055
{
5156
//Publish the event
5257
for (var i = 0; i < _settings.TransactionRetryCount; i++)
5358
{
54-
var publishTran = await GeneratePublishTransaction(serializedMessage, messageType).ConfigureAwait(false);
59+
var publishTran = await GeneratePublishTransaction(serializedMessage, messageType, capacity).ConfigureAwait(false);
5560
if (await publishTran.ExecuteAsync().ConfigureAwait(false))
5661
{
5762
return;
@@ -63,7 +68,8 @@ public async Task PublishAsync(string serializedMessage, string messageId, strin
6368
throw new MessagePublishFailedException(messageId, _settings.TransactionRetryCount);
6469
}
6570

66-
private async Task<ITransaction> GeneratePublishTransaction(string serializedEvent, string messageType)
71+
72+
private async Task<ITransaction> GeneratePublishTransaction(string serializedEvent, string messageType, int? capacity)
6773
{
6874
var eventKey = $"{_environment}:{messageType}";
6975

@@ -82,6 +88,13 @@ private async Task<ITransaction> GeneratePublishTransaction(string serializedEve
8288
foreach (var subscriber in subscribers)
8389
{
8490
var listKey = $"{subscriber}:{{{eventKey}}}:PublishedEvents";
91+
92+
if (capacity.HasValue)
93+
{
94+
var waiter = new AsyncConditionWaiter(async () => await _redis.Database.ListLengthAsync(listKey) < capacity);
95+
await waiter.WaitForConditionAsync().ConfigureAwait(false);
96+
}
97+
8598
tran.ListLeftPushAsync(listKey, serializedEvent).ConfigureAwait(false);
8699
}
87100

test/Learning.Cqrs.Test/Learning.Cqrs.Test.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
</ItemGroup>
1616

1717
<ItemGroup>
18-
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.4.0" />
18+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.4" />
1919
<PackageReference Include="MSTest.TestAdapter" Version="2.0.0" />
2020
<PackageReference Include="MSTest.TestFramework" Version="2.0.0" />
2121
<PackageReference Include="System.Diagnostics.TraceSource" Version="4.3.0" />

test/Learning.EventStore.Test/Learning.EventStore.Test.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
</ItemGroup>
1818

1919
<ItemGroup>
20-
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.4.0" />
20+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.4" />
2121
<PackageReference Include="FakeItEasy" Version="5.5.0" />
2222
<PackageReference Include="MSTest.TestAdapter" Version="2.0.0" />
2323
<PackageReference Include="MSTest.TestFramework" Version="2.0.0" />

test/Learning.EventStore.Test/RedisEventStore/SaveTest.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public async Task CallsMessageQueuePublish()
8585
A.CallTo(() => _trans.ExecuteAsync(CommandFlags.None)).Returns(Task.Run(() => true));
8686
await _redisEventStore.SaveAsync(_eventList);
8787

88-
A.CallTo(() => _messageQueue.PublishAsync(A<string>._, "12345", A<string>._ ))
88+
A.CallTo(() => _messageQueue.PublishAsync(A<string>._, "12345", A<string>._, A<int?>._))
8989
.MustHaveHappened();
9090
}
9191

@@ -112,7 +112,7 @@ public async Task ThrowsExceptionIfSaveTransactionFails()
112112
public async Task DeletesFromEventStoreHashAndCommitListIfPublishThrowsException()
113113
{
114114
A.CallTo(() => _trans.ExecuteAsync(CommandFlags.None)).Returns(Task.Run(() => true));
115-
A.CallTo(() => _messageQueue.PublishAsync(A<string>._, A<string>._, A<string>._)).Throws(new MessagePublishFailedException("12345", 10));
115+
A.CallTo(() => _messageQueue.PublishAsync(A<string>._, A<string>._, A<string>._, A<int?>._)).Throws(new MessagePublishFailedException("12345", 10));
116116

117117
try
118118
{

test/Learning.EventStore.Test/RedisMessageQueue/PublishTest.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,5 +108,18 @@ public async Task ThrowsExeptionIfPublishTransactionFails()
108108
A.CallTo(() => _trans.PublishAsync("test:TestEvent", true, CommandFlags.None)).MustHaveHappened(Repeated.Exactly.Times(10));
109109
}
110110
}
111+
112+
[TestMethod]
113+
public async Task PublishesEventsWithCapacity()
114+
{
115+
A.CallTo(() => _trans.ExecuteAsync(CommandFlags.None)).Returns(Task.Run(() => true));
116+
A.CallTo(() => _redis.Database.ListLengthAsync("Subscriber1:{test:TestEvent}:PublishedEvents", CommandFlags.None)).ReturnsNextFromSequence(new[] { (long)100, (long)99 });
117+
118+
await _messageQueue.PublishAsync(_event, 100);
119+
120+
121+
A.CallTo(() => _trans.PublishAsync("test:TestEvent", true, CommandFlags.None)).MustHaveHappened(Repeated.Exactly.Once);
122+
A.CallTo(() => _redis.Database.ListLengthAsync("Subscriber1:{test:TestEvent}:PublishedEvents", CommandFlags.None)).MustHaveHappened(Repeated.Exactly.Twice);
123+
}
111124
}
112125
}

test/Learning.EventStore.Test/SqlServerEventStore/SaveTest.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,15 @@ public async Task CallsMessageQueuePublish()
5050
await _sqlEventStore.SaveAsync(_eventList);
5151

5252
A.CallTo(() => _dapper.ExecuteAsync(_writeDbConnection, A<string>._, A<EventDto>._, CommandType.StoredProcedure, _transaction)).MustHaveHappened();
53-
A.CallTo(() => _messageQueue.PublishAsync(_serializedEvent, "12345", A<string>._ ))
53+
A.CallTo(() => _messageQueue.PublishAsync(_serializedEvent, "12345", A<string>._, A<int?>._))
5454
.MustHaveHappened();
5555
A.CallTo(() => _transaction.Commit()).MustHaveHappened();
5656
}
5757

5858
[TestMethod]
5959
public async Task RollsBackTransactionIfPublishFails()
6060
{
61-
A.CallTo(() => _messageQueue.PublishAsync(_serializedEvent, "12345", A<string>._)).Throws(new Exception("Publish Failed"));
61+
A.CallTo(() => _messageQueue.PublishAsync(_serializedEvent, "12345", A<string>._, A<int?>._)).Throws(new Exception("Publish Failed"));
6262

6363
try
6464
{
@@ -67,7 +67,7 @@ public async Task RollsBackTransactionIfPublishFails()
6767
catch (Exception)
6868
{
6969
A.CallTo(() => _dapper.ExecuteAsync(_writeDbConnection, A<string>._, A<EventDto>._, CommandType.StoredProcedure, _transaction)).MustHaveHappened();
70-
A.CallTo(() => _messageQueue.PublishAsync(_serializedEvent, "12345", A<string>._))
70+
A.CallTo(() => _messageQueue.PublishAsync(_serializedEvent, "12345", A<string>._, A<int?>._))
7171
.MustHaveHappened();
7272
A.CallTo(() => _transaction.Commit()).MustNotHaveHappened();
7373
A.CallTo(() => _transaction.Rollback()).MustHaveHappened();

0 commit comments

Comments
 (0)