Skip to content

Commit cf2b768

Browse files
steelheaddigitalAlaina Hagan
authored and
Alaina Hagan
committed
Merged in feature/CLR-46 (pull request #31)
Feature/CLR-46 Approved-by: Brian Craw <bcraw@learning.com> Approved-by: Aaron Butler <abutler@learning.com>
2 parents 0313198 + f13b720 commit cf2b768

File tree

8 files changed

+89
-39
lines changed

8 files changed

+89
-39
lines changed

src/Learning.EventStore.Common/Redis/IRedisClient.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public interface IRedisClient
2020
long ListRemove(RedisKey key, RedisValue value);
2121
Task PublishAsync(RedisChannel channel, RedisValue value);
2222
Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler);
23+
void Subscribe(RedisChannel channel, Action<ChannelMessage> handler);
2324
Task<string> StringGetAsync(string key);
2425
Task<bool> StringSetAsync(string key, string value, TimeSpan? expiry = null);
2526
Task<bool> KeyExistsAsync(string key);

src/Learning.EventStore.Common/Redis/RedisClient.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ public async Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, Redi
135135
await RetryPolicyAsync.ExecuteAsync(() => _redisConnection.GetSubscriber().SubscribeAsync(channel, handler)).ConfigureAwait(false);
136136
}
137137

138+
public void Subscribe(RedisChannel channel, Action<ChannelMessage> handler)
139+
{
140+
RetryPolicy.Execute(() => _redisConnection.GetSubscriber().Subscribe(channel).OnMessage(handler));
141+
}
142+
138143
public async Task<string> StringGetAsync(string key)
139144
{
140145
var result = await RetryPolicyAsync.ExecuteAsync(() => Database.StringGetAsync(key)).ConfigureAwait(false);

src/Learning.EventStore/Learning.EventStore.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
<PropertyGroup>
44
<AssemblyTitle>Learning.EventStore</AssemblyTitle>
5-
<VersionPrefix>11.0.2</VersionPrefix>
5+
<VersionPrefix>11.1.1</VersionPrefix>
66
<TargetFrameworks>netstandard2.0</TargetFrameworks>
77
<AssemblyName>Learning.EventStore</AssemblyName>
88
<PackageId>Learning.EventStore</PackageId>

src/Learning.MessageQueue/IEventSubscriber.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ public interface IEventSubscriber
88
{
99
Task SubscribeAsync<T>(Action<T> callBack) where T : IMessage;
1010
Task SubscribeAsync<T>(Action<T> callBack, bool enableLock) where T : IMessage;
11+
Task SubscribeAsync<T>(Action<T> callBack, bool enableLock, bool sequentialProcessing) where T : IMessage;
1112
}
1213
}

src/Learning.MessageQueue/Learning.MessageQueue.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
<PropertyGroup>
44
<AssemblyTitle>Learning.MessageQueue</AssemblyTitle>
5-
<VersionPrefix>4.0.2</VersionPrefix>
5+
<VersionPrefix>4.1.1</VersionPrefix>
66
<TargetFrameworks>netstandard2.0</TargetFrameworks>
77
<AssemblyName>Learning.MessageQueue</AssemblyName>
88
<PackageId>Learning.MessageQueue</PackageId>

src/Learning.MessageQueue/Messages/RedisSubscription.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,29 @@ public abstract class RedisSubscription<T> : ISubscription where T : IMessage
99
private readonly IEventSubscriber _subscriber;
1010
private readonly bool _useLock;
1111
private readonly ILog _logger;
12+
private readonly bool _sequentialProcessing;
1213

1314
protected RedisSubscription(IEventSubscriber subscriber)
14-
: this(subscriber, false)
15+
: this(subscriber, false, false)
1516
{
1617
}
1718

1819
protected RedisSubscription(IEventSubscriber subscriber, bool useLock)
20+
: this(subscriber, useLock, false)
21+
{
22+
}
23+
24+
protected RedisSubscription(IEventSubscriber subscriber, bool useLock, bool sequentialProcessing)
1925
{
2026
_subscriber = subscriber;
2127
_useLock = useLock;
2228
_logger = LogProvider.GetCurrentClassLogger();
29+
_sequentialProcessing = sequentialProcessing;
2330
}
2431

2532
public virtual async Task SubscribeAsync()
2633
{
27-
await _subscriber.SubscribeAsync((Action<T>)CallBack, _useLock).ConfigureAwait(false);
34+
await _subscriber.SubscribeAsync((Action<T>)CallBack, _useLock, _sequentialProcessing).ConfigureAwait(false);
2835
var messageName = typeof(T).Name;
2936
_logger.Info($"{messageName} subscription created");
3037
}

src/Learning.MessageQueue/Messages/RetryableRedisSubscription.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,17 @@ public abstract class RetryableRedisSubscription<T> : RedisSubscription<T>, IRet
1313
private readonly ILog _logger;
1414

1515
protected RetryableRedisSubscription(IEventSubscriber subscriber, IMessageQueueRepository messageQueueRepository)
16-
: this(subscriber, messageQueueRepository, false)
16+
: this(subscriber, messageQueueRepository, false, false)
1717
{
1818
}
1919

2020
protected RetryableRedisSubscription(IEventSubscriber subscriber, IMessageQueueRepository messageQueueRepository, bool useLock)
21-
: base(subscriber, useLock)
21+
: this(subscriber, messageQueueRepository, useLock, false)
22+
{
23+
}
24+
25+
protected RetryableRedisSubscription(IEventSubscriber subscriber, IMessageQueueRepository messageQueueRepository, bool useLock, bool sequentialProcessing)
26+
: base(subscriber, useLock, sequentialProcessing)
2227
{
2328
_logger = LogProvider.GetCurrentClassLogger();
2429
_messageQueueRepository = messageQueueRepository;

src/Learning.MessageQueue/RedisEventSubscriber.cs

Lines changed: 64 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ public async Task SubscribeAsync<T>(Action<T> callBack) where T : IMessage
4949
}
5050

