4242using System . Collections . Concurrent ;
4343using System . Collections . Generic ;
4444using System . Linq ;
45- using System . Threading ;
45+ using System . Threading . Channels ;
4646using System . Threading . Tasks ;
4747
4848using RabbitMQ . Client . Events ;
4949using RabbitMQ . Client . Exceptions ;
5050using RabbitMQ . Client . Impl ;
51-
52- using RabbitMQ . Util ;
5351using RabbitMQ . Client . Logging ;
5452
5553namespace RabbitMQ . Client . Framing . Impl
@@ -664,7 +662,7 @@ private void Init(IFrameHandler fh)
664662 {
665663 if ( ShouldTriggerConnectionRecovery ( args ) )
666664 {
667- _recoveryLoopCommandQueue . Enqueue ( RecoveryCommand . BeginAutomaticRecovery ) ;
665+ _recoveryLoopCommandQueue . Writer . TryWrite ( RecoveryCommand . BeginAutomaticRecovery ) ;
668666 }
669667 } ;
670668 lock ( _eventLock )
@@ -1243,9 +1241,7 @@ private enum RecoveryConnectionState
12431241 private Task _recoveryTask ;
12441242 private RecoveryConnectionState _recoveryLoopState = RecoveryConnectionState . Connected ;
12451243
1246- private readonly AsyncConcurrentQueue < RecoveryCommand > _recoveryLoopCommandQueue = new AsyncConcurrentQueue < RecoveryCommand > ( ) ;
1247- private readonly CancellationTokenSource _recoveryCancellationToken = new CancellationTokenSource ( ) ;
1248- private readonly TaskCompletionSource < int > _recoveryLoopComplete = new TaskCompletionSource < int > ( ) ;
1244+ private readonly Channel < RecoveryCommand > _recoveryLoopCommandQueue = Channel . CreateUnbounded < RecoveryCommand > ( new UnboundedChannelOptions { AllowSynchronousContinuations = false , SingleReader = true , SingleWriter = false } ) ;
12491245
12501246 /// <summary>
12511247 /// This is the main loop for the auto-recovery thread.
@@ -1254,21 +1250,22 @@ private async Task MainRecoveryLoop()
12541250 {
12551251 try
12561252 {
1257- while ( ! _recoveryCancellationToken . IsCancellationRequested )
1253+ while ( await _recoveryLoopCommandQueue . Reader . WaitToReadAsync ( ) . ConfigureAwait ( false ) )
12581254 {
1259- var command = await _recoveryLoopCommandQueue . DequeueAsync ( _recoveryCancellationToken . Token ) . ConfigureAwait ( false ) ;
1260-
1261- switch ( _recoveryLoopState )
1255+ while ( _recoveryLoopCommandQueue . Reader . TryRead ( out RecoveryCommand command ) )
12621256 {
1263- case RecoveryConnectionState . Connected :
1264- RecoveryLoopConnectedHandler ( command ) ;
1265- break ;
1266- case RecoveryConnectionState . Recovering :
1267- RecoveryLoopRecoveringHandler ( command ) ;
1268- break ;
1269- default :
1270- ESLog . Warn ( "RecoveryLoop state is out of range." ) ;
1271- break ;
1257+ switch ( _recoveryLoopState )
1258+ {
1259+ case RecoveryConnectionState . Connected :
1260+ RecoveryLoopConnectedHandler ( command ) ;
1261+ break ;
1262+ case RecoveryConnectionState . Recovering :
1263+ RecoveryLoopRecoveringHandler ( command ) ;
1264+ break ;
1265+ default :
1266+ ESLog . Warn ( "RecoveryLoop state is out of range." ) ;
1267+ break ;
1268+ }
12721269 }
12731270 }
12741271 }
@@ -1280,8 +1277,6 @@ private async Task MainRecoveryLoop()
12801277 {
12811278 ESLog . Error ( "Main recovery loop threw unexpected exception." , e ) ;
12821279 }
1283-
1284- _recoveryLoopComplete . SetResult ( 0 ) ;
12851280 }
12861281
12871282 /// <summary>
@@ -1290,8 +1285,10 @@ private async Task MainRecoveryLoop()
12901285 /// </summary>
12911286 private void StopRecoveryLoop ( )
12921287 {
1293- _recoveryCancellationToken . Cancel ( ) ;
1294- if ( ! _recoveryLoopComplete . Task . Wait ( _factory . RequestedConnectionTimeout ) )
1288+ _recoveryLoopCommandQueue . Writer . Complete ( ) ;
1289+ Task timeout = Task . Delay ( _factory . RequestedConnectionTimeout ) ;
1290+
1291+ if ( Task . WhenAny ( _recoveryTask , timeout ) . Result == timeout )
12951292 {
12961293 ESLog . Warn ( "Timeout while trying to stop background AutorecoveringConnection recovery loop." ) ;
12971294 }
@@ -1351,11 +1348,9 @@ private void RecoveryLoopConnectedHandler(RecoveryCommand command)
13511348 /// </summary>
13521349 private void ScheduleRecoveryRetry ( )
13531350 {
1354- Task . Delay ( _factory . NetworkRecoveryInterval )
1355- . ContinueWith ( t =>
1356- {
1357- _recoveryLoopCommandQueue . Enqueue ( RecoveryCommand . PerformAutomaticRecovery ) ;
1358- } ) ;
1351+ _ = Task
1352+ . Delay ( _factory . NetworkRecoveryInterval )
1353+ . ContinueWith ( t => _recoveryLoopCommandQueue . Writer . TryWrite ( RecoveryCommand . PerformAutomaticRecovery ) ) ;
13591354 }
13601355 }
13611356}
0 commit comments