Skip to content

Conversation

@LuisSantosJS
Copy link
Contributor

@LuisSantosJS LuisSantosJS commented Sep 2, 2025

🐰 Correção do Timeout de Heartbeat e Implementação de

Reconexão Persistente

🐛 Problemas Corrigidos

  • Heartbeat timeout: Erro Error: Heartbeat timeout que
    causava falhas na conexão
  • Reconexão limitada: Sistema desistia após poucas
    tentativas
  • Perda de mensagens: Mensagens perdidas durante
    instabilidade de rede
  • Recuperação inadequada: Falta de recuperação automática
    de canais

🚀 Funcionalidades Adicionadas

  1. Controle de Estado Avançado

    • Prevenção de reconexões simultâneas
    • Rastreamento de tentativas com contador
    • Estado de shutdown controlado
  2. Configuração Flexível

    • RABBITMQ_HEARTBEAT: Configurar intervalo de heartbeat
    • Delays configuráveis (base: 1s, máximo: 5min)
  3. Monitoramento Aprimorado

    • Logs com emojis para fácil identificação
    • Contadores de tentativas
    • Status detalhado de conexão

🔧 Arquivos Modificados

  • src/api/integrations/event/rabbitmq/rabbitmq.controller.ts
  • src/config/env.config.ts

📋 Comportamento do Sistema

  1. Desconexão → Inicia reconexão imediatamente
  2. Backoff Exponencial → 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s,
    256s, até 5min
  3. Persistência Total → Nunca desiste (exceto shutdown
    explícito)
  4. Recuperação Automática → Reset do contador em conexão
    bem-sucedida

🧪 Testado

  • ✅ Build e lint passando
  • ✅ Reconexão após falha de rede
  • ✅ Shutdown gracioso
  • ✅ Backoff exponencial funcional

🎯 Resultado: RabbitMQ mantém conexão estável com recuperação
automática e inteligente, priorizando sempre a manutenção da
conectividade.

Summary by Sourcery

Enhance the RabbitMQ controller to maintain a stable connection by configuring heartbeat timeouts, introducing an unlimited persistent reconnection strategy with exponential backoff, and improving lifecycle management with automatic channel recovery, detailed logging, and a graceful shutdown procedure.

New Features:

  • Implement persistent RabbitMQ reconnection with exponential backoff and unlimited retry attempts
  • Make RabbitMQ heartbeat interval configurable via RABBITMQ_HEARTBEAT environment variable
  • Add a graceful shutdown method to close RabbitMQ channel and connection and cancel reconnection timers

Enhancements:

  • Prevent concurrent reconnection attempts and track reconnection state with attempt counters and timers
  • Automatically recreate AMQP channels on errors and handle connection/channel lifecycle events
  • Enhance logging with emojis and contextual status messages for easier monitoring
  • Refine event emission and global queue initialization to skip operations when the channel is unavailable

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Sep 2, 2025

Reviewer's Guide

Refactors RabbitMQ integration to maintain persistent connections with exponential backoff, advanced state tracking, configurable heartbeats and delays, comprehensive error handling, graceful shutdown and enriched logging.

File-Level Changes

Change Details Files
Persistent reconnection with exponential backoff
  • Added reconnectTimer and reconnectAttempt counters
  • Defined base and max reconnect delays (1s to 5min)
  • scheduleReconnect never stops (except shutdown) and uses exponential backoff
  • Removed fixed maxAttempts limit and linear delay
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts
Enhanced connection and channel lifecycle management
  • Refactored connect() to manage isReconnecting flag and reset attempt on success
  • Introduced handleConnectionError and handleChannelError for event-driven recovery
  • Removed ensureConnection and cleanup methods
  • Added channel availability checks in emit() and initGlobalQueues()
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts
Graceful shutdown support
  • Added shutdown() to cancel timers and close channel/connection gracefully
  • Introduced isShuttingDown flag to halt reconnection during shutdown
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts
Configurable heartbeat and connection settings
  • Exposed RABBITMQ_HEARTBEAT in env.config.ts
  • Replaced hard-coded heartbeat and reconnect options with config values
  • Moved frameMax, heartbeat, reconnect options into connectionOptions
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts
src/config/env.config.ts
Improved logging and monitoring
  • Unified log messages with emojis and formatted strings
  • Added detailed logs for each reconnection attempt, errors, and shutdown steps
  • Replaced verbose object logs with concise info, warn, error calls
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts

