Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
SERVER_NAME=evolution
SERVER_TYPE=http
SERVER_PORT=8080
# Server URL - Set your application url
Expand Down Expand Up @@ -96,6 +97,35 @@ SQS_SECRET_ACCESS_KEY=
SQS_ACCOUNT_ID=
SQS_REGION=

SQS_GLOBAL_ENABLED=false
SQS_GLOBAL_APPLICATION_STARTUP=false
SQS_GLOBAL_CALL=false
SQS_GLOBAL_CHATS_DELETE=false
SQS_GLOBAL_CHATS_SET=false
SQS_GLOBAL_CHATS_UPDATE=false
SQS_GLOBAL_CHATS_UPSERT=false
SQS_GLOBAL_CONNECTION_UPDATE=false
SQS_GLOBAL_CONTACTS_SET=false
SQS_GLOBAL_CONTACTS_UPDATE=false
SQS_GLOBAL_CONTACTS_UPSERT=false
SQS_GLOBAL_GROUP_PARTICIPANTS_UPDATE=false
SQS_GLOBAL_GROUP_UPDATE=false
SQS_GLOBAL_GROUPS_UPSERT=false
SQS_GLOBAL_LABELS_ASSOCIATION=false
SQS_GLOBAL_LABELS_EDIT=false
SQS_GLOBAL_LOGOUT_INSTANCE=false
SQS_GLOBAL_MESSAGES_DELETE=false
SQS_GLOBAL_MESSAGES_EDITED=false
SQS_GLOBAL_MESSAGES_SET=false
SQS_GLOBAL_MESSAGES_UPDATE=false
SQS_GLOBAL_MESSAGES_UPSERT=false
SQS_GLOBAL_PRESENCE_UPDATE=false
SQS_GLOBAL_QRCODE_UPDATED=false
SQS_GLOBAL_REMOVE_INSTANCE=false
SQS_GLOBAL_SEND_MESSAGE=false
SQS_GLOBAL_TYPEBOT_CHANGE_STATUS=false
SQS_GLOBAL_TYPEBOT_START=false

# Websocket - Environment variables
WEBSOCKET_ENABLED=false
WEBSOCKET_GLOBAL_EVENTS=false
Expand Down
206 changes: 127 additions & 79 deletions src/api/integrations/event/sqs/sqs.controller.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import * as s3Service from '@api/integrations/storage/s3/libs/minio.server';
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { CreateQueueCommand, DeleteQueueCommand, ListQueuesCommand, SQS } from '@aws-sdk/client-sqs';
import { configService, Log, Sqs } from '@config/env.config';
import { configService, HttpServer, Log, S3, Sqs } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { join } from 'path';

import { EmitData, EventController, EventControllerInterface } from '../event.controller';
import { EventDto } from '../event.dto';
Expand All @@ -15,27 +17,29 @@ export class SqsController extends EventController implements EventControllerInt
super(prismaRepository, waMonitor, configService.get<Sqs>('SQS')?.ENABLED, 'sqs');
}

public init(): void {
public async init(): Promise<void> {
if (!this.status) {
return;
}

new Promise<void>((resolve) => {
const awsConfig = configService.get<Sqs>('SQS');
const awsConfig = configService.get<Sqs>('SQS');

this.sqs = new SQS({
credentials: {
accessKeyId: awsConfig.ACCESS_KEY_ID,
secretAccessKey: awsConfig.SECRET_ACCESS_KEY,
},
this.sqs = new SQS({
credentials: {
accessKeyId: awsConfig.ACCESS_KEY_ID,
secretAccessKey: awsConfig.SECRET_ACCESS_KEY,
},

region: awsConfig.REGION,
});
region: awsConfig.REGION,
});

this.logger.info('SQS initialized');
this.logger.info('SQS initialized');

resolve();
});
const sqsConfig = configService.get<Sqs>('SQS');
if (this.sqs && sqsConfig.GLOBAL_ENABLED) {
const sqsEvents = Object.keys(sqsConfig.EVENTS).filter((e) => sqsConfig.EVENTS[e]);
await this.saveQueues(sqsConfig.GLOBAL_PREFIX_NAME, sqsEvents, true);
}
}

private set channel(sqs: SQS) {
Expand All @@ -47,7 +51,7 @@ export class SqsController extends EventController implements EventControllerInt
}

override async set(instanceName: string, data: EventDto): Promise<any> {
if (!this.status) {
if (!this.status || configService.get<Sqs>('SQS').GLOBAL_ENABLED) {
return;
}

Expand Down Expand Up @@ -75,6 +79,7 @@ export class SqsController extends EventController implements EventControllerInt
instanceId: this.monitor.waInstances[instanceName].instanceId,
},
};

console.log('*** payload: ', payload);
return this.prisma[this.name].upsert(payload);
}
Expand All @@ -98,66 +103,105 @@ export class SqsController extends EventController implements EventControllerInt
return;
}

