Skip to content

Commit 75de64c

Browse files
committed
PAN-2194 Add an optional ISubscriber and if present use it to call async callbacks
1 parent f09473f commit 75de64c

File tree

1 file changed

+34
-5
lines changed

1 file changed

+34
-5
lines changed

src/Learning.MessageQueue/RedisEventSubscriber.cs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class RedisEventSubscriber : IEventSubscriber
1616
{
1717
private readonly IMessageQueueRepository _messageQueueRepository;
1818
private readonly IRedisClient _redisClient;
19+
private readonly ISubscriber _sub;
1920
private readonly string _applicationName;
2021
private readonly string _environment;
2122
private readonly IDistributedLockFactory _distributedLockFactory;
@@ -33,8 +34,20 @@ public RedisEventSubscriber(
3334
string environment,
3435
IDistributedLockFactory distributedLockFactory,
3536
DistributedLockSettings lockSettings = null)
37+
: this(redisClient, null, applicationName, environment, distributedLockFactory, lockSettings)
38+
{
39+
}
40+
41+
public RedisEventSubscriber(
42+
IRedisClient redisClient,
43+
ISubscriber sub,
44+
string applicationName,
45+
string environment,
46+
IDistributedLockFactory distributedLockFactory,
47+
DistributedLockSettings lockSettings = null)
3648
{
3749
_redisClient = redisClient;
50+
_sub = sub;
3851
_messageQueueRepository = new MessageQueueRepository(_redisClient, environment, applicationName);
3952
_applicationName = applicationName;
4053
_environment = environment;
@@ -211,7 +224,7 @@ public async Task SubscribeAsync<T>(Func<T, Task> callBack, bool enableLock, boo
211224
await _redisClient.SetAddAsync(subscriberSetKey, _applicationName).ConfigureAwait(false);
212225

213226
//Create concurrent subscription callback
214-
async void ConcurrentRedisCallback(RedisChannel channel, RedisValue data)
227+
async Task ConcurrentRedisCallback()
215228
{
216229
if (enableLock)
217230
{
@@ -223,8 +236,9 @@ async void ConcurrentRedisCallback(RedisChannel channel, RedisValue data)
223236
}
224237
}
225238

239+
// TODO: These are identical. Does sequential/concurrent make sense an ISubscriber?
226240
//Create sequential subscription callback
227-
async void SequentialRedisCallback(ChannelMessage message)
241+
async Task SequentialRedisCallback()
228242
{
229243
if (enableLock)
230244
{
@@ -238,11 +252,26 @@ async void SequentialRedisCallback(ChannelMessage message)
238252

239253
if (sequentialProcessing)
240254
{
241-
_redisClient.Subscribe(eventKey, SequentialRedisCallback);
255+
if (_sub != null)
256+
{
257+
_sub.Subscribe(eventKey).OnMessage(async message => await SequentialRedisCallback().ConfigureAwait(false));
258+
}
259+
else
260+
{
261+
_redisClient.Subscribe(eventKey, async message => await SequentialRedisCallback().ConfigureAwait(false));
262+
}
242263
}
243264
else
244265
{
245-
await _redisClient.SubscribeAsync(eventKey, ConcurrentRedisCallback).ConfigureAwait(false);
266+
if (_sub != null)
267+
{
268+
_sub.Subscribe(eventKey).OnMessage(async message => await ConcurrentRedisCallback().ConfigureAwait(false));
269+
}
270+
else
271+
{
272+
await _redisClient.SubscribeAsync(eventKey, async (channel, data) => await ConcurrentRedisCallback().ConfigureAwait(false))
273+
.ConfigureAwait(false);
274+
}
246275
}
247276

248277
//Grab any unprocessed events and process them
@@ -252,7 +281,7 @@ async void SequentialRedisCallback(ChannelMessage message)
252281
{
253282
try
254283
{
255-
await Task.Run(() => ConcurrentRedisCallback(eventKey, true)).ConfigureAwait(false);
284+
await Task.Run(async () => await ConcurrentRedisCallback().ConfigureAwait(false)).ConfigureAwait(false);
256285
}
257286
catch (Exception e)
258287
{

0 commit comments

Comments
 (0)