Skip to content

Commit 50c432c

Browse files
feat: enhancement azure service - supports topic/subscriptions
1 parent 11e6955 commit 50c432c

File tree

7 files changed

+162
-39
lines changed

7 files changed

+162
-39
lines changed

README.md

Lines changed: 62 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ yarn add @nestjstools/messaging @nestjstools/messaging-azure-service-bus-extensi
2929

3030
---
3131

32+
### Basic Example (Queue Mode)
33+
3234
```typescript
3335
import { Module } from '@nestjs/common';
3436
import { MessagingModule } from '@nestjstools/messaging';
@@ -45,15 +47,52 @@ import { MessagingAzureServiceBusExtensionModule, AzureServiceBusChannelConfig }
4547
channels: ['azure-channel'],
4648
},
4749
],
48-
channels: [
49-
new AzureServiceBusChannelConfig({
50-
name: 'azure-channel',
51-
enableConsumer: true,
52-
autoCreate: false, // You need to have admin account to create a queue
53-
fullyQualifiedNamespace: 'Endpoint=...',
54-
queue: 'azure-queue',
55-
}),
50+
channels: [
51+
new AzureServiceBusChannelConfig({
52+
name: 'azure-channel',
53+
autoCreate: false, // Requires admin access in Azure to auto-create resources
54+
enableConsumer: true, // Needed for `autoCreate` and message consumption
55+
connectionString: 'Endpoint=...SharedAccessKey=...',
56+
queue: 'azure-queue',
57+
// mode: Mode.QUEUE, // Optional: default is 'QUEUE'
58+
}),
59+
],
60+
debug: true, // Optional: Enable debugging for Messaging operations
61+
}),
62+
],
63+
})
64+
export class AppModule {}
65+
```
66+
67+
### Topic/Subscription Mode (Pub/Sub)
68+
69+
```typescript
70+
import { Module } from '@nestjs/common';
71+
import { MessagingModule } from '@nestjstools/messaging';
72+
import { SendMessageHandler } from './handlers/send-message.handler';
73+
import { MessagingAzureServiceBusExtensionModule, AzureServiceBusChannelConfig } from '@nestjstools/messaging-azure-service-bus-extension';
74+
75+
@Module({
76+
imports: [
77+
MessagingAzureServiceBusExtensionModule,
78+
MessagingModule.forRoot({
79+
buses: [
80+
{
81+
name: 'azure.bus',
82+
channels: ['azure-channel'],
83+
},
5684
],
85+
channels: [
86+
new AzureServiceBusChannelConfig({
87+
name: 'azure-pubsub-channel',
88+
autoCreate: true, // Automatically create topic and subscription (if they don’t exist)
89+
enableConsumer: true, // Needed for `autoCreate` and message consumption
90+
connectionString: 'Endpoint=...SharedAccessKey=...',
91+
topic: 'azure-topic',
92+
subscription: 'azure-subscription',
93+
mode: Mode.TOPIC,
94+
}),
95+
],
5796
debug: true, // Optional: Enable debugging for Messaging operations
5897
}),
5998
],
@@ -133,15 +172,19 @@ export class CreateUserHandler implements IMessageHandler<CreateUser>{
133172

134173
#### **AzureServiceBusChannelConfig**
135174

136-
| **Property** | **Description** | **Default Value** |
137-
|-------------------------------|-----------------------------------------------------------------------------------------------|-------------------|
138-
| **`name`** | The name of the messaging channel within your app (used for internal routing). | |
139-
| **`fullyQualifiedNamespace`** | Azure service bus credentials (e.g., `Endpoint=sb:...`). | |
140-
| **`enableConsumer`** | Whether to enable message consumption (i.e., subscribing and processing messages from queue). | `true` |
141-
| **`autoCreate`** | Automatically create the queue, but admin permission is required from IAM | `true` |
142-
| **`queue`** | The name of the queue to publish messages and consume. | |
143-
144-
---
145-
146-
## Real world working example with RabbitMQ & Redis - but might be helpful to understand how it works
175+
| **Property** | **Description** | **Default Value** |
176+
|----------------------------------------|------------------------------------------------------------------------------------------------------------|-------------------|
177+
| **`name`** | The name of the messaging channel within your app (used for internal routing). | |
178+
| **`connectionString`** | Full Azure Service Bus connection string (`Endpoint=sb://...;SharedAccessKeyName=...;...`). | |
179+
| **`mode`** | Messaging mode: `'queue'` (point-to-point) or `'topic'` (publish-subscribe). | `Mode.QUEUE` |
180+
| **`queue`** | The queue name (used in `Mode.QUEUE`). | |
181+
| **`topic`** | The topic name (used in `Mode.TOPIC`). | |
182+
| **`subscription`** | The subscription name under the topic (required when `mode` is `TOPIC`). | |
183+
| **`enableConsumer`** | Whether to enable message consumption (subscribe and process messages). | `true` |
184+
| **`autoCreate`** | Automatically create the queue/topic/subscription if not found. Requires Service Bus **admin privileges**. | `false` |
185+
| **`middlewares`** | Optional array of middleware functions for pre-processing incoming messages. | `[]` |
186+
| **`avoidErrorsForNotExistedHandlers`** | Ignore errors when a routing key doesn’t match any registered handler. | `false` |
187+
188+
189+
### Real world working example with RabbitMQ & Redis - but might be helpful to understand how it works
147190
https://github.com/nestjstools/messaging-rabbitmq-example

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nestjstools/messaging-azure-service-bus-extension",
3-
"version": "1.0.0",
3+
"version": "1.1.0",
44
"description": "Extension to handle messages and dispatch them over Azure service bus",
55
"author": "Sebastian Iwanczyszyn",
66
"private": false,

src/channel/azure-service-bus-channel-factory.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { AzureServiceBusChannel } from './azure-service-bus.channel';
2-
import {Injectable} from "@nestjs/common";
2+
import { Injectable } from "@nestjs/common";
33
import { ChannelFactory, IChannelFactory } from '@nestjstools/messaging';
44
import { AzureServiceBusChannelConfig } from './azure-service-bus-channel.config';
55

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
import { ChannelConfig } from '@nestjstools/messaging';
22

33
export class AzureServiceBusChannelConfig extends ChannelConfig {
4-
public readonly queue: string;
5-
public readonly fullyQualifiedNamespace: string;
4+
public readonly mode?: Mode;
5+
public readonly queue?: string;
6+
public readonly topic?: string;
7+
public readonly subscription?: string;
8+
public readonly connectionString: string;
69
public readonly autoCreate?: boolean;
710

811
constructor({
912
name,
1013
queue,
11-
fullyQualifiedNamespace,
14+
mode,
15+
topic,
16+
subscription,
17+
connectionString,
1218
autoCreate,
1319
enableConsumer,
1420
avoidErrorsForNotExistedHandlers,
@@ -17,7 +23,16 @@ export class AzureServiceBusChannelConfig extends ChannelConfig {
1723
}: AzureServiceBusChannelConfig) {
1824
super(name, avoidErrorsForNotExistedHandlers, middlewares, enableConsumer, normalizer)
1925
this.queue = queue;
20-
this.fullyQualifiedNamespace = fullyQualifiedNamespace;
21-
this.autoCreate = autoCreate ?? true;
26+
this.connectionString = connectionString;
27+
this.queue = queue ?? Mode.QUEUE;
28+
this.mode = mode;
29+
this.topic = topic;
30+
this.subscription = subscription;
31+
this.autoCreate = autoCreate ?? false;
2232
}
2333
}
34+
35+
export enum Mode {
36+
QUEUE = 'queue',
37+
TOPIC = 'topic',
38+
}
Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Channel } from '@nestjstools/messaging';
2-
import { AzureServiceBusChannelConfig } from './azure-service-bus-channel.config';
2+
import { AzureServiceBusChannelConfig, Mode } from './azure-service-bus-channel.config';
33
import { ServiceBusAdministrationClient, ServiceBusClient } from '@azure/service-bus';
44

55
export class AzureServiceBusChannel extends Channel<AzureServiceBusChannelConfig> {
@@ -8,10 +8,47 @@ export class AzureServiceBusChannel extends Channel<AzureServiceBusChannelConfig
88

99
constructor(config: AzureServiceBusChannelConfig) {
1010
super(config);
11-
this.client = new ServiceBusClient(config.fullyQualifiedNamespace);
11+
this.client = new ServiceBusClient(config.connectionString);
1212

1313
if (config.autoCreate) {
14-
this.adminClient = new ServiceBusAdministrationClient(config.fullyQualifiedNamespace);
14+
this.adminClient = new ServiceBusAdministrationClient(config.connectionString);
1515
}
16+
17+
if (Mode.QUEUE === config.mode && !config.queue) {
18+
throw new Error('For [Mode.QUEUE] property `queue` in config must be defined');
19+
}
20+
21+
if (Mode.TOPIC === config.mode && (!config.topic || !config.subscription)) {
22+
throw new Error('For [Mode.TOPIC] properties `topic` and `subscription` in config must be defined');
23+
}
24+
}
25+
26+
isQueueMode(): boolean {
27+
return Mode.QUEUE === this.config.mode;
28+
}
29+
30+
isTopicMode(): boolean {
31+
return Mode.TOPIC === this.config.mode;
32+
}
33+
34+
getQueue(): string {
35+
if (!this.config.queue) {
36+
throw new Error('[queue] in config is not defined');
37+
}
38+
return this.config.queue;
39+
}
40+
41+
getTopic(): string {
42+
if (!this.config.topic) {
43+
throw new Error('[topic] in config is not defined');
44+
}
45+
return this.config.topic;
46+
}
47+
48+
getSubscription(): string {
49+
if (!this.config.subscription) {
50+
throw new Error('[subscription] in config is not defined');
51+
}
52+
return this.config.subscription;
1653
}
1754
}

