Skip to content

Commit f09473f

Browse files
committed
PAN-2194 Add an async Task subscription
1 parent 6f8be0d commit f09473f

File tree

4 files changed

+312
-3
lines changed

4 files changed

+312
-3
lines changed

src/Learning.MessageQueue/IEventSubscriber.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,9 @@ public interface IEventSubscriber
99
Task SubscribeAsync<T>(Action<T> callBack) where T : IMessage;
1010
Task SubscribeAsync<T>(Action<T> callBack, bool enableLock) where T : IMessage;
1111
Task SubscribeAsync<T>(Action<T> callBack, bool enableLock, bool sequentialProcessing) where T : IMessage;
12+
13+
Task SubscribeAsync<T>(Func<T, Task> callBack) where T : IMessage;
14+
Task SubscribeAsync<T>(Func<T, Task> callBack, bool enableLock) where T : IMessage;
15+
Task SubscribeAsync<T>(Func<T, Task> callBack, bool enableLock, bool sequentialProcessing) where T : IMessage;
1216
}
1317
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Learning.MessageQueue.Logging;
4+
5+
namespace Learning.MessageQueue.Messages
6+
{
7+
public abstract class AsyncRedisSubscription<T> : ISubscription where T : IMessage
8+
{
9+
private readonly IEventSubscriber _subscriber;
10+
private readonly bool _useLock;
11+
private readonly ILog _logger;
12+
private readonly bool _sequentialProcessing;
13+
14+
protected AsyncRedisSubscription(IEventSubscriber subscriber)
15+
: this(subscriber, false, false)
16+
{
17+
}
18+
19+
protected AsyncRedisSubscription(IEventSubscriber subscriber, bool useLock)
20+
: this(subscriber, useLock, false)
21+
{
22+
}
23+
24+
protected AsyncRedisSubscription(IEventSubscriber subscriber, bool useLock, bool sequentialProcessing)
25+
{
26+
_subscriber = subscriber;
27+
_useLock = useLock;
28+
_logger = LogProvider.GetCurrentClassLogger();
29+
_sequentialProcessing = sequentialProcessing;
30+
}
31+
32+
public virtual async Task SubscribeAsync()
33+
{
34+
await _subscriber.SubscribeAsync((Func<T, Task>)CallBack, _useLock, _sequentialProcessing).ConfigureAwait(false);
35+
var messageName = typeof(T).Name;
36+
_logger.Info($"{messageName} subscription created");
37+
}
38+
39+
protected abstract Task CallBack(T message);
40+
}
41+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Learning.MessageQueue.Logging;
4+
using Learning.MessageQueue.Repository;
5+
using Newtonsoft.Json;
6+
using StackExchange.Redis;
7+
8+
namespace Learning.MessageQueue.Messages
9+
{
10+
public abstract class RetryableAsyncRedisSubscription<T> : AsyncRedisSubscription<T>, IRetryableSubscription where T: IMessage
11+
{
12+
private readonly IMessageQueueRepository _messageQueueRepository;
13+
private readonly ILog _logger;
14+
15+
protected RetryableAsyncRedisSubscription(IEventSubscriber subscriber, IMessageQueueRepository messageQueueRepository)
16+
: this(subscriber, messageQueueRepository, false, false)
17+
{
18+
}
19+
20+
protected RetryableAsyncRedisSubscription(IEventSubscriber subscriber, IMessageQueueRepository messageQueueRepository, bool useLock)
21+
: this(subscriber, messageQueueRepository, useLock, false)
22+
{
23+
}
24+
25+
protected RetryableAsyncRedisSubscription(IEventSubscriber subscriber, IMessageQueueRepository messageQueueRepository, bool useLock, bool sequentialProcessing)
26+
: base(subscriber, useLock, sequentialProcessing)
27+
{
28+
_logger = LogProvider.GetCurrentClassLogger();
29+
_messageQueueRepository = messageQueueRepository;
30+
}
31+
32+
public virtual int TimeToLiveHours { get; set; } = 168;
33+
34+
public virtual int RetryIntervalMinutes { get; set; } = 5;
35+
36+
public virtual int RetryIntervalMaxMinutes { get; set; } = 60;
37+
38+
public virtual async Task RetryAsync()
39+
{
40+
var eventType = typeof(T).Name;
41+
var listLength = await _messageQueueRepository.GetDeadLetterListLength<T>().ConfigureAwait(false);
42+
var eventsProcessed = 0;
43+
var errors = 0;
44+
45+
if (listLength > 0)
46+
{
47+
_logger.Info($"Beginning retry of {listLength} {typeof(T).Name} events");
48+
}
49+
50+
for (var i = 0; i < listLength; i++)
51+
{
52+
var indexToGet = i - eventsProcessed;
53+
var eventData = await _messageQueueRepository.GetUnprocessedMessage<T>(indexToGet).ConfigureAwait(false);
54+
55+
if (string.IsNullOrEmpty(eventData))
56+
{
57+
continue;
58+
}
59+
60+
var @event = JsonConvert.DeserializeObject<T>(eventData);
61+
62+
try
63+
{
64+
if (await ShouldRetry(@event, eventData).ConfigureAwait(false))
65+
{
66+
_logger.Info($"Beginning retry of processing for {eventType} event for Aggregate: {@event.Id}");
67+
68+
await RetryCallBackAsync(@event).ConfigureAwait(false);
69+
await _messageQueueRepository.DeleteFromDeadLetterQueue<T>(eventData, @event).ConfigureAwait(false);
70+
eventsProcessed++;
71+
72+
_logger.Info($"Completed retry of processing for {eventType} event for Aggregate: {@event.Id}");
73+
}
74+
}
75+
catch (Exception e)
76+
{
77+
var message = $"{e.Message}{Environment.NewLine}{e.StackTrace}";
78+
await _messageQueueRepository.UpdateRetryData(@event, message).ConfigureAwait(false);
79+
_logger.WarnException($"Event processing retry failed for {eventType} with message: {e.Message}{Environment.NewLine}{e.StackTrace}", e);
80+
81+
errors++;
82+
}
83+
}
84+
85+
_logger.Info($"Retry complete for {eventType}. Processed {eventsProcessed} events with {errors} errors.");
86+
}
87+
88+
protected virtual async Task RetryCallBackAsync(T message)
89+
{
90+
await RetryCallBack(message).ConfigureAwait(false);
91+
}
92+
93+
protected virtual async Task RetryCallBack(T message)
94+
{
95+
await CallBack(message).ConfigureAwait(false);
96+
}
97+
98+
protected virtual async Task<bool> ShouldRetry(IMessage @event, RedisValue eventData)
99+
{
100+
var retryData = await _messageQueueRepository.GetRetryData(@event).ConfigureAwait(false);
101+
102+
// exponential backoff
103+
var mpow = Math.Pow(2, retryData.RetryCount);
104+
var interval = Math.Min(mpow * RetryIntervalMinutes, RetryIntervalMaxMinutes);
105+
var intervalPassed = DateTimeOffset.UtcNow > retryData.LastRetryTime?.ToUniversalTime().AddMinutes(interval);
106+
107+
if (!intervalPassed && retryData.RetryCount > 0)
108+
{
109+
_logger.Debug($"Skipping retry for event with Aggregate Id {@event.Id}; Retry interval has not elapsed.");
110+
return false;
111+
}
112+
113+
if (TimeToLiveHours != default(int) &&
114+
DateTimeOffset.UtcNow > @event.TimeStamp.ToUniversalTime().AddHours(TimeToLiveHours))
115+
{
116+
_logger.Debug($"Time to live of {TimeToLiveHours} hours exceeded for event with Aggregate Id {@event.Id}; Deleting from the dead letter queue.");
117+
await _messageQueueRepository.DeleteFromDeadLetterQueue<T>(eventData, @event).ConfigureAwait(false);
118+
return false;
119+
}
120+
121+
return true;
122+
}
123+
}
124+
}

src/Learning.MessageQueue/RedisEventSubscriber.cs

Lines changed: 143 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public async Task SubscribeAsync<T>(Action<T> callBack) where T : IMessage
5050

5151
public async Task SubscribeAsync<T>(Action<T> callBack, bool enableLock) where T : IMessage
5252
{
53-
await SubscribeAsync(callBack, enableLock, false);
53+
await SubscribeAsync(callBack, enableLock, false).ConfigureAwait(false);
5454
}
5555

5656
public async Task SubscribeAsync<T>(Action<T> callBack, bool enableLock, bool sequentialProcessing) where T : IMessage
@@ -145,8 +145,7 @@ private void ExecuteCallbackWithLock<T>(Action<T> callBack, string eventKey) whe
145145
_logger.ErrorException($"{e.Message}\n{e.StackTrace}", e);
146146
throw;
147147
}
148-
}
149-
148+
}
150149

