Skip to content

Commit 2f9ff47

Browse files
committed
fix: change sqs
1 parent 5544178 commit 2f9ff47

File tree

2 files changed

+26
-6
lines changed

2 files changed

+26
-6
lines changed

app/src/infrastructure/queue/sqs/impl/sqs.consumer.queue.provider.impl.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,16 @@ export class SqsConsumerQueueProviderImpl implements OnModuleInit, OnModuleDestr
3636
}
3737

3838
async onModuleInit() {
39-
this.logger.log('SqsConsumerQueueProviderImpl initialized.');
39+
this.logger.log('SQS Consumer initialized.');
4040
}
4141

4242
async onModuleDestroy() {
4343
this.isShuttingDown = true;
44-
this.logger.log('Shutting down SqsConsumerQueueProviderImpl...');
44+
this.logger.log('Shutting down sqs consumer...');
4545
this.stopAllPolling();
46+
47+
await Promise.all([...this.pollingIntervals.values()].map(interval => clearTimeout(interval)));
48+
this.logger.log('All polling operations stopped.');
4649
}
4750

4851
registerQueueHandler(queueConfig: QueueConfig, handler: SqsHandler) {
@@ -87,14 +90,14 @@ export class SqsConsumerQueueProviderImpl implements OnModuleInit, OnModuleDestr
8790
await handler(message);
8891
await this.deleteMessage(url, message.ReceiptHandle!);
8992
} catch (error) {
90-
this.logger.error(`[${name}] Error processing message: ${error.message}`);
93+
this.logger.error(`[${name}] Error processing message: ${error.message}`, error.stack);
9194
await this.extendVisibilityTimeout(url, message.ReceiptHandle!, queueConfig.visibilityTimeout || 30);
9295
}
9396
}
9497
backoffTime = 1000;
9598
}
9699
} catch (error) {
97-
this.logger.error(`[${name}] Error receiving messages: ${error.message}`);
100+
this.logger.error(`[${name}] Unhandled error: ${error.message}`, error.stack);
98101
} finally {
99102
if (!this.isShuttingDown) {
100103
backoffTime = Math.min(backoffTime * 2, maxBackoffTime);
@@ -110,6 +113,11 @@ export class SqsConsumerQueueProviderImpl implements OnModuleInit, OnModuleDestr
110113
}
111114

112115
private async deleteMessage(queueUrl: string, receiptHandle: string) {
116+
if (!receiptHandle) {
117+
this.logger.warn(`Invalid receipt handle for queue: ${queueUrl}`);
118+
return;
119+
}
120+
113121
try {
114122
const command = new DeleteMessageCommand({
115123
QueueUrl: queueUrl,
@@ -123,6 +131,11 @@ export class SqsConsumerQueueProviderImpl implements OnModuleInit, OnModuleDestr
123131
}
124132

125133
private async extendVisibilityTimeout(queueUrl: string, receiptHandle: string, visibilityTimeout: number) {
134+
if (!receiptHandle) {
135+
this.logger.warn(`Invalid receipt handle for queue: ${queueUrl}`);
136+
return;
137+
}
138+
126139
try {
127140
const command = new ChangeMessageVisibilityCommand({
128141
QueueUrl: queueUrl,
@@ -132,7 +145,7 @@ export class SqsConsumerQueueProviderImpl implements OnModuleInit, OnModuleDestr
132145
await this.sqsClient.send(command);
133146
this.logger.debug(`Extended visibility timeout for message in queue: ${queueUrl}`);
134147
} catch (error) {
135-
this.logger.error(`Error extending visibility timeout for queue: ${queueUrl}, Error: ${error.message}`);
148+
this.logger.error(`Error extending visibility timeout for queue: ${queueUrl}, Error: ${error.message}`, error.stack);
136149
}
137150
}
138151

app/src/modules/consumers/notification/notification.order.consumer.module.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Inject, Module, OnModuleInit } from '@nestjs/common';
1+
import { Inject, Logger, Module, OnModuleInit } from '@nestjs/common';
22
import { SqsConsumerQueueProviderImpl } from '@infrastructure/queue/sqs/impl';
33
import { configEnv } from '@src/shared/config';
44
import { Message } from '@aws-sdk/client-sqs';
@@ -13,6 +13,9 @@ import { WebhookIntegrationClientModule } from 'src/infrastructure/integrations/
1313
providers: [SqsConsumerQueueProviderImpl, UsecaseProviderConfig(NotificationSendWebhookUsecaseImpl, [WebhookIntegrationClientProviderImpl])],
1414
})
1515
export class NotificationOrderConsumerModule implements OnModuleInit {
16+
17+
private readonly logger = new Logger(SqsConsumerQueueProviderImpl.name);
18+
1619
@Inject(NotificationSendWebhookUsecaseImpl.name)
1720
private readonly notificationSendWebhookUsecase: NotificationSendWebhookUsecase;
1821

@@ -28,6 +31,10 @@ export class NotificationOrderConsumerModule implements OnModuleInit {
2831
visibilityTimeout: 60,
2932
},
3033
async (message: Message) => {
34+
if (!message || !message.Body) {
35+
this.logger.warn(`Empty message body received in queue: ${configEnv.aws.sqs.queues.queueNotificationOrder}`);
36+
return;
37+
}
3138
await this.notificationSendWebhookUsecase.execute(message.Body);
3239
},
3340
);

0 commit comments

Comments
 (0)