Develop NestJs microservices using Dapr pubsub, actors and bindings.
Dapr Module for Nest built on top of the latest Dapr JS SDK.
npm i @rayondigital/nest-daprInstall Dapr as per getting started guide. Ensure Dapr is running with
dapr --versionOutput:
CLI version: 1.15.1
Runtime version: 1.15.4
The following scaffolds a Nest project with the nest-dapr package and demonstrates using Nest with Dapr using actors and RabbitMQ pubsub bindings.
Install Nest CLI
npm install -g @nestjs/cliScaffold Nest project
nest new nest-dapr
cd nest-dapr/
Install nest-dapr package
npm i --save @rayondigital/nest-daprImport DaprModule in AppModule class
@Module({
imports: [DaprModule.register()],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}Import DaprClient from @dapr/dapr package and add dependency to AppController class
import { DaprClient } from '@dapr/dapr';
import { Controller, Get } from '@nestjs/common';
import { AppService } from './app.service';
@Controller()
export class AppController {
constructor(
private readonly appService: AppService,
private readonly daprClient: DaprClient,
) {}
@Get()
getHello(): string {
return this.appService.getHello();
}
}Create actors and connect them to your NestJS application using the @DaprActor decorator.
This decorator takes in the interface of the actor, and marks the Actor as transient inside the NestJS
dependency injection container.
Ensure your actor classes are added to the providers array of your NestJS module.
// You must expose your actors interface as an abstract class because Typescript interfaces are not available at runtime (erasure).
// Having the interface as an abstract class allows us to call the actor by only knowing the interface type.
export abstract class CounterActorInterface {
abstract increment(): Promise<number>;
abstract getCounter(): Promise<number>;
}
@DaprActor({
interfaceType: CounterActorInterface,
})
export class CounterActor
extends StatefulActor
implements CounterActorInterface
{
// You can inject other NestJS services into your actor.
// Only Singleton services are supported at this time.
@Inject(CacheService)
private readonly cacheService: CacheService;
counter: number;
async onActivate(): Promise<void> {
this.counter = await this.getState('counter', 0);
return super.onActivate();
}
async increment(): Promise<number> {
this.counter++;
// Use a NestJS service as an example.
// Share in memory state between actors on this node.
// You probably will never want to do this, but we're just demonstrating a singleton service.
await this.cacheService.increment('total');
await this.setState('counter', this.counter);
await this.saveState();
return this.counter;
}
async getCounter(): Promise<number> {
return this.counter;
}
}This module provides the DaprActorClient which is a NestJS service.
It can be injected into controllers, services, handlers and other actors.
It acts as a proxy service to the actors, and allows you to call methods on the actors - similar to the Orleans GrainFactory.
@Controller()
export class CounterController {
constructor(
private readonly actorClient: DaprActorClient,
) {}
@Get(":id")
async increment(@Param("id") id: string): Promise<string> {
const value = await this.actorClient
.getActor(CounterActorInterface, id)
.increment();
return `Counter incremented to ${value}`;
}
}Workflows and Activities are annotated with the @DaprWorkflow and @DaprActivity decorators respectively.
When added to the providers array of a NestJS module, they are automatically registered with the Dapr server.
Note: Take care to ensure that your activities are stateless, and idempotent. Be very careful with the state, and services you have inside your workflows.
@DaprActivity()
export class HelloActivity implements WorkflowActivity<string, string> {
async run(context: WorkflowActivityContext, name: string): Promise<string> {
return `Hello ${name}!`;
}
}
@DaprActivity()
export class CreateEntityActivity implements WorkflowActivity<string, Entity> {
@Inject()
private readonly entityService: EntityService;
constructor(private readonly cacheService: CacheService) {}
async run(context: WorkflowActivityContext, id: string): Promise<Entity> {
const entity: Entity = { id: id, createdAt: new Date(), lastUpdatedAt: new Date(), status: 'created', data: {} };
await this.entityService.update(entity);
console.log('entity', entity);
return entity;
}
}
@DaprActivity()
export class GetEntityActivity implements WorkflowActivity<string, Entity> {
@Inject()
private readonly entityService: EntityService;
constructor(private readonly cacheService: CacheService) {}
async run(context: WorkflowActivityContext, id: string): Promise<Entity> {
const entity = await this.entityService.get(id);
console.log('entity', entity);
return entity;
}
}
@DaprWorkflow()
export class HelloWorkflow implements Workflow<string[], string> {
async *run(ctx: WorkflowContext, input: string): AsyncGenerator<unknown, string[]> {
const cities: string[] = [];
let entity = expect<Entity>(yield ctx.callActivity(CreateEntityActivity, '12345'));
ctx.setCustomStatus('Entity');
entity = expect<Entity>(yield ctx.callActivity(GetEntityActivity, '12345'));
console.log('entity', entity);
ctx.setCustomStatus('Entity');
const result1 = expect<string>(yield ctx.callActivity(HelloActivity, 'Tokyo'));
ctx.setCustomStatus('Tokyo');
const event = yield ctx.waitForExternalEvent('next');
console.log('event', event);
const result2 = expect<string>(yield ctx.callActivity(HelloActivity, 'Seattle'));
ctx.setCustomStatus('Seattle');
const result3 = expect<string>(yield ctx.callActivity(HelloActivity, 'London'));
ctx.setCustomStatus('London');
return cities;
}
}@Controller()
export class WorkflowController {
constructor(
private readonly workflowClient: DaprWorkflowClient,
) {}
@Get(":id")
async start(@Param("id") uuid: string): Promise<string> {
const id = await workflowClient.scheduleNewWorkflow(HelloWorkflow, 'Hello', uuid);
// Workflow is started, and the id is returned.
// You can wait for the workflow to start and get the initial state.
const initialState = await workflowClient.waitForWorkflowStart(id, undefined, 15);
// You can raise events
// await workflowClient.raiseEvent(id, 'next', { input: 'next' });
// Optionally you can also wait for it to complete.
// const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 15);
// Use the workflowOutput helper to get typed variables
// const value = workflowOutput(HelloWorkflow, state);
return {
id: id,
state: initialState
}
}
}Create pubsub & topic names used for pubsub operations and message interface
const pubSubName = 'my-pubsub';
const topicName = 'my-topic';
interface Message {
hello: string;
}
@Controller()Create endpoint to publish topic message
@Post('pubsub')
async pubsub(): Promise<boolean> {
const message: Message = { hello: 'world' };
return this.daprClient.pubsub.publish(pubSubName, topicName, message);
}Create pubsub handler which will subscribe to the topic and log the received message
@DaprPubSub(pubSubName, topicName)
pubSubHandler(message: Message): void {
console.log(`Received topic:${topicName} message:`, message);
}Create Dapr pubsub component in components folder
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: my-pubsub
namespace: default
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: host
value: amqp://guest:guest@localhost:5674Save file as components/rabbitmq-pubsub.yaml
Create docker-compose.yml in the project root used to run RabbitMQ
version: '3.9'
services:
pubsub:
image: rabbitmq:3-management-alpine
ports:
- 5674:5672
- 15674:15672Start RabbitMQ
docker-compose upCreate script to bootstrap your Nest project using Dapr sidecar. Update package.json and add script
"scripts": {
..
"start:dapr": "dapr run --app-id nest-dapr --app-protocol http --app-port 50001 --dapr-http-port 50000 --components-path ./components npm run start"
},Start Nest app with Dapr
npm run start:daprInvoke endpoint to publish the message
curl -X POST localhost:3000/pubsubThis should publish a message to RabbitMQ which should be consumed by the handler and written to the console:
== APP == Received topic:my-topic message: { hello: 'world' }
Full example
import { DaprClient } from '@dapr/dapr';
import { DaprPubSub } from '@rayondigital/nest-dapr';
import { Controller, Get, Post } from '@nestjs/common';
import { AppService } from './app.service';
const pubSubName = 'my-pubsub';
const topicName = 'my-topic';
interface Message {
hello: string;
}
@Controller()
export class AppController {
constructor(
private readonly appService: AppService,
private readonly daprClient: DaprClient,
) {}
@Get()
getHello(): string {
return this.appService.getHello();
}
@Post('pubsub')
async pubsub(): Promise<boolean> {
const message: Message = { hello: 'world' };
return this.daprClient.pubsub.publish(pubSubName, topicName, message);
}
@DaprPubSub(pubSubName, topicName)
pubSubHandler(message: Message): void {
console.log(`Received topic:${topicName} message:`, message);
}
}DaprModule is a global Nest Module used to register DaprServer & DaprClient as providers within your project. It also registers all your handlers which listen to Dapr pubsub and input bindings so that when messages are received by Dapr, they are forwarded to the handler. Handler registration occurs during the onApplicationBootstrap lifecycle hook.
To use nest-dapr, import the DaprModule into the root AppModule and run the register() static method.
@Module({
imports: [DaprModule.register()],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}register() takes an optional DaprModuleOptions object which allows passing arguments to DaprServer instance.
export interface DaprModuleOptions {
serverHost?: string;
serverPort?: string;
daprHost?: string;
daprPort?: string;
communicationProtocol?: CommunicationProtocolEnum;
clientOptions?: DaprClientOptions;
}See Dapr JS docs for more information about these arguments.
You can pass your options asynchronously instead of statically. In this case, use the registerAsync() method, which provides several ways to deal with async configuration. One of which is to use a factory function:
DaprModule.registerAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
serverHost: configService.get('DAPR_SERVER_HOST'),
serverPort: configService.get('DAPR_SERVER_PORT'),
daprHost: configService.get('DAPR_HOST'),
daprPort: configService.get('DAPR_PORT'),
communicationProtocol: CommunicationProtocolEnum.GRPC,
clientOptions: {
logger: {
level: LogLevel.Verbose,
},
},
}),
inject: [ConfigService],
})DaprModule registers DaprServer and DaprClient as Nest providers. These can be injected into your controllers and services like any other provider.
import { DaprClient } from '@dapr/dapr';
import { Controller, Post } from '@nestjs/common';
@Controller()
export class AppController {
constructor(readonly daprClient: DaprClient) {}
@Post()
async pubsub(): Promise<boolean> {
return this.daprClient.pubsub.publish('my-pub-sub', 'my-topic', {
hello: 'world',
});
}
}nest-dapr provides two TypeScript decorators which are used to declaratively configure subscriptions and bindings. These are used by DaprModule in conjunction with the handler method to define the handler implementations.
DaprPubSub decorator is used to set-up a handler for receiving pubsub topic messages. The handler has 3 arguments (name, topicName & route). name specifies the pubsub component name as defined in the Dapr component metadata section. topicName is the name of the pubsub topic. Route is an optional argument and defines possible routing values.
Example:
@DaprPubSub('my-pubsub', 'my-topic')
pubSubHandler(message: any): void {
console.log('Received message:', message);
}RabbitMQ pubsub Component:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: my-pubsub
namespace: default
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: host
value: amqp://guest:guest@localhost:5674Publish message:
await this.daprClient.pubsub.publish('my-pubsub', 'my-topic', { hello: 'world' });In this example the handler pubSubHandler method will receive messages from the my-topic topic through the my-pubsub component which in this case is RabbitMQ.
DaprBinding decorator is used to set-up a handler for receiving input binding data. The handler has one argument name which specifies the binding component name as defined in the Dapr component metadata section.
Example:
@DaprBinding('my-queue-binding')
bindingHandler(message: any): void {
coneole.log('Received message:', message);
}RabbitMQ binding component:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: my-queue-binding
namespace: default
spec:
type: bindings.rabbitmq
version: v1
metadata:
- name: queueName
value: queue1
- name: host
value: amqp://guest:guest@localhost:5674
- name: durable
value: true
- name: deleteWhenUnused
value: false
- name: ttlInSeconds
value: 60
- name: prefetchCount
value: 0
- name: exclusive
value: false
- name: maxPriority
value: 5
- name: contentType
value: "text/plain"Send message:
await this.daprClient.binding.send('my-queue-binding', 'create', { hello: 'world' });In this example the handler bindingHandler method will receive messages from the queue1 queue defined in the my-queue-binding component which in this case is RabbitMQ.
DaprModule uses reflection to register all handlers found either in Controller or Provider classes. These classes must be registered in a Nest module. Providers must be decorated with the @Injectable() decorator at the class level. Once this is done and your provider is added to your module's [providers] array then nest-dapr will use Nest dependency injection container to resolve the provider instance and call your handler when the message is received.
Here's an example of a Provider containing a Dapr handler.
import { DaprPubSub } from '@rayondigital/nest-dapr';
import { Injectable, Logger } from '@nestjs/common';
@Injectable()
export class AppService {
private readonly logger = new Logger(AppService.name);
@DaprPubSub('my-pubsub', 'my-topic')
pubSubHandler(message: any): void {
this.logger.log(`Received topic message:`, message);
}
}| Example | Description |
|---|---|
| Basics | Demonstrates a very basic actors, pubsub & input binding using RabbitMQ |
Dapr is a complex set of tools and services and must be set-up and deployed carefully to ensure your system operates correctly. This library is merely integration using the existing Dapr js-sdk. If things are not working out for you please review:
- Your configuration
- Your Dapr local environment
- Your port numbers and hostnames
- Dapr & SDK documentation
- The tests and examples in this project
If you find that both Dapr and the Javascript SDK is both working fine but nest-dapr is not working in some way,
please file an issue and state clearly the problem and provide a reproducible code example.
Filing an issue with something like: "It doesn't work" is likely to be ignored or removed.
Thanks to:
- @dbc-tech/nest-dapr - We forked from this repository
- nad-au - Worked on pubsub and initial integration
- dapr-nestjs-pubsub - The original library
- @dapr/dapr - Development team
- Nest - Development team
Released under the MIT license. No warranty expressed or implied.