151150
private void ExecuteCallback<T>(Action<T> callBack) where T : IMessage
152151
{
@@ -186,5 +185,146 @@ Pop the event out of the queue and atomicaly push it into another 'processing' l
186185
_redisClient.ListRemove(processingListKey, eventData);
187186
}
188187
}
188+
189+
public async Task SubscribeAsync<T>(Func<T, Task> callBack) where T : IMessage
190+
{
191+
await SubscribeAsync(callBack, false).ConfigureAwait(false);
192+
}
193+
194+
public async Task SubscribeAsync<T>(Func<T, Task> callBack, bool enableLock) where T : IMessage
195+
{
196+
await SubscribeAsync(callBack, enableLock, false).ConfigureAwait(false);
197+
}
198+
199+
public async Task SubscribeAsync<T>(Func<T, Task> callBack, bool enableLock, bool sequentialProcessing) where T : IMessage
200+
{
201+
if (enableLock && _distributedLockFactory == null)
202+
{
203+
throw new ArgumentNullException(nameof(_distributedLockFactory), "IDistributedLockFactory must be set in constructor if lock is enabled");
204+
}
205+
206+
//Register subscriber
207+
var eventType = typeof(T).Name;
208+
var eventKey = $"{_environment}:{eventType}";
209+
var subscriberSetKey = $"Subscribers:{{{eventKey}}}";
210+
var publishedListKey = $"{_applicationName}:{{{eventKey}}}:PublishedEvents";
211+
await _redisClient.SetAddAsync(subscriberSetKey, _applicationName).ConfigureAwait(false);
212+
213+
//Create concurrent subscription callback
214+
async void ConcurrentRedisCallback(RedisChannel channel, RedisValue data)
215+
{
216+
if (enableLock)
217+
{
218+
await ExecuteCallbackWithLock(callBack, eventKey).ConfigureAwait(false);
219+
}
220+
else
221+
{
222+
await ExecuteCallback(callBack).ConfigureAwait(false);
223+
}
224+
}
225+
226+
//Create sequential subscription callback
227+
async void SequentialRedisCallback(ChannelMessage message)
228+
{
229+
if (enableLock)
230+
{
231+
await ExecuteCallbackWithLock(callBack, eventKey).ConfigureAwait(false);
232+
}
233+
else
234+
{
235+
await ExecuteCallback(callBack).ConfigureAwait(false);
236+
}
237+
}
238+
239+
if (sequentialProcessing)
240+
{
241+
_redisClient.Subscribe(eventKey, SequentialRedisCallback);
242+
}
243+
else
244+
{
245+
await _redisClient.SubscribeAsync(eventKey, ConcurrentRedisCallback).ConfigureAwait(false);
246+
}
247+
248+
//Grab any unprocessed events and process them
249+
//Ensures that events that were fired before the application was started will be picked up
250+
var awaitingEvents = await _redisClient.ListLengthAsync(publishedListKey).ConfigureAwait(false);
251+
for (var i = 0; i < awaitingEvents; i++)
252+
{
253+
try
254+
{
255+
await Task.Run(() => ConcurrentRedisCallback(eventKey, true)).ConfigureAwait(false);
256+
}
257+
catch (Exception e)
258+
{
259+
_logger.ErrorException(e.Message, e);
260+
}
261+
}
262+
}
263+
264+
private async Task ExecuteCallbackWithLock<T>(Func<T, Task> callBack, string eventKey) where T : IMessage
265+
{
266+
try
267+
{
268+
_logger.Debug($"Distributed lock enabled. Attempting to get lock for message with ID {eventKey}...");
269+
using (var distributedLock = await _distributedLockFactory.CreateLockAsync(
270+
eventKey,
271+
TimeSpan.FromSeconds(_lockSettings.ExpirySeconds),
272+
TimeSpan.FromSeconds(_lockSettings.WaitSeconds),
273+
TimeSpan.FromMilliseconds(_lockSettings.RetryMilliseconds))
274+
.ConfigureAwait(false))
275+
{
276+
if (distributedLock.IsAcquired)
277+
{
278+
_logger.Debug($"Distributed lock acquired for message with ID {eventKey}; LockId: {distributedLock.LockId};");
279+
await ExecuteCallback(callBack).ConfigureAwait(false);
280+
}
281+
else
282+
{
283+
throw new DistributedLockException(distributedLock, _lockSettings);
284+
}
285+
}
286+
}
287+
catch (Exception e)
288+
{
289+
_logger.ErrorException($"{e.Message}\n{e.StackTrace}", e);
290+
}
291+
}
292+
293+
private async Task ExecuteCallback<T>(Func<T, Task> callBack) where T : IMessage
294+
{
295+
var eventType = typeof(T).Name;
296+
var eventKey = $"{_environment}:{eventType}";
297+
var publishedListKey = $"{_applicationName}:{{{eventKey}}}:PublishedEvents";
298+
var processingListKey = $"{_applicationName}:{{{eventKey}}}:ProcessingEvents";
299+
300+
/*
301+
Pop the event out of the queue and atomicaly push it into another 'processing' list.
302+
Creates a reliable queue where events can be retried if processing fails, see https://redis.io/commands/rpoplpush.
303+
*/
304+
var eventData = _redisClient.ListRightPopLeftPush(publishedListKey, processingListKey);
305+
306+
// if the eventData is null, then the event has already been processed by another instance, skip further execution
307+
if (!eventData.HasValue)
308+
{
309+
return;
310+
}
311+
312+
//Deserialize the event data and invoke the handler
313+
var message = JsonConvert.DeserializeObject<T>(eventData);
314+
try
315+
{
316+
await callBack.Invoke(message).ConfigureAwait(false);
317+
}
318+
catch (Exception e)
319+
{
320+
_messageQueueRepository.AddToDeadLetterQueue<T>(eventData, message, e);
321+
_logger.ErrorException($"{e.Message}\n{e.StackTrace}", e);
322+
}
323+
finally
324+
{
325+
//Remove the event from the 'processing' list.
326+
await _redisClient.ListRemoveAsync(processingListKey, eventData).ConfigureAwait(false);
327+
}
328+
}
189329
}
190330
}

0 commit comments

Comments
 (0)