diff --git a/pubsub/README.md b/pubsub/README.md index dbec3848d3..abe3140dbd 100644 --- a/pubsub/README.md +++ b/pubsub/README.md @@ -82,7 +82,7 @@ Commands: create-push Creates a new push subscription. delete Deletes a subscription. get Gets the metadata for a subscription. - pull Pulls messages for a subscription. + listen Listens to messages for a subscription. get-policy Gets the IAM policy for a subscription. set-policy Sets the IAM policy for a subscription. test-permissions Tests the permissions for a subscription. diff --git a/pubsub/package.json b/pubsub/package.json index b26afb79f1..011cf3f3c8 100644 --- a/pubsub/package.json +++ b/pubsub/package.json @@ -17,14 +17,14 @@ "test": "samples test run --cmd ava -- -T 30s --verbose system-test/*.test.js" }, "dependencies": { - "@google-cloud/pubsub": "0.13.2", + "@google-cloud/pubsub": "0.14.0", "yargs": "8.0.2" }, "devDependencies": { "@google-cloud/nodejs-repo-tools": "1.4.17", - "ava": "0.21.0", + "ava": "0.22.0", "proxyquire": "1.8.0", - "sinon": "3.2.0" + "sinon": "3.2.1" }, "cloud-repo-tools": { "requiresKeyFile": true, diff --git a/pubsub/subscriptions.js b/pubsub/subscriptions.js index 813dd013ea..b9d8b93438 100644 --- a/pubsub/subscriptions.js +++ b/pubsub/subscriptions.js @@ -73,7 +73,7 @@ function createSubscription (topicName, subscriptionName) { const topic = pubsub.topic(topicName); // Creates a new subscription, e.g. "my-new-subscription" - return topic.subscribe(subscriptionName) + return topic.createSubscription(subscriptionName) .then((results) => { const subscription = results[0]; @@ -101,7 +101,7 @@ function createPushSubscription (topicName, subscriptionName) { }; // Creates a new push subscription, e.g. "my-new-subscription" - return topic.subscribe(subscriptionName, options) + return topic.createSubscription(subscriptionName, options) .then((results) => { const subscription = results[0]; @@ -151,32 +151,34 @@ function getSubscription (subscriptionName) { } // [END pubsub_get_subscription] -// [START pubsub_pull_messages] -function pullMessages (subscriptionName) { +// [START pubsub_listen_messages] +function listenForMessages (subscriptionName, timeout) { // Instantiates a client const pubsub = PubSub(); // References an existing subscription, e.g. "my-subscription" const subscription = pubsub.subscription(subscriptionName); - // Pulls messages. Set returnImmediately to false to block until messages are - // received. - return subscription.pull() - .then((results) => { - const messages = results[0]; - - console.log(`Received ${messages.length} messages.`); + // Create an event handler to handle messages + let messageCount = 0; + const messageHandler = (message) => { + console.log(`Received message ${message.id}:`); + console.log(`\tData: ${message.data}`); + console.log(`\tAttributes: ${message.attributes}`); + messageCount += 1; - messages.forEach((message) => { - console.log(`* %d %j %j`, message.id, message.data, message.attributes); - }); + // "Ack" (acknowledge receipt of) the message + message.ack(); + }; - // Acknowledges received messages. If you do not acknowledge, Pub/Sub will - // redeliver the message. - return subscription.ack(messages.map((message) => message.ackId)); - }); + // Listen for new messages until timeout is hit + subscription.on(`message`, messageHandler); + setTimeout(() => { + subscription.removeListener('message', messageHandler); + console.log(`${messageCount} message(s) received.`); + }, timeout * 1000); } -// [END pubsub_pull_messages] +// [END pubsub_listen_messages] let subscribeCounterValue = 1; @@ -188,54 +190,61 @@ function setSubscribeCounterValue (value) { subscribeCounterValue = value; } -// [START pubsub_pull_ordered_messages] +// [START pubsub_listen_ordered_messages] const outstandingMessages = {}; -function pullOrderedMessages (subscriptionName) { +function listenForOrderedMessages (subscriptionName, timeout) { // Instantiates a client const pubsub = PubSub(); // References an existing subscription, e.g. "my-subscription" const subscription = pubsub.subscription(subscriptionName); - // Pulls messages. Set returnImmediately to false to block until messages are - // received. - return subscription.pull() - .then((results) => { - const messages = results[0]; - - // Pub/Sub messages are unordered, so here we manually order messages by - // their "counterId" attribute which was set when they were published. - messages.forEach((message) => { - outstandingMessages[message.attributes.counterId] = message; - }); - - const outstandingIds = Object.keys(outstandingMessages).map((counterId) => parseInt(counterId, 10)); - outstandingIds.sort(); - - outstandingIds.forEach((counterId) => { - const counter = getSubscribeCounterValue(); - const message = outstandingMessages[counterId]; - - if (counterId < counter) { - // The message has already been processed - subscription.ack(message.ackId); - delete outstandingMessages[counterId]; - } else if (counterId === counter) { - // Process the message - console.log(`* %d %j %j`, message.id, message.data, message.attributes); - - setSubscribeCounterValue(counterId + 1); - subscription.ack(message.ackId); - delete outstandingMessages[counterId]; - } else { - // Have not yet processed the message on which this message is dependent - return false; - } - }); + // Create an event handler to handle messages + const messageHandler = function (message) { + // Buffer the message in an object (for later ordering) + outstandingMessages[message.attributes.counterId] = message; + + // "Ack" (acknowledge receipt of) the message + message.ack(); + }; + + // Listen for new messages until timeout is hit + return new Promise((resolve) => { + subscription.on(`message`, messageHandler); + setTimeout(() => { + subscription.removeListener(`message`, messageHandler); + resolve(); + }, timeout * 1000); + }) + .then(() => { + // Pub/Sub messages are unordered, so here we manually order messages by + // their "counterId" attribute which was set when they were published. + const outstandingIds = Object.keys(outstandingMessages).map((counterId) => parseInt(counterId, 10)); + outstandingIds.sort(); + + outstandingIds.forEach((counterId) => { + const counter = getSubscribeCounterValue(); + const message = outstandingMessages[counterId]; + + if (counterId < counter) { + // The message has already been processed + message.ack(); + delete outstandingMessages[counterId]; + } else if (counterId === counter) { + // Process the message + console.log(`* %d %j %j`, message.id, message.data.toString(), message.attributes); + setSubscribeCounterValue(counterId + 1); + message.ack(); + delete outstandingMessages[counterId]; + } else { + // Have not yet processed the message on which this message is dependent + return false; + } }); + }); } -// [END pubsub_pull_ordered_messages] +// [END pubsub_listen_ordered_messages] // [START pubsub_get_subscription_policy] function getSubscriptionPolicy (subscriptionName) { @@ -318,7 +327,7 @@ function testSubscriptionPermissions (subscriptionName) { } // [END pubsub_test_subscription_permissions] -module.exports = { pullOrderedMessages }; +module.exports = { listenForOrderedMessages }; const cli = require(`yargs`) .demand(1) @@ -359,10 +368,16 @@ const cli = require(`yargs`) (opts) => getSubscription(opts.subscriptionName) ) .command( - `pull `, - `Pulls messages for a subscription.`, - {}, - (opts) => pullMessages(opts.subscriptionName) + `listen `, + `Listens to messages for a subscription.`, + { + timeout: { + alias: 't', + type: 'number', + default: 10 + } + }, + (opts) => listenForMessages(opts.subscriptionName, opts.timeout) ) .command( `get-policy `, diff --git a/pubsub/system-test/subscriptions.test.js b/pubsub/system-test/subscriptions.test.js index 0af1115c0b..bcd563badc 100644 --- a/pubsub/system-test/subscriptions.test.js +++ b/pubsub/system-test/subscriptions.test.js @@ -63,17 +63,22 @@ test.beforeEach(tools.stubConsole); test.afterEach.always(tools.restoreConsole); test.serial(`should create a subscription`, async (t) => { + t.plan(1); const output = await tools.runAsync(`${cmd} create ${topicNameOne} ${subscriptionNameOne}`, cwd); t.is(output, `Subscription ${fullSubscriptionNameOne} created.`); - const results = await pubsub.subscription(subscriptionNameOne).exists(); - t.true(results[0]); + await tools.tryTest(async (assert) => { + const [subscriptions] = await pubsub.topic(topicNameOne).getSubscriptions(); + assert.equal(subscriptions[0].name, fullSubscriptionNameOne); + }).start(); }); test.serial(`should create a push subscription`, async (t) => { const output = await tools.runAsync(`${cmd} create-push ${topicNameOne} ${subscriptionNameTwo}`, cwd); t.is(output, `Subscription ${fullSubscriptionNameTwo} created.`); - const results = await pubsub.subscription(subscriptionNameTwo).exists(); - t.true(results[0]); + await tools.tryTest(async (assert) => { + const [subscriptions] = await pubsub.topic(topicNameOne).getSubscriptions(); + assert(subscriptions.some((s) => s.name === fullSubscriptionNameTwo)); + }).start(); }); test.serial(`should get metadata for a subscription`, async (t) => { @@ -86,52 +91,60 @@ test.serial(`should get metadata for a subscription`, async (t) => { }); test.serial(`should list all subscriptions`, async (t) => { - await tools.tryTest(async () => { + t.plan(0); + await tools.tryTest(async (assert) => { const output = await tools.runAsync(`${cmd} list`, cwd); - t.true(output.includes(`Subscriptions:`)); - t.true(output.includes(fullSubscriptionNameOne)); - t.true(output.includes(fullSubscriptionNameTwo)); + assert(output.includes(`Subscriptions:`)); + assert(output.includes(fullSubscriptionNameOne)); + assert(output.includes(fullSubscriptionNameTwo)); }).start(); }); test.serial(`should list subscriptions for a topic`, async (t) => { - const output = await tools.runAsync(`${cmd} list ${topicNameOne}`, cwd); - t.true(output.includes(`Subscriptions for ${topicNameOne}:`)); - t.true(output.includes(fullSubscriptionNameOne)); - t.true(output.includes(fullSubscriptionNameTwo)); + t.plan(0); + await tools.tryTest(async (assert) => { + const output = await tools.runAsync(`${cmd} list ${topicNameOne}`, cwd); + assert(output.includes(`Subscriptions for ${topicNameOne}:`)); + assert(output.includes(fullSubscriptionNameOne)); + assert(output.includes(fullSubscriptionNameTwo)); + }).start(); }); -test.serial(`should pull messages`, async (t) => { +test.serial(`should listen for messages`, async (t) => { const expected = `Hello, world!`; - const results = await pubsub.topic(topicNameOne).publish(expected); - const messageIds = results[0]; - const expectedOutput = `Received ${messageIds.length} messages.\n* ${messageIds[0]} "${expected}" {}`; - const output = await tools.runAsync(`${cmd} pull ${subscriptionNameOne}`, cwd); - t.is(output, expectedOutput); + const messageIds = await pubsub.topic(topicNameOne).publisher().publish(Buffer.from(expected)); + const output = await tools.runAsync(`${cmd} listen ${subscriptionNameOne}`, cwd); + t.true(output.includes(`Received message ${messageIds[0]}:`)); }); -test.serial(`should pull ordered messages`, async (t) => { +test.serial(`should listen for ordered messages`, async (t) => { + const timeout = 5; const subscriptions = require('../subscriptions'); const expected = `Hello, world!`; + const expectedBuffer = Buffer.from(expected); const publishedMessageIds = []; - await pubsub.topic(topicNameTwo).subscribe(subscriptionNameThree); - let results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '3' } }, { raw: true }); - publishedMessageIds.push(results[0][0]); - await subscriptions.pullOrderedMessages(subscriptionNameThree); + const publisherTwo = pubsub.topic(topicNameTwo).publisher(); + + await pubsub.topic(topicNameTwo).createSubscription(subscriptionNameThree); + let [result] = await publisherTwo.publish(expectedBuffer, { counterId: '3' }); + publishedMessageIds.push(result); + await subscriptions.listenForOrderedMessages(subscriptionNameThree, timeout); t.is(console.log.callCount, 0); - results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '1' } }, { raw: true }); - publishedMessageIds.push(results[0][0]); - await subscriptions.pullOrderedMessages(subscriptionNameThree); + + [result] = await publisherTwo.publish(expectedBuffer, { counterId: '1' }); + publishedMessageIds.push(result); + await subscriptions.listenForOrderedMessages(subscriptionNameThree, timeout); t.is(console.log.callCount, 1); t.deepEqual(console.log.firstCall.args, [`* %d %j %j`, publishedMessageIds[1], expected, { counterId: '1' }]); - results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '1' } }, { raw: true }); - results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '2' } }, { raw: true }); - publishedMessageIds.push(results[0][0]); - await tools.tryTest(async () => { - await subscriptions.pullOrderedMessages(subscriptionNameThree); - t.is(console.log.callCount, 3); - t.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]); - t.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]); + + [result] = await publisherTwo.publish(expectedBuffer, { counterId: '1' }); + [result] = await publisherTwo.publish(expectedBuffer, { counterId: '2' }); + publishedMessageIds.push(result); + await tools.tryTest(async (assert) => { + await subscriptions.listenForOrderedMessages(subscriptionNameThree, timeout); + assert.equal(console.log.callCount, 3); + assert.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]); + assert.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]); }); }); @@ -163,8 +176,12 @@ test.serial(`should test permissions for a subscription`, async (t) => { }); test.serial(`should delete a subscription`, async (t) => { + t.plan(1); const output = await tools.runAsync(`${cmd} delete ${subscriptionNameOne}`, cwd); t.is(output, `Subscription ${fullSubscriptionNameOne} deleted.`); - const results = await pubsub.subscription(subscriptionNameOne).exists(); - t.false(results[0], false); + await tools.tryTest(async (assert) => { + const [subscriptions] = await pubsub.getSubscriptions(); + assert.ok(subscriptions); + assert(subscriptions.every((s) => s.name !== fullSubscriptionNameOne)); + }).start(); }); diff --git a/pubsub/system-test/topics.test.js b/pubsub/system-test/topics.test.js index 3dcb26d958..03537df661 100644 --- a/pubsub/system-test/topics.test.js +++ b/pubsub/system-test/topics.test.js @@ -28,7 +28,7 @@ const subscriptionNameOne = `nodejs-docs-samples-test-${uuid.v4()}`; const subscriptionNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`; const projectId = process.env.GCLOUD_PROJECT; const fullTopicNameOne = `projects/${projectId}/topics/${topicNameOne}`; -const message = { data: `Hello, world!` }; +const expectedMessage = { data: `Hello, world!` }; const cmd = `node topics.js`; test.before(tools.checkCredentials); @@ -53,11 +53,40 @@ test.after.always(async () => { } catch (err) {} // ignore error }); +// Helper function to pull one message +const _pullOneMessage = (subscriptionObj, timeout) => { + timeout = timeout || 10000; // 10 second timeout by default + + let message; + return new Promise((resolve, reject) => { + // First message received; ack it + resolve promise + const messageHandler = (received) => { + received.ack(); + message = received; + return resolve(messageHandler); + }; + + // Listen for new messages + subscriptionObj.on(`message`, messageHandler); + + // Timeout appropriately + setTimeout(() => { + return reject(new Error(`_pullOneMessage timed out`)); + }, timeout); + }).then((messageHandler) => { + subscriptionObj.removeListener('message', messageHandler); + return Promise.resolve(message); + }); +}; + test.serial(`should create a topic`, async (t) => { + t.plan(1); const output = await tools.runAsync(`${cmd} create ${topicNameOne}`, cwd); t.is(output, `Topic ${fullTopicNameOne} created.`); - const [exists] = await pubsub.topic(topicNameOne).exists(); - t.true(exists); + await tools.tryTest(async (assert) => { + const [topics] = await pubsub.getTopics(); + assert(topics.some((s) => s.name === fullTopicNameOne)); + }).start(); }); test.serial(`should list topics`, async (t) => { @@ -69,40 +98,43 @@ test.serial(`should list topics`, async (t) => { }); test.serial(`should publish a simple message`, async (t) => { - const [subscription] = await pubsub.topic(topicNameOne).subscribe(subscriptionNameOne); - await tools.runAsync(`${cmd} publish ${topicNameOne} "${message.data}"`, cwd); - const [messages] = await subscription.pull(); - t.is(messages[0].data, message.data); + t.plan(1); + const [subscription] = await pubsub.topic(topicNameOne).createSubscription(subscriptionNameOne); + await tools.runAsync(`${cmd} publish ${topicNameOne} "${expectedMessage.data}"`, cwd); + const receivedMessage = await _pullOneMessage(subscription); + t.is(receivedMessage.data.toString(), expectedMessage.data); }); test.serial(`should publish a JSON message`, async (t) => { - const [subscription] = await pubsub.topic(topicNameOne).subscribe(subscriptionNameOne); - await tools.runAsync(`${cmd} publish ${topicNameOne} '${JSON.stringify(message)}'`, cwd); - const [messages] = await subscription.pull(); - t.deepEqual(messages[0].data, message); + const [subscription] = await pubsub.topic(topicNameOne).createSubscription(subscriptionNameOne); + await tools.runAsync(`${cmd} publish ${topicNameOne} '${JSON.stringify(expectedMessage)}'`, cwd); + const receivedMessage = await _pullOneMessage(subscription); + t.deepEqual(JSON.parse(receivedMessage.data.toString()), expectedMessage); }); test.serial(`should publish ordered messages`, async (t) => { const topics = require(`../topics`); - const [subscription] = await pubsub.topic(topicNameTwo).subscribe(subscriptionNameTwo); - let messageIds = await topics.publishOrderedMessage(topicNameTwo, message.data); - let [messages] = await subscription.pull(); - t.is(messages[0].id, messageIds[0]); - t.is(messages[0].data, message.data); - t.is(messages[0].attributes.counterId, '1'); - messageIds = await topics.publishOrderedMessage(topicNameTwo, message.data); - [messages] = await subscription.pull(); - t.is(messages[0].id, messageIds[0]); - t.is(messages[0].data, message.data); - t.is(messages[0].attributes.counterId, '2'); - await topics.publishOrderedMessage(topicNameTwo, message.data); + const [subscription] = await pubsub.topic(topicNameTwo).createSubscription(subscriptionNameTwo); + + let messageId = await topics.publishOrderedMessage(topicNameTwo, expectedMessage.data); + let message = await _pullOneMessage(subscription); + t.is(message.id, messageId); + t.is(message.data.toString(), expectedMessage.data); + t.is(message.attributes.counterId, '1'); + + messageId = await topics.publishOrderedMessage(topicNameTwo, expectedMessage.data); + message = await _pullOneMessage(subscription); + t.is(message.id, messageId); + t.is(message.data.toString(), expectedMessage.data); + t.is(message.attributes.counterId, '2'); + await topics.publishOrderedMessage(topicNameTwo, expectedMessage.data); }); test.serial(`should set the IAM policy for a topic`, async (t) => { await tools.runAsync(`${cmd} set-policy ${topicNameOne}`, cwd); const results = await pubsub.topic(topicNameOne).iam.getPolicy(); - const policy = results[0]; + const [policy] = results; t.deepEqual(policy.bindings, [ { role: `roles/pubsub.editor`, @@ -127,8 +159,11 @@ test.serial(`should test permissions for a topic`, async (t) => { }); test.serial(`should delete a topic`, async (t) => { + t.plan(1); const output = await tools.runAsync(`${cmd} delete ${topicNameOne}`, cwd); t.is(output, `Topic ${fullTopicNameOne} deleted.`); - const [exists] = await pubsub.topic(topicNameOne).exists(); - t.false(exists); + await tools.tryTest(async (assert) => { + const [topics] = await pubsub.getTopics(); + assert(topics.every((s) => s.name !== fullTopicNameOne)); + }).start(); }); diff --git a/pubsub/topics.js b/pubsub/topics.js index a6b1ed120a..0d7f06c275 100644 --- a/pubsub/topics.js +++ b/pubsub/topics.js @@ -84,14 +84,18 @@ function publishMessage (topicName, data) { // References an existing topic, e.g. "my-topic" const topic = pubsub.topic(topicName); - // Publishes the message, e.g. "Hello, world!" or { amount: 599.00, status: 'pending' } - return topic.publish(data) + // Create a publisher for the topic (which can include additional batching configuration) + const publisher = topic.publisher(); + + // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) + const dataBuffer = Buffer.from(data); + return publisher.publish(dataBuffer) .then((results) => { - const messageIds = results[0]; + const messageId = results[0]; - console.log(`Message ${messageIds[0]} published.`); + console.log(`Message ${messageId} published.`); - return messageIds; + return messageId; }); } // [END pubsub_publish_message] @@ -114,27 +118,27 @@ function publishOrderedMessage (topicName, data) { // References an existing topic, e.g. "my-topic" const topic = pubsub.topic(topicName); - const message = { - data: data, + // Create a publisher for the topic (which can include additional batching configuration) + const publisher = topic.publisher(); - // Pub/Sub messages are unordered, so assign an order id to the message to - // manually order messages - attributes: { - counterId: `${getPublishCounterValue()}` - } + // Creates message parameters + const dataBuffer = Buffer.from(data); + const attributes = { + // Pub/Sub messages are unordered, so assign an order ID and manually order messages + counterId: `${getPublishCounterValue()}` }; - // Publishes the message, use raw: true to pass a message with attributes - return topic.publish(message, { raw: true }) + // Publishes the message + return publisher.publish(dataBuffer, attributes) .then((results) => { - const messageIds = results[0]; + const messageId = results[0]; // Update the counter value - setPublishCounterValue(parseInt(message.attributes.counterId, 10) + 1); + setPublishCounterValue(parseInt(attributes.counterId, 10) + 1); - console.log(`Message ${messageIds[0]} published.`); + console.log(`Message ${messageId} published.`); - return messageIds; + return messageId; }); } // [END pubsub_publish_ordered_message] @@ -248,11 +252,6 @@ const cli = require(`yargs`) `Publishes a message to a topic.`, {}, (opts) => { - try { - opts.message = JSON.parse(opts.message); - } catch (err) { - // Ignore error - } publishMessage(opts.topicName, opts.message); } )