Skip to content

Commit

Permalink
Add beforeHook and afterHook
Browse files Browse the repository at this point in the history
  • Loading branch information
Florian Gambert committed Apr 12, 2021
1 parent dbd1f88 commit b54f964
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ jobs:
- run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/secondQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/thirdQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/fourthQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/fithQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/unackedQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"*.random.routingKey.*"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/firstQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"*.randomBis.routingKey.*"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/secondQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"test3.*.routingKey.test3"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/thirdQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"test4.*.routingKey.test4"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/fourthQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"test5.*.routingKey.test5"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/fithQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"*.randomUnacked.routingKey.*"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/unackedQueue

- run: npm install
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,22 @@ Default: `return true`

Function run before each message treatment. If it return a false value, the message is reject.

##### beforeHook

Type: `function`<br>
Default: () => {}
Parameters: `message` (see below)

Function run before each message treatment, can modify message.

##### afterHook

Type: `function`<br>
Default: () => {}
Parameters: `message` (see below), `subscriberResult` (ACK/NACK/REJECT)

Function run after each message treatment.

### rabQ.start()

Starts a connection.
Expand Down
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class RabQ extends EventEmitter {
this.validators.consumer = () => true; // By default, all message are valid
}

this.beforeHook = opts.beforeHook || (() => {});
this.afterHook = opts.afterHook || (() => {});

_connection.set(this, undefined);
_channel.set(this, undefined);
}
Expand Down
13 changes: 13 additions & 0 deletions lib/set-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ function execAction(parsedMsg, rabQ, ch) {
msg: `${msgSubscribers.length} subscribers for this message (${parsedMsg.rk})`
});

rabQ.beforeHook(parsedMsg);

const runActionsPromises = msgSubscribers.map(actions => {
actions.before = actions.before || (() => Promise.resolve());
actions.after = actions.after || ((message, returnCode) => Promise.resolve(returnCode));
Expand Down Expand Up @@ -194,6 +196,7 @@ function execAction(parsedMsg, rabQ, ch) {
}

delete rabQ.unackedMessages[unackedMessageId];
return subscriberResult;
})
.catch(err => {
rabQ.emit('log', {
Expand All @@ -214,5 +217,15 @@ function execAction(parsedMsg, rabQ, ch) {
msg: `Unable to nack message after error while treating message`,
err
});
})
.then(subscriberResult => rabQ.afterHook(parsedMsg, subscriberResult))
.catch(err => {
rabQ.emit('log', {
level: 'error',
uuid: parsedMsg.content.uuid,
token: parsedMsg.token,
msg: `afterHooks errors after treating message`,
err
});
});
}
31 changes: 30 additions & 1 deletion test/set-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ test('set consumer with message which validate', async t => {
const p = await makeRabQ(c);

p.subscribesTo(/test4\.random\.routingKey\.test4/, message => {
t.is('Function called', 'Function called');
t.pass('Function called');
return Promise.resolve(message.ACK);
});

Expand Down Expand Up @@ -186,3 +186,32 @@ test('autoAck mode', async t => {

return delay(1000);
});

test('set before and after hooks', async t => {
t.plan(2);

const contentToSend = {toto: 'tata'};
let afterHookMsg = null;

const c = Object.assign({}, minimalOptions);
c.queues = 'fithQueue';
c.beforeHook = msg => {
msg.test = 'before';
};
c.afterHook = (msg, result) => {
afterHookMsg = msg.test + result;
};
const p = await makeRabQ(c);

p.subscribesTo(/test5\.random\.routingKey\.test5/, message => {
t.pass('Function called');
return Promise.resolve(message.ACK);
});

p.publish('test5.random.routingKey.test5', contentToSend);

return delay(1000)
.then(() => {
t.is(afterHookMsg, 'beforeACK');
});
});

0 comments on commit b54f964

Please sign in to comment.