-
Couldn't load subscription status.
- Fork 4.6k
Implement persistent RabbitMQ reconnection with exponential backoff #1898
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…on logic and heartbeat configuration
Reviewer's GuideRefactors RabbitMQ integration to maintain persistent connections with exponential backoff, advanced state tracking, configurable heartbeats and delays, comprehensive error handling, graceful shutdown and enriched logging. File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Verify that the custom reconnection parameters passed to amqp.connect (e.g., reconnect, reconnectBackoffStrategy) are actually supported by your AMQP library or remove them to avoid confusion since reconnection is handled manually.
- Ensure the isReconnecting flag is always reset on every failure path (including channel errors during createChannel) so it doesn’t block subsequent reconnection attempts.
- Consider extracting the exponential backoff scheduling logic into a utility function or class to improve readability and make it easier to test.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Verify that the custom reconnection parameters passed to amqp.connect (e.g., reconnect, reconnectBackoffStrategy) are actually supported by your AMQP library or remove them to avoid confusion since reconnection is handled manually.
- Ensure the isReconnecting flag is always reset on every failure path (including channel errors during createChannel) so it doesn’t block subsequent reconnection attempts.
- Consider extracting the exponential backoff scheduling logic into a utility function or class to improve readability and make it easier to test.
## Individual Comments
### Comment 1
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:14` </location>
<code_context>
private amqpConnection: amqp.Connection | null = null;
+ private reconnectTimer: NodeJS.Timeout | null = null;
+ private reconnectAttempt: number = 0;
+ private maxReconnectDelay: number = 300000; // 5 minutos máximo
+ private baseReconnectDelay: number = 1000; // 1 segundo inicial
+ private isReconnecting: boolean = false;
+ private isShuttingDown: boolean = false;
</code_context>
<issue_to_address>
Consider making reconnect delay parameters configurable via environment variables.
Configuring these delay values via environment variables or the config service will make it easier to adjust them for different deployment needs without modifying the code.
Suggested implementation:
```typescript
private maxReconnectDelay: number;
private baseReconnectDelay: number;
```
```typescript
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
super(prismaRepository, waMonitor, configService.get<Rabbitmq>('RABBITMQ')?.ENABLED, 'rabbitmq');
const rabbitmqConfig = configService.get<Rabbitmq>('RABBITMQ');
this.maxReconnectDelay = rabbitmqConfig?.MAX_RECONNECT_DELAY ?? Number(process.env.RABBITMQ_MAX_RECONNECT_DELAY) ?? 300000;
this.baseReconnectDelay = rabbitmqConfig?.BASE_RECONNECT_DELAY ?? Number(process.env.RABBITMQ_BASE_RECONNECT_DELAY) ?? 1000;
return;
}
```
- Ensure that your config service and Rabbitmq config type support `MAX_RECONNECT_DELAY` and `BASE_RECONNECT_DELAY` properties.
- You may need to add these properties to your configuration files or environment variable documentation.
</issue_to_address>
### Comment 2
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:33` </location>
<code_context>
private async connect(): Promise<void> {
- return new Promise<void>((resolve, reject) => {
+ if (this.isReconnecting) {
+ return;
+ }
+
</code_context>
<issue_to_address>
Potential race condition if connect is called concurrently.
The current use of isReconnecting may not fully prevent concurrent connection attempts. Implementing a mutex or promise-based lock would provide stronger guarantees against race conditions.
</issue_to_address>
### Comment 3
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:80` </location>
<code_context>
- local: 'RabbitmqController.connectionError',
- message: 'RabbitMQ connection error',
- error: err.message || err,
+ connection.on('close', () => {
+ this.logger.warn('RabbitMQ connection closed, attempting reconnection...');
+ this.amqpConnection = null;
+ this.amqpChannel = null;
+ this.scheduleReconnect();
});
- this.handleConnectionLoss();
</code_context>
<issue_to_address>
Consider distinguishing between intentional and unintentional connection closes.
Currently, all connection closes trigger a reconnection. To avoid unnecessary reconnects during shutdown, check isShuttingDown before scheduling a reconnect.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
connection.on('close', () => {
this.logger.warn('RabbitMQ connection closed, attempting reconnection...');
this.amqpConnection = null;
this.amqpChannel = null;
this.scheduleReconnect();
});
=======
connection.on('close', () => {
this.logger.warn('RabbitMQ connection closed.');
this.amqpConnection = null;
this.amqpChannel = null;
if (!this.isShuttingDown) {
this.logger.warn('Attempting RabbitMQ reconnection...');
this.scheduleReconnect();
} else {
this.logger.info('RabbitMQ connection closed intentionally during shutdown.');
}
});
>>>>>>> REPLACE
</suggested_fix>
### Comment 4
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:192` </location>
<code_context>
}
- private async ensureConnection(): Promise<boolean> {
- if (!this.amqpChannel) {
- this.logger.warn('AMQP channel is not available, attempting to reconnect...');
- if (!this.isReconnecting) {
</code_context>
<issue_to_address>
Skipping message when channel is unavailable may lead to silent data loss.
Currently, messages are dropped if the channel is unavailable. Implementing a retry mechanism or buffer would help prevent data loss.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| private maxReconnectDelay: number = 300000; // 5 minutos máximo | ||
| private baseReconnectDelay: number = 1000; // 1 segundo inicial |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Consider making reconnect delay parameters configurable via environment variables.
Configuring these delay values via environment variables or the config service will make it easier to adjust them for different deployment needs without modifying the code.
Suggested implementation:
private maxReconnectDelay: number;
private baseReconnectDelay: number; constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
super(prismaRepository, waMonitor, configService.get<Rabbitmq>('RABBITMQ')?.ENABLED, 'rabbitmq');
const rabbitmqConfig = configService.get<Rabbitmq>('RABBITMQ');
this.maxReconnectDelay = rabbitmqConfig?.MAX_RECONNECT_DELAY ?? Number(process.env.RABBITMQ_MAX_RECONNECT_DELAY) ?? 300000;
this.baseReconnectDelay = rabbitmqConfig?.BASE_RECONNECT_DELAY ?? Number(process.env.RABBITMQ_BASE_RECONNECT_DELAY) ?? 1000;
return;
}- Ensure that your config service and Rabbitmq config type support
MAX_RECONNECT_DELAYandBASE_RECONNECT_DELAYproperties. - You may need to add these properties to your configuration files or environment variable documentation.
|
|
||
| public async init(): Promise<void> { | ||
| if (!this.status) { | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Potential race condition if connect is called concurrently.
The current use of isReconnecting may not fully prevent concurrent connection attempts. Implementing a mutex or promise-based lock would provide stronger guarantees against race conditions.
| connection.on('close', () => { | ||
| this.logger.warn('RabbitMQ connection closed, attempting reconnection...'); | ||
| this.amqpConnection = null; | ||
| this.amqpChannel = null; | ||
| this.scheduleReconnect(); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Consider distinguishing between intentional and unintentional connection closes.
Currently, all connection closes trigger a reconnection. To avoid unnecessary reconnects during shutdown, check isShuttingDown before scheduling a reconnect.
| connection.on('close', () => { | |
| this.logger.warn('RabbitMQ connection closed, attempting reconnection...'); | |
| this.amqpConnection = null; | |
| this.amqpChannel = null; | |
| this.scheduleReconnect(); | |
| }); | |
| connection.on('close', () => { | |
| this.logger.warn('RabbitMQ connection closed.'); | |
| this.amqpConnection = null; | |
| this.amqpChannel = null; | |
| if (!this.isShuttingDown) { | |
| this.logger.warn('Attempting RabbitMQ reconnection...'); | |
| this.scheduleReconnect(); | |
| } else { | |
| this.logger.info('RabbitMQ connection closed intentionally during shutdown.'); | |
| } | |
| }); |
|
|
||
| while (retry < 3) { | ||
| try { | ||
| if (!this.amqpChannel) { | ||
| this.logger.warn('RabbitMQ channel not available, skipping message'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Skipping message when channel is unavailable may lead to silent data loss.
Currently, messages are dropped if the channel is unavailable. Implementing a retry mechanism or buffer would help prevent data loss.
🐰 Correção do Timeout de Heartbeat e Implementação de
Reconexão Persistente
🐛 Problemas Corrigidos
Error: Heartbeat timeoutquecausava falhas na conexão
tentativas
instabilidade de rede
de canais
🚀 Funcionalidades Adicionadas
Controle de Estado Avançado
Configuração Flexível
RABBITMQ_HEARTBEAT: Configurar intervalo de heartbeatMonitoramento Aprimorado
🔧 Arquivos Modificados
src/api/integrations/event/rabbitmq/rabbitmq.controller.tssrc/config/env.config.ts📋 Comportamento do Sistema
256s, até 5min
explícito)
bem-sucedida
🧪 Testado
🎯 Resultado: RabbitMQ mantém conexão estável com recuperação
automática e inteligente, priorizando sempre a manutenção da
conectividade.
Summary by Sourcery
Enhance the RabbitMQ controller to maintain a stable connection by configuring heartbeat timeouts, introducing an unlimited persistent reconnection strategy with exponential backoff, and improving lifecycle management with automatic channel recovery, detailed logging, and a graceful shutdown procedure.
New Features:
Enhancements: