Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nestjstools/messaging",
"version": "2.18.1",
"version": "2.19.0",
"description": "Simplifies asynchronous and synchronous message handling with support for buses, handlers, channels, and consumers. Build scalable, decoupled applications with ease and reliability.",
"author": "Sebastian Iwanczyszyn",
"license": "MIT",
Expand Down
69 changes: 63 additions & 6 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,57 @@
import { ObjectForwardMessageNormalizer } from './normalizer/object-forward-message.normalizer';
import { Type } from '@nestjs/common';
import { DynamicModule } from '@nestjs/common/interfaces/modules/dynamic-module.interface';
import { ForwardReference } from '@nestjs/common/interfaces/modules/forward-reference.interface';

type DefineChannels = ChannelConfig[];
export type DefineChannels = ChannelConfig[];

export interface MessagingModuleOptions {
buses?: DefineBusOption[];
export interface MessagingModuleOptions extends MandatoryMessagingModuleOptions {
channels?: DefineChannels;
debug?: boolean;
logging?: boolean;
global?: boolean;
}

/**
* @description
* Configuration for a messaging bus.
* Each bus is identified by a unique name and is associated with one or more channels.
* Define buses if you want to enable sending messages over specific channels.
*/
export interface DefineBusOption {
/**
* @description
* Unique name of your bus
*/
name: string;
/**
* @description
* List of channel names the message will be sent through.
*/
channels: string[];
}

export class ChannelConfig {
/**
* @description
* If true, suppresses errors when no handler is found for a message on this channel.
* Useful for optional or loosely coupled message handling.
*/
public readonly avoidErrorsForNotExistedHandlers?: boolean;
/**
* @description
* An array of middleware objects to be applied to messages passing through this channel.
* Middleware can be used for logging, transformation, validation, etc.
*/
public readonly middlewares?: object[];
/**
* @description
* Enables or disables the consumer (listener) for this channel.
* If set to false, the channel will not actively consume messages.
*/
public readonly enableConsumer?: boolean;
/**
* @description
* Optional message normalizer to process or transform messages before they are handled & before send.
* Can be used to enforce consistent message structure.
*/
public readonly normalizer?: object;

constructor(
Expand Down Expand Up @@ -86,3 +119,27 @@ export enum ExchangeType {
FANOUT = 'fanout',
DIRECT = 'direct',
}

/**
* @description
* Allows asynchronous configuration of messaging channels, similar to NestJS's `useFactory` pattern.
* Use this to define your channels dynamically, possibly depending on other injected services.
* Note: Buses and other options must be configured in sync way.
*/
export interface MessagingModuleAsyncOptions extends MandatoryMessagingModuleOptions {
inject?: Array<Type | string | symbol>;
useChannelFactory: (...args) => Promise<DefineChannels> | DefineChannels;
imports?: Array<Type | DynamicModule | Promise<DynamicModule> | ForwardReference>;
}

/**
* @description
* Base configuration options for the MessagingModule.
* These options control core behaviors such as debugging, logging, buses and global registration.
*/
export interface MandatoryMessagingModuleOptions {
buses?: DefineBusOption[];
debug?: boolean;
logging?: boolean;
global?: boolean;
}
2 changes: 2 additions & 0 deletions src/dependency-injection/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ export enum Service {
EXCEPTION_LISTENER_HANDLER = 'messenger_EXCEPTION_LISTENER_HANDLER',
CHANNELS = 'messenger_CHANNELS',
LOGGER = 'messenger_LOGGER',
MESSAGING_MODULE_ASYNC_CHANNEL_OPTIONS = 'MESSAGING_MODULE_ASYNC_CHANNEL_OPTIONS',
MESSAGING_BUSES = 'MESSAGING_BUSES',
}
116 changes: 80 additions & 36 deletions src/messaging.module.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
import { DynamicModule, Logger as NestCommonLogger, Module, OnApplicationBootstrap } from '@nestjs/common';
import { ChannelConfig, InMemoryChannelConfig, MessagingModuleOptions } from './config';
import {
DynamicModule,
FactoryProvider,
Logger as NestCommonLogger,
Module,
OnApplicationBootstrap,
Provider,
} from '@nestjs/common';
import {
ChannelConfig,
DefineChannels,
InMemoryChannelConfig,
MessagingModuleAsyncOptions,
MessagingModuleOptions, MandatoryMessagingModuleOptions,
} from './config';
import { Service } from './dependency-injection/service';
import { CompositeChannelFactory } from './channel/factory/composite-channel.factory';
import { ChannelRegistry } from './channel/channel.registry';
Expand Down Expand Up @@ -31,23 +44,50 @@ import { ExceptionListenerHandler } from './exception-listener/exception-listene

@Module({})
export class MessagingModule implements OnApplicationBootstrap {

static forRoot(options: MessagingModuleOptions): DynamicModule {
const buses = options.buses ?? [];
const channels = options.channels ?? [];

const registerChannels = (): any => {
return {
const registerChannels: FactoryProvider = {
provide: Service.CHANNELS,
useFactory: (compositeChannelFactory: CompositeChannelFactory) => {
return channels.map((channelConfig: ChannelConfig) =>
compositeChannelFactory.create(channelConfig),
);
},
inject: [CompositeChannelFactory],
};
};

const registerBuses = (): any[] => {
return MessagingModule.createDynamicModule(options, [registerChannels]);
}

static forRootAsync(options: MessagingModuleAsyncOptions): DynamicModule {
const registerAsyncConfig: FactoryProvider = {
provide: Service.MESSAGING_MODULE_ASYNC_CHANNEL_OPTIONS,
useFactory: options.useChannelFactory,
inject: options.inject ?? [],
};

const registerChannels: FactoryProvider = {
provide: Service.CHANNELS,
useFactory: (
compositeChannelFactory: CompositeChannelFactory,
channels: DefineChannels,
) => {
return (channels ?? []).map((channelConfig: ChannelConfig) =>
compositeChannelFactory.create(channelConfig),
);
},
inject: [CompositeChannelFactory, Service.MESSAGING_MODULE_ASYNC_CHANNEL_OPTIONS],
};

return MessagingModule.createDynamicModule(options, [registerAsyncConfig, registerChannels], options.imports ?? []);
}

private static createDynamicModule(options: MandatoryMessagingModuleOptions, providers: Provider[] = [], imports: any = []): DynamicModule {
const buses = options.buses ?? [];

const registerBuses = (): FactoryProvider[] => {
return buses.map((bus) => ({
provide: `${bus.name}`,
useFactory: (
Expand Down Expand Up @@ -78,39 +118,43 @@ export class MessagingModule implements OnApplicationBootstrap {
}));
};

const defaultMessageBus = (): Provider => {
return {
provide: Service.DEFAULT_MESSAGE_BUS,
useFactory: (
messageHandlerRegistry: MessageHandlerRegistry,
middlewareRegistry: MiddlewareRegistry,
normalizerRegistry: NormalizerRegistry,
) => {
return new InMemoryMessageBus(
messageHandlerRegistry,
middlewareRegistry,
new InMemoryChannel(
new InMemoryChannelConfig({
name: 'default.bus',
middlewares: [],
avoidErrorsForNotExistedHandlers: true,
}),
),
normalizerRegistry,
);
},
inject: [
Service.MESSAGE_HANDLERS_REGISTRY,
Service.MIDDLEWARE_REGISTRY,
Service.MESSAGE_NORMALIZERS_REGISTRY,
],
};
};

return {
global: options.global ?? true,
module: MessagingModule,
imports: [DiscoveryModule],
imports: [DiscoveryModule, ...imports ],
providers: [
...providers,
defaultMessageBus(),
...registerBuses(),
registerChannels(),
{
provide: Service.DEFAULT_MESSAGE_BUS,
useFactory: (
messageHandlerRegistry: MessageHandlerRegistry,
middlewareRegistry: MiddlewareRegistry,
normalizerRegistry: NormalizerRegistry,
) => {
return new InMemoryMessageBus(
messageHandlerRegistry,
middlewareRegistry,
new InMemoryChannel(
new InMemoryChannelConfig({
name: 'default.bus',
middlewares: [],
avoidErrorsForNotExistedHandlers: true,
}),
),
normalizerRegistry,
);
},
inject: [
Service.MESSAGE_HANDLERS_REGISTRY,
Service.MIDDLEWARE_REGISTRY,
Service.MESSAGE_NORMALIZERS_REGISTRY,
],
},
{
provide: Service.MESSAGE_HANDLERS_REGISTRY,
useClass: MessageHandlerRegistry,
Expand Down Expand Up @@ -156,7 +200,7 @@ export class MessagingModule implements OnApplicationBootstrap {
],
exports: [
Service.DEFAULT_MESSAGE_BUS,
...registerBuses(),
...registerBuses().map(bus => bus.provide),
DistributedConsumer,
ObjectForwardMessageNormalizer,
],
Expand Down
84 changes: 84 additions & 0 deletions test/e2e/dispatch-and-handle-message-async.e2e-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { Test, TestingModule } from '@nestjs/testing';
import { INestApplication, Logger } from '@nestjs/common';
import { DefaultMessageOptions, IMessageBus, MessageResponse, RoutingMessage } from '../../src';
import { TestMessage } from '../support/app/test.message';
import { SpyDataService } from '../support/app/spy-data.service';
import { Service } from '../../src/dependency-injection/service';
import { ObjectForwardMessageNormalizer } from '../../src/normalizer/object-forward-message.normalizer';
import { HandlersException } from '../../src/exception/handlers.exception';
import { TestAsyncModule } from '../support/app/test-async.module';

/**
* @description Same test as DispatchAndHandleMessage but test async loading MessagingModule
*/
describe('DispatchAndHandleMessageAsync', () => {
let app: INestApplication;
let defaultMessageBus: IMessageBus;
let messageBus: IMessageBus;
let middlewareMessageBus: IMessageBus;
let spyDataService: SpyDataService;

beforeEach(async () => {
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [TestAsyncModule],
}).compile();
Logger.overrideLogger(false);

app = moduleFixture.createNestApplication();
await app.init();
defaultMessageBus = app.get(Service.DEFAULT_MESSAGE_BUS);
messageBus = app.get('message.bus');
middlewareMessageBus = app.get('middleware-message.bus');
spyDataService = app.get(SpyDataService);
});

afterAll(async () => {
await app.close();
});

it('will dispatch message to void handler and void second handler and check if everything is correct via spy service', async () => {
await messageBus.dispatch(
new RoutingMessage(new TestMessage('xyz'), 'message.void', new DefaultMessageOptions([], true, ObjectForwardMessageNormalizer)),
);

expect(spyDataService.getFirst()).toBe('xyz');
expect(spyDataService.getAllData()[1]).toBe('xyz2');
});

it('will dispatch message to throwable handler', async () => {
try {
await messageBus.dispatch(
new RoutingMessage(new TestMessage('xyz'), 'message.throwable', new DefaultMessageOptions([], true, ObjectForwardMessageNormalizer)),
);
} catch (e) {
expect(e).toBeInstanceOf(HandlersException);
}
});

it('will dispatch message to returned handler and expected returned result', async () => {
const result = await messageBus.dispatch(
new RoutingMessage(new TestMessage('xyz'), 'message.returned'),
);

expect(result).toEqual(
new MessageResponse([{ id: 'uuid', response: 'xyz' }]),
);
});

it('check if middleware will correctly applied', async () => {
await middlewareMessageBus.dispatch(
new RoutingMessage(new TestMessage('xyz'), 'message.void'),
);
const data: string[] = spyDataService.getAllData();

expect(data).toHaveLength(3);
expect(data[0]).toBe('MIDDLEWARE WORKS');
expect(data[1]).toBe('xyz');
});

it('Dispatch message by DEFAULT MESSAGE BUS and do not show error when handler does not exists', async () => {
await defaultMessageBus.dispatch(
new RoutingMessage(new TestMessage('xyz'), 'message_for_not_existed_handler'),
);
});
});
46 changes: 46 additions & 0 deletions test/support/app/test-async.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { Module } from '@nestjs/common';
import { InMemoryChannelConfig, MessagingModule } from '../../../src';
import { ReturnedHandler, ThrowableHandler, VoidHandler, VoidSecondHandler } from './test.handler';
import { SpyDataService } from './spy-data.service';
import { TestService } from './test.service';
import { SpyDataMiddleware } from './spy-data-middleware.service';

@Module({
imports: [
MessagingModule.forRootAsync({
buses: [
{
name: 'message.bus',
channels: ['simple'],
},
{
name: 'middleware-message.bus',
channels: ['middleware-simple'],
},
],
useChannelFactory: () => {
return [
new InMemoryChannelConfig({
avoidErrorsForNotExistedHandlers: false,
name: 'simple',
}),
new InMemoryChannelConfig({
avoidErrorsForNotExistedHandlers: false,
name: 'middleware-simple',
middlewares: [SpyDataMiddleware],
}),
];
}
}),
],
providers: [
SpyDataMiddleware,
ReturnedHandler,
ThrowableHandler,
VoidHandler,
VoidSecondHandler,
SpyDataService,
TestService,
],
})
export class TestAsyncModule {}