Skip to content

Commit

Permalink
pubsub: allow simple publishing (#1662)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus authored and callmehiphop committed Oct 10, 2016
1 parent fc7ebb8 commit fecf108
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 55 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,7 @@ var pubsubClient = pubsub({
var topic = pubsubClient.topic('my-topic');

// Publish a message to the topic.
topic.publish({
data: 'New message!'
}, function(err) {});
topic.publish('New message!', function(err) {});

// Subscribe to the topic.
var options = {
Expand Down
4 changes: 1 addition & 3 deletions packages/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ var pubsub = require('@google-cloud/pubsub')({
var topic = pubsub.topic('my-topic');

// Publish a message to the topic.
topic.publish({
data: 'New message!'
}, function(err) {});
topic.publish('New message!', function(err) {});

// Subscribe to the topic.
var options = {
Expand Down
79 changes: 52 additions & 27 deletions packages/pubsub/src/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

var arrify = require('arrify');
var common = require('@google-cloud/common');
var extend = require('extend');
var is = require('is');
var prop = require('propprop');
var util = require('util');

/**
Expand Down Expand Up @@ -282,32 +282,27 @@ Topic.prototype.getSubscriptions = function(options, callback) {
* @resource [Topics: publish API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish}
*
* @throws {Error} If no message is provided.
* @throws {Error} If a message is missing a data property.
*
* @param {object|object[]} message - The message(s) to publish.
* @param {*} message.data - The contents of the message.
* @param {array=} message.attributes - Key/value pair of attributes to apply to
* the message. All values must be strings.
* @param {*|*[]} message - The message(s) to publish. If you need to
* provide attributes for the message, you must enable `options.raw`, then
* box your message in to an object with a `data` and `attributes` property.
* `data` will be the raw message value you want to publish, and
* `attributes` is a key/value pair of attributes to apply to the message.
* @param {object=} options - Configuration object.
* @param {boolean} options.raw - Enable if you require setting attributes on
* your messages.
* @param {function=} callback - The callback function.
*
* @example
* topic.publish({
* data: 'Hello, world!'
* }, function(err, messageIds, apiResponse) {});
* topic.publish('Hello, world!', function(err, messageIds, apiResponse) {});
*
* //-
* // The data property can be a JSON object as well.
* // You can also publish a JSON object.
* //-
* var registerMessage = {
* data: {
* userId: 3,
* name: 'Stephen',
* event: 'new user'
* },
* attributes: {
* key: 'value',
* hello: 'world'
* }
* userId: 3,
* name: 'Stephen',
* event: 'new user'
* };
*
* topic.publish(registerMessage, function(err, messageIds, apiResponse) {});
Expand All @@ -327,28 +322,58 @@ Topic.prototype.getSubscriptions = function(options, callback) {
* registerMessage,
* purchaseMessage
* ], function(err, messageIds, apiResponse) {});
*
* //-
* // Set attributes with your message.
* //-
* var message = {
* data: {
* userId: 3,
* product: 'book',
* event: 'rent'
* },
* attributes: {
* key: 'value',
* hello: 'world'
* }
* };
*
* var options = {
* raw: true
* };
*
* topic.publish(message, options, function(err, messageIds, apiResponse) {});
*/
Topic.prototype.publish = function(messages, callback) {
Topic.prototype.publish = function(messages, options, callback) {
messages = arrify(messages);

if (messages.length === 0) {
throw new Error('Cannot publish without a message.');
}

if (!messages.every(prop('data'))) {
throw new Error('Cannot publish message without a `data` property.');
if (is.fn(options)) {
callback = options;
options = {};
}

options = options || {};
callback = callback || common.util.noop;

if (messages.length === 0) {
throw new Error('Cannot publish without a message.');
}

var protoOpts = {
service: 'Publisher',
method: 'publish',
};

var reqOpts = {
topic: this.name,
messages: messages.map(Topic.formatMessage_)
messages: messages
.map(function(message) {
if (is.object(message)) {
message = extend(true, {}, message);
}
return options.raw ? message : { data: message };
})
.map(Topic.formatMessage_)
};

this.request(protoOpts, reqOpts, function(err, result) {
Expand Down
79 changes: 68 additions & 11 deletions packages/pubsub/system-test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ var uuid = require('node-uuid');
var env = require('../../../system-test/env.js');
var pubsub = require('../')(env);

function generateSubName() {
return 'test-subscription-' + uuid.v4();
}

function generateTopicName() {
return 'test-topic-' + uuid.v4();
}

describe('pubsub', function() {
var TOPIC_NAMES = [
generateTopicName(),
Expand All @@ -51,6 +43,53 @@ describe('pubsub', function() {
TOPICS[2].name
];

function generateSubName() {
return 'test-subscription-' + uuid.v4();
}

function generateTopicName() {
return 'test-topic-' + uuid.v4();
}

function publishPop(message, options, callback) {
if (!callback) {
callback = options;
options = {};
}

options = options || {};

var topic = pubsub.topic(generateTopicName());
var subscription = topic.subscription(generateSubName());

async.series([
topic.create.bind(topic),
subscription.create.bind(subscription),
function(callback) {
async.times(6, function(_, callback) {
topic.publish(message, options, callback);
}, callback);
}
], function(err) {
if (err) {
callback(err);
return;
}

subscription.pull({
returnImmediately: true,
maxResults: 1
}, function(err, messages) {
if (err) {
callback(err);
return;
}

callback(null, messages.pop());
});
});
}

before(function(done) {
// create all needed topics
async.each(TOPICS, function(topic, cb) {
Expand Down Expand Up @@ -120,9 +159,27 @@ describe('pubsub', function() {

it('should publish a message', function(done) {
var topic = pubsub.topic(TOPIC_NAMES[0]);
topic.publish({ data: 'message from me' }, function(err, messageIds) {
topic.publish('message from me', function(err, messageIds) {
assert.ifError(err);
assert.equal(messageIds.length, 1);
assert.strictEqual(messageIds.length, 1);
done();
});
});

it('should publish a message with attributes', function(done) {
var rawMessage = {
data: 'raw message data',
attributes: {
customAttribute: 'value'
}
};

publishPop(rawMessage, { raw: true }, function(err, message) {
assert.ifError(err);

assert.strictEqual(message.data, rawMessage.data);
assert.deepEqual(message.attributes, rawMessage.attributes);

done();
});
});
Expand Down Expand Up @@ -166,7 +223,7 @@ describe('pubsub', function() {
}

async.times(10, function(_, next) {
topic.publish({ data: 'hello' }, next);
topic.publish('hello', next);
}, function(err) {
if (err) {
done(err);
Expand Down
50 changes: 39 additions & 11 deletions packages/pubsub/test/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ describe('Topic', function() {

describe('publish', function() {
var message = 'howdy';
var messageObject = { data: message };
var attributes = {
key: 'value'
};

it('should throw if no message is provided', function() {
assert.throws(function() {
Expand All @@ -192,12 +194,6 @@ describe('Topic', function() {
}, /Cannot publish without a message\./);
});

it('should throw if a message has no data', function() {
assert.throws(function() {
topic.publish({});
}, /Cannot publish message without a `data` property\./);
});

it('should send correct api request', function(done) {
topic.request = function(protoOpts, reqOpts) {
assert.strictEqual(protoOpts.service, 'Publisher');
Expand All @@ -211,15 +207,47 @@ describe('Topic', function() {
done();
};

topic.publish(messageObject, assert.ifError);
topic.publish(message, assert.ifError);
});

it('should send correct api request for raw message', function(done) {
topic.request = function(protoOpts, reqOpts) {
assert.deepEqual(reqOpts.messages, [
{
data: new Buffer(JSON.stringify(message)).toString('base64'),
attributes: attributes
}
]);

done();
};

topic.publish({
data: message,
attributes: attributes
}, { raw: true }, assert.ifError);
});

it('should clone the provided message', function(done) {
var message = {
data: 'data'
};
var originalMessage = extend({}, message);

topic.request = function() {
assert.deepEqual(message, originalMessage);
done();
};

topic.publish(message, { raw: true }, assert.ifError);
});

it('should execute callback', function(done) {
topic.request = function(protoOpts, reqOpts, callback) {
callback(null, {});
};

topic.publish(messageObject, done);
topic.publish(message, done);
});

it('should execute callback with error', function(done) {
Expand All @@ -230,7 +258,7 @@ describe('Topic', function() {
callback(error, apiResponse);
};

topic.publish(messageObject, function(err, ackIds, apiResponse_) {
topic.publish(message, function(err, ackIds, apiResponse_) {
assert.strictEqual(err, error);
assert.strictEqual(ackIds, null);
assert.strictEqual(apiResponse_, apiResponse);
Expand All @@ -246,7 +274,7 @@ describe('Topic', function() {
callback(null, resp);
};

topic.publish(messageObject, function(err, ackIds, apiResponse) {
topic.publish(message, function(err, ackIds, apiResponse) {
assert.deepEqual(resp, apiResponse);
done();
});
Expand Down

0 comments on commit fecf108

Please sign in to comment.