Skip to content

Commit 42141f3

Browse files
DavidHogueAlaina Hagan
authored and
Alaina Hagan
committed
Merged in feature/PG-251 (pull request #35)
PG-215 Check for old processing events and move them to the dead letters queue Approved-by: Rafael Ortiz Approved-by: Beth Bennett Approved-by: Dave Sanderling Approved-by: Thomas Mooney Approved-by: Alaina Hagan
2 parents 7d08413 + 77b8a97 commit 42141f3

File tree

7 files changed

+135
-15
lines changed

7 files changed

+135
-15
lines changed

src/Learning.EventStore/Learning.EventStore.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
<PropertyGroup>
44
<AssemblyTitle>Learning.EventStore</AssemblyTitle>
5-
<VersionPrefix>11.2.1</VersionPrefix>
5+
<VersionPrefix>11.3.0-rc1</VersionPrefix>
66
<TargetFrameworks>netstandard2.0</TargetFrameworks>
77
<AssemblyName>Learning.EventStore</AssemblyName>
88
<PackageId>Learning.EventStore</PackageId>

src/Learning.MessageQueue/Messages/RetryableAsyncRedisSubscription.cs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ protected RetryableAsyncRedisSubscription(IEventSubscriber subscriber, IMessageQ
3737

3838
public virtual async Task RetryAsync()
3939
{
40+
await MoveStaleProcessingEventsToDeadLetters();
41+
4042
var eventType = typeof(T).Name;
4143
var listLength = await _messageQueueRepository.GetDeadLetterListLength<T>().ConfigureAwait(false);
4244
var eventsProcessed = 0;
@@ -50,7 +52,7 @@ public virtual async Task RetryAsync()
5052
for (var i = 0; i < listLength; i++)
5153
{
5254
var indexToGet = i - eventsProcessed;
53-
var eventData = await _messageQueueRepository.GetUnprocessedMessage<T>(indexToGet).ConfigureAwait(false);
55+
var eventData = await _messageQueueRepository.GetDeadLetterMessage<T>(indexToGet).ConfigureAwait(false);
5456

5557
if (string.IsNullOrEmpty(eventData))
5658
{
@@ -120,5 +122,27 @@ protected virtual async Task<bool> ShouldRetry(IMessage @event, RedisValue event
120122

121123
return true;
122124
}
125+
126+
private async Task MoveStaleProcessingEventsToDeadLetters()
127+
{
128+
const int eventCount = 10;
129+
var waitTime = TimeSpan.FromMinutes(5);
130+
var staleTimeStamp = DateTimeOffset.UtcNow - waitTime;
131+
132+
var unprocessedEvents = await _messageQueueRepository.GetOldestProcessingEvents<T>(eventCount).ConfigureAwait(false);
133+
134+
for (var i = 0; i < unprocessedEvents.Length; i++)
135+
{
136+
var unprocessedEvent = unprocessedEvents[i];
137+
var @event = JsonConvert.DeserializeObject<T>(unprocessedEvent);
138+
139+
if (@event.TimeStamp < staleTimeStamp)
140+
{
141+
_logger.Info($"Moving {typeof(T).Name} {@event.Id} with timestamp @{@event.TimeStamp} to dead letter queue.");
142+
143+
await _messageQueueRepository.MoveProcessingEventToDeadLetterQueue<T>(unprocessedEvent, @event).ConfigureAwait(false);
144+
}
145+
}
146+
}
123147
}
124148
}

src/Learning.MessageQueue/Messages/RetryableRedisSubscription.cs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ protected RetryableRedisSubscription(IEventSubscriber subscriber, IMessageQueueR
3737

3838
public virtual async Task RetryAsync()
3939
{
40+
await MoveStaleProcessingEventsToDeadLetters();
41+
4042
var eventType = typeof(T).Name;
4143
var listLength = await _messageQueueRepository.GetDeadLetterListLength<T>().ConfigureAwait(false);
4244
var eventsProcessed = 0;
@@ -50,7 +52,7 @@ public virtual async Task RetryAsync()
5052
for (var i = 0; i < listLength; i++)
5153
{
5254
var indexToGet = i - eventsProcessed;
53-
var eventData = await _messageQueueRepository.GetUnprocessedMessage<T>(indexToGet).ConfigureAwait(false);
55+
var eventData = await _messageQueueRepository.GetDeadLetterMessage<T>(indexToGet).ConfigureAwait(false);
5456

5557
if (string.IsNullOrEmpty(eventData))
5658
{
@@ -123,5 +125,27 @@ protected virtual async Task<bool> ShouldRetry(IMessage @event, RedisValue event
123125

124126
return true;
125127
}
128+
129+
private async Task MoveStaleProcessingEventsToDeadLetters()
130+
{
131+
const int eventCount = 10;
132+
var waitTime = TimeSpan.FromMinutes(5);
133+
var staleTimeStamp = DateTimeOffset.UtcNow - waitTime;
134+
135+
var unprocessedEvents = await _messageQueueRepository.GetOldestProcessingEvents<T>(eventCount);
136+
137+
for (var i = 0; i < unprocessedEvents.Length; i++)
138+
{
139+
var unprocessedEvent = unprocessedEvents[i];
140+
var @event = JsonConvert.DeserializeObject<T>(unprocessedEvent);
141+
142+
if (@event.TimeStamp < staleTimeStamp)
143+
{
144+
_logger.Info($"Moving {typeof(T).Name} {@event.Id} with timestamp @{@event.TimeStamp} to dead letter queue.");
145+
146+
await _messageQueueRepository.MoveProcessingEventToDeadLetterQueue<T>(unprocessedEvent, @event);
147+
}
148+
}
149+
}
126150
}
127151
}

src/Learning.MessageQueue/Repository/IMessageQueueRepository.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ public interface IMessageQueueRepository
1010
{
1111
Task<long> GetDeadLetterListLength<T>() where T : IMessage;
1212

13-
Task<RedisValue> GetUnprocessedMessage<T>(int index) where T : IMessage;
13+
Task<RedisValue> GetDeadLetterMessage<T>(int index) where T : IMessage;
1414

1515
void AddToDeadLetterQueue<T>(RedisValue eventData, IMessage @event, Exception exception) where T : IMessage;
1616

@@ -19,5 +19,9 @@ public interface IMessageQueueRepository
1919
Task UpdateRetryData(IMessage @event, string exceptionMessage);
2020

2121
Task<RetryData> GetRetryData(IMessage @event);
22+
23+
Task<RedisValue[]> GetOldestProcessingEvents<T>(int count) where T : IMessage;
24+
25+
Task MoveProcessingEventToDeadLetterQueue<T>(RedisValue eventData, IMessage @event) where T : IMessage;
2226
}
2327
}

src/Learning.MessageQueue/Repository/MessageQueueRepository.cs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using System;
22
using System.Linq;
3+
using System.Reflection;
34
using System.Threading.Tasks;
45
using Learning.EventStore.Common.Redis;
6+
using Learning.EventStore.Common.Sql;
57
using Learning.MessageQueue.Messages;
68
using StackExchange.Redis;
79

@@ -33,7 +35,7 @@ public async Task<long> GetDeadLetterListLength<T>() where T : IMessage
3335
return listLength;
3436
}
3537

36-
public async Task<RedisValue> GetUnprocessedMessage<T>(int index) where T : IMessage
38+
public async Task<RedisValue> GetDeadLetterMessage<T>(int index) where T : IMessage
3739
{
3840
var deadLetterListKey = GetDeadLetterListKey<T>();
3941

@@ -95,6 +97,26 @@ public async Task<RetryData> GetRetryData(IMessage @event)
9597
return data;
9698
}
9799

100+
public async Task<RedisValue[]> GetOldestProcessingEvents<T>(int count) where T : IMessage
101+
{
102+
var processingEventsListKey = GetProcessingEventsListKey<T>();
103+
var unprocessedEvents = await _redisClient.ListRangeAsync(processingEventsListKey, count * -1, -1).ConfigureAwait(false);
104+
return unprocessedEvents;
105+
}
106+
107+
public async Task MoveProcessingEventToDeadLetterQueue<T>(RedisValue eventData, IMessage @event) where T : IMessage
108+
{
109+
var deadLetterListKey = GetDeadLetterListKey<T>();
110+
var processingEventsListKey = GetProcessingEventsListKey<T>();
111+
var tran = _redisClient.CreateTransaction();
112+
113+
var pushTask = tran.ListLeftPushAsync(deadLetterListKey, eventData);
114+
var removeTask = tran.ListRemoveAsync(processingEventsListKey, eventData, -1);
115+
116+
await ExcecuteTransaction(tran, @event.Id).ConfigureAwait(false);
117+
await Task.WhenAll(pushTask, removeTask).ConfigureAwait(false);
118+
}
119+
98120
private string GetDeadLetterListKey<T>() where T : IMessage
99121
{
100122
var eventType = typeof(T).Name;
@@ -104,6 +126,15 @@ private string GetDeadLetterListKey<T>() where T : IMessage
104126
return processingListKey;
105127
}
106128

129+
private string GetProcessingEventsListKey<T>() where T : IMessage
130+
{
131+
var eventType = typeof(T).Name;
132+
var eventKey = $"{_environment}:{eventType}";
133+
var processingListKey = $"{_applicationName}:{{{eventKey}}}:ProcessingEvents";
134+
135+
return processingListKey;
136+
}
137+
107138
private string GetRetryDataHashKey(IMessage @event)
108139
{
109140
var eventType = @event.GetType().Name;
@@ -122,5 +153,6 @@ private async Task ExcecuteTransaction(ITransaction tran, string aggregateId)
122153
throw new Exception($"Redis transaction failed for AggregateId {aggregateId}");
123154
}
124155
}
156+
125157
}
126158
}