const instanceSqs = await this.get(instanceName);
const sqsLocal = instanceSqs?.events;
const we = event.replace(/[.-]/gm, '_').toUpperCase();

if (instanceSqs?.enabled) {
if (this.sqs) {
if (Array.isArray(sqsLocal) && sqsLocal.includes(we)) {
const eventFormatted = `${event.replace('.', '_').toLowerCase()}`;
const queueName = `${instanceName}_${eventFormatted}.fifo`;
const sqsConfig = configService.get<Sqs>('SQS');
const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;

const message = {
event,
instance: instanceName,
data,
server_url: serverUrl,
date_time: dateTime,
sender,
apikey: apiKey,
};

const params = {
MessageBody: JSON.stringify(message),
MessageGroupId: 'evolution',
MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`,
QueueUrl: sqsUrl,
};

this.sqs.sendMessage(params, (err) => {
if (err) {
this.logger.error({
local: `${origin}.sendData-SQS`,
message: err?.message,
hostName: err?.hostname,
code: err?.code,
stack: err?.stack,
name: err?.name,
url: queueName,
server_url: serverUrl,
});
} else {
if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: `${origin}.sendData-SQS`,
...message,
};

this.logger.log(logData);
}
}
if (this.sqs) {
const sqsConfig = configService.get<Sqs>('SQS');

const we = event.replace(/[.-]/gm, '_').toUpperCase();

let sqsEvents = [];
if (sqsConfig.GLOBAL_ENABLED) {
sqsEvents = Object.keys(sqsConfig.EVENTS).filter((e) => sqsConfig.EVENTS[e]);
} else {
const instanceSqs = await this.get(instanceName);
if (instanceSqs?.enabled && Array.isArray(instanceSqs?.events)) {
sqsEvents = instanceSqs?.events;
}
}

if (Array.isArray(sqsEvents) && sqsEvents.includes(we)) {
const eventFormatted = `${event.replace('.', '_').toLowerCase()}`;
const prefixName = sqsConfig.GLOBAL_ENABLED ? sqsConfig.GLOBAL_PREFIX_NAME : instanceName;
const queueName = `${prefixName}_${eventFormatted}.fifo`;

const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;

const message = {
event,
instance: instanceName,
dataType: 'json',
data,
server: configService.get<HttpServer>('SERVER').NAME,
server_url: serverUrl,
date_time: dateTime,
sender,
apikey: apiKey,
};

const jsonStr = JSON.stringify(message);
const size = Buffer.byteLength(jsonStr, 'utf8');
if (size > sqsConfig.MAX_PAYLOAD_SIZE) {
if (!configService.get<S3>('S3').ENABLE) {
this.logger.error(
`${instanceName} - ${eventFormatted} - SQS ignored: payload (${size} bytes) exceeds SQS size limit (${sqsConfig.MAX_PAYLOAD_SIZE} bytes) and S3 storage is not enabled.`,
);
return;
}

const buffer = Buffer.from(jsonStr, 'utf8');
const fileName = `${instanceName}_${eventFormatted}_${Date.now()}.json`;
const fullName = join('messages', fileName);

await s3Service.uploadFile(fullName, buffer, size, {
'Content-Type': 'application/json',
'Cache-Control': 'no-store',
});

const fileUrl = await s3Service.getObjectUrl(fullName);

message.data = { fileUrl };
message.dataType = 's3';
}

const isGlobalEnabled = configService.get<Sqs>('SQS').GLOBAL_ENABLED;
const params = {
MessageBody: JSON.stringify(message),
MessageGroupId: 'evolution',
QueueUrl: sqsUrl,
...(!isGlobalEnabled && {
MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`,
}),
};

this.sqs.sendMessage(params, (err) => {
if (err) {
this.logger.error({
local: `${origin}.sendData-SQS`,
params: JSON.stringify(message),
sqsUrl: sqsUrl,
message: err?.message,
hostName: err?.hostname,
code: err?.code,
stack: err?.stack,
name: err?.name,
url: queueName,
server_url: serverUrl,
});
} else if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: `${origin}.sendData-SQS`,
...message,
};

this.logger.log(logData);
}
});
}
}
}

