Skip to content

Commit c53427a

Browse files
feat: root async (#25)
* feat: root async in MessagingModule * feat: root async in MessagingModule
1 parent 86c8a26 commit c53427a

File tree

6 files changed

+276
-43
lines changed

6 files changed

+276
-43
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nestjstools/messaging",
3-
"version": "2.18.1",
3+
"version": "2.19.0",
44
"description": "Simplifies asynchronous and synchronous message handling with support for buses, handlers, channels, and consumers. Build scalable, decoupled applications with ease and reliability.",
55
"author": "Sebastian Iwanczyszyn",
66
"license": "MIT",

src/config.ts

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,57 @@
11
import { ObjectForwardMessageNormalizer } from './normalizer/object-forward-message.normalizer';
2+
import { Type } from '@nestjs/common';
3+
import { DynamicModule } from '@nestjs/common/interfaces/modules/dynamic-module.interface';
4+
import { ForwardReference } from '@nestjs/common/interfaces/modules/forward-reference.interface';
25

3-
type DefineChannels = ChannelConfig[];
6+
export type DefineChannels = ChannelConfig[];
47

5-
export interface MessagingModuleOptions {
6-
buses?: DefineBusOption[];
8+
export interface MessagingModuleOptions extends MandatoryMessagingModuleOptions {
79
channels?: DefineChannels;
8-
debug?: boolean;
9-
logging?: boolean;
10-
global?: boolean;
1110
}
1211

12+
/**
13+
* @description
14+
* Configuration for a messaging bus.
15+
* Each bus is identified by a unique name and is associated with one or more channels.
16+
* Define buses if you want to enable sending messages over specific channels.
17+
*/
1318
export interface DefineBusOption {
19+
/**
20+
* @description
21+
* Unique name of your bus
22+
*/
1423
name: string;
24+
/**
25+
* @description
26+
* List of channel names the message will be sent through.
27+
*/
1528
channels: string[];
1629
}
1730

1831
export class ChannelConfig {
32+
/**
33+
* @description
34+
* If true, suppresses errors when no handler is found for a message on this channel.
35+
* Useful for optional or loosely coupled message handling.
36+
*/
1937
public readonly avoidErrorsForNotExistedHandlers?: boolean;
38+
/**
39+
* @description
40+
* An array of middleware objects to be applied to messages passing through this channel.
41+
* Middleware can be used for logging, transformation, validation, etc.
42+
*/
2043
public readonly middlewares?: object[];
44+
/**
45+
* @description
46+
* Enables or disables the consumer (listener) for this channel.
47+
* If set to false, the channel will not actively consume messages.
48+
*/
2149
public readonly enableConsumer?: boolean;
50+
/**
51+
* @description
52+
* Optional message normalizer to process or transform messages before they are handled & before send.
53+
* Can be used to enforce consistent message structure.
54+
*/
2255
public readonly normalizer?: object;
2356

2457
constructor(
@@ -86,3 +119,27 @@ export enum ExchangeType {
86119
FANOUT = 'fanout',
87120
DIRECT = 'direct',
88121
}
122+
123+
/**
124+
* @description
125+
* Allows asynchronous configuration of messaging channels, similar to NestJS's `useFactory` pattern.
126+
* Use this to define your channels dynamically, possibly depending on other injected services.
127+
* Note: Buses and other options must be configured in sync way.
128+
*/
129+
export interface MessagingModuleAsyncOptions extends MandatoryMessagingModuleOptions {
130+
inject?: Array<Type | string | symbol>;
131+
useChannelFactory: (...args) => Promise<DefineChannels> | DefineChannels;
132+
imports?: Array<Type | DynamicModule | Promise<DynamicModule> | ForwardReference>;
133+
}
134+
135+
/**
136+
* @description
137+
* Base configuration options for the MessagingModule.
138+
* These options control core behaviors such as debugging, logging, buses and global registration.
139+
*/
140+
export interface MandatoryMessagingModuleOptions {
141+
buses?: DefineBusOption[];
142+
debug?: boolean;
143+
logging?: boolean;
144+
global?: boolean;
145+
}

src/dependency-injection/service.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,6 @@ export enum Service {
99
EXCEPTION_LISTENER_HANDLER = 'messenger_EXCEPTION_LISTENER_HANDLER',
1010
CHANNELS = 'messenger_CHANNELS',
1111
LOGGER = 'messenger_LOGGER',
12+
MESSAGING_MODULE_ASYNC_CHANNEL_OPTIONS = 'MESSAGING_MODULE_ASYNC_CHANNEL_OPTIONS',
13+
MESSAGING_BUSES = 'MESSAGING_BUSES',
1214
}

src/messaging.module.ts

Lines changed: 80 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,18 @@
1-
import { DynamicModule, Logger as NestCommonLogger, Module, OnApplicationBootstrap } from '@nestjs/common';
2-
import { ChannelConfig, InMemoryChannelConfig, MessagingModuleOptions } from './config';
1+
import {
2+
DynamicModule,
3+
FactoryProvider,
4+
Logger as NestCommonLogger,
5+
Module,
6+
OnApplicationBootstrap,
7+
Provider,
8+
} from '@nestjs/common';
9+
import {
10+
ChannelConfig,
11+
DefineChannels,
12+
InMemoryChannelConfig,
13+
MessagingModuleAsyncOptions,
14+
MessagingModuleOptions, MandatoryMessagingModuleOptions,
15+
} from './config';
316
import { Service } from './dependency-injection/service';
417
import { CompositeChannelFactory } from './channel/factory/composite-channel.factory';
518
import { ChannelRegistry } from './channel/channel.registry';
@@ -31,23 +44,50 @@ import { ExceptionListenerHandler } from './exception-listener/exception-listene
3144

3245
@Module({})
3346
export class MessagingModule implements OnApplicationBootstrap {
47+
3448
static forRoot(options: MessagingModuleOptions): DynamicModule {
35-
const buses = options.buses ?? [];
3649
const channels = options.channels ?? [];
3750

38-
const registerChannels = (): any => {
39-
return {
51+
const registerChannels: FactoryProvider = {
4052
provide: Service.CHANNELS,
4153
useFactory: (compositeChannelFactory: CompositeChannelFactory) => {
4254
return channels.map((channelConfig: ChannelConfig) =>
4355
compositeChannelFactory.create(channelConfig),
4456
);
4557
},
4658
inject: [CompositeChannelFactory],
47-
};
4859
};
4960

50-
const registerBuses = (): any[] => {
61+
return MessagingModule.createDynamicModule(options, [registerChannels]);
62+
}
63+
64+
static forRootAsync(options: MessagingModuleAsyncOptions): DynamicModule {
65+
const registerAsyncConfig: FactoryProvider = {
66+
provide: Service.MESSAGING_MODULE_ASYNC_CHANNEL_OPTIONS,
67+
useFactory: options.useChannelFactory,
68+
inject: options.inject ?? [],
69+
};
70+
71+
const registerChannels: FactoryProvider = {
72+
provide: Service.CHANNELS,
73+
useFactory: (
74+
compositeChannelFactory: CompositeChannelFactory,
75+
channels: DefineChannels,
76+
) => {
77+
return (channels ?? []).map((channelConfig: ChannelConfig) =>
78+
compositeChannelFactory.create(channelConfig),
79+
);
80+
},
81+
inject: [CompositeChannelFactory, Service.MESSAGING_MODULE_ASYNC_CHANNEL_OPTIONS],
82+
};
83+
84+
return MessagingModule.createDynamicModule(options, [registerAsyncConfig, registerChannels], options.imports ?? []);
85+
}
86+
87+
private static createDynamicModule(options: MandatoryMessagingModuleOptions, providers: Provider[] = [], imports: any = []): DynamicModule {
88+
const buses = options.buses ?? [];
89+
90+
const registerBuses = (): FactoryProvider[] => {
5191
return buses.map((bus) => ({
5292
provide: `${bus.name}`,
5393
useFactory: (
@@ -78,39 +118,43 @@ export class MessagingModule implements OnApplicationBootstrap {
78118
}));
79119
};
80120

121+
const defaultMessageBus = (): Provider => {
122+
return {
123+
provide: Service.DEFAULT_MESSAGE_BUS,
124+
useFactory: (
125+
messageHandlerRegistry: MessageHandlerRegistry,
126+
middlewareRegistry: MiddlewareRegistry,
127+
normalizerRegistry: NormalizerRegistry,
128+
) => {
129+
return new InMemoryMessageBus(
130+
messageHandlerRegistry,
131+
middlewareRegistry,
132+
new InMemoryChannel(
133+
new InMemoryChannelConfig({
134+
name: 'default.bus',
135+
middlewares: [],
136+
avoidErrorsForNotExistedHandlers: true,
137+
}),
138+
),
139+
normalizerRegistry,
140+
);
141+
},
142+
inject: [
143+
Service.MESSAGE_HANDLERS_REGISTRY,
144+
Service.MIDDLEWARE_REGISTRY,
145+
Service.MESSAGE_NORMALIZERS_REGISTRY,
146+
],
147+
};
148+
};
149+
81150
return {
82151
global: options.global ?? true,
83152
module: MessagingModule,
84-
imports: [DiscoveryModule],
153+
imports: [DiscoveryModule, ...imports ],
85154
providers: [
155+
...providers,
156+
defaultMessageBus(),
86157
...registerBuses(),
87-
registerChannels(),
88-
{
89-
provide: Service.DEFAULT_MESSAGE_BUS,
90-
useFactory: (
91-
messageHandlerRegistry: MessageHandlerRegistry,
92-
middlewareRegistry: MiddlewareRegistry,
93-
normalizerRegistry: NormalizerRegistry,
94-
) => {
95-
return new InMemoryMessageBus(
96-
messageHandlerRegistry,
97-
middlewareRegistry,
98-
new InMemoryChannel(
99-
new InMemoryChannelConfig({
100-
name: 'default.bus',
101-
middlewares: [],
102-
avoidErrorsForNotExistedHandlers: true,
103-
}),
104-
),
105-
normalizerRegistry,
106-
);
107-
},
108-
inject: [
109-
Service.MESSAGE_HANDLERS_REGISTRY,
110-
Service.MIDDLEWARE_REGISTRY,
111-
Service.MESSAGE_NORMALIZERS_REGISTRY,
112-
],
113-
},
114158
{
115159
provide: Service.MESSAGE_HANDLERS_REGISTRY,
116160
useClass: MessageHandlerRegistry,
@@ -156,7 +200,7 @@ export class MessagingModule implements OnApplicationBootstrap {
156200
],
157201
exports: [
158202
Service.DEFAULT_MESSAGE_BUS,
159-
...registerBuses(),
203+
...registerBuses().map(bus => bus.provide),
160204
DistributedConsumer,
161205
ObjectForwardMessageNormalizer,
162206
],
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import { Test, TestingModule } from '@nestjs/testing';
2+
import { INestApplication, Logger } from '@nestjs/common';
3+
import { DefaultMessageOptions, IMessageBus, MessageResponse, RoutingMessage } from '../../src';
4+
import { TestMessage } from '../support/app/test.message';
5+
import { SpyDataService } from '../support/app/spy-data.service';
6+
import { Service } from '../../src/dependency-injection/service';
7+
import { ObjectForwardMessageNormalizer } from '../../src/normalizer/object-forward-message.normalizer';
8+
import { HandlersException } from '../../src/exception/handlers.exception';
9+
import { TestAsyncModule } from '../support/app/test-async.module';
10+
11+
/**
12+
* @description Same test as DispatchAndHandleMessage but test async loading MessagingModule
13+
*/
14+
describe('DispatchAndHandleMessageAsync', () => {
15+
let app: INestApplication;
16+
let defaultMessageBus: IMessageBus;
17+
let messageBus: IMessageBus;
18+
let middlewareMessageBus: IMessageBus;
19+
let spyDataService: SpyDataService;
20+
21+
beforeEach(async () => {
22+
const moduleFixture: TestingModule = await Test.createTestingModule({
23+
imports: [TestAsyncModule],
24+
}).compile();
25+
Logger.overrideLogger(false);
26+
27+
app = moduleFixture.createNestApplication();
28+
await app.init();
29+
defaultMessageBus = app.get(Service.DEFAULT_MESSAGE_BUS);
30+
messageBus = app.get('message.bus');
31+
middlewareMessageBus = app.get('middleware-message.bus');
32+
spyDataService = app.get(SpyDataService);
33+
});
34+
35+
afterAll(async () => {
36+
await app.close();
37+
});
38+
39+
it('will dispatch message to void handler and void second handler and check if everything is correct via spy service', async () => {
40+
await messageBus.dispatch(
41+
new RoutingMessage(new TestMessage('xyz'), 'message.void', new DefaultMessageOptions([], true, ObjectForwardMessageNormalizer)),
42+
);
43+
44+
expect(spyDataService.getFirst()).toBe('xyz');
45+
expect(spyDataService.getAllData()[1]).toBe('xyz2');
46+
});
47+
48+
it('will dispatch message to throwable handler', async () => {
49+
try {
50+
await messageBus.dispatch(
51+
new RoutingMessage(new TestMessage('xyz'), 'message.throwable', new DefaultMessageOptions([], true, ObjectForwardMessageNormalizer)),
52+
);
53+
} catch (e) {
54+
expect(e).toBeInstanceOf(HandlersException);
55+
}
56+
});
57+
58+
it('will dispatch message to returned handler and expected returned result', async () => {
59+
const result = await messageBus.dispatch(
60+
new RoutingMessage(new TestMessage('xyz'), 'message.returned'),
61+
);
62+
63+
expect(result).toEqual(
64+
new MessageResponse([{ id: 'uuid', response: 'xyz' }]),
65+
);
66+
});
67+
68+
it('check if middleware will correctly applied', async () => {
69+
await middlewareMessageBus.dispatch(
70+
new RoutingMessage(new TestMessage('xyz'), 'message.void'),
71+
);
72+
const data: string[] = spyDataService.getAllData();
73+
74+
expect(data).toHaveLength(3);
75+
expect(data[0]).toBe('MIDDLEWARE WORKS');
76+
expect(data[1]).toBe('xyz');
77+
});
78+
79+
it('Dispatch message by DEFAULT MESSAGE BUS and do not show error when handler does not exists', async () => {
80+
await defaultMessageBus.dispatch(
81+
new RoutingMessage(new TestMessage('xyz'), 'message_for_not_existed_handler'),
82+
);
83+
});
84+
});

test/support/app/test-async.module.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { Module } from '@nestjs/common';
2+
import { InMemoryChannelConfig, MessagingModule } from '../../../src';
3+
import { ReturnedHandler, ThrowableHandler, VoidHandler, VoidSecondHandler } from './test.handler';
4+
import { SpyDataService } from './spy-data.service';
5+
import { TestService } from './test.service';
6+
import { SpyDataMiddleware } from './spy-data-middleware.service';
7+
8+
@Module({
9+
imports: [
10+
MessagingModule.forRootAsync({
11+
buses: [
12+
{
13+
name: 'message.bus',
14+
channels: ['simple'],
15+
},
16+
{
17+
name: 'middleware-message.bus',
18+
channels: ['middleware-simple'],
19+
},
20+
],
21+
useChannelFactory: () => {
22+
return [
23+
new InMemoryChannelConfig({
24+
avoidErrorsForNotExistedHandlers: false,
25+
name: 'simple',
26+
}),
27+
new InMemoryChannelConfig({
28+
avoidErrorsForNotExistedHandlers: false,
29+
name: 'middleware-simple',
30+
middlewares: [SpyDataMiddleware],
31+
}),
32+
];
33+
}
34+
}),
35+
],
36+
providers: [
37+
SpyDataMiddleware,
38+
ReturnedHandler,
39+
ThrowableHandler,
40+
VoidHandler,
41+
VoidSecondHandler,
42+
SpyDataService,
43+
TestService,
44+
],
45+
})
46+
export class TestAsyncModule {}

0 commit comments

Comments
 (0)