5151
public async Task SubscribeAsync<T>(Action<T> callBack, bool enableLock) where T : IMessage
52+
{
53+
await SubscribeAsync(callBack, enableLock, false);
54+
}
55+
56+
public async Task SubscribeAsync<T>(Action<T> callBack, bool enableLock, bool sequentialProcessing) where T : IMessage
5257
{
5358
if (enableLock && _distributedLockFactory == null)
5459
{
@@ -62,45 +67,40 @@ public async Task SubscribeAsync<T>(Action<T> callBack, bool enableLock) where T
6267
var publishedListKey = $"{_applicationName}:{{{eventKey}}}:PublishedEvents";
6368
await _redisClient.SetAddAsync(subscriberSetKey, _applicationName).ConfigureAwait(false);
6469

65-
//Create subscription callback
66-
void RedisCallback(RedisChannel channel, RedisValue data)
70+
//Create concurrent subscription callback
71+
void ConcurrentRedisCallback(RedisChannel channel, RedisValue data)
6772
{
68-
try
73+
if (enableLock)
6974
{
70-
if (enableLock)
71-
{
72-
_logger.Debug($"Distributed lock enabled. Attempting to get lock for message with ID {eventKey}...");
73-
using(var distributedLock = _distributedLockFactory.CreateLock(
74-
eventKey,
75-
TimeSpan.FromSeconds(_lockSettings.ExpirySeconds),
76-
TimeSpan.FromSeconds(_lockSettings.WaitSeconds),
77-
TimeSpan.FromMilliseconds(_lockSettings.RetryMilliseconds)))
78-
{
79-
if (distributedLock.IsAcquired)
80-
{
81-
_logger.Debug($"Distributed lock acquired for message with ID {eventKey}; LockId: {distributedLock.LockId};");
82-
ExecuteCallback(callBack);
83-
}
84-
else
85-
{
86-
throw new DistributedLockException(distributedLock, _lockSettings);
87-
}
88-
}
89-
}
90-
else
91-
{
92-
ExecuteCallback(callBack);
93-
}
75+
ExecuteCallbackWithLock(callBack, eventKey);
9476
}
95-
catch (Exception e)
77+
else
9678
{
97-
_logger.ErrorException($"{e.Message}\n{e.StackTrace}", e);
98-
throw;
79+
ExecuteCallback(callBack);
9980
}
10081
}
10182

102-
//Subscribe to the event
103-
await _redisClient.SubscribeAsync(eventKey, RedisCallback).ConfigureAwait(false);
83+
//Create sequential subscription callback
84+
void SequentialRedisCallback(ChannelMessage message)
85+
{
86+
if (enableLock)
87+
{
88+
ExecuteCallbackWithLock(callBack, eventKey);
89+
}
90+
else
91+
{
92+
ExecuteCallback(callBack);
93+
}
94+
}
95+
96+
if (sequentialProcessing)
97+
{
98+
_redisClient.Subscribe(eventKey, SequentialRedisCallback);
99+
}
100+
else
101+
{
102+
await _redisClient.SubscribeAsync(eventKey, ConcurrentRedisCallback).ConfigureAwait(false);
103+
}
104104

105105
//Grab any unprocessed events and process them
106106
//Ensures that events that were fired before the application was started will be picked up
@@ -109,7 +109,7 @@ void RedisCallback(RedisChannel channel, RedisValue data)
109109
{
110110
try
111111
{
112-
await Task.Run(() => RedisCallback(eventKey, true)).ConfigureAwait(false);
112+
await Task.Run(() => ConcurrentRedisCallback(eventKey, true)).ConfigureAwait(false);
113113
}
114114
catch (Exception e)
115115
{
@@ -118,6 +118,36 @@ void RedisCallback(RedisChannel channel, RedisValue data)
118118
}
119119
}
120120

121+
private void ExecuteCallbackWithLock<T>(Action<T> callBack, string eventKey) where T : IMessage
122+
{
123+
try
124+
{
125+
_logger.Debug($"Distributed lock enabled. Attempting to get lock for message with ID {eventKey}...");
126+
using(var distributedLock = _distributedLockFactory.CreateLock(
127+
eventKey,
128+
TimeSpan.FromSeconds(_lockSettings.ExpirySeconds),
129+
TimeSpan.FromSeconds(_lockSettings.WaitSeconds),
130+
TimeSpan.FromMilliseconds(_lockSettings.RetryMilliseconds)))
131+
{
132+
if (distributedLock.IsAcquired)
133+
{
134+
_logger.Debug($"Distributed lock acquired for message with ID {eventKey}; LockId: {distributedLock.LockId};");
135+
ExecuteCallback(callBack);
136+
}
137+
else
138+
{
139+
throw new DistributedLockException(distributedLock, _lockSettings);
140+
}
141+
}
142+
}
143+
catch (Exception e)
144+
{
145+
_logger.ErrorException($"{e.Message}\n{e.StackTrace}", e);
146+
throw;
147+
}
148+
}
149+
150+
121151
private void ExecuteCallback<T>(Action<T> callBack) where T : IMessage
122152
{
123153
var eventType = typeof(T).Name;
@@ -146,6 +176,7 @@ Pop the event out of the queue and atomicaly push it into another 'processing' l
146176
catch (Exception e)
147177
{
148178
_messageQueueRepository.AddToDeadLetterQueue<T>(eventData, message, e);
179+
_logger.ErrorException($"{e.Message}\n{e.StackTrace}", e);
149180

150181
throw;
151182
}

0 commit comments

Comments
 (0)