private async saveQueues(instanceName: string, events: string[], enable: boolean) {
private async saveQueues(prefixName: string, events: string[], enable: boolean) {
if (enable) {
const eventsFinded = await this.listQueuesByInstance(instanceName);
const eventsFinded = await this.listQueues(prefixName);
console.log('eventsFinded', eventsFinded);

for (const event of events) {
Expand All @@ -168,15 +212,17 @@ export class SqsController extends EventController implements EventControllerInt
continue;
}

const queueName = `${instanceName}_${normalizedEvent}.fifo`;

const queueName = `${prefixName}_${normalizedEvent}.fifo`;
try {
const isGlobalEnabled = configService.get<Sqs>('SQS').GLOBAL_ENABLED;
const createCommand = new CreateQueueCommand({
QueueName: queueName,
Attributes: {
FifoQueue: 'true',
...(isGlobalEnabled && { ContentBasedDeduplication: 'true' }),
},
});

const data = await this.sqs.send(createCommand);
this.logger.info(`Queue ${queueName} criada: ${data.QueueUrl}`);
} catch (err: any) {
Expand All @@ -186,12 +232,14 @@ export class SqsController extends EventController implements EventControllerInt
}
}

private async listQueuesByInstance(instanceName: string) {
private async listQueues(prefixName: string) {
let existingQueues: string[] = [];

try {
const listCommand = new ListQueuesCommand({
QueueNamePrefix: `${instanceName}_`,
QueueNamePrefix: `${prefixName}_`,
});

const listData = await this.sqs.send(listCommand);
if (listData.QueueUrls && listData.QueueUrls.length > 0) {
// Extrai o nome da fila a partir da URL
Expand All @@ -201,32 +249,32 @@ export class SqsController extends EventController implements EventControllerInt
});
}
} catch (error: any) {
this.logger.error(`Erro ao listar filas para a instância ${instanceName}: ${error.message}`);
this.logger.error(`Erro ao listar filas para ${prefixName}: ${error.message}`);
return;
}

// Mapeia os eventos já existentes nas filas: remove o prefixo e o sufixo ".fifo"
return existingQueues
.map((queueName) => {
// Espera-se que o nome seja `${instanceName}_${event}.fifo`
if (queueName.startsWith(`${instanceName}_`) && queueName.endsWith('.fifo')) {
return queueName.substring(instanceName.length + 1, queueName.length - 5).toLowerCase();
if (queueName.startsWith(`${prefixName}_`) && queueName.endsWith('.fifo')) {
return queueName.substring(prefixName.length + 1, queueName.length - 5).toLowerCase();
}
return '';
})
.filter((event) => event !== '');
}

// Para uma futura feature de exclusão forçada das queues
private async removeQueuesByInstance(instanceName: string) {
private async removeQueuesByInstance(prefixName: string) {
try {
const listCommand = new ListQueuesCommand({
QueueNamePrefix: `${instanceName}_`,
QueueNamePrefix: `${prefixName}_`,
});
const listData = await this.sqs.send(listCommand);

if (!listData.QueueUrls || listData.QueueUrls.length === 0) {
this.logger.info(`No queues found for instance ${instanceName}`);
this.logger.info(`No queues found for ${prefixName}`);
return;
}

Expand All @@ -240,7 +288,7 @@ export class SqsController extends EventController implements EventControllerInt
}
}
} catch (err: any) {
this.logger.error(`Error listing queues for instance ${instanceName}: ${err.message}`);
this.logger.error(`Error listing queues for ${prefixName}: ${err.message}`);
}
}
}
Loading