Skip to content

Commit

Permalink
pubsub: tokenize connection pool project id (googleapis#2579)
Browse files Browse the repository at this point in the history
* pubsub: interpolate projectId in connection pool

* add delay between making sub & snap
  • Loading branch information
callmehiphop authored and stephenplusplus committed Aug 31, 2017
1 parent 51df102 commit 4292811
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 2 deletions.
9 changes: 8 additions & 1 deletion packages/pubsub/src/connection-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ var RETRY_CODES = [
*/
function ConnectionPool(subscription) {
this.subscription = subscription;
this.projectId = subscription.projectId;

this.connections = new Map();

this.isPaused = false;
Expand Down Expand Up @@ -230,7 +232,8 @@ ConnectionPool.prototype.createConnection = function() {
}

connection.write({
subscription: self.subscription.name,
subscription: common.util.replaceProjectIdToken(
self.subscription.name, self.projectId),
streamAckDeadlineSeconds: self.settings.ackDeadline / 1000
});

Expand Down Expand Up @@ -302,6 +305,10 @@ ConnectionPool.prototype.getClient = function(callback) {
grpc.credentials.createFromGoogleCredential(authClient)
);

if (!self.projectId || self.projectId === '{{projectId}}') {
self.projectId = pubsub.auth.projectId;
}

var Subscriber = v1(pubsub.options).Subscriber;

self.client = new Subscriber(v1.SERVICE_ADDRESS, credentials, {
Expand Down
1 change: 1 addition & 0 deletions packages/pubsub/system-test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ describe('pubsub', function() {
return deleteAllSnapshots()
.then(wait(2500))
.then(subscription.create.bind(subscription))
.then(wait(2500))
.then(snapshot.create.bind(snapshot))
.then(wait(2500));
});
Expand Down
37 changes: 36 additions & 1 deletion packages/pubsub/test/connection-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,20 @@ describe('ConnectionPool', function() {
var pool;

var FAKE_PUBSUB_OPTIONS = {};
var PROJECT_ID = 'grapce-spacheship-123';

var PUBSUB = {
projectId: PROJECT_ID,
auth: {
projectId: PROJECT_ID,
getAuthClient: fakeUtil.noop
},
options: FAKE_PUBSUB_OPTIONS
};

var SUB_NAME = 'test-subscription';
var SUBSCRIPTION = {
projectId: PROJECT_ID,
name: SUB_NAME,
pubsub: PUBSUB,
request: fakeUtil.noop
Expand Down Expand Up @@ -344,6 +348,7 @@ describe('ConnectionPool', function() {
});

describe('connection', function() {
var TOKENIZED_SUB_NAME = 'project/p/subscriptions/' + SUB_NAME;
var fakeId;

beforeEach(function() {
Expand All @@ -352,12 +357,20 @@ describe('ConnectionPool', function() {
fakeUuid.v4 = function() {
return fakeId;
};

fakeUtil.replaceProjectIdToken = common.util.replaceProjectIdToken;
});

it('should create a connection', function(done) {
fakeUtil.replaceProjectIdToken = function(subName, projectId) {
assert.strictEqual(subName, SUB_NAME);
assert.strictEqual(projectId, PROJECT_ID);
return TOKENIZED_SUB_NAME;
};

fakeConnection.write = function(reqOpts) {
assert.deepEqual(reqOpts, {
subscription: SUB_NAME,
subscription: TOKENIZED_SUB_NAME,
streamAckDeadlineSeconds: pool.settings.ackDeadline / 1000
});
};
Expand Down Expand Up @@ -662,6 +675,7 @@ describe('ConnectionPool', function() {
});

describe('getClient', function() {
var AUTH_PROJECT_ID = 'auth-project-id-123';
var fakeAuthClient = {};

function FakeSubscriber(address, creds, options) {
Expand All @@ -671,6 +685,7 @@ describe('ConnectionPool', function() {
}

beforeEach(function() {
PUBSUB.auth.projectId = AUTH_PROJECT_ID;
PUBSUB.auth.getAuthClient = function(callback) {
callback(null, fakeAuthClient);
};
Expand Down Expand Up @@ -734,6 +749,26 @@ describe('ConnectionPool', function() {
});
});

it('should capture the projectId when falsey', function(done) {
delete pool.projectId;

pool.getClient(function(err) {
assert.ifError(err);
assert.strictEqual(pool.projectId, AUTH_PROJECT_ID);
done();
});
});

it('should capture the projectId if it needs tokenization', function(done) {
pool.projectId = '{{projectId}}';

pool.getClient(function(err) {
assert.ifError(err);
assert.strictEqual(pool.projectId, AUTH_PROJECT_ID);
done();
});
});

it('should pass the pubsub options into the gax fn', function(done) {
v1Override = function(options) {
assert.strictEqual(options, FAKE_PUBSUB_OPTIONS);
Expand Down

0 comments on commit 4292811

Please sign in to comment.