Skip to content

Commit a3750af

Browse files
committed
PG-215 Check for old processing events and move them to the dead letters queue
1 parent 7d08413 commit a3750af

File tree

5 files changed

+104
-1
lines changed

5 files changed

+104
-1
lines changed

src/Learning.EventStore/Learning.EventStore.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
<PropertyGroup>
44
<AssemblyTitle>Learning.EventStore</AssemblyTitle>
5-
<VersionPrefix>11.2.1</VersionPrefix>
5+
<VersionPrefix>11.3.0-beta.10</VersionPrefix>
66
<TargetFrameworks>netstandard2.0</TargetFrameworks>
77
<AssemblyName>Learning.EventStore</AssemblyName>
88
<PackageId>Learning.EventStore</PackageId>

src/Learning.MessageQueue/Messages/RetryableAsyncRedisSubscription.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using System;
22
using System.Threading.Tasks;
3+
using Learning.EventStore.Common.Sql;
34
using Learning.MessageQueue.Logging;
45
using Learning.MessageQueue.Repository;
6+
using Microsoft.Extensions.Logging;
57
using Newtonsoft.Json;
68
using StackExchange.Redis;
79

@@ -37,6 +39,8 @@ protected RetryableAsyncRedisSubscription(IEventSubscriber subscriber, IMessageQ
3739

3840
public virtual async Task RetryAsync()
3941
{
42+
await MoveStaleProcessingEventsToDeadLetters();
43+
4044
var eventType = typeof(T).Name;
4145
var listLength = await _messageQueueRepository.GetDeadLetterListLength<T>().ConfigureAwait(false);
4246
var eventsProcessed = 0;
@@ -120,5 +124,27 @@ protected virtual async Task<bool> ShouldRetry(IMessage @event, RedisValue event
120124

121125
return true;
122126
}
127+
128+
private async Task MoveStaleProcessingEventsToDeadLetters()
129+
{
130+
const int eventCount = 10;
131+
var waitTime = TimeSpan.FromMinutes(5);
132+
var staleTimeStamp = DateTimeOffset.UtcNow - waitTime;
133+
134+
var unprocessedEvents = await _messageQueueRepository.GetOldestProcessingEvents<T>(eventCount);
135+
136+
for (var i = 0; i < unprocessedEvents.Length; i++)
137+
{
138+
var unprocessedEvent = unprocessedEvents[i];
139+
var @event = JsonConvert.DeserializeObject<T>(unprocessedEvent);
140+
141+
if (@event.TimeStamp < staleTimeStamp)
142+
{
143+
_logger.Info($"Moving {typeof(T).Name} {@event.Id} with timestamp @{@event.TimeStamp} to dead letter queue.");
144+
145+
await _messageQueueRepository.MoveProcessingEventToDeadLetterQueue<T>(unprocessedEvent, @event);
146+
}
147+
}
148+
}
123149
}
124150
}

src/Learning.MessageQueue/Messages/RetryableRedisSubscription.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ protected RetryableRedisSubscription(IEventSubscriber subscriber, IMessageQueueR
3737

3838
public virtual async Task RetryAsync()
3939
{
40+
await MoveStaleProcessingEventsToDeadLetters();
41+
4042
var eventType = typeof(T).Name;
4143
var listLength = await _messageQueueRepository.GetDeadLetterListLength<T>().ConfigureAwait(false);
4244
var eventsProcessed = 0;
@@ -123,5 +125,27 @@ protected virtual async Task<bool> ShouldRetry(IMessage @event, RedisValue event
123125

124126
return true;
125127
}
128+
129+
private async Task MoveStaleProcessingEventsToDeadLetters()
130+
{
131+
const int eventCount = 10;
132+
var waitTime = TimeSpan.FromMinutes(5);
133+
var staleTimeStamp = DateTimeOffset.UtcNow - waitTime;
134+
135+
var unprocessedEvents = await _messageQueueRepository.GetOldestProcessingEvents<T>(eventCount);
136+
137+
for (var i = 0; i < unprocessedEvents.Length; i++)
138+
{
139+
var unprocessedEvent = unprocessedEvents[i];
140+
var @event = JsonConvert.DeserializeObject<T>(unprocessedEvent);
141+
142+
if (@event.TimeStamp < staleTimeStamp)
143+
{
144+
_logger.Info($"Moving {typeof(T).Name} {@event.Id} with timestamp @{@event.TimeStamp} to dead letter queue.");
145+
146+
await _messageQueueRepository.MoveProcessingEventToDeadLetterQueue<T>(unprocessedEvent, @event);
147+
}
148+
}
149+
}
126150
}
127151
}

src/Learning.MessageQueue/Repository/IMessageQueueRepository.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public interface IMessageQueueRepository
1010
{
1111
Task<long> GetDeadLetterListLength<T>() where T : IMessage;
1212

13+
// TODO: Rename to GetDeadLetterMessage
1314
Task<RedisValue> GetUnprocessedMessage<T>(int index) where T : IMessage;
1415

1516
void AddToDeadLetterQueue<T>(RedisValue eventData, IMessage @event, Exception exception) where T : IMessage;
@@ -19,5 +20,9 @@ public interface IMessageQueueRepository
1920
Task UpdateRetryData(IMessage @event, string exceptionMessage);
2021

2122
Task<RetryData> GetRetryData(IMessage @event);
23+
24+
Task<RedisValue[]> GetOldestProcessingEvents<T>(int count) where T : IMessage;
25+
26+
Task MoveProcessingEventToDeadLetterQueue<T>(RedisValue eventData, IMessage @event) where T : IMessage;
2227
}
2328
}

src/Learning.MessageQueue/Repository/MessageQueueRepository.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using System;
22
using System.Linq;
3+
using System.Reflection;
34
using System.Threading.Tasks;
45
using Learning.EventStore.Common.Redis;
6+
using Learning.EventStore.Common.Sql;
57
using Learning.MessageQueue.Messages;
68
using StackExchange.Redis;
79

@@ -33,6 +35,7 @@ public async Task<long> GetDeadLetterListLength<T>() where T : IMessage
3335
return listLength;
3436
}
3537

38+
// TODO: Rename to GetDeadLetterMessage
3639
public async Task<RedisValue> GetUnprocessedMessage<T>(int index) where T : IMessage
3740
{
3841
var deadLetterListKey = GetDeadLetterListKey<T>();
@@ -95,6 +98,41 @@ public async Task<RetryData> GetRetryData(IMessage @event)
9598
return data;
9699
}
97100

101+
public async Task<RedisValue[]> GetOldestProcessingEvents<T>(int count) where T : IMessage
102+
{
103+
var processingEventsListKey = GetProcessingEventsListKey<T>();
104+
var listLength = await _redisClient.ListLengthAsync(processingEventsListKey).ConfigureAwait(false);
105+
long startIndex;
106+
107+
if (listLength == 0)
108+
{
109+
return new RedisValue[0];
110+
}
111+
if (listLength > count)
112+
{
113+
startIndex = listLength - count;
114+
}
115+
else
116+
{
117+
startIndex = 0;
118+
}
119+
120+
var unprocessedEvents = await _redisClient.ListRangeAsync(processingEventsListKey, startIndex, -1).ConfigureAwait(false);
121+
return unprocessedEvents;
122+
}
123+
124+
public async Task MoveProcessingEventToDeadLetterQueue<T>(RedisValue eventData, IMessage @event) where T : IMessage
125+
{
126+
var deadLetterListKey = GetDeadLetterListKey<T>();
127+
var processingEventsListKey = GetProcessingEventsListKey<T>();
128+
var tran = _redisClient.CreateTransaction();
129+
130+
tran.ListLeftPushAsync(deadLetterListKey, eventData);
131+
tran.ListRemoveAsync(processingEventsListKey, eventData, -1);
132+
133+
await ExcecuteTransaction(tran, @event.Id).ConfigureAwait(false);
134+
}
135+
98136
private string GetDeadLetterListKey<T>() where T : IMessage
99137
{
100138
var eventType = typeof(T).Name;
@@ -104,6 +142,15 @@ private string GetDeadLetterListKey<T>() where T : IMessage
104142
return processingListKey;
105143
}
106144

145+
private string GetProcessingEventsListKey<T>() where T : IMessage
146+
{
147+
var eventType = typeof(T).Name;
148+
var eventKey = $"{_environment}:{eventType}";
149+
var processingListKey = $"{_applicationName}:{{{eventKey}}}:ProcessingEvents";
150+
151+
return processingListKey;
152+
}
153+
107154
private string GetRetryDataHashKey(IMessage @event)
108155
{
109156
var eventType = @event.GetType().Name;
@@ -122,5 +169,6 @@ private async Task ExcecuteTransaction(ITransaction tran, string aggregateId)
122169
throw new Exception($"Redis transaction failed for AggregateId {aggregateId}");
123170
}
124171
}
172+
125173
}
126174
}

0 commit comments

Comments
 (0)