1- // Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+ // Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
22//
33// This software, the RabbitMQ Java client library, is triple-licensed under the
44// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
1919import com .rabbitmq .client .*;
2020import com .rabbitmq .client .AMQP .Queue .DeclareOk ;
2121import com .rabbitmq .perf .PerfTest .EXIT_WHEN ;
22+ import com .rabbitmq .perf .TopologyRecording .RecordedQueue ;
2223import java .time .Duration ;
24+ import java .util .concurrent .Callable ;
25+ import java .util .concurrent .ScheduledExecutorService ;
26+ import java .util .concurrent .TimeUnit ;
2327import org .slf4j .Logger ;
2428import org .slf4j .LoggerFactory ;
2529
@@ -85,6 +89,8 @@ public class Consumer extends AgentBase implements Runnable {
8589
8690 private volatile long lastDeliveryTag , lastAckedDeliveryTag ;
8791
92+ private final ScheduledExecutorService topologyRecoveryScheduledExecutorService ;
93+
8894 public Consumer (ConsumerParameters parameters ) {
8995 this .channel = parameters .getChannel ();
9096 this .id = parameters .getId ();
@@ -101,6 +107,7 @@ public Consumer(ConsumerParameters parameters) {
101107 this .pollingInterval = parameters .getPollingInterval ();
102108 this .consumerArguments = parameters .getConsumerArguments ();
103109 this .exitWhen = parameters .getExitWhen ();
110+ this .topologyRecoveryScheduledExecutorService = parameters .getTopologyRecoveryScheduledExecutorService ();
104111
105112 this .queueNames .set (new ArrayList <>(parameters .getQueueNames ()));
106113 this .initialQueueNames = new ArrayList <>(parameters .getQueueNames ());
@@ -313,8 +320,9 @@ public void handleCancel(String consumerTag) throws IOException {
313320 System .out .printf ("Consumer cancelled by broker for tag: %s" , consumerTag );
314321 if (consumerTagBranchMap .containsKey (consumerTag )) {
315322 String qName = consumerTagBranchMap .get (consumerTag );
316- System .out .printf ("Re-consuming. Queue: %s for Tag: %s" , qName , consumerTag );
317- channel .basicConsume (qName , autoAck , consumerArguments , q );
323+ TopologyRecording topologyRecording = topologyRecording ();
324+ RecordedQueue queueRecord = topologyRecording .queue (qName );
325+ consumeOrScheduleConsume (queueRecord , topologyRecording , consumerTag , qName );
318326 } else {
319327 System .out .printf ("Could not find queue for consumer tag: %s" , consumerTag );
320328 }
@@ -348,10 +356,12 @@ public void recover(TopologyRecording topologyRecording) {
348356 } else {
349357 for (Map .Entry <String , String > entry : consumerTagBranchMap .entrySet ()) {
350358 String queueName = queueName (topologyRecording , entry .getValue ());
359+ String consumerTag = entry .getKey ();
351360 LOGGER .debug ("Recovering consumer, starting consuming on {}" , queueName );
352361 try {
353- channel .basicConsume (queueName , autoAck , entry .getKey (), false , false , this .consumerArguments , q );
354- } catch (IOException e ) {
362+ TopologyRecording .RecordedQueue queueRecord = topologyRecording .queue (entry .getValue ());
363+ consumeOrScheduleConsume (queueRecord , topologyRecording , consumerTag , queueName );
364+ } catch (Exception e ) {
355365 LOGGER .warn (
356366 "Error while recovering consumer {} on queue {} on connection {}" ,
357367 entry .getKey (), queueName , channel .getConnection ().getClientProvidedName (), e
@@ -361,6 +371,55 @@ public void recover(TopologyRecording topologyRecording) {
361371 }
362372 }
363373
374+ private void consumeOrScheduleConsume (RecordedQueue queueRecord ,
375+ TopologyRecording topologyRecording ,
376+ String consumerTag ,
377+ String queueName ) throws IOException {
378+ if (queueMayBeDown (queueRecord , topologyRecording )) {
379+ // If the queue is on a cluster node that is down, basic.consume will fail with a 404.
380+ // This will close the channel and we can't afford it, so we check if the queue exists with a different channel,
381+ // and postpone the subscription if the queue does not exist
382+ LOGGER .debug ("Checking if queue {} exists before subscribing" , queueName );
383+ if (Utils .exists (channel .getConnection (), ch -> ch .queueDeclarePassive (queueName ))) {
384+ LOGGER .debug ("Queue {} does exist, subscribing" , queueName );
385+ channel .basicConsume (queueName , autoAck , consumerTag , false , false , this .consumerArguments , q );
386+ } else {
387+ LOGGER .debug ("Queue {} does not exist, it is likely unavailable, scheduling subscription." , queueName );
388+ Duration schedulingPeriod = Duration .ofSeconds (5 );
389+ int maxRetry = (int ) (Duration .ofMinutes (10 ).getSeconds () / schedulingPeriod .getSeconds ());
390+ AtomicInteger retryCount = new AtomicInteger (0 );
391+ AtomicReference <Callable <Void >> resubscriptionReference = new AtomicReference <>();
392+ Callable <Void > resubscription = () -> {
393+ LOGGER .debug ("Scheduled re-subscription for {}..." , queueName );
394+ if (Utils .exists (channel .getConnection (), ch -> ch .queueDeclarePassive (queueName ))) {
395+ LOGGER .debug ("Queue {} exists, re-subscribing" , queueName );
396+ channel .basicConsume (queueName , autoAck , consumerTag , false , false , consumerArguments , q );
397+ } else if (retryCount .incrementAndGet () <= maxRetry ){
398+ LOGGER .debug ("Queue {} does not exist, scheduling re-subscription" , queueName );
399+ this .topologyRecoveryScheduledExecutorService .schedule (
400+ resubscriptionReference .get (), schedulingPeriod .getSeconds (), TimeUnit .SECONDS
401+ );
402+ } else {
403+ LOGGER .debug ("Max subscription retry count reached {} for queue {}" ,
404+ retryCount .get (), queueName );
405+ }
406+ return null ;
407+ };
408+ resubscriptionReference .set (resubscription );
409+ this .topologyRecoveryScheduledExecutorService .schedule (
410+ resubscription , schedulingPeriod .getSeconds (), TimeUnit .SECONDS
411+ );
412+ }
413+ } else {
414+ channel .basicConsume (queueName , autoAck , consumerTag , false , false , this .consumerArguments , q );
415+ }
416+ }
417+
418+ private static boolean queueMayBeDown (RecordedQueue queueRecord , TopologyRecording topologyRecording ) {
419+ return queueRecord != null
420+ && queueRecord .isClassic () && queueRecord .isDurable () && topologyRecording .isCluster ();
421+ }
422+
364423 void maybeStopIfNoActivityOrQueueEmpty () {
365424 LOGGER .debug ("Checking consumer activity" );
366425 if (this .exitWhen == EXIT_WHEN .NEVER ) {
0 commit comments