Skip to content

Commit

Permalink
Close channels when outstanding messages have been acknowledged
Browse files Browse the repository at this point in the history
  • Loading branch information
cressie176 committed Oct 24, 2021
1 parent 73be204 commit 84108dc
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 212 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ Rascal extends the existing [RabbitMQ Concepts](https://www.rabbitmq.com/tutoria

A **publication** is a named configuration for publishing a message, including the destination queue or exchange, routing configuration, encryption profile and reliability guarantees, message options, etc. A **subscription** is a named configuration for consuming messages, including the source queue, encryption profile, content encoding, delivery options (e.g. acknowledgement handling and prefetch), etc. These must be [configured](#configuration) and supplied when creating the Rascal broker. After the broker has been created the subscriptions and publications can be retrivied from the broker and used to publish and consume messages.

### Breaking Changes in Rascal@15

Rascal@15 waits for inflight messages to be acknowledged before closing subscriber channels. Prior to this version Rascal just waited an arbitary amount of time. If you application does not acknowledge a message for some reason (quite likely in tests) calling `subscription.cancel`, `broker.shutdown`, `broker.unsubscribeAll` or `broker.nuke` will wait indefinitely. You can specify a `closeTimeout` in your subscription config, however if this is exceeded the aforementioned methods will yield an error with a code of `ETIMEDOUT` and message stating `Callback function "waitForUnacknowledgedMessages" timed out`.

The correct way to handle this is to always call `ackOrNack` when receiving a message.

### Special Note

RabbitMQ 3.8.0 introduced [quorum queues](https://www.rabbitmq.com/quorum-queues.html). Although quorum queues may not be suitable in all situations, they provide [poison message handling](https://www.rabbitmq.com/quorum-queues.html#poison-message-handling) without the need for an external [redelivery counter](https://github.com/guidesmiths/rascal#dealing-with-redeliveries) and offer better data safety in the event of a network partition. You can read more about them [here](https://www.cloudamqp.com/blog/reasons-you-should-switch-to-quorum-queues.html) and [here](https://blog.rabbitmq.com/posts/2020/06/quorum-queues-local-delivery).
Expand Down Expand Up @@ -122,7 +128,7 @@ There are three situations when Rascal will nack a message without requeue, lead

1. When it is unable to parse the message content and the subscriber has no 'invalid_content' listener
1. When the subscriber's (optional) redelivery limit has been exceeded and the subscriber has neither a 'redeliveries_error' nor a 'redeliveries_exceeded' listener
1. When attempting to recover by [republishing](#republishing) or [forwarding](#forwarding), but the recovery operation fails.
1. When attempting to recover by [republishing](#republishing), [forwarding](#forwarding), but the recovery operation fails.

The reason Rascal nacks the message is because the alternatives are to leave the message unacknowledged indefinitely, or to rollback and retry the message in an infinite tight loop. This can DDOS your application and cause problems for your infrastructure. Providing you have correctly configured dead letter queues and/or listen to the "invalid_content" and "redeliveries_exceeded" subscriber events, your messages should be safe.

Expand Down Expand Up @@ -1606,11 +1612,11 @@ try {
}
```
Cancelling a subscribion will stop consuming messages, but leave the channel open for a short while so your application can still ack/nack messages. By default the channel is left open for 10 seconds, but can be overridden through the `deferCloseChannel` subscription property.
Cancelling a subscribion will stop consuming messages, but leave the channel open until any outstanding messages have been acknowledged, or the timeout specified by through the `closeTimeout` subscription property is exceeded.
## Shutdown
You can shutdown the broker by calling `await broker.shutdown()` or `broker.shutdown(cb)`. Shutting down the broker will cancel all subscriptions, then wait a short amount of time for inflight messages to be acknowledged (configurable via the `deferCloseChannel` subscription property), before closing channels and disconnecting.
You can shutdown the broker by calling `await broker.shutdown()` or `broker.shutdown(cb)`.
## Bonus Features
Expand Down
15 changes: 2 additions & 13 deletions lib/amqp/Broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,13 @@ function Broker(config, components) {
};

this.unsubscribeAll = function (next) {
const timeout = getMaxDeferCloseChannelTimeout();
async.eachSeries(
async.each(
sessions.slice(),
(session, cb) => {
sessions.shift();
session.cancel(cb);
},
(err) => {
if (err) return next(err);
debug('Waiting %dms for all subscriber channels to close', timeout);
setTimeoutUnref(next, timeout);
}
next
);
};

Expand All @@ -227,10 +222,4 @@ function Broker(config, components) {
this._addSubscription = function (subscription) {
subscriptions[subscription.name] = subscription;
};

function getMaxDeferCloseChannelTimeout() {
return sessions.reduce((value, session) => {
return session._maxDeferCloseChannel(value);
}, 0);
}
}
4 changes: 3 additions & 1 deletion lib/amqp/SubscriberError.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ module.exports = function SubscriptionRecovery(broker, vhost) {
{
name: 'unknown',
execute(session, message, err, strategyConfig, next) {
next(new Error(format('Error recovering message: %s. No such strategy: %s.', message.properties.messageId, strategyConfig.strategy)));
session._nack(message, () => {
next(new Error(format('Error recovering message: %s. No such strategy: %s.', message.properties.messageId, strategyConfig.strategy)));
});
},
},
],
Expand Down
69 changes: 35 additions & 34 deletions lib/amqp/SubscriberSession.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const debug = require('debug')('rascal:SubscriberSession');
const EventEmitter = require('events').EventEmitter;
const inherits = require('util').inherits;
const _ = require('lodash');
const async = require('async');
const setTimeoutUnref = require('../utils/setTimeoutUnref');

module.exports = SubscriberSession;
Expand All @@ -25,7 +26,7 @@ function SubscriberSession(sequentialChannelOperations, config) {
this._open = function (channel, consumerTag, next) {
if (cancelled) return next(new Error('Subscriber has been cancelled'));
debug('Opening subscriber session: %s on channel: %s', consumerTag, channel._rascal_id);
channels[consumerTag] = { index: index++, channel, consumerTag };
channels[consumerTag] = { index: index++, channel, consumerTag, unacknowledgedMessages: 0 };
channel.once('close', unref.bind(null, consumerTag));
channel.once('error', unref.bind(null, consumerTag));
next();
Expand All @@ -47,12 +48,18 @@ function SubscriberSession(sequentialChannelOperations, config) {

this._unsafeClose = function (next) {
withCurrentChannel(
(channel, consumerTag) => {
(channel, consumerTag, entry) => {
entry.doomed = true;
debug('Cancelling subscriber session: %s on channel: %s', consumerTag, channel._rascal_id);
channel.cancel(consumerTag, (err) => {
if (err) return next(err);
doom(consumerTag);
next();
const waitOrTimeout = config.closeTimeout ? async.timeout(waitForUnacknowledgedMessages, config.closeTimeout) : waitForUnacknowledgedMessages;
waitOrTimeout(entry, (err) => {
channel.close(() => {
debug('Channel: %s was closed', entry.channel._rascal_id);
next(err);
});
});
});
},
() => {
Expand All @@ -66,10 +73,6 @@ function SubscriberSession(sequentialChannelOperations, config) {
timeout = setTimeoutUnref(fn, delay);
};

this._maxDeferCloseChannel = function (other) {
return Math.max(config.deferCloseChannel, other);
};

this._getRascalChannelId = function () {
let rascalChannelId = null;
withCurrentChannel((channel) => {
Expand All @@ -78,12 +81,27 @@ function SubscriberSession(sequentialChannelOperations, config) {
return rascalChannelId;
};

this._incrementUnacknowledgeMessageCount = function (consumerTag) {
if (config.options.noAck) return;
withConsumerChannel(consumerTag, (channel, __, entry) => {
debug('Channel: %s has %s unacknowledged messages', channel._rascal_id, ++entry.unacknowledgedMessages);
});
};

this._decrementUnacknowledgeMessageCount = function (consumerTag) {
if (config.options.noAck) return;
withConsumerChannel(consumerTag, (channel, __, entry) => {
debug('Channel: %s has %s unacknowledged messages', channel._rascal_id, --entry.unacknowledgedMessages);
});
};

this._ack = function (message, next) {
withConsumerChannel(
message.fields.consumerTag,
(channel) => {
debug('Acknowledging message: %s on channel: %s', message.properties.messageId, channel._rascal_id);
channel.ack(message);
self._decrementUnacknowledgeMessageCount(message.fields.consumerTag);
setImmediate(next);
},
() => {
Expand All @@ -101,6 +119,7 @@ function SubscriberSession(sequentialChannelOperations, config) {
(channel) => {
debug('Not acknowledging message: %s with requeue: %s on channel: %s', message.properties.messageId, !!options.requeue, channel._rascal_id);
channel.nack(message, false, !!options.requeue);
self._decrementUnacknowledgeMessageCount(message.fields.consumerTag);
setImmediate(next);
},
() => {
Expand All @@ -114,9 +133,7 @@ function SubscriberSession(sequentialChannelOperations, config) {
function withCurrentChannel(fn, altFn) {
const entry = _.chain(channels)
.values()
.filter((channel) => {
return !channel.doomed;
})
.filter((entry) => !entry.doomed)
.sortBy('index')
.last()
.value();
Expand All @@ -137,28 +154,12 @@ function SubscriberSession(sequentialChannelOperations, config) {
});
}

function doom(consumerTag) {
withConsumerChannel(consumerTag, (channel, consumerTag, entry) => {
if (entry.doomed) return;
entry.doomed = true;
scheduleClose(entry);
});
}

/*
There may still be delivered messages that have yet to be ack or nacked
but no way of telling how many are outstanding since due to potentially
complicated recovery strategies, with timeouts etc.
Keeping channels around for a minute shouldn't hurt
*/
function scheduleClose(entry) {
debug('Deferring close channel: %s by %dms', entry.channel._rascal_id, config.deferCloseChannel);
setTimeoutUnref(() => {
withConsumerChannel(entry.consumerTag, (channel) => {
channel.close(() => {
debug('Channel: %s was closed', channel._rascal_id);
});
});
}, config.deferCloseChannel);
function waitForUnacknowledgedMessages(entry, next) {
if (entry.unacknowledgedMessages > 0) {
debug('Waiting for %d unacknowledged messages from channel: %s', entry.unacknowledgedMessages, entry.channel._rascal_id);
setTimeoutUnref(() => waitForUnacknowledgedMessages(entry, next), 100);
return;
}
next();
}
}
1 change: 1 addition & 0 deletions lib/amqp/Subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ function Subscription(broker, vhost, config, counter) {
if (!message) return handleConsumerCancel(session, config, removeErrorHandlers);

debug('Received message: %s from queue: %s', message.properties.messageId, config.queue);
session._incrementUnacknowledgeMessageCount(message.fields.consumerTag);

decorateWithRoutingHeaders(message);
if (immediateNack(message)) return ackOrNack(session, message, true);
Expand Down
1 change: 0 additions & 1 deletion lib/config/baseline.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ module.exports = {
timeout: 1000,
counter: 'stub',
},
deferCloseChannel: 10000,
options: {},
},
redeliveries: {
Expand Down
2 changes: 1 addition & 1 deletion lib/config/tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module.exports = _.defaultsDeep(
},
},
subscriptions: {
deferCloseChannel: 100,
closeTimeout: 100,
},
},
redeliveries: {
Expand Down
2 changes: 1 addition & 1 deletion lib/config/validate.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ module.exports = _.curry((config, next) => {
'redeliveries',
'autoCreated',
'deprecated',
'deferCloseChannel',
'closeTimeout',
'encryption',
'promisifyAckOrNack',
]);
Expand Down
24 changes: 14 additions & 10 deletions test/broker.tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ describe(
});
});

it('should defer returning from unsubscribeAll until underlying channels have been closed', (test, done) => {
it('should not return from unsubscribeAll until underlying channels have been closed', (test, done) => {
const config = _.defaultsDeep(
{
vhosts,
Expand All @@ -255,23 +255,27 @@ describe(
testConfig
);

config.vhosts['/'].subscriptions.s1.deferCloseChannel = 200;
config.vhosts['/'].subscriptions.s1.closeTimeout = 200;

createBroker(config, (err, broker) => {
assert.ifError(err);

broker.subscribe('s1', (err, subscription) => {
assert.ifError(err);

// eslint-disable-next-line no-empty-function
subscription.on('message', () => {});

const before = Date.now();
broker.unsubscribeAll((err) => {
broker.publish('p1', 'test message', (err) => {
assert.ifError(err);
const after = Date.now();
assert.ok(after >= before + 200, 'Did not defer returning from unsubscibeAll');
done();
});

// eslint-disable-next-line no-empty-function
subscription.on('message', () => {
const before = Date.now();
broker.unsubscribeAll((err) => {
assert.strictEqual(err.code, 'ETIMEDOUT');
const after = Date.now();
assert.ok(after >= before + 200, 'Did not defer returning from unsubscibeAll');
done();
});
});
});
});
Expand Down
Loading

0 comments on commit 84108dc

Please sign in to comment.