Skip to content

Commit f6e67e5

Browse files
feat: rabbitmq client
1 parent da76951 commit f6e67e5

File tree

5 files changed

+87
-132
lines changed

5 files changed

+87
-132
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nestjstools/messaging-rabbitmq-extension",
3-
"version": "3.1.1",
3+
"version": "3.2.0",
44
"description": "Extension to handle messages and dispatch them over AMQP protocol",
55
"author": "Sebastian Iwanczyszyn",
66
"private": false,
@@ -51,7 +51,7 @@
5151
"test:e2e": "node_modules/.bin/jest --config ./test/jest-e2e.json"
5252
},
5353
"dependencies": {
54-
"amqplib": "^0.10.9"
54+
"rabbitmq-client": "^5.0.5"
5555
},
5656
"peerDependencies": {
5757
"@nestjs/common": "^10.x||^11.x",

src/channel/amqp.channel.ts

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,20 @@
11
import { Channel } from '@nestjstools/messaging';
2-
import { RmqChannelConfig as ExtensionAmqpChannelConfig } from './rmq-channel.config';
3-
import * as amqp from 'amqplib';
2+
import { RmqChannelConfig } from './rmq-channel.config';
3+
import { Connection } from 'rabbitmq-client';
44

5-
export class AmqpChannel extends Channel<ExtensionAmqpChannelConfig> {
6-
public connection?: any;
7-
public readonly config: ExtensionAmqpChannelConfig;
5+
export class AmqpChannel extends Channel<
6+
RmqChannelConfig
7+
> {
8+
public readonly connection: Connection;
9+
public readonly config: RmqChannelConfig;
810

9-
constructor(config: ExtensionAmqpChannelConfig) {
11+
constructor(config: RmqChannelConfig) {
1012
super(config);
11-
this.config = config;
12-
}
13-
14-
async init(): Promise<void> {
15-
if (this.connection) {
16-
return Promise.resolve();
17-
}
18-
19-
this.connection = undefined;
20-
21-
this.connection = await amqp.connect(this.config.connectionUri);
22-
this.connection.on('close', (err: any) => {
23-
if (err) {
24-
console.error('AMQP Connection error:', err);
25-
}
26-
process.exit(0);
27-
});
13+
this.connection = new Connection(config.connectionUri);
2814
}
2915

3016
async onChannelDestroy(): Promise<void> {
31-
if (this.connection) {
32-
await this.connection.close();
33-
this.connection = undefined;
34-
}
17+
await this.connection.close();
18+
return Promise.resolve();
3519
}
3620
}

src/consumer/rabbitmq-messaging.consumer.ts

Lines changed: 30 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { RABBITMQ_HEADER_ROUTING_KEY } from '../const';
33
import { IMessagingConsumer } from '@nestjstools/messaging';
44
import { ConsumerMessageDispatcher } from '@nestjstools/messaging';
55
import { ConsumerMessage } from '@nestjstools/messaging';
6-
import { Injectable, OnModuleDestroy } from '@nestjs/common';
6+
import { Injectable } from '@nestjs/common';
77
import { MessageConsumer } from '@nestjstools/messaging';
88
import { ConsumerDispatchedMessageError } from '@nestjstools/messaging';
99
import { RabbitmqMigrator } from '../migrator/rabbitmq.migrator';
@@ -12,82 +12,63 @@ import { Buffer } from 'buffer';
1212
@Injectable()
1313
@MessageConsumer(AmqpChannel)
1414
export class RabbitmqMessagingConsumer
15-
implements IMessagingConsumer<AmqpChannel>, OnModuleDestroy
15+
implements IMessagingConsumer<AmqpChannel>
1616
{
1717
private channel?: AmqpChannel = undefined;
18-
private amqpChannel: any;
1918

2019
constructor(private readonly rabbitMqMigrator: RabbitmqMigrator) {}
2120

2221
async consume(
2322
dispatcher: ConsumerMessageDispatcher,
2423
channel: AmqpChannel,
2524
): Promise<void> {
26-
await channel.init();
27-
this.channel = channel;
2825
await this.rabbitMqMigrator.run(channel);
26+
this.channel = channel;
2927

30-
const amqpChannel = await this.channel.connection.createChannel();
31-
this.amqpChannel = amqpChannel;
32-
33-
if (!amqpChannel) {
34-
throw new Error('AMQP channel not initialized');
35-
}
36-
37-
await amqpChannel.prefetch(1);
38-
await amqpChannel.consume(
39-
channel.config.queue,
40-
async (msg) => {
41-
if (!msg) return;
28+
channel.connection.createConsumer(
29+
{
30+
queue: channel.config.queue,
31+
queueOptions: { durable: true },
32+
requeue: false,
33+
},
34+
async (msg): Promise<void> => {
35+
const rabbitMqMessage = msg as RabbitMQMessage;
4236

43-
let message: any = msg.content;
37+
let message = rabbitMqMessage.body;
4438
if (Buffer.isBuffer(message)) {
45-
message = JSON.parse(message.toString());
39+
const messageContent = message.toString();
40+
message = JSON.parse(messageContent);
4641
}
4742

4843
const routingKey =
49-
msg.properties.headers?.[RABBITMQ_HEADER_ROUTING_KEY] ??
50-
msg.fields.routingKey;
44+
rabbitMqMessage.headers?.[RABBITMQ_HEADER_ROUTING_KEY] ??
45+
rabbitMqMessage.routingKey;
5146

52-
if (dispatcher.isReady()) {
53-
await dispatcher.dispatch(new ConsumerMessage(message, routingKey));
54-
amqpChannel.ack(msg);
55-
return;
56-
}
57-
58-
amqpChannel.nack(msg, false, true);
47+
dispatcher.dispatch(new ConsumerMessage(message, routingKey));
5948
},
60-
{ noAck: false },
6149
);
50+
51+
return Promise.resolve();
6252
}
6353

6454
async onError(
6555
errored: ConsumerDispatchedMessageError,
6656
channel: AmqpChannel,
6757
): Promise<void> {
68-
if (channel.config.deadLetterQueueFeature && this.amqpChannel) {
69-
const exchange = 'dead_letter.exchange';
70-
const routingKey = `${channel.config.queue}_dead_letter`;
71-
72-
this.amqpChannel.publish(
73-
exchange,
74-
routingKey,
75-
Buffer.from(JSON.stringify(errored.dispatchedConsumerMessage.message)),
76-
{
77-
headers: {
78-
'messaging-routing-key':
79-
errored.dispatchedConsumerMessage.routingKey,
80-
},
58+
if (channel.config.deadLetterQueueFeature) {
59+
const publisher = channel.connection.createPublisher();
60+
const envelope = {
61+
headers: {
62+
'messaging-routing-key': errored.dispatchedConsumerMessage.routingKey,
8163
},
82-
);
64+
exchange: 'dead_letter.exchange',
65+
routingKey: `${channel.config.queue}_dead_letter`,
66+
};
67+
await publisher.send(envelope, errored.dispatchedConsumerMessage.message);
68+
await publisher.close();
8369
}
84-
}
8570

86-
async onModuleDestroy(): Promise<void> {
87-
if (this.channel?.connection) {
88-
await this.channel.connection.close();
89-
}
90-
this.channel = undefined;
71+
return Promise.resolve();
9172
}
9273
}
9374

src/message-bus/amqp-message.bus.ts

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
import { RoutingMessage } from '@nestjstools/messaging';
22
import { IMessageBus } from '@nestjstools/messaging';
33
import { Injectable } from '@nestjs/common';
4-
import { AmqpChannel } from '../channel/amqp.channel';
4+
import { Connection } from 'rabbitmq-client';
55
import { AmqpMessageOptions } from '../message/amqp-message-options';
6+
import { AmqpChannel } from '../channel/amqp.channel';
67
import { AmqpMessageBuilder } from './amqp-message.builder';
78
import { RABBITMQ_HEADER_ROUTING_KEY } from '../const';
89
import { ExchangeType } from '../channel/rmq-channel.config';
910

1011
@Injectable()
1112
export class AmqpMessageBus implements IMessageBus {
12-
public publisherChannel?: any;
13+
private readonly connection: Connection;
1314

14-
constructor(private readonly amqpChannel: AmqpChannel) {}
15+
constructor(private readonly amqpChanel: AmqpChannel) {
16+
this.connection = amqpChanel.connection;
17+
}
1518

1619
async dispatch(message: RoutingMessage): Promise<object | void> {
17-
await this.amqpChannel.init();
18-
await this.initPublisherChannel();
19-
2020
if (
2121
message.messageOptions !== undefined &&
2222
!(message.messageOptions instanceof AmqpMessageOptions)
@@ -37,21 +37,9 @@ export class AmqpMessageBus implements IMessageBus {
3737
);
3838

3939
const amqpMessage = messageBuilder.buildMessage();
40-
41-
await this.publisherChannel.publish(
42-
amqpMessage.envelope.exchange,
43-
amqpMessage.envelope.routingKey,
44-
Buffer.from(JSON.stringify(amqpMessage.message)),
45-
{
46-
headers: amqpMessage.envelope.headers,
47-
},
48-
);
49-
}
50-
51-
async initPublisherChannel() {
52-
if (!this.publisherChannel && this.amqpChannel.connection) {
53-
this.publisherChannel = await this.amqpChannel.connection.createChannel();
54-
}
40+
const publisher = await this.connection.createPublisher();
41+
await publisher.send(amqpMessage.envelope, amqpMessage.message);
42+
await publisher.close();
5543
}
5644

5745
private createMessageBuilderWhenUndefined(
@@ -61,13 +49,13 @@ export class AmqpMessageBus implements IMessageBus {
6149

6250
messageBuilder
6351
.withMessage(message.message)
64-
.withExchangeName(this.amqpChannel.config.exchangeName);
52+
.withExchangeName(this.amqpChanel.config.exchangeName);
6553

66-
if (this.amqpChannel.config.exchangeType === ExchangeType.DIRECT) {
54+
if (this.amqpChanel.config.exchangeType === ExchangeType.DIRECT) {
6755
messageBuilder.withRoutingKey(this.getRoutingKey(message));
6856
}
6957

70-
if (this.amqpChannel.config.exchangeType === ExchangeType.TOPIC) {
58+
if (this.amqpChanel.config.exchangeType === ExchangeType.TOPIC) {
7159
messageBuilder.withRoutingKey(message.messageRoutingKey);
7260
}
7361

@@ -79,11 +67,10 @@ export class AmqpMessageBus implements IMessageBus {
7967
): AmqpMessageBuilder {
8068
const options = message.messageOptions as AmqpMessageOptions;
8169
const messageBuilder = AmqpMessageBuilder.create();
82-
8370
messageBuilder
8471
.withMessage(message.message)
8572
.withExchangeName(
86-
options.exchangeName ?? this.amqpChannel.config.exchangeName,
73+
options.exchangeName ?? this.amqpChanel.config.exchangeName,
8774
)
8875
.withRoutingKey(options.routingKey ?? this.getRoutingKey(message))
8976
.withHeaders(options.headers);
@@ -92,9 +79,9 @@ export class AmqpMessageBus implements IMessageBus {
9279
}
9380

9481
private getRoutingKey(message: RoutingMessage): string {
95-
return this.amqpChannel.config.bindingKeys !== undefined
96-
? this.amqpChannel.config.bindingKeys.length > 0
97-
? this.amqpChannel.config.bindingKeys[0]
82+
return this.amqpChanel.config.bindingKeys !== undefined
83+
? this.amqpChanel.config.bindingKeys.length > 0
84+
? this.amqpChanel.config.bindingKeys[0]
9885
: message.messageRoutingKey
9986
: message.messageRoutingKey;
10087
}

src/migrator/rabbitmq.migrator.ts

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,48 @@
11
import { AmqpChannel } from '../channel/amqp.channel';
22
import { Injectable } from '@nestjs/common';
3+
import { ExchangeType } from '../channel/rmq-channel.config';
34

45
@Injectable()
56
export class RabbitmqMigrator {
6-
async run(channel: AmqpChannel): Promise<void> {
7-
if (channel.config.autoCreate === false) {
7+
async run(channel: AmqpChannel): Promise<any> {
8+
if (false === channel.config.autoCreate) {
89
return;
910
}
1011

11-
const amqpChannel = await channel.connection.createChannel();
12-
13-
await amqpChannel.assertExchange(
14-
channel.config.exchangeName,
15-
channel.config.exchangeType,
16-
{ durable: true },
17-
);
12+
await channel.connection.exchangeDeclare({
13+
durable: true,
14+
exchange: channel.config.exchangeName,
15+
type: channel.config.exchangeType,
16+
});
1817

19-
await amqpChannel.assertQueue(channel.config.queue, {
18+
await channel.connection.queueDeclare({
2019
durable: true,
20+
queue: channel.config.queue,
2121
});
2222

2323
if (channel.config.deadLetterQueueFeature === true) {
24-
const dlxExchange = 'dead_letter.exchange';
25-
const dlq = `${channel.config.queue}_dead_letter`;
26-
27-
await amqpChannel.assertExchange(dlxExchange, 'direct', {
24+
await channel.connection.exchangeDeclare({
2825
durable: true,
26+
exchange: 'dead_letter.exchange',
27+
type: ExchangeType.DIRECT,
28+
});
29+
await channel.connection.queueDeclare({
30+
durable: true,
31+
queue: `${channel.config.queue}_dead_letter`,
32+
});
33+
await channel.connection.queueBind({
34+
queue: `${channel.config.queue}_dead_letter`,
35+
exchange: 'dead_letter.exchange',
36+
routingKey: `${channel.config.queue}_dead_letter`,
2937
});
30-
31-
await amqpChannel.assertQueue(dlq, { durable: true });
32-
33-
await amqpChannel.bindQueue(dlq, dlxExchange, dlq);
3438
}
3539

36-
// Bindings
37-
for (const bindingKey of channel.config.bindingKeys ?? []) {
38-
await amqpChannel.bindQueue(
39-
channel.config.queue,
40-
channel.config.exchangeName,
41-
bindingKey,
42-
);
40+
for (const bindingKey of channel.config.bindingKeys) {
41+
await channel.connection.queueBind({
42+
queue: channel.config.queue,
43+
exchange: channel.config.exchangeName,
44+
routingKey: bindingKey,
45+
});
4346
}
4447
}
4548
}

0 commit comments

Comments
 (0)