src/consumer/azure-service-bus-messaging-consumer.ts

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { Injectable, OnApplicationShutdown } from '@nestjs/common';
55
import { MessageConsumer } from '@nestjstools/messaging';
66
import { ConsumerDispatchedMessageError } from '@nestjstools/messaging';
77
import { ROUTING_KEY_ATTRIBUTE_NAME } from '../const';
8-
import { MessagingException } from '@nestjstools/messaging/lib/exception/messaging.exception';
8+
import { ServiceBusReceiver } from '@azure/service-bus';
99

1010
@Injectable()
1111
@MessageConsumer(AzureServiceBusChannel)
@@ -14,14 +14,24 @@ export class AzureServiceBusMessagingConsumer implements IMessagingConsumer<Azur
1414

1515
async consume(dispatcher: ConsumerMessageDispatcher, channel: AzureServiceBusChannel): Promise<void> {
1616
this.channel = channel;
17-
const receiver = this.channel.client.createReceiver(this.channel.config.queue);
17+
await this.autoCreate();
18+
let receiver: ServiceBusReceiver;
19+
20+
if (this.channel.isQueueMode()) {
21+
receiver = this.channel.client.createReceiver(this.channel.getQueue());
22+
} else {
23+
receiver = this.channel.client.createReceiver(
24+
this.channel.getTopic(),
25+
this.channel.getSubscription(),
26+
);
27+
}
1828

1929
receiver.subscribe({
2030
processMessage: async (message) => {
2131
const routingKey = message.applicationProperties?.[ROUTING_KEY_ATTRIBUTE_NAME];
2232

2333
if (!routingKey) {
24-
throw new MessagingException(`Routing header [${ROUTING_KEY_ATTRIBUTE_NAME}] not found in message attribute`);
34+
throw new Error(`Routing header [${ROUTING_KEY_ATTRIBUTE_NAME}] not found in message attribute`);
2535
}
2636

2737
await dispatcher.dispatch(
@@ -30,15 +40,36 @@ export class AzureServiceBusMessagingConsumer implements IMessagingConsumer<Azur
3040
},
3141
processError: async (args) => {
3242
console.error(
33-
`Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `,
34-
args.error
43+
`Error occurred with Azure service bus: ${args.error.message}`,
44+
args.error,
3545
);
3646
},
3747
});
3848

3949
return Promise.resolve();
4050
}
4151

52+
async autoCreate(): Promise<void> {
53+
if (this.channel.config.autoCreate && this.channel.adminClient && this.channel.isQueueMode()) {
54+
const isExists = await this.channel.adminClient.queueExists(this.channel.getQueue());
55+
if (!isExists) {
56+
await this.channel.adminClient.createQueue(this.channel.getQueue());
57+
}
58+
}
59+
60+
if (this.channel.config.autoCreate && this.channel.adminClient && this.channel.isTopicMode()) {
61+
const topicExists = await this.channel.adminClient.topicExists(this.channel.getTopic());
62+
if (!topicExists) {
63+
await this.channel.adminClient.createTopic(this.channel.getTopic());
64+
}
65+
66+
const subscriptionExists = await this.channel.adminClient.subscriptionExists(this.channel.getTopic(), this.channel.getSubscription());
67+
if (!subscriptionExists) {
68+
await this.channel.adminClient.createSubscription(this.channel.getTopic(), this.channel.getSubscription());
69+
}
70+
}
71+
}
72+
4273
async onError(errored: ConsumerDispatchedMessageError, channel: AzureServiceBusChannel): Promise<void> {
4374
return Promise.resolve();
4475
}

src/message-bus/azure-service-bus-message-bus.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,15 @@ export class AzureServiceBusMessageBus implements IMessageBus {
1313
}
1414

1515
async dispatch(message: RoutingMessage): Promise<object | void> {
16-
if (this.channel.config.autoCreate && this.channel.adminClient) {
17-
await this.channel.adminClient.createQueue(this.channel.config.queue);
18-
}
19-
2016
const serviceBusMessage: ServiceBusMessage = {
2117
body: message.message,
2218
applicationProperties: {
2319
[ROUTING_KEY_ATTRIBUTE_NAME]: message.messageRoutingKey,
2420
}
2521
};
2622

27-
const sender = this.channel.client.createSender(this.channel.config.queue);
23+
const senderName = this.channel.isQueueMode() ? this.channel.getQueue() : this.channel.getTopic();
24+
const sender = this.channel.client.createSender(senderName);
2825

2926
try {
3027
await sender.sendMessages(serviceBusMessage);

0 commit comments

Comments
 (0)