Skip to content

Commit

Permalink
feat(rabbitmq): extend custom parsers, update docs
Browse files Browse the repository at this point in the history
Better handling and documentation around non JSON values

Closes #574
  • Loading branch information
sdomagala authored Mar 28, 2023
1 parent 4e714ee commit 5e15faf
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 20 deletions.
1 change: 1 addition & 0 deletions integration/rabbitmq/e2e/jest-e2e.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"rootDir": ".",
"testEnvironment": "node",
"testRegex": ".e2e-spec.ts$",
"testTimeout": 10000,
"transform": {
"^.+\\.ts$": "ts-jest"
}
Expand Down
8 changes: 4 additions & 4 deletions integration/rabbitmq/e2e/rpc-controller-discovery.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ describe('Rabbit Controller RPC', () => {
payload: Buffer.alloc(0),
});

expect(response).toEqual({ echo: undefined });
expect(response).toEqual({ echo: '' });
});

it('non-JSON RPC handler with unparsable message should receive a valid RPC response', async () => {
Expand All @@ -152,7 +152,7 @@ describe('Rabbit Controller RPC', () => {
payload: Buffer.from('{a:'),
});

expect(response).toEqual({ echo: undefined });
expect(response).toEqual({ echo: '{a:' });
});

it('SUBMODULE: regular RPC handler should receive a valid RPC response', async () => {
Expand Down Expand Up @@ -269,7 +269,7 @@ describe('Rabbit Controller RPC', () => {
payload: Buffer.alloc(0),
});

expect(response).toEqual({ echo: undefined });
expect(response).toEqual({ echo: '' });
});

it('SUBMODULE: non-JSON RPC handler with unparsable message should receive a valid RPC response', async () => {
Expand All @@ -279,6 +279,6 @@ describe('Rabbit Controller RPC', () => {
payload: Buffer.from('{a:'),
});

expect(response).toEqual({ echo: undefined });
expect(response).toEqual({ echo: '{a:' });
});
});
4 changes: 2 additions & 2 deletions integration/rabbitmq/e2e/rpc.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ describe('Rabbit RPC', () => {
payload: Buffer.alloc(0),
});

expect(response).toEqual({ echo: undefined });
expect(response).toEqual({ echo: '' });
});

it('non-JSON RPC handler with unparsable message should receive a valid RPC response', async () => {
Expand All @@ -88,6 +88,6 @@ describe('Rabbit RPC', () => {
payload: Buffer.from('{a:'),
});

expect(response).toEqual({ echo: undefined });
expect(response).toEqual({ echo: '{a:' });
});
});
18 changes: 11 additions & 7 deletions integration/rabbitmq/e2e/subscribe.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,21 @@ describe('Rabbit Subscribe', () => {
times(100).forEach((x) => expect(deleteHandler).toHaveBeenCalledWith(x));
});

it('should receive undefined argument when subscriber allows non-json messages and message is invalid', async () => {
amqpConnection.publish(exchange, nonJsonRoutingKey, undefined);
amqpConnection.publish(exchange, nonJsonRoutingKey, Buffer.alloc(0));
amqpConnection.publish(exchange, nonJsonRoutingKey, Buffer.from('{a:'));
it('should receive message as-is if unable to parse', async () => {
await amqpConnection.publish(exchange, nonJsonRoutingKey, undefined);
await amqpConnection.publish(exchange, nonJsonRoutingKey, Buffer.alloc(0));
await amqpConnection.publish(
exchange,
nonJsonRoutingKey,
Buffer.from('{a:'),
);

await new Promise((resolve) => setTimeout(resolve, 50));

expect(testHandler).toHaveBeenCalledTimes(3);
expect(testHandler).toHaveBeenNthCalledWith(1, undefined);
expect(testHandler).toHaveBeenNthCalledWith(2, undefined);
expect(testHandler).toHaveBeenNthCalledWith(3, undefined);
expect(testHandler).toHaveBeenNthCalledWith(1, '');
expect(testHandler).toHaveBeenNthCalledWith(2, '');
expect(testHandler).toHaveBeenNthCalledWith(3, '{a:');
});

it('should receive messages in existing queue without setting exchange and routing key on subscribe', async () => {
Expand Down
1 change: 1 addition & 0 deletions jest-e2e.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"rootDir": ".",
"testEnvironment": "node",
"testRegex": ".e2e-spec.ts$",
"testTimeout": 10000,
"transform": {
"^.+\\.ts$": "ts-jest"
},
Expand Down
41 changes: 41 additions & 0 deletions packages/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
- [Module Initialization](#module-initialization)
- [Usage with Interceptors, Guards and Filters](#usage-with-interceptors-guards-and-filters)
- [Usage with Controllers](#usage-with-controllers)
- [Interceptors, Guards, Pipes](#interceptors-guards-pipes)
- [Receiving Messages](#receiving-messages)
- [Exposing RPC Handlers](#exposing-rpc-handlers)
- [Exposing Pub/Sub Handlers](#exposing-pubsub-handlers)
- [Handling messages with format different than JSON](#handling-messages-with-format-different-than-json)
- [Message Handling](#message-handling)
- [Conditional Handler Registration](#conditional-handler-registration)
- [Dealing with the amqp original message](#dealing-with-the-amqp-original-message)
- [Selecting channel for handler](#selecting-channel-for-handler)
- [Sending Messages](#sending-messages)
- [Inject the AmqpConnection](#inject-the-amqpconnection)
Expand All @@ -32,6 +35,8 @@
- [Interop with other RPC Servers](#interop-with-other-rpc-servers)
- [Advanced Patterns](#advanced-patterns)
- [Competing Consumers](#competing-consumers)
- [Handling errors](#handling-errors)
- [Handling errors during queue creation](#handling-errors-during-queue-creation)
- [Contribute](#contribute)
- [License](#license)

Expand Down Expand Up @@ -289,6 +294,42 @@ export class MessagingService {
}
```

### Handling messages with format different than JSON

By default, messages are parsed with `JSON.parse` method when they are received and stringified with `JSON.stringify` on publish.
If you wish to change this behavior, you can use your own parsers, like so

```typescript
import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
import { Module } from '@nestjs/common';
import { MessagingController } from './messaging/messaging.controller';
import { MessagingService } from './messaging/messaging.service';
import { ConsumeMessage } from 'amqplib';

@Module({
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
// ...
deserializer: (message: Buffer, msg: ConsumeMessage) => {
const decodedMessage = myCustomDeserializer(
msg.toString(),
msg.properties.headers
);
return decodedMessage;
},
serializer: (msg: any) => {
const encodedMessage = myCustomSerializer(msg);
return Buffer.from(encodedMessage);
},
}),
],
// ...
})
export class RabbitExampleModule {}
```

Also, if you simply do not want to parse incoming message, set flag `allowNonJsonMessages` on consumer level, it will return raw message if unable to parse it

### Message Handling

NestJS Plus provides sane defaults for message handling with automatic acking of messages that have been successfully processed by either RPC or PubSub handlers. However, there are situtations where an application may want to Negatively Acknowledge (or Nack) a message. To support this, the library exposes the `Nack` object which when returned from a handler allows a developer to control the message handling behavior. Simply return a `Nack` instance to negatively acknowledge the message.
Expand Down
11 changes: 6 additions & 5 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ export class AmqpConnection {
// Check that the Buffer has content, before trying to parse it
const message =
msg.content.length > 0
? this.config.deserializer(msg.content)
? this.config.deserializer(msg.content, msg)
: undefined;

const correlationMessage: CorrelationMessage = {
Expand Down Expand Up @@ -584,13 +584,14 @@ export class AmqpConnection {
if (msg.content) {
if (allowNonJsonMessages) {
try {
message = this.config.deserializer(msg.content) as T;
message = this.config.deserializer(msg.content, msg) as T;
} catch {
// Let handler handle parsing error, it has the raw message anyway
message = undefined;
// Pass raw message since flag `allowNonJsonMessages` is set
// Casting to `any` first as T doesn't have a type
message = msg.content.toString() as any as T;
}
} else {
message = this.config.deserializer(msg.content) as T;
message = this.config.deserializer(msg.content, msg) as T;
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { LoggerService } from '@nestjs/common';
import { AmqpConnectionManagerOptions } from 'amqp-connection-manager';
import { Options } from 'amqplib';
import { ConsumeMessage, Options } from 'amqplib';
import {
AssertQueueErrorHandler,
MessageErrorHandler,
Expand Down Expand Up @@ -125,7 +125,7 @@ export interface RabbitMQConfig {
/**
* This function is used to deserialize the received message.
*/
deserializer?: (message: Buffer) => any;
deserializer?: (message: Buffer, msg: ConsumeMessage) => any;

/**
* This function is used to serialize the message to be sent.
Expand Down

0 comments on commit 5e15faf

Please sign in to comment.