Skip to content

Commit 36d1a4b

Browse files
authored
feat: update RedisChannelConfig to use ConnectionOptions (#2)
* refactor: update RedisChannelConfig to use ConnectionOptions * refactor: rename connection property to connectionOptions and enhance ConnectionOptions interface in RedisChannelConfig * refactor: remove unused comments * refactor: update RedisChannelConfig to use a simplified Connection interface and adjust related components
1 parent 884968c commit 36d1a4b

File tree

6 files changed

+62
-26
lines changed

6 files changed

+62
-26
lines changed

src/channel/redis.channel-config.ts

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,24 @@
11
import { ChannelConfig } from '@nestjstools/messaging';
2-
32
export class RedisChannelConfig extends ChannelConfig {
43
public readonly connection: Connection;
54
public readonly queue: string;
65

76
constructor({
8-
name,
9-
connection,
10-
queue,
11-
enableConsumer,
12-
avoidErrorsForNotExistedHandlers,
13-
middlewares,
14-
normalizer,
15-
}: RedisChannelConfig) {
16-
super(name, avoidErrorsForNotExistedHandlers, middlewares, enableConsumer, normalizer)
7+
name,
8+
connection,
9+
queue,
10+
enableConsumer,
11+
avoidErrorsForNotExistedHandlers,
12+
middlewares,
13+
normalizer,
14+
}: RedisChannelConfig) {
15+
super(
16+
name,
17+
avoidErrorsForNotExistedHandlers,
18+
middlewares,
19+
enableConsumer,
20+
normalizer,
21+
);
1722
this.connection = connection;
1823
this.queue = queue;
1924
}
@@ -22,4 +27,11 @@ export class RedisChannelConfig extends ChannelConfig {
2227
interface Connection {
2328
host: string;
2429
port: number;
30+
password?: string;
31+
db?: number;
32+
/**
33+
* This prefix is not used as RedisOptions keyPrefix, it is used as prefix for BullMQ
34+
* Read more: https://github.com/taskforcesh/bullmq/issues/1219#issuecomment-1113903785
35+
*/
36+
keyPrefix?: string;
2537
}

src/channel/redis.channel-factory.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import { RedisChannel } from './redis.channel';
2-
import {Injectable} from "@nestjs/common";
2+
import { Injectable } from '@nestjs/common';
33
import { ChannelFactory, IChannelFactory } from '@nestjstools/messaging';
44
import { RedisChannelConfig } from './redis.channel-config';
55

66
@Injectable()
77
@ChannelFactory(RedisChannelConfig)
8-
export class RedisChannelFactory implements IChannelFactory<RedisChannelConfig> {
8+
export class RedisChannelFactory
9+
implements IChannelFactory<RedisChannelConfig>
10+
{
911
create(channelConfig: RedisChannelConfig): RedisChannel {
1012
return new RedisChannel(channelConfig);
1113
}

src/channel/redis.channel.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,14 @@ export class RedisChannel extends Channel<RedisChannelConfig> {
88

99
constructor(config: RedisChannelConfig) {
1010
super(config);
11-
this.queue = new Queue(config.queue, { connection: config.connection });
11+
this.queue = new Queue(config.queue, {
12+
connection: {
13+
host: config.connection.host,
14+
port: config.connection.port,
15+
password: config.connection.password,
16+
db: config.connection.db,
17+
},
18+
prefix: config.connection.keyPrefix,
19+
});
1220
}
1321
}

src/consumer/redis-messaging.consumer.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,41 @@ import { Worker } from 'bullmq';
88

99
@Injectable()
1010
@MessageConsumer(RedisChannel)
11-
export class RedisMessagingConsumer implements IMessagingConsumer<RedisChannel>, OnApplicationShutdown {
11+
export class RedisMessagingConsumer
12+
implements IMessagingConsumer<RedisChannel>, OnApplicationShutdown
13+
{
1214
private channel?: RedisChannel = undefined;
1315
private worker?: Worker = undefined;
1416

15-
async consume(dispatcher: ConsumerMessageDispatcher, channel: RedisChannel): Promise<void> {
17+
async consume(
18+
dispatcher: ConsumerMessageDispatcher,
19+
channel: RedisChannel,
20+
): Promise<void> {
1621
this.channel = channel;
1722

1823
this.worker = new Worker(
1924
channel.config.queue,
2025
async (job) => {
2126
dispatcher.dispatch(new ConsumerMessage(job.data, job.name));
2227
},
23-
{ connection: channel.config.connection }
28+
{
29+
connection: {
30+
host: channel.config.connection.host,
31+
port: channel.config.connection.port,
32+
password: channel.config.connection.password,
33+
db: channel.config.connection.db,
34+
},
35+
prefix: channel.config.connection.keyPrefix,
36+
},
2437
);
2538

2639
return Promise.resolve();
2740
}
2841

29-
onError(errored: ConsumerDispatchedMessageError, channel: RedisChannel): Promise<void> {
42+
onError(
43+
errored: ConsumerDispatchedMessageError,
44+
channel: RedisChannel,
45+
): Promise<void> {
3046
return Promise.resolve();
3147
}
3248

src/message-bus/redis-message-bus-factory.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import { Injectable } from '@nestjs/common';
22
import { RedisMessageBus } from './redis-message.bus';
33
import { RedisChannel } from '../channel/redis.channel';
4-
import {IMessageBusFactory} from "@nestjstools/messaging";
5-
import {MessageBusFactory} from "@nestjstools/messaging";
6-
import {IMessageBus} from "@nestjstools/messaging";
4+
import { IMessageBusFactory } from '@nestjstools/messaging';
5+
import { MessageBusFactory } from '@nestjstools/messaging';
6+
import { IMessageBus } from '@nestjstools/messaging';
77

88
@Injectable()
99
@MessageBusFactory(RedisChannel)
10-
export class RedisMessageBusFactory implements IMessageBusFactory<RedisChannel> {
11-
10+
export class RedisMessageBusFactory
11+
implements IMessageBusFactory<RedisChannel>
12+
{
1213
create(channel: RedisChannel): IMessageBus {
1314
return new RedisMessageBus(channel);
1415
}

src/message-bus/redis-message.bus.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ import { RedisChannel } from '../channel/redis.channel';
55

66
@Injectable()
77
export class RedisMessageBus implements IMessageBus {
8-
constructor(
9-
private readonly redisChannel: RedisChannel,
10-
) {
11-
}
8+
constructor(private readonly redisChannel: RedisChannel) {}
129

1310
async dispatch(message: RoutingMessage): Promise<object | void> {
1411
this.redisChannel.queue.add(message.messageRoutingKey, message.message);

0 commit comments

Comments
 (0)