Skip to content

Commit

Permalink
Merge pull request onebeyond#177 from guidesmiths/recovery-requeue
Browse files Browse the repository at this point in the history
Recovery requeue
  • Loading branch information
cressie176 authored Nov 7, 2021
2 parents a9261cc + 1a7abd2 commit 2a66524
Show file tree
Hide file tree
Showing 13 changed files with 349 additions and 225 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change Log

## 15.0.0

- Rather than waiting an arbitrary time for channels to close when cancelling a subscription, Rascal now waits until any outstanding messages have been acknowledged. By default, Rascal will wait indefinitely, but this behaviour can be overriden by specifying a subscription.closeTimeout. If the timeout is exceeded following a direct call to `broker.unsubscribeAll` or `subscription.cancel` then an error will be yielded. If the timeout is exceeded following an indirect call to `subscription.cancel` (e.g. by `broker.shutdown`) then an error will be emitted but the operation will be allowed to continue.

## 14.0.0

- Messages which cannot be recovered by the republish or forward strategies are nacked resulting in message loss unless a dead letter is configured.
Expand Down
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ Rascal extends the existing [RabbitMQ Concepts](https://www.rabbitmq.com/tutoria

A **publication** is a named configuration for publishing a message, including the destination queue or exchange, routing configuration, encryption profile and reliability guarantees, message options, etc. A **subscription** is a named configuration for consuming messages, including the source queue, encryption profile, content encoding, delivery options (e.g. acknowledgement handling and prefetch), etc. These must be [configured](#configuration) and supplied when creating the Rascal broker. After the broker has been created the subscriptions and publications can be retrivied from the broker and used to publish and consume messages.

### Breaking Changes in Rascal@15

Rascal@15 waits for inflight messages to be acknowledged before closing subscriber channels. Prior to this version Rascal just waited an arbitary amount of time. If you application does not acknowledge a message for some reason (quite likely in tests) calling `subscription.cancel`, `broker.unsubscribeAll`, `broker.bounce`, `broker.shutdown` or `broker.nuke` will wait indefinitely. You can specify a `closeTimeout` in your subscription config, however if this is exceeded the `subscription.cancel` and `broker.unsubscribeAll` methods will yield an error, while the `broker.bounce`, `broker.shutdown` and `broker.nuke` methods will emit an error, but attempt to continue. In both cases the error will have a code of `ETIMEDOUT` and message stating `Callback function "waitForUnacknowledgedMessages" timed out`.

### Special Note

RabbitMQ 3.8.0 introduced [quorum queues](https://www.rabbitmq.com/quorum-queues.html). Although quorum queues may not be suitable in all situations, they provide [poison message handling](https://www.rabbitmq.com/quorum-queues.html#poison-message-handling) without the need for an external [redelivery counter](https://github.com/guidesmiths/rascal#dealing-with-redeliveries) and offer better data safety in the event of a network partition. You can read more about them [here](https://www.cloudamqp.com/blog/reasons-you-should-switch-to-quorum-queues.html) and [here](https://blog.rabbitmq.com/posts/2020/06/quorum-queues-local-delivery).
Expand Down Expand Up @@ -120,7 +124,7 @@ There are three situations when Rascal will nack a message without requeue, lead

1. When it is unable to parse the message content and the subscriber has no 'invalid_content' listener
1. When the subscriber's (optional) redelivery limit has been exceeded and the subscriber has neither a 'redeliveries_error' nor a 'redeliveries_exceeded' listener
1. When attempting to recover by [republishing](#republishing) or [forwarding](#forwarding), but the recovery operation fails.
1. When attempting to recover by [republishing](#republishing), [forwarding](#forwarding), but the recovery operation fails.

The reason Rascal nacks the message is because the alternatives are to leave the message unacknowledged indefinitely, or to rollback and retry the message in an infinite tight loop. This can DDOS your application and cause problems for your infrastructure. Providing you have correctly configured dead letter queues and/or listen to the "invalid_content" and "redeliveries_exceeded" subscriber events, your messages should be safe.

Expand Down Expand Up @@ -1604,11 +1608,11 @@ try {
}
```
Cancelling a subscribion will stop consuming messages, but leave the channel open for a short while so your application can still ack/nack messages. By default the channel is left open for 10 seconds, but can be overridden through the `deferCloseChannel` subscription property.
Cancelling a subscribion will stop consuming messages, but leave the channel open until any outstanding messages have been acknowledged, or the timeout specified by through the `closeTimeout` subscription property is exceeded.
## Shutdown
You can shutdown the broker by calling `await broker.shutdown()` or `broker.shutdown(cb)`. Shutting down the broker will cancel all subscriptions, then wait a short amount of time for inflight messages to be acknowledged (configurable via the `deferCloseChannel` subscription property), before closing channels and disconnecting.
You can shutdown the broker by calling `await broker.shutdown()` or `broker.shutdown(cb)`.
## Bonus Features
Expand Down
59 changes: 24 additions & 35 deletions lib/amqp/Broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,6 @@ function Broker(config, components) {
vhosts[name].connect(next);
};

this.nuke = function (next) {
debug('Nuking broker');
self.unsubscribeAll((err) => {
if (err) return next(err);
async.eachSeries(
_.values(vhosts),
(vhost, callback) => {
nukeVhost(config, { vhost }, callback);
},
(err) => {
if (err) return next(err);
vhosts = publications = subscriptions = {};
clearInterval(self.keepActive);
debug('Finished nuking broker');
next();
}
);
});
};

this.purge = function (next) {
debug('Purging all queues in all vhosts');
async.eachSeries(
Expand All @@ -112,7 +92,7 @@ function Broker(config, components) {
(err) => {
if (err) return next(err);
self.unsubscribeAll((err) => {
if (err) return next(err);
if (err) self.emit('error', err);
async.eachSeries(
_.values(vhosts),
(vhost, callback) => {
Expand All @@ -133,7 +113,7 @@ function Broker(config, components) {
this.bounce = function (next) {
debug('Bouncing broker');
self.unsubscribeAll((err) => {
if (err) return next(err);
if (err) self.emit('error', err);
async.eachSeries(
_.values(vhosts),
(vhost, callback) => {
Expand All @@ -148,6 +128,26 @@ function Broker(config, components) {
});
};

this.nuke = function (next) {
debug('Nuking broker');
self.unsubscribeAll((err) => {
if (err) self.emit('error', err);
async.eachSeries(
_.values(vhosts),
(vhost, callback) => {
nukeVhost(config, { vhost }, callback);
},
(err) => {
if (err) return next(err);
vhosts = publications = subscriptions = {};
clearInterval(self.keepActive);
debug('Finished nuking broker');
next();
}
);
});
};

this.publish = function (name, message, overrides, next) {
if (arguments.length === 3) return self.publish(name, message, {}, arguments[2]);
if (_.isString(overrides)) return self.publish(name, message, { routingKey: overrides }, next);
Expand Down Expand Up @@ -191,18 +191,13 @@ function Broker(config, components) {
};

this.unsubscribeAll = function (next) {
const timeout = getMaxDeferCloseChannelTimeout();
async.eachSeries(
async.each(
sessions.slice(),
(session, cb) => {
sessions.shift();
session.cancel(cb);
},
(err) => {
if (err) return next(err);
debug('Waiting %dms for all subscriber channels to close', timeout);
setTimeoutUnref(next, timeout);
}
next
);
};

Expand All @@ -227,10 +222,4 @@ function Broker(config, components) {
this._addSubscription = function (subscription) {
subscriptions[subscription.name] = subscription;
};

function getMaxDeferCloseChannelTimeout() {
return sessions.reduce((value, session) => {
return session._maxDeferCloseChannel(value);
}, 0);
}
}
4 changes: 3 additions & 1 deletion lib/amqp/SubscriberError.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ module.exports = function SubscriptionRecovery(broker, vhost) {
{
name: 'unknown',
execute(session, message, err, strategyConfig, next) {
next(new Error(format('Error recovering message: %s. No such strategy: %s.', message.properties.messageId, strategyConfig.strategy)));
session._nack(message, () => {
next(new Error(format('Error recovering message: %s. No such strategy: %s.', message.properties.messageId, strategyConfig.strategy)));
});
},
},
],
Expand Down
72 changes: 38 additions & 34 deletions lib/amqp/SubscriberSession.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const debug = require('debug')('rascal:SubscriberSession');
const EventEmitter = require('events').EventEmitter;
const inherits = require('util').inherits;
const _ = require('lodash');
const async = require('async');
const setTimeoutUnref = require('../utils/setTimeoutUnref');

module.exports = SubscriberSession;
Expand All @@ -25,7 +26,7 @@ function SubscriberSession(sequentialChannelOperations, config) {
this._open = function (channel, consumerTag, next) {
if (cancelled) return next(new Error('Subscriber has been cancelled'));
debug('Opening subscriber session: %s on channel: %s', consumerTag, channel._rascal_id);
channels[consumerTag] = { index: index++, channel, consumerTag };
channels[consumerTag] = { index: index++, channel, consumerTag, unacknowledgedMessages: 0 };
channel.once('close', unref.bind(null, consumerTag));
channel.once('error', unref.bind(null, consumerTag));
next();
Expand All @@ -47,12 +48,18 @@ function SubscriberSession(sequentialChannelOperations, config) {

this._unsafeClose = function (next) {
withCurrentChannel(
(channel, consumerTag) => {
(channel, consumerTag, entry) => {
entry.doomed = true;
debug('Cancelling subscriber session: %s on channel: %s', consumerTag, channel._rascal_id);
channel.cancel(consumerTag, (err) => {
if (err) return next(err);
doom(consumerTag);
next();
const waitOrTimeout = config.closeTimeout ? async.timeout(waitForUnacknowledgedMessages, config.closeTimeout) : waitForUnacknowledgedMessages;
waitOrTimeout(entry, null, (err) => {
channel.close(() => {
debug('Channel: %s was closed', entry.channel._rascal_id);
next(err);
});
});
});
},
() => {
Expand All @@ -66,10 +73,6 @@ function SubscriberSession(sequentialChannelOperations, config) {
timeout = setTimeoutUnref(fn, delay);
};

this._maxDeferCloseChannel = function (other) {
return Math.max(config.deferCloseChannel, other);
};

this._getRascalChannelId = function () {
let rascalChannelId = null;
withCurrentChannel((channel) => {
Expand All @@ -78,12 +81,27 @@ function SubscriberSession(sequentialChannelOperations, config) {
return rascalChannelId;
};

this._incrementUnacknowledgeMessageCount = function (consumerTag) {
if (config.options.noAck) return;
withConsumerChannel(consumerTag, (channel, __, entry) => {
debug('Channel: %s has %s unacknowledged messages', channel._rascal_id, ++entry.unacknowledgedMessages);
});
};

this._decrementUnacknowledgeMessageCount = function (consumerTag) {
if (config.options.noAck) return;
withConsumerChannel(consumerTag, (channel, __, entry) => {
debug('Channel: %s has %s unacknowledged messages', channel._rascal_id, --entry.unacknowledgedMessages);
});
};

this._ack = function (message, next) {
withConsumerChannel(
message.fields.consumerTag,
(channel) => {
debug('Acknowledging message: %s on channel: %s', message.properties.messageId, channel._rascal_id);
channel.ack(message);
self._decrementUnacknowledgeMessageCount(message.fields.consumerTag);
setImmediate(next);
},
() => {
Expand All @@ -101,6 +119,7 @@ function SubscriberSession(sequentialChannelOperations, config) {
(channel) => {
debug('Not acknowledging message: %s with requeue: %s on channel: %s', message.properties.messageId, !!options.requeue, channel._rascal_id);
channel.nack(message, false, !!options.requeue);
self._decrementUnacknowledgeMessageCount(message.fields.consumerTag);
setImmediate(next);
},
() => {
Expand All @@ -114,9 +133,7 @@ function SubscriberSession(sequentialChannelOperations, config) {
function withCurrentChannel(fn, altFn) {
const entry = _.chain(channels)
.values()
.filter((channel) => {
return !channel.doomed;
})
.filter((entry) => !entry.doomed)
.sortBy('index')
.last()
.value();
Expand All @@ -137,28 +154,15 @@ function SubscriberSession(sequentialChannelOperations, config) {
});
}

function doom(consumerTag) {
withConsumerChannel(consumerTag, (channel, consumerTag, entry) => {
if (entry.doomed) return;
entry.doomed = true;
scheduleClose(entry);
});
}

/*
There may still be delivered messages that have yet to be ack or nacked
but no way of telling how many are outstanding since due to potentially
complicated recovery strategies, with timeouts etc.
Keeping channels around for a minute shouldn't hurt
*/
function scheduleClose(entry) {
debug('Deferring close channel: %s by %dms', entry.channel._rascal_id, config.deferCloseChannel);
setTimeoutUnref(() => {
withConsumerChannel(entry.consumerTag, (channel) => {
channel.close(() => {
debug('Channel: %s was closed', channel._rascal_id);
});
});
}, config.deferCloseChannel);
function waitForUnacknowledgedMessages(entry, previousCount, next) {
const currentCount = entry.unacknowledgedMessages;
if (currentCount > 0) {
if (currentCount !== previousCount) {
debug('Waiting for %d unacknowledged messages from channel: %s', currentCount, entry.channel._rascal_id);
}
setTimeoutUnref(() => waitForUnacknowledgedMessages(entry, currentCount, next), 100);
return;
}
next();
}
}
1 change: 1 addition & 0 deletions lib/amqp/Subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ function Subscription(broker, vhost, config, counter) {
if (!message) return handleConsumerCancel(session, config, removeErrorHandlers);

debug('Received message: %s from queue: %s', message.properties.messageId, config.queue);
session._incrementUnacknowledgeMessageCount(message.fields.consumerTag);

decorateWithRoutingHeaders(message);
if (immediateNack(message)) return ackOrNack(session, message, true);
Expand Down
6 changes: 5 additions & 1 deletion lib/amqp/Vhost.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ function Vhost(config) {
};

this.bounce = function (next) {
async.series([self.disconnect, self.init], next);
async.series([self.disconnect, self.init], (err) => {
if (err) return next(err);
debug('Finished bouncing vhost: %s', self.name);
setImmediate(next);
});
};

this.connect = function (next) {
Expand Down
1 change: 0 additions & 1 deletion lib/config/baseline.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ module.exports = {
timeout: 1000,
counter: 'stub',
},
deferCloseChannel: 10000,
options: {},
},
redeliveries: {
Expand Down
2 changes: 1 addition & 1 deletion lib/config/tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module.exports = _.defaultsDeep(
},
},
subscriptions: {
deferCloseChannel: 100,
closeTimeout: 500,
},
},
redeliveries: {
Expand Down
2 changes: 1 addition & 1 deletion lib/config/validate.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ module.exports = _.curry((config, next) => {
'redeliveries',
'autoCreated',
'deprecated',
'deferCloseChannel',
'closeTimeout',
'encryption',
'promisifyAckOrNack',
]);
Expand Down
Loading

0 comments on commit 2a66524

Please sign in to comment.