Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: allow setting a pull HTTP timeout #981

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
pubsub: allow setting a pull HTTP timeout
  • Loading branch information
stephenplusplus committed Dec 7, 2015
commit a78b77d6e377e0737f549d60259669a86cbab135
9 changes: 7 additions & 2 deletions lib/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -483,11 +483,16 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
* @param {string} name - The name of the subscription.
* @param {object=} options - Configuration object.
* @param {boolean} options.autoAck - Automatically acknowledge the message once
* it's pulled.
* it's pulled. (default: false)
* @param {string} options.encoding - When pulling for messages, this type is
* used when converting a message's data to a string. (default: 'utf-8')
* @param {number} options.interval - Interval in milliseconds to check for new
* messages.
* messages. (default: 10)
* @param {number} options.maxInProgress - Maximum messages to consume

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

* simultaneously.
* @param {number} options.timeout - Set a maximum amount of time in
* milliseconds on an HTTP request to pull new messages to wait for a
* response before the connection is broken.
* @return {module:pubsub/subscription}
*
* @example
Expand Down
38 changes: 28 additions & 10 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,18 @@ var util = require('../common/util.js');
*
* @param {module:pubsub} pubsub - PubSub object.
* @param {object} options - Configuration object.
* @param {boolean} options.autoAck - Automatically acknowledge the message
* once it's pulled. (default: false)
* @param {boolean} options.autoAck - Automatically acknowledge the message once
* it's pulled. (default: false)
* @param {string} options.encoding - When pulling for messages, this type is
* used when converting a message's data to a string. (default: 'utf-8')
* @param {number} options.interval - Interval in milliseconds to check for new
* messages. (default: 10)
* @param {string} options.name - Name of the subscription.
* @param {number} options.maxInProgress - Maximum messages to consume
* simultaneously.
* @param {number} options.timeout - Set a maximum amount of time in
* milliseconds on an HTTP request to pull new messages to wait for a
* response before the connection is broken. (default: 90000)
*/
/**
* A Subscription object will give you access to your Google Cloud Pub/Sub
Expand Down Expand Up @@ -269,13 +272,23 @@ function Subscription(pubsub, options) {

this.autoAck = is.boolean(options.autoAck) ? options.autoAck : false;
this.closed = true;
this.interval = is.number(options.interval) ? options.interval : 10;
this.encoding = options.encoding || 'utf-8';
this.inProgressAckIds = {};
this.interval = is.number(options.interval) ? options.interval : 10;
this.maxInProgress =
is.number(options.maxInProgress) ? options.maxInProgress : Infinity;
this.messageListeners = 0;
this.paused = false;
this.encoding = options.encoding || 'utf-8';

if (is.number(options.timeout)) {
this.timeout = options.timeout;
} else {
// The default timeout used in gcloud-node is 60s, but a pull request times
// out around 90 seconds. Allow an extra couple of seconds to give the API a
// chance to respond on its own before terminating the connection.
var PUBSUB_API_TIMEOUT = 90000;
this.timeout = PUBSUB_API_TIMEOUT + 2000;
}

this.listenForEvents_();
}
Expand Down Expand Up @@ -447,10 +460,10 @@ Subscription.prototype.delete = function(callback) {
* @resource [Subscriptions: pull API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/pull}
*
* @param {object=} options - Configuration object.
* @param {number} options.maxResults - Limit the amount of messages pulled.
* @param {boolean} options.returnImmediately - If set, the system will respond
* immediately. Otherwise, wait until new messages are available. Returns if
* timeout is reached.
* @param {number} options.maxResults - Limit the amount of messages pulled.
* @param {function} callback - The callback function.
*
* @example
Expand Down Expand Up @@ -496,9 +509,7 @@ Subscription.prototype.pull = function(options, callback) {
}

this.request({
// The default timeout set used in this library is 60s, but a pull request
// times out around 90 seconds.
timeout: 90000,
timeout: this.timeout,
method: 'POST',
uri: ':pull',
json: {
Expand All @@ -507,8 +518,15 @@ Subscription.prototype.pull = function(options, callback) {
}
}, function(err, response) {
if (err) {
callback(err, null, response);
return;
if (err.code === 'ETIMEDOUT' && !err.connect) {
// Simulate a server timeout where no messages were received.
response = {
receivedMessages: []
};
} else {
callback(err, null, response);
return;
}
}

var messages = arrify(response.receivedMessages)
Expand Down
43 changes: 40 additions & 3 deletions test/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,15 @@ describe('Subscription', function() {
autoAck: true,
interval: 100,
maxInProgress: 3,
encoding: 'binary'
encoding: 'binary',
timeout: 30000
};
var sub = new Subscription(PUBSUB, CONFIG);
assert.strictEqual(sub.autoAck, CONFIG.autoAck);
assert.strictEqual(sub.interval, CONFIG.interval);
assert.strictEqual(sub.maxInProgress, 3);
assert.strictEqual(sub.encoding, CONFIG.encoding);
assert.strictEqual(sub.maxInProgress, CONFIG.maxInProgress);
assert.strictEqual(sub.timeout, CONFIG.timeout);
});

it('should be closed', function() {
Expand Down Expand Up @@ -157,6 +159,10 @@ describe('Subscription', function() {
assert.strictEqual(subscription.encoding, 'utf-8');
});

it('should default timeout to 92 seconds', function() {
assert.strictEqual(subscription.timeout, 92000);
});

it('should create an iam object', function() {
assert.deepEqual(subscription.iam.calledWith_, [
PUBSUB,
Expand Down Expand Up @@ -465,7 +471,7 @@ describe('Subscription', function() {
it('should make correct api request', function(done) {
subscription.request = function(reqOpts) {
assert.strictEqual(reqOpts.method, 'POST');
assert.strictEqual(reqOpts.timeout, 90000);
assert.strictEqual(reqOpts.timeout, 92000);
assert.strictEqual(reqOpts.uri, ':pull');
assert.strictEqual(reqOpts.json.returnImmediately, false);
assert.strictEqual(reqOpts.json.maxMessages, 1);
Expand All @@ -475,6 +481,22 @@ describe('Subscription', function() {
subscription.pull({ maxResults: 1 }, assert.ifError);
});

it('should pass a timeout if specified', function(done) {
var timeout = 30000;

var subscription = new Subscription(PUBSUB, {
name: SUB_NAME,
timeout: timeout
});

subscription.request = function(reqOpts) {
assert.strictEqual(reqOpts.timeout, 30000);
done();
};

subscription.pull(assert.ifError);
});

it('should pass error to callback', function(done) {
var error = new Error('Error.');
subscription.request = function(reqOpts, callback) {
Expand All @@ -486,6 +508,21 @@ describe('Subscription', function() {
});
});

it('should not return messages if request timed out', function(done) {
subscription.request = function(reqOpts, callback) {
var error = new Error();
error.code = 'ETIMEDOUT';
error.connect = false;
callback(error);
};

subscription.pull({}, function(err, messages) {
assert.ifError(err);
assert.deepEqual(messages, []);
done();
});
});

it('should call formatMessage_ with encoding', function(done) {
subscription.encoding = 'encoding-value';

Expand Down