test/Learning.EventStore.Test/RedisMessageQueue/RetryableAsyncSubscriptionTest.cs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public async Task CallsCallbackAndDeletesFromDeadLetterQueue()
2323
var message1 = JsonConvert.SerializeObject(new TestAsyncMessage { Id = "0" });
2424
var message2 = JsonConvert.SerializeObject(new TestAsyncMessage { Id = "1" });
2525
var message3 = JsonConvert.SerializeObject(new TestAsyncMessage { Id = "2" });
26-
A.CallTo(() => eventStoreRepository.GetUnprocessedMessage<TestAsyncMessage>(0)).ReturnsNextFromSequence(message1, message2, message3);
26+
A.CallTo(() => eventStoreRepository.GetDeadLetterMessage<TestAsyncMessage>(0)).ReturnsNextFromSequence(message1, message2, message3);
2727
var retryData = new RetryData
2828
{
2929
LastRetryTime = DateTimeOffset.UtcNow.AddHours(-1)
@@ -47,7 +47,7 @@ public async Task IncrementsExecutionCounterOnCallbackException()
4747
var eventStoreRepository = A.Fake<IMessageQueueRepository>();
4848
A.CallTo(() => eventStoreRepository.GetDeadLetterListLength<TestAsyncMessage>()).Returns(1);
4949
var message = new TestAsyncMessage { Id = "0" };
50-
A.CallTo(() => eventStoreRepository.GetUnprocessedMessage<TestAsyncMessage>(0)).Returns(JsonConvert.SerializeObject(message));
50+
A.CallTo(() => eventStoreRepository.GetDeadLetterMessage<TestAsyncMessage>(0)).Returns(JsonConvert.SerializeObject(message));
5151
var retryData = new RetryData
5252
{
5353
LastRetryTime = DateTimeOffset.UtcNow.AddHours(-1)
@@ -68,7 +68,7 @@ public async Task CallsCallbackIfTimeToLiveHasNotBeenExceeded()
6868
var eventStoreRepository = A.Fake<IMessageQueueRepository>();
6969
A.CallTo(() => eventStoreRepository.GetDeadLetterListLength<TestAsyncMessage>()).Returns(1);
7070
var message = new TestAsyncMessage { Id = "0", TimeStamp = DateTimeOffset.UtcNow.AddHours(-1) };
71-
A.CallTo(() => eventStoreRepository.GetUnprocessedMessage<TestAsyncMessage>(0)).Returns(JsonConvert.SerializeObject(message));
71+
A.CallTo(() => eventStoreRepository.GetDeadLetterMessage<TestAsyncMessage>(0)).Returns(JsonConvert.SerializeObject(message));
7272
var retryData = new RetryData
7373
{
7474
LastRetryTime = DateTimeOffset.UtcNow.AddHours(-1),
@@ -90,7 +90,7 @@ public async Task DoesNotCallCallbackAndDeletesFromDeadLetterQueueIfTimeToLiveIs
9090
var eventStoreRepository = A.Fake<IMessageQueueRepository>();
9191
A.CallTo(() => eventStoreRepository.GetDeadLetterListLength<TestAsyncMessage>()).Returns(1);
9292
var message = new TestAsyncMessage { Id = "0", TimeStamp = DateTimeOffset.UtcNow.AddHours(-169) };
93-
A.CallTo(() => eventStoreRepository.GetUnprocessedMessage<TestAsyncMessage>(0)).Returns(JsonConvert.SerializeObject(message));
93+
A.CallTo(() => eventStoreRepository.GetDeadLetterMessage<TestAsyncMessage>(0)).Returns(JsonConvert.SerializeObject(message));
9494
var retryData = new RetryData
9595
{
9696
LastRetryTime = DateTimeOffset.UtcNow.AddHours(-1),
@@ -112,7 +112,7 @@ public async Task DoesNotFailWithLargeNumberOfRetries()
112112
var eventStoreRepository = A.Fake<IMessageQueueRepository>();
113113
A.CallTo(() => eventStoreRepository.GetDeadLetterListLength<TestAsyncMessage>()).Returns(1);
114114
var message = new TestAsyncMessage { Id = "0", TimeStamp = DateTimeOffset.UtcNow.AddHours(-1) };
115-
A.CallTo(() => eventStoreRepository.GetUnprocessedMessage<TestAsyncMessage>(0)).Returns(JsonConvert.SerializeObject(message));
115+
A.CallTo(() => eventStoreRepository.GetDeadLetterMessage<TestAsyncMessage>(0)).Returns(JsonConvert.SerializeObject(message));
116116
var retryData = new RetryData
117117
{
118118
LastRetryTime = DateTimeOffset.UtcNow.AddHours(-1),
@@ -126,6 +126,24 @@ public async Task DoesNotFailWithLargeNumberOfRetries()
126126

127127
Assert.IsTrue(retryClass.CallBack1Called);
128128
}
129+
130+
[TestMethod]
131+
public async Task ChecksForStaleProcessingEvents()
132+
{
133+
var logger = A.Fake<ILogger>();
134+
var eventStoreRepository = A.Fake<IMessageQueueRepository>();
135+
A.CallTo(() => eventStoreRepository.GetDeadLetterListLength<TestAsyncMessage>()).Returns(3);
136+
var message1 = JsonConvert.SerializeObject(new TestAsyncMessage { Id = "0", TimeStamp = DateTimeOffset.UtcNow - TimeSpan.FromMinutes(0) });
137+
var message2 = JsonConvert.SerializeObject(new TestAsyncMessage { Id = "1", TimeStamp = DateTimeOffset.UtcNow - TimeSpan.FromMinutes(4) });
138+
var message3 = JsonConvert.SerializeObject(new TestAsyncMessage { Id = "2", TimeStamp = DateTimeOffset.UtcNow - TimeSpan.FromMinutes(5) });
139+
A.CallTo(() => eventStoreRepository.GetOldestProcessingEvents<TestAsyncMessage>(A<int>._)).Returns(new RedisValue[] { message1, message2, message3 });
140+
var subscriber = A.Fake<IEventSubscriber>();
141+
var retryClass = new TestAsyncRetryClass(subscriber, eventStoreRepository);
142+
143+
await retryClass.RetryAsync().ConfigureAwait(false);
144+
A.CallTo(() => eventStoreRepository.MoveProcessingEventToDeadLetterQueue<TestAsyncMessage>(A<RedisValue>._, A<TestAsyncMessage>._))
145+
.MustHaveHappened(Repeated.Exactly.Once);
146+
}
129147
}
130148

131149
public class TestAsyncRetryClass : RetryableAsyncRedisSubscription<TestAsyncMessage>

test/Learning.EventStore.Test/RedisMessageQueue/RetryableSubscriptionTest.cs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public async Task CallsCallbackAndDeletesFromDeadLetterQueue()
2323
var message1 = JsonConvert.SerializeObject(new TestMessage { Id = "0" });
2424
var message2 = JsonConvert.SerializeObject(new TestMessage { Id = "1" });
2525
var message3 = JsonConvert.SerializeObject(new TestMessage { Id = "2" });
26-
A.CallTo(() => eventStoreRepository.GetUnprocessedMessage<TestMessage>(0)).ReturnsNextFromSequence(message1, message2, message3);
26+
A.CallTo(() => eventStoreRepository.GetDeadLetterMessage<TestMessage>(0)).ReturnsNextFromSequence(message1, message2, message3);
2727
var retryData = new RetryData
2828
{
2929
LastRetryTime = DateTimeOffset.UtcNow.AddHours(-1)
@@ -47,7 +47,7 @@ public async Task IncrementsExecutionCounterOnCallbackException()
4747
var eventStoreRepository = A.Fake<IMessageQueueRepository>();
4848
A.CallTo(() => eventStoreRepository.GetDeadLetterListLength<TestMessage>()).Returns(1);
4949
var message = new TestMessage { Id = "0" };
50-
A.CallTo(() => eventStoreRepository.GetUnprocessedMessage<TestMessage>(0)).Returns(JsonConvert.SerializeObject(message));
50+
A.CallTo(() => eventStoreRepository.GetDeadLetterMessage<TestMessage>(0)).Returns(JsonConvert.SerializeObject(message));
5151
var retryData = new RetryData
5252
{
5353
LastRetryTime = DateTimeOffset.UtcNow.AddHours(-1)
@@ -68,7 +68,7 @@ public async Task CallsCallbackIfTimeToLiveHasNotBeenExceeded()
6868
var eventStoreRepository = A.Fake<IMessageQueueRepository>();
6969
A.CallTo(() => eventStoreRepository.GetDeadLetterListLength<TestMessage>()).Returns(1);
7070
var message = new TestMessage { Id = "0", TimeStamp = DateTimeOffset.UtcNow.AddHours(-1) };
71-
A.CallTo(() => eventStoreRepository.GetUnprocessedMessage<TestMessage>(0)).Returns(JsonConvert.SerializeObject(message));
71+
A.CallTo(() => eventStoreRepository.GetDeadLetterMessage<TestMessage>(0)).Returns(JsonConvert.SerializeObject(message));
7272
var retryData = new RetryData
7373
{
7474
LastRetryTime = DateTimeOffset.UtcNow.AddHours(-1),
@@ -90,7 +90,7 @@ public async Task DoesNotCallCallbackAndDeletesFromDeadLetterQueueIfTimeToLiveIs
9090
var eventStoreRepository = A.Fake<IMessageQueueRepository>();
9191
A.CallTo(() => eventStoreRepository.GetDeadLetterListLength<TestMessage>()).Returns(1);
9292
var message = new TestMessage { Id = "0", TimeStamp = DateTimeOffset.UtcNow.AddHours(-169) };
93-
A.CallTo(() => eventStoreRepository.GetUnprocessedMessage<TestMessage>(0)).Returns(JsonConvert.SerializeObject(message));
93+
A.CallTo(() => eventStoreRepository.GetDeadLetterMessage<TestMessage>(0)).Returns(JsonConvert.SerializeObject(message));
9494
var retryData = new RetryData
9595
{
9696
LastRetryTime = DateTimeOffset.UtcNow.AddHours(-1),
@@ -112,7 +112,7 @@ public async Task DoesNotFailWithLargeNumberOfRetries()
112112
var eventStoreRepository = A.Fake<IMessageQueueRepository>();
113113
A.CallTo(() => eventStoreRepository.GetDeadLetterListLength<TestMessage>()).Returns(1);
114114
var message = new TestMessage { Id = "0", TimeStamp = DateTimeOffset.UtcNow.AddHours(-1) };
115-
A.CallTo(() => eventStoreRepository.GetUnprocessedMessage<TestMessage>(0)).Returns(JsonConvert.SerializeObject(message));
115+
A.CallTo(() => eventStoreRepository.GetDeadLetterMessage<TestMessage>(0)).Returns(JsonConvert.SerializeObject(message));
116116
var retryData = new RetryData
117117
{
118118
LastRetryTime = DateTimeOffset.UtcNow.AddHours(-1),
@@ -126,6 +126,24 @@ public async Task DoesNotFailWithLargeNumberOfRetries()
126126

127127
Assert.IsTrue(retryClass.CallBack1Called);
128128
}
129+
130+
[TestMethod]
131+
public async Task ChecksForStaleProcessingEvents()
132+
{
133+
var logger = A.Fake<ILogger>();
134+
var eventStoreRepository = A.Fake<IMessageQueueRepository>();
135+
A.CallTo(() => eventStoreRepository.GetDeadLetterListLength<TestAsyncMessage>()).Returns(3);
136+
var message1 = JsonConvert.SerializeObject(new TestAsyncMessage { Id = "0", TimeStamp = DateTimeOffset.UtcNow - TimeSpan.FromMinutes(0) });
137+
var message2 = JsonConvert.SerializeObject(new TestAsyncMessage { Id = "1", TimeStamp = DateTimeOffset.UtcNow - TimeSpan.FromMinutes(4) });
138+
var message3 = JsonConvert.SerializeObject(new TestAsyncMessage { Id = "2", TimeStamp = DateTimeOffset.UtcNow - TimeSpan.FromMinutes(5) });
139+
A.CallTo(() => eventStoreRepository.GetOldestProcessingEvents<TestAsyncMessage>(A<int>._)).Returns(new RedisValue[] { message1, message2, message3 });
140+
var subscriber = A.Fake<IEventSubscriber>();
141+
var retryClass = new TestAsyncRetryClass(subscriber, eventStoreRepository);
142+
143+
await retryClass.RetryAsync().ConfigureAwait(false);
144+
A.CallTo(() => eventStoreRepository.MoveProcessingEventToDeadLetterQueue<TestAsyncMessage>(A<RedisValue>._, A<TestAsyncMessage>._))
145+
.MustHaveHappened(Repeated.Exactly.Once);
146+
}
129147
}
130148

131149
public class TestRetryClass : RetryableRedisSubscription<TestMessage>

0 commit comments

Comments
 (0)