Skip to content

Commit

Permalink
pubsub: support generated subscription names (googleapis#1799)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus authored Nov 28, 2016
1 parent 6395a4c commit 2a67398
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 73 deletions.
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,7 @@ var topic = pubsubClient.topic('my-topic');
topic.publish('New message!', function(err) {});

// Subscribe to the topic.
var options = {
reuseExisting: true
};

topic.subscribe('subscription-name', options, function(err, subscription) {
topic.subscribe('subscription-name', function(err, subscription) {
// Register listeners to start pulling for messages.
function onError(err) {}
function onMessage(message) {}
Expand Down
6 changes: 1 addition & 5 deletions packages/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ var topic = pubsub.topic('my-topic');
topic.publish('New message!', function(err) {});

// Subscribe to the topic.
var options = {
reuseExisting: true
};

topic.subscribe('subscription-name', options, function(err, subscription) {
topic.subscribe('subscription-name', function(err, subscription) {
// Register listeners to start pulling for messages.
function onError(err) {}
function onMessage(message) {}
Expand Down
2 changes: 1 addition & 1 deletion packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@
"google-proto-files": "^0.8.0",
"is": "^3.0.1",
"modelo": "^4.2.0",
"node-uuid": "^1.4.3",
"propprop": "^0.3.0"
},
"devDependencies": {
"async": "^1.5.2",
"mocha": "^3.0.1",
"node-uuid": "^1.4.3",
"proxyquire": "^1.7.10"
},
"scripts": {
Expand Down
52 changes: 34 additions & 18 deletions packages/pubsub/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,17 @@ PubSub.prototype.getTopicsStream = common.paginator.streamify('getTopics');
/**
* Create a subscription to a topic.
*
* All generated subscription names share a common prefix, `autogenerated-`.
*
* @resource [Subscriptions: create API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/create}
*
* @throws {Error} If a Topic instance or topic name is not provided.
* @throws {Error} If a subName is not provided.
*
* @param {module:pubsub/topic|string} topic - The Topic to create a
* subscription to.
* @param {string} subName - The name of the subscription.
* @param {string=} subName - The name of the subscription. If a name is not
* provided, a random subscription name will be generated and created.
* @param {object=} options - See a
* [Subscription resource](https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions)
* @param {number} options.ackDeadlineSeconds - The maximum time after receiving
Expand All @@ -431,10 +434,6 @@ PubSub.prototype.getTopicsStream = common.paginator.streamify('getTopics');
* simultaneously.
* @param {string} options.pushEndpoint - A URL to a custom endpoint that
* messages should be pushed to.
* @param {boolean} options.reuseExisting - If the subscription already exists,
* reuse it. The options of the existing subscription are not changed. If
* false, attempting to create a subscription that already exists will fail.
* (default: false)
* @param {number} options.timeout - Set a maximum amount of time in
* milliseconds on an HTTP request to pull new messages to wait for a
* response before the connection is broken.
Expand All @@ -453,6 +452,14 @@ PubSub.prototype.getTopicsStream = common.paginator.streamify('getTopics');
* pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {});
*
* //-
* // Omit the name to have one generated automatically. All generated names
* // share a common prefix, `autogenerated-`.
* //-
* pubsub.subscribe(topic, function(err, subscription, apiResponse) {
* // subscription.name = The generated name.
* });
*
* //-
* // Customize the subscription.
* //-
* pubsub.subscribe(topic, name, {
Expand All @@ -474,21 +481,28 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
throw new Error('A Topic is required for a new subscription.');
}

if (!is.string(subName)) {
throw new Error('A subscription name is required for a new subscription.');
if (is.string(topic)) {
topic = this.topic(topic);
}

if (!callback) {
if (is.object(subName)) {
callback = options;
options = {};
options = subName;
subName = '';
}

options = options || {};
if (is.fn(subName)) {
callback = subName;
subName = '';
}

if (is.string(topic)) {
topic = this.topic(topic);
if (is.fn(options)) {
callback = options;
options = {};
}

options = options || {};

var subscription = this.subscription(subName, options);

var protoOpts = {
Expand All @@ -513,11 +527,10 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
delete reqOpts.interval;
delete reqOpts.maxInProgress;
delete reqOpts.pushEndpoint;
delete reqOpts.reuseExisting;
delete reqOpts.timeout;

this.request(protoOpts, reqOpts, function(err, resp) {
if (err && !(err.code === 409 && options.reuseExisting)) {
if (err && err.code !== 409) {
callback(err, null, resp);
return;
}
Expand All @@ -531,9 +544,10 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
* requests. You will receive a {module:pubsub/subscription} object,
* which will allow you to interact with a subscription.
*
* @throws {Error} If a name is not provided.
* All generated names share a common prefix, `autogenerated-`.
*
* @param {string} name - The name of the subscription.
* @param {string=} name - The name of the subscription. If a name is not
* provided, a random subscription name will be generated.
* @param {object=} options - Configuration object.
* @param {boolean} options.autoAck - Automatically acknowledge the message once
* it's pulled. (default: false)
Expand All @@ -560,12 +574,14 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
* });
*/
PubSub.prototype.subscription = function(name, options) {
if (!name) {
throw new Error('The name of a subscription is required.');
if (is.object(name)) {
options = name;
name = undefined;
}

options = options || {};
options.name = name;

return new Subscription(this, options);
};

Expand Down
14 changes: 13 additions & 1 deletion packages/pubsub/src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var events = require('events');
var is = require('is');
var modelo = require('modelo');
var prop = require('propprop');
var uuid = require('node-uuid');

/**
* @type {module:pubsub/iam}
Expand Down Expand Up @@ -140,7 +141,9 @@ var PUBSUB_API_TIMEOUT = 90000;
* subscription.removeListener('message', onMessage);
*/
function Subscription(pubsub, options) {
this.name = Subscription.formatName_(pubsub.projectId, options.name);
var name = options.name || Subscription.generateName_();

this.name = Subscription.formatName_(pubsub.projectId, name);

var methods = {
/**
Expand Down Expand Up @@ -374,6 +377,15 @@ Subscription.formatName_ = function(projectId, name) {
return 'projects/' + projectId + '/subscriptions/' + name;
};

/**
* Generate a random name to use for a name-less subscription.
*
* @private
*/
Subscription.generateName_ = function() {
return 'autogenerated-' + uuid.v4();
};

/**
* Acknowledge to the backend that the message was retrieved. You must provide
* either a single ackId or an array of ackIds.
Expand Down
17 changes: 12 additions & 5 deletions packages/pubsub/src/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,12 @@ Topic.prototype.publish = function(messages, options, callback) {
/**
* Create a subscription to this topic.
*
* All generated subscription names share a common prefix, `autogenerated-`.
*
* @resource [Subscriptions: create API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/create}
*
* @param {string} subName - The name of the subscription.
* @param {string=} subName - The name of the subscription. If a name is not
* provided, a random subscription name will be generated and created.
* @param {object=} options - See a
* [Subscription resource](https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions)
* @param {number} options.ackDeadlineSeconds - The maximum time after
Expand All @@ -494,10 +497,6 @@ Topic.prototype.publish = function(messages, options, callback) {
* simultaneously.
* @param {string} options.pushEndpoint - A URL to a custom endpoint that
* messages should be pushed to.
* @param {boolean=} options.reuseExisting - If the subscription already exists,
* reuse it. The options of the existing subscription are not changed. If
* false, attempting to create a subscription that already exists will fail.
* (default: false)
* @param {number} options.timeout - Set a maximum amount of time in
* milliseconds on an HTTP request to pull new messages to wait for a
* response before the connection is broken.
Expand All @@ -507,6 +506,14 @@ Topic.prototype.publish = function(messages, options, callback) {
* // Without specifying any options.
* topic.subscribe('newMessages', function(err, subscription, apiResponse) {});
*
* //-
* // Omit the name to have one generated automatically. All generated names
* // share a common prefix, `autogenerated-`.
* //-
* topic.subscribe(function(err, subscription, apiResponse) {
* // subscription.name = The generated name.
* });
*
* // With options.
* topic.subscribe('newMessages', {
* ackDeadlineSeconds: 90,
Expand Down
9 changes: 8 additions & 1 deletion packages/pubsub/system-test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,15 @@ describe('pubsub', function() {
});
});

it('should create a subscription with a generated name', function(done) {
topic.subscribe(function(err, sub) {
assert.ifError(err);
sub.delete(done);
});
});

it('should re-use an existing subscription', function(done) {
pubsub.subscribe(topic, SUB_NAMES[0], { reuseExisting: true }, done);
pubsub.subscribe(topic, SUB_NAMES[0], done);
});

it('should error when using a non-existent subscription', function(done) {
Expand Down
Loading

0 comments on commit 2a67398

Please sign in to comment.