@@ -50,7 +50,7 @@ public async Task SubscribeAsync<T>(Action<T> callBack) where T : IMessage
5050
5151 public async Task SubscribeAsync < T > ( Action < T > callBack , bool enableLock ) where T : IMessage
5252 {
53- await SubscribeAsync ( callBack , enableLock , false ) ;
53+ await SubscribeAsync ( callBack , enableLock , false ) . ConfigureAwait ( false ) ;
5454 }
5555
5656 public async Task SubscribeAsync < T > ( Action < T > callBack , bool enableLock , bool sequentialProcessing ) where T : IMessage
@@ -145,8 +145,7 @@ private void ExecuteCallbackWithLock<T>(Action<T> callBack, string eventKey) whe
145145 _logger . ErrorException ( $ "{ e . Message } \n { e . StackTrace } ", e ) ;
146146 throw ;
147147 }
148- }
149-
148+ }
150149
151150 private void ExecuteCallback < T > ( Action < T > callBack ) where T : IMessage
152151 {
@@ -186,5 +185,146 @@ Pop the event out of the queue and atomicaly push it into another 'processing' l
186185 _redisClient . ListRemove ( processingListKey , eventData ) ;
187186 }
188187 }
188+
189+ public async Task SubscribeAsync < T > ( Func < T , Task > callBack ) where T : IMessage
190+ {
191+ await SubscribeAsync ( callBack , false ) . ConfigureAwait ( false ) ;
192+ }
193+
194+ public async Task SubscribeAsync < T > ( Func < T , Task > callBack , bool enableLock ) where T : IMessage
195+ {
196+ await SubscribeAsync ( callBack , enableLock , false ) . ConfigureAwait ( false ) ;
197+ }
198+
199+ public async Task SubscribeAsync < T > ( Func < T , Task > callBack , bool enableLock , bool sequentialProcessing ) where T : IMessage
200+ {
201+ if ( enableLock && _distributedLockFactory == null )
202+ {
203+ throw new ArgumentNullException ( nameof ( _distributedLockFactory ) , "IDistributedLockFactory must be set in constructor if lock is enabled" ) ;
204+ }
205+
206+ //Register subscriber
207+ var eventType = typeof ( T ) . Name ;
208+ var eventKey = $ "{ _environment } :{ eventType } ";
209+ var subscriberSetKey = $ "Subscribers:{{{eventKey}}}";
210+ var publishedListKey = $ "{ _applicationName } :{{{eventKey}}}:PublishedEvents";
211+ await _redisClient . SetAddAsync ( subscriberSetKey , _applicationName ) . ConfigureAwait ( false ) ;
212+
213+ // Create async Task subscription callback
214+ async Task TaskRedisCallback ( )
215+ {
216+ if ( enableLock )
217+ {
218+ await ExecuteCallbackWithLock ( callBack , eventKey ) . ConfigureAwait ( false ) ;
219+ }
220+ else
221+ {
222+ await ExecuteCallback ( callBack ) . ConfigureAwait ( false ) ;
223+ }
224+ }
225+
226+ // Create async void subscription callback
227+ async void VoidRedisCallback ( )
228+ {
229+ if ( enableLock )
230+ {
231+ await ExecuteCallbackWithLock ( callBack , eventKey ) . ConfigureAwait ( false ) ;
232+ }
233+ else
234+ {
235+ await ExecuteCallback ( callBack ) . ConfigureAwait ( false ) ;
236+ }
237+ }
238+
239+ if ( sequentialProcessing )
240+ {
241+ _redisClient . Subscribe ( eventKey , message => TaskRedisCallback ( ) ) ;
242+ }
243+ else
244+ {
245+ await _redisClient . SubscribeAsync ( eventKey , ( channel , data ) => VoidRedisCallback ( ) ) . ConfigureAwait ( false ) ;
246+ }
247+
248+ //Grab any unprocessed events and process them
249+ //Ensures that events that were fired before the application was started will be picked up
250+ var awaitingEvents = await _redisClient . ListLengthAsync ( publishedListKey ) . ConfigureAwait ( false ) ;
251+ for ( var i = 0 ; i < awaitingEvents ; i ++ )
252+ {
253+ try
254+ {
255+ await Task . Run ( async ( ) => await TaskRedisCallback ( ) . ConfigureAwait ( false ) ) . ConfigureAwait ( false ) ;
256+ }
257+ catch ( Exception e )
258+ {
259+ _logger . ErrorException ( e . Message , e ) ;
260+ }
261+ }
262+ }
263+
264+ private async Task ExecuteCallbackWithLock < T > ( Func < T , Task > callBack , string eventKey ) where T : IMessage
265+ {
266+ try
267+ {
268+ _logger . Debug ( $ "Distributed lock enabled. Attempting to get lock for message with ID { eventKey } ...") ;
269+ using ( var distributedLock = await _distributedLockFactory . CreateLockAsync (
270+ eventKey ,
271+ TimeSpan . FromSeconds ( _lockSettings . ExpirySeconds ) ,
272+ TimeSpan . FromSeconds ( _lockSettings . WaitSeconds ) ,
273+ TimeSpan . FromMilliseconds ( _lockSettings . RetryMilliseconds ) )
274+ . ConfigureAwait ( false ) )
275+ {
276+ if ( distributedLock . IsAcquired )
277+ {
278+ _logger . Debug ( $ "Distributed lock acquired for message with ID { eventKey } ; LockId: { distributedLock . LockId } ;") ;
279+ await ExecuteCallback ( callBack ) . ConfigureAwait ( false ) ;
280+ }
281+ else
282+ {
283+ throw new DistributedLockException ( distributedLock , _lockSettings ) ;
284+ }
285+ }
286+ }
287+ catch ( Exception e )
288+ {
289+ _logger . ErrorException ( $ "{ e . Message } \n { e . StackTrace } ", e ) ;
290+ }
291+ }
292+
293+ private async Task ExecuteCallback < T > ( Func < T , Task > callBack ) where T : IMessage
294+ {
295+ var eventType = typeof ( T ) . Name ;
296+ var eventKey = $ "{ _environment } :{ eventType } ";
297+ var publishedListKey = $ "{ _applicationName } :{{{eventKey}}}:PublishedEvents";
298+ var processingListKey = $ "{ _applicationName } :{{{eventKey}}}:ProcessingEvents";
299+
300+ /*
301+ Pop the event out of the queue and atomicaly push it into another 'processing' list.
302+ Creates a reliable queue where events can be retried if processing fails, see https://redis.io/commands/rpoplpush.
303+ */
304+ var eventData = _redisClient . ListRightPopLeftPush ( publishedListKey , processingListKey ) ;
305+
306+ // if the eventData is null, then the event has already been processed by another instance, skip further execution
307+ if ( ! eventData . HasValue )
308+ {
309+ return ;
310+ }
311+
312+ //Deserialize the event data and invoke the handler
313+ var message = JsonConvert . DeserializeObject < T > ( eventData ) ;
314+ try
315+ {
316+ await callBack . Invoke ( message ) . ConfigureAwait ( false ) ;
317+ }
318+ catch ( Exception e )
319+ {
320+ _messageQueueRepository . AddToDeadLetterQueue < T > ( eventData , message , e ) ;
321+ _logger . ErrorException ( $ "{ e . Message } \n { e . StackTrace } ", e ) ;
322+ }
323+ finally
324+ {
325+ //Remove the event from the 'processing' list.
326+ await _redisClient . ListRemoveAsync ( processingListKey , eventData ) . ConfigureAwait ( false ) ;
327+ }
328+ }
189329 }
190330}
0 commit comments