diff --git a/integration/rabbitmq/e2e/jest-e2e.json b/integration/rabbitmq/e2e/jest-e2e.json index 187406d6f..9255c9c97 100644 --- a/integration/rabbitmq/e2e/jest-e2e.json +++ b/integration/rabbitmq/e2e/jest-e2e.json @@ -3,6 +3,7 @@ "rootDir": ".", "testEnvironment": "node", "testRegex": ".e2e-spec.ts$", + "testTimeout": 10000, "transform": { "^.+\\.ts$": "ts-jest" } diff --git a/integration/rabbitmq/e2e/rpc-controller-discovery.e2e-spec.ts b/integration/rabbitmq/e2e/rpc-controller-discovery.e2e-spec.ts index 78bf6cee8..cfcc77a20 100644 --- a/integration/rabbitmq/e2e/rpc-controller-discovery.e2e-spec.ts +++ b/integration/rabbitmq/e2e/rpc-controller-discovery.e2e-spec.ts @@ -142,7 +142,7 @@ describe('Rabbit Controller RPC', () => { payload: Buffer.alloc(0), }); - expect(response).toEqual({ echo: undefined }); + expect(response).toEqual({ echo: '' }); }); it('non-JSON RPC handler with unparsable message should receive a valid RPC response', async () => { @@ -152,7 +152,7 @@ describe('Rabbit Controller RPC', () => { payload: Buffer.from('{a:'), }); - expect(response).toEqual({ echo: undefined }); + expect(response).toEqual({ echo: '{a:' }); }); it('SUBMODULE: regular RPC handler should receive a valid RPC response', async () => { @@ -269,7 +269,7 @@ describe('Rabbit Controller RPC', () => { payload: Buffer.alloc(0), }); - expect(response).toEqual({ echo: undefined }); + expect(response).toEqual({ echo: '' }); }); it('SUBMODULE: non-JSON RPC handler with unparsable message should receive a valid RPC response', async () => { @@ -279,6 +279,6 @@ describe('Rabbit Controller RPC', () => { payload: Buffer.from('{a:'), }); - expect(response).toEqual({ echo: undefined }); + expect(response).toEqual({ echo: '{a:' }); }); }); diff --git a/integration/rabbitmq/e2e/rpc.e2e-spec.ts b/integration/rabbitmq/e2e/rpc.e2e-spec.ts index 2b0052696..cdf8bf097 100644 --- a/integration/rabbitmq/e2e/rpc.e2e-spec.ts +++ b/integration/rabbitmq/e2e/rpc.e2e-spec.ts @@ -78,7 +78,7 @@ describe('Rabbit RPC', () => { payload: Buffer.alloc(0), }); - expect(response).toEqual({ echo: undefined }); + expect(response).toEqual({ echo: '' }); }); it('non-JSON RPC handler with unparsable message should receive a valid RPC response', async () => { @@ -88,6 +88,6 @@ describe('Rabbit RPC', () => { payload: Buffer.from('{a:'), }); - expect(response).toEqual({ echo: undefined }); + expect(response).toEqual({ echo: '{a:' }); }); }); diff --git a/integration/rabbitmq/e2e/subscribe.e2e-spec.ts b/integration/rabbitmq/e2e/subscribe.e2e-spec.ts index 3cf6da308..036867328 100644 --- a/integration/rabbitmq/e2e/subscribe.e2e-spec.ts +++ b/integration/rabbitmq/e2e/subscribe.e2e-spec.ts @@ -216,17 +216,21 @@ describe('Rabbit Subscribe', () => { times(100).forEach((x) => expect(deleteHandler).toHaveBeenCalledWith(x)); }); - it('should receive undefined argument when subscriber allows non-json messages and message is invalid', async () => { - amqpConnection.publish(exchange, nonJsonRoutingKey, undefined); - amqpConnection.publish(exchange, nonJsonRoutingKey, Buffer.alloc(0)); - amqpConnection.publish(exchange, nonJsonRoutingKey, Buffer.from('{a:')); + it('should receive message as-is if unable to parse', async () => { + await amqpConnection.publish(exchange, nonJsonRoutingKey, undefined); + await amqpConnection.publish(exchange, nonJsonRoutingKey, Buffer.alloc(0)); + await amqpConnection.publish( + exchange, + nonJsonRoutingKey, + Buffer.from('{a:'), + ); await new Promise((resolve) => setTimeout(resolve, 50)); expect(testHandler).toHaveBeenCalledTimes(3); - expect(testHandler).toHaveBeenNthCalledWith(1, undefined); - expect(testHandler).toHaveBeenNthCalledWith(2, undefined); - expect(testHandler).toHaveBeenNthCalledWith(3, undefined); + expect(testHandler).toHaveBeenNthCalledWith(1, ''); + expect(testHandler).toHaveBeenNthCalledWith(2, ''); + expect(testHandler).toHaveBeenNthCalledWith(3, '{a:'); }); it('should receive messages in existing queue without setting exchange and routing key on subscribe', async () => { diff --git a/jest-e2e.json b/jest-e2e.json index 995940e42..1747025c9 100644 --- a/jest-e2e.json +++ b/jest-e2e.json @@ -3,6 +3,7 @@ "rootDir": ".", "testEnvironment": "node", "testRegex": ".e2e-spec.ts$", + "testTimeout": 10000, "transform": { "^.+\\.ts$": "ts-jest" }, diff --git a/packages/rabbitmq/README.md b/packages/rabbitmq/README.md index caaa2e676..ce92c1e50 100644 --- a/packages/rabbitmq/README.md +++ b/packages/rabbitmq/README.md @@ -18,11 +18,14 @@ - [Module Initialization](#module-initialization) - [Usage with Interceptors, Guards and Filters](#usage-with-interceptors-guards-and-filters) - [Usage with Controllers](#usage-with-controllers) + - [Interceptors, Guards, Pipes](#interceptors-guards-pipes) - [Receiving Messages](#receiving-messages) - [Exposing RPC Handlers](#exposing-rpc-handlers) - [Exposing Pub/Sub Handlers](#exposing-pubsub-handlers) + - [Handling messages with format different than JSON](#handling-messages-with-format-different-than-json) - [Message Handling](#message-handling) - [Conditional Handler Registration](#conditional-handler-registration) + - [Dealing with the amqp original message](#dealing-with-the-amqp-original-message) - [Selecting channel for handler](#selecting-channel-for-handler) - [Sending Messages](#sending-messages) - [Inject the AmqpConnection](#inject-the-amqpconnection) @@ -32,6 +35,8 @@ - [Interop with other RPC Servers](#interop-with-other-rpc-servers) - [Advanced Patterns](#advanced-patterns) - [Competing Consumers](#competing-consumers) + - [Handling errors](#handling-errors) + - [Handling errors during queue creation](#handling-errors-during-queue-creation) - [Contribute](#contribute) - [License](#license) @@ -289,6 +294,42 @@ export class MessagingService { } ``` +### Handling messages with format different than JSON + +By default, messages are parsed with `JSON.parse` method when they are received and stringified with `JSON.stringify` on publish. +If you wish to change this behavior, you can use your own parsers, like so + +```typescript +import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq'; +import { Module } from '@nestjs/common'; +import { MessagingController } from './messaging/messaging.controller'; +import { MessagingService } from './messaging/messaging.service'; +import { ConsumeMessage } from 'amqplib'; + +@Module({ + imports: [ + RabbitMQModule.forRoot(RabbitMQModule, { + // ... + deserializer: (message: Buffer, msg: ConsumeMessage) => { + const decodedMessage = myCustomDeserializer( + msg.toString(), + msg.properties.headers + ); + return decodedMessage; + }, + serializer: (msg: any) => { + const encodedMessage = myCustomSerializer(msg); + return Buffer.from(encodedMessage); + }, + }), + ], + // ... +}) +export class RabbitExampleModule {} +``` + +Also, if you simply do not want to parse incoming message, set flag `allowNonJsonMessages` on consumer level, it will return raw message if unable to parse it + ### Message Handling NestJS Plus provides sane defaults for message handling with automatic acking of messages that have been successfully processed by either RPC or PubSub handlers. However, there are situtations where an application may want to Negatively Acknowledge (or Nack) a message. To support this, the library exposes the `Nack` object which when returned from a handler allows a developer to control the message handling behavior. Simply return a `Nack` instance to negatively acknowledge the message. diff --git a/packages/rabbitmq/src/amqp/connection.ts b/packages/rabbitmq/src/amqp/connection.ts index dafa1667e..0828da462 100644 --- a/packages/rabbitmq/src/amqp/connection.ts +++ b/packages/rabbitmq/src/amqp/connection.ts @@ -297,7 +297,7 @@ export class AmqpConnection { // Check that the Buffer has content, before trying to parse it const message = msg.content.length > 0 - ? this.config.deserializer(msg.content) + ? this.config.deserializer(msg.content, msg) : undefined; const correlationMessage: CorrelationMessage = { @@ -584,13 +584,14 @@ export class AmqpConnection { if (msg.content) { if (allowNonJsonMessages) { try { - message = this.config.deserializer(msg.content) as T; + message = this.config.deserializer(msg.content, msg) as T; } catch { - // Let handler handle parsing error, it has the raw message anyway - message = undefined; + // Pass raw message since flag `allowNonJsonMessages` is set + // Casting to `any` first as T doesn't have a type + message = msg.content.toString() as any as T; } } else { - message = this.config.deserializer(msg.content) as T; + message = this.config.deserializer(msg.content, msg) as T; } } diff --git a/packages/rabbitmq/src/rabbitmq.interfaces.ts b/packages/rabbitmq/src/rabbitmq.interfaces.ts index 3a03d9bc3..4e7a2cdb3 100644 --- a/packages/rabbitmq/src/rabbitmq.interfaces.ts +++ b/packages/rabbitmq/src/rabbitmq.interfaces.ts @@ -1,6 +1,6 @@ import { LoggerService } from '@nestjs/common'; import { AmqpConnectionManagerOptions } from 'amqp-connection-manager'; -import { Options } from 'amqplib'; +import { ConsumeMessage, Options } from 'amqplib'; import { AssertQueueErrorHandler, MessageErrorHandler, @@ -125,7 +125,7 @@ export interface RabbitMQConfig { /** * This function is used to deserialize the received message. */ - deserializer?: (message: Buffer) => any; + deserializer?: (message: Buffer, msg: ConsumeMessage) => any; /** * This function is used to serialize the message to be sent.