Possibly linked issues

  • Docker folder is deleted #1: PR implements persistent RabbitMQ reconnection with exponential backoff, fixing ECONNRESET errors and improving stability and error logging.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • Verify that the custom reconnection parameters passed to amqp.connect (e.g., reconnect, reconnectBackoffStrategy) are actually supported by your AMQP library or remove them to avoid confusion since reconnection is handled manually.
  • Ensure the isReconnecting flag is always reset on every failure path (including channel errors during createChannel) so it doesn’t block subsequent reconnection attempts.
  • Consider extracting the exponential backoff scheduling logic into a utility function or class to improve readability and make it easier to test.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Verify that the custom reconnection parameters passed to amqp.connect (e.g., reconnect, reconnectBackoffStrategy) are actually supported by your AMQP library or remove them to avoid confusion since reconnection is handled manually.
- Ensure the isReconnecting flag is always reset on every failure path (including channel errors during createChannel) so it doesn’t block subsequent reconnection attempts.
- Consider extracting the exponential backoff scheduling logic into a utility function or class to improve readability and make it easier to test.

## Individual Comments

### Comment 1
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:14` </location>
<code_context>
   private amqpConnection: amqp.Connection | null = null;
+  private reconnectTimer: NodeJS.Timeout | null = null;
+  private reconnectAttempt: number = 0;
+  private maxReconnectDelay: number = 300000; // 5 minutos máximo
+  private baseReconnectDelay: number = 1000; // 1 segundo inicial
+  private isReconnecting: boolean = false;
+  private isShuttingDown: boolean = false;
</code_context>

<issue_to_address>
Consider making reconnect delay parameters configurable via environment variables.

Configuring these delay values via environment variables or the config service will make it easier to adjust them for different deployment needs without modifying the code.

Suggested implementation:

```typescript
  private maxReconnectDelay: number;
  private baseReconnectDelay: number;

```

```typescript
  constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
    super(prismaRepository, waMonitor, configService.get<Rabbitmq>('RABBITMQ')?.ENABLED, 'rabbitmq');
    const rabbitmqConfig = configService.get<Rabbitmq>('RABBITMQ');
    this.maxReconnectDelay = rabbitmqConfig?.MAX_RECONNECT_DELAY ?? Number(process.env.RABBITMQ_MAX_RECONNECT_DELAY) ?? 300000;
    this.baseReconnectDelay = rabbitmqConfig?.BASE_RECONNECT_DELAY ?? Number(process.env.RABBITMQ_BASE_RECONNECT_DELAY) ?? 1000;
    return;
  }

```

- Ensure that your config service and Rabbitmq config type support `MAX_RECONNECT_DELAY` and `BASE_RECONNECT_DELAY` properties.
- You may need to add these properties to your configuration files or environment variable documentation.
</issue_to_address>

### Comment 2
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:33` </location>
<code_context>

   private async connect(): Promise<void> {
-    return new Promise<void>((resolve, reject) => {
+    if (this.isReconnecting) {
+      return;
+    }
+
</code_context>

<issue_to_address>
Potential race condition if connect is called concurrently.

The current use of isReconnecting may not fully prevent concurrent connection attempts. Implementing a mutex or promise-based lock would provide stronger guarantees against race conditions.
</issue_to_address>

### Comment 3
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:80` </location>
<code_context>
-            local: 'RabbitmqController.connectionError',
-            message: 'RabbitMQ connection error',
-            error: err.message || err,
+          connection.on('close', () => {
+            this.logger.warn('RabbitMQ connection closed, attempting reconnection...');
+            this.amqpConnection = null;
+            this.amqpChannel = null;
+            this.scheduleReconnect();
           });
-          this.handleConnectionLoss();
</code_context>

<issue_to_address>
Consider distinguishing between intentional and unintentional connection closes.

Currently, all connection closes trigger a reconnection. To avoid unnecessary reconnects during shutdown, check isShuttingDown before scheduling a reconnect.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
          connection.on('close', () => {
            this.logger.warn('RabbitMQ connection closed, attempting reconnection...');
            this.amqpConnection = null;
            this.amqpChannel = null;
            this.scheduleReconnect();
          });
=======
          connection.on('close', () => {
            this.logger.warn('RabbitMQ connection closed.');
            this.amqpConnection = null;
            this.amqpChannel = null;
            if (!this.isShuttingDown) {
              this.logger.warn('Attempting RabbitMQ reconnection...');
              this.scheduleReconnect();
            } else {
              this.logger.info('RabbitMQ connection closed intentionally during shutdown.');
            }
          });
>>>>>>> REPLACE

</suggested_fix>

### Comment 4
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:192` </location>
<code_context>
   }

-  private async ensureConnection(): Promise<boolean> {
-    if (!this.amqpChannel) {
-      this.logger.warn('AMQP channel is not available, attempting to reconnect...');
-      if (!this.isReconnecting) {
</code_context>

<issue_to_address>
Skipping message when channel is unavailable may lead to silent data loss.

Currently, messages are dropped if the channel is unavailable. Implementing a retry mechanism or buffer would help prevent data loss.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +14 to +15
private maxReconnectDelay: number = 300000; // 5 minutos máximo
private baseReconnectDelay: number = 1000; // 1 segundo inicial
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Consider making reconnect delay parameters configurable via environment variables.

Configuring these delay values via environment variables or the config service will make it easier to adjust them for different deployment needs without modifying the code.

Suggested implementation:

  private maxReconnectDelay: number;
  private baseReconnectDelay: number;
  constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
    super(prismaRepository, waMonitor, configService.get<Rabbitmq>('RABBITMQ')?.ENABLED, 'rabbitmq');
    const rabbitmqConfig = configService.get<Rabbitmq>('RABBITMQ');
    this.maxReconnectDelay = rabbitmqConfig?.MAX_RECONNECT_DELAY ?? Number(process.env.RABBITMQ_MAX_RECONNECT_DELAY) ?? 300000;
    this.baseReconnectDelay = rabbitmqConfig?.BASE_RECONNECT_DELAY ?? Number(process.env.RABBITMQ_BASE_RECONNECT_DELAY) ?? 1000;
    return;
  }
  • Ensure that your config service and Rabbitmq config type support MAX_RECONNECT_DELAY and BASE_RECONNECT_DELAY properties.
  • You may need to add these properties to your configuration files or environment variable documentation.


public async init(): Promise<void> {
if (!this.status) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Potential race condition if connect is called concurrently.

The current use of isReconnecting may not fully prevent concurrent connection attempts. Implementing a mutex or promise-based lock would provide stronger guarantees against race conditions.

Comment on lines +80 to 85
connection.on('close', () => {
this.logger.warn('RabbitMQ connection closed, attempting reconnection...');
this.amqpConnection = null;
this.amqpChannel = null;
this.scheduleReconnect();
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Consider distinguishing between intentional and unintentional connection closes.

Currently, all connection closes trigger a reconnection. To avoid unnecessary reconnects during shutdown, check isShuttingDown before scheduling a reconnect.

Suggested change
connection.on('close', () => {
this.logger.warn('RabbitMQ connection closed, attempting reconnection...');
this.amqpConnection = null;
this.amqpChannel = null;
this.scheduleReconnect();
});
connection.on('close', () => {
this.logger.warn('RabbitMQ connection closed.');
this.amqpConnection = null;
this.amqpChannel = null;
if (!this.isShuttingDown) {
this.logger.warn('Attempting RabbitMQ reconnection...');
this.scheduleReconnect();
} else {
this.logger.info('RabbitMQ connection closed intentionally during shutdown.');
}
});

Comment on lines 301 to +305

while (retry < 3) {
try {
if (!this.amqpChannel) {
this.logger.warn('RabbitMQ channel not available, skipping message');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Skipping message when channel is unavailable may lead to silent data loss.

Currently, messages are dropped if the channel is unavailable. Implementing a retry mechanism or buffer would help prevent data loss.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant