Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: allow simple publishing #1662

Merged
merged 2 commits into from
Oct 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,7 @@ var pubsubClient = pubsub({
var topic = pubsubClient.topic('my-topic');

// Publish a message to the topic.
topic.publish({
data: 'New message!'
}, function(err) {});
topic.publish('New message!', function(err) {});

// Subscribe to the topic.
var options = {
Expand Down
4 changes: 1 addition & 3 deletions packages/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ var pubsub = require('@google-cloud/pubsub')({
var topic = pubsub.topic('my-topic');

// Publish a message to the topic.
topic.publish({
data: 'New message!'
}, function(err) {});
topic.publish('New message!', function(err) {});

// Subscribe to the topic.
var options = {
Expand Down
79 changes: 52 additions & 27 deletions packages/pubsub/src/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

var arrify = require('arrify');
var common = require('@google-cloud/common');
var extend = require('extend');
var is = require('is');
var prop = require('propprop');
var util = require('util');

/**
Expand Down Expand Up @@ -282,32 +282,27 @@ Topic.prototype.getSubscriptions = function(options, callback) {
* @resource [Topics: publish API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish}
*
* @throws {Error} If no message is provided.
* @throws {Error} If a message is missing a data property.
*
* @param {object|object[]} message - The message(s) to publish.
* @param {*} message.data - The contents of the message.
* @param {array=} message.attributes - Key/value pair of attributes to apply to
* the message. All values must be strings.
* @param {*|*[]} message - The message(s) to publish. If you need to
* provide attributes for the message, you must enable `options.raw`, then
* box your message in to an object with a `data` and `attributes` property.
* `data` will be the raw message value you want to publish, and
* `attributes` is a key/value pair of attributes to apply to the message.
* @param {object=} options - Configuration object.
* @param {boolean} options.raw - Enable if you require setting attributes on
* your messages.
* @param {function=} callback - The callback function.
*
* @example
* topic.publish({
* data: 'Hello, world!'
* }, function(err, messageIds, apiResponse) {});
* topic.publish('Hello, world!', function(err, messageIds, apiResponse) {});
*
* //-
* // The data property can be a JSON object as well.
* // You can also publish a JSON object.
* //-
* var registerMessage = {
* data: {
* userId: 3,
* name: 'Stephen',
* event: 'new user'
* },
* attributes: {
* key: 'value',
* hello: 'world'
* }
* userId: 3,
* name: 'Stephen',
* event: 'new user'
* };
*
* topic.publish(registerMessage, function(err, messageIds, apiResponse) {});
Expand All @@ -327,28 +322,58 @@ Topic.prototype.getSubscriptions = function(options, callback) {
* registerMessage,
* purchaseMessage
* ], function(err, messageIds, apiResponse) {});
*
* //-
* // Set attributes with your message.
* //-
* var message = {
* data: {
* userId: 3,
* product: 'book',
* event: 'rent'
* },
* attributes: {
* key: 'value',
* hello: 'world'
* }
* };
*
* var options = {
* raw: true
* };
*
* topic.publish(message, options, function(err, messageIds, apiResponse) {});
*/
Topic.prototype.publish = function(messages, callback) {
Topic.prototype.publish = function(messages, options, callback) {
messages = arrify(messages);

if (messages.length === 0) {
throw new Error('Cannot publish without a message.');
}

if (!messages.every(prop('data'))) {
throw new Error('Cannot publish message without a `data` property.');
if (is.fn(options)) {
callback = options;
options = {};
}

options = options || {};
callback = callback || common.util.noop;

if (messages.length === 0) {
throw new Error('Cannot publish without a message.');
}

var protoOpts = {
service: 'Publisher',
method: 'publish',
};

var reqOpts = {
topic: this.name,
messages: messages.map(Topic.formatMessage_)
messages: messages
.map(function(message) {
if (is.object(message)) {
message = extend(true, {}, message);
}
return options.raw ? message : { data: message };
})
.map(Topic.formatMessage_)
};

this.request(protoOpts, reqOpts, function(err, result) {
Expand Down
79 changes: 68 additions & 11 deletions packages/pubsub/system-test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ var uuid = require('node-uuid');
var env = require('../../../system-test/env.js');
var pubsub = require('../')(env);

function generateSubName() {
return 'test-subscription-' + uuid.v4();
}

function generateTopicName() {
return 'test-topic-' + uuid.v4();
}

describe('pubsub', function() {
var TOPIC_NAMES = [
generateTopicName(),
Expand All @@ -51,6 +43,53 @@ describe('pubsub', function() {
TOPICS[2].name
];

function generateSubName() {
return 'test-subscription-' + uuid.v4();
}

function generateTopicName() {
return 'test-topic-' + uuid.v4();
}

function publishPop(message, options, callback) {
if (!callback) {
callback = options;
options = {};
}

options = options || {};

var topic = pubsub.topic(generateTopicName());
var subscription = topic.subscription(generateSubName());

async.series([
topic.create.bind(topic),
subscription.create.bind(subscription),
function(callback) {
async.times(6, function(_, callback) {
topic.publish(message, options, callback);
}, callback);
}
], function(err) {
if (err) {
callback(err);
return;
}

subscription.pull({
returnImmediately: true,
maxResults: 1
}, function(err, messages) {
if (err) {
callback(err);
return;
}

callback(null, messages.pop());
});
});
}

before(function(done) {
// create all needed topics
async.each(TOPICS, function(topic, cb) {
Expand Down Expand Up @@ -120,9 +159,27 @@ describe('pubsub', function() {

it('should publish a message', function(done) {
var topic = pubsub.topic(TOPIC_NAMES[0]);
topic.publish({ data: 'message from me' }, function(err, messageIds) {
topic.publish('message from me', function(err, messageIds) {
assert.ifError(err);
assert.equal(messageIds.length, 1);
assert.strictEqual(messageIds.length, 1);
done();
});
});

it('should publish a message with attributes', function(done) {
var rawMessage = {
data: 'raw message data',
attributes: {
customAttribute: 'value'
}
};

publishPop(rawMessage, { raw: true }, function(err, message) {
assert.ifError(err);

assert.strictEqual(message.data, rawMessage.data);
assert.deepEqual(message.attributes, rawMessage.attributes);

done();
});
});
Expand Down Expand Up @@ -166,7 +223,7 @@ describe('pubsub', function() {
}

async.times(10, function(_, next) {
topic.publish({ data: 'hello' }, next);
topic.publish('hello', next);
}, function(err) {
if (err) {
done(err);
Expand Down
50 changes: 39 additions & 11 deletions packages/pubsub/test/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ describe('Topic', function() {

describe('publish', function() {
var message = 'howdy';
var messageObject = { data: message };
var attributes = {
key: 'value'
};

it('should throw if no message is provided', function() {
assert.throws(function() {
Expand All @@ -192,12 +194,6 @@ describe('Topic', function() {
}, /Cannot publish without a message\./);
});

it('should throw if a message has no data', function() {
assert.throws(function() {
topic.publish({});
}, /Cannot publish message without a `data` property\./);
});

it('should send correct api request', function(done) {
topic.request = function(protoOpts, reqOpts) {
assert.strictEqual(protoOpts.service, 'Publisher');
Expand All @@ -211,15 +207,47 @@ describe('Topic', function() {
done();
};

topic.publish(messageObject, assert.ifError);
topic.publish(message, assert.ifError);
});

it('should send correct api request for raw message', function(done) {
topic.request = function(protoOpts, reqOpts) {
assert.deepEqual(reqOpts.messages, [
{
data: new Buffer(JSON.stringify(message)).toString('base64'),
attributes: attributes
}
]);

done();
};

topic.publish({
data: message,
attributes: attributes
}, { raw: true }, assert.ifError);
});

it('should clone the provided message', function(done) {
var message = {
data: 'data'
};
var originalMessage = extend({}, message);

topic.request = function() {
assert.deepEqual(message, originalMessage);
done();
};

topic.publish(message, { raw: true }, assert.ifError);
});

it('should execute callback', function(done) {
topic.request = function(protoOpts, reqOpts, callback) {
callback(null, {});
};

topic.publish(messageObject, done);
topic.publish(message, done);
});

it('should execute callback with error', function(done) {
Expand All @@ -230,7 +258,7 @@ describe('Topic', function() {
callback(error, apiResponse);
};

topic.publish(messageObject, function(err, ackIds, apiResponse_) {
topic.publish(message, function(err, ackIds, apiResponse_) {
assert.strictEqual(err, error);
assert.strictEqual(ackIds, null);
assert.strictEqual(apiResponse_, apiResponse);
Expand All @@ -246,7 +274,7 @@ describe('Topic', function() {
callback(null, resp);
};

topic.publish(messageObject, function(err, ackIds, apiResponse) {
topic.publish(message, function(err, ackIds, apiResponse) {
assert.deepEqual(resp, apiResponse);
done();
});
Expand Down