Skip to content

Commit

Permalink
Merge pull request #255 from stephenplusplus/spp--pb-refactor
Browse files Browse the repository at this point in the history
datastore: tests: refactor pb/transaction.
  • Loading branch information
silvolu committed Oct 13, 2014
2 parents 714ff88 + 1436ef8 commit 00e6466
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 143 deletions.
98 changes: 46 additions & 52 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,10 @@ DatastoreRequest.prototype.get = function(keys, callback) {
var isMultipleRequest = Array.isArray(keys);
keys = isMultipleRequest ? keys : [keys];
callback = callback || util.noop;
var req = new pb.LookupRequest({
var req = {
key: keys.map(entity.keyToKeyProto)
});
var res = pb.LookupResponse;
if (this.id) {
req.transaction = this.id;
}
this.createRequest_('lookup', req, res, function(err, resp) {
};
this.makeReq_('lookup', req, function(err, resp) {
if (err) {
callback(err);
return;
Expand Down Expand Up @@ -217,7 +213,6 @@ DatastoreRequest.prototype.save = function(entities, callback) {
return entityObject.key;
});
var req = {
mode: MODE_NON_TRANSACTIONAL,
mutation: entities.reduce(function(acc, entityObject, index) {
var ent = {};
if (Array.isArray(entityObject.data)) {
Expand All @@ -242,13 +237,7 @@ DatastoreRequest.prototype.save = function(entities, callback) {
return acc;
}.bind(this), { upsert: [], insert_auto_id: [] })
};
if (this.id) {
req.transaction = this.id;
req.mode = MODE_TRANSACTIONAL;
}
req = new pb.CommitRequest(req);
var res = pb.CommitResponse;
this.createRequest_('commit', req, res, function(err, resp) {
this.makeReq_('commit', req, function(err, resp) {
if (err || !resp) {
callback(err);
return;
Expand Down Expand Up @@ -290,20 +279,12 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
var isMultipleRequest = Array.isArray(keys);
keys = isMultipleRequest ? keys : [keys];
callback = callback || util.noop;

var req = {
mode: MODE_NON_TRANSACTIONAL,
mutation: {
delete: keys.map(entity.keyToKeyProto)
}
};
if (this.id) {
req.transaction = this.id;
req.mode = MODE_TRANSACTIONAL;
}
req = new pb.CommitRequest(req);
var res = pb.CommitResponse;
this.createRequest_('commit', req, res, function(err) {
this.makeReq_('commit', req, function(err) {
if (!err && this.id) {
this.isFinalized = true;
}
Expand Down Expand Up @@ -334,28 +315,20 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
* }
* });
*/

DatastoreRequest.prototype.runQuery = function(q, callback) {
callback = callback || util.noop;
var req = {
read_options: {},
query: entity.queryToQueryProto(q)
};

if (this.id) {
req.read_options.transaction = this.id;
}

if (q.namespace) {
req.partition_id = {
namespace: q.namespace
};
}

req = new pb.RunQueryRequest(req);
var res = pb.RunQueryResponse;

this.createRequest_('runQuery', req, res, function(err, resp) {
this.makeReq_('runQuery', req, function(err, resp) {
if (err || !resp.batch || !resp.batch.entity_result) {
callback(err);
return;
Expand Down Expand Up @@ -406,11 +379,10 @@ DatastoreRequest.prototype.allocateIds = function(incompleteKey, n, callback) {
for (var i = 0; i < n; i++) {
incompleteKeys.push(entity.keyToKeyProto(incompleteKey));
}

this.createRequest_(
'allocateIds',
new pb.AllocateIdsRequest({ key: incompleteKeys }),
pb.AllocateIdsResponse, function(err, resp) {
var req = {
key: incompleteKeys
};
this.makeReq_('allocateIds', req, function(err, resp) {
if (err) {
callback(err);
return;
Expand All @@ -424,28 +396,50 @@ DatastoreRequest.prototype.allocateIds = function(incompleteKey, n, callback) {
};

/**
* Make a request to the API endpoint.
* Make a request to the API endpoint. Properties to indicate a transactional or
* non-transactional operation are added automatically.
*
* @param {string} method - Transaction action (allocateIds, commit, etc.).
* @param {object} req - Request configuration object.
* @param {object} respType - Response type.
* @param {function} cb - The callback function.
* @param {string} method - Datastore action (allocateIds, commit, etc.).
* @param {object=} body - Request configuration object.
* @param {function} callback - The callback function.
*
* @private
*
* @example
* var deleteRequest = {
* MODE: 'NON_TRANSACTIONAL',
* mutation: {
* delete: [] // datastore key objects.
* }
* };
* transaction.makeReq('commit', deleteRequest, function(err) {});
*/
DatastoreRequest.prototype.createRequest_ =
function(method, req, respType, cb) {
DatastoreRequest.prototype.makeReq_ = function(method, body, callback) {
// TODO: Handle non-HTTP 200 cases.
cb = cb || util.noop;
if (!callback) {
callback = body;
body = {};
}
callback = callback || util.noop;

// Set properties to indicate if we're in a transaction or not.
if (method === 'commit') {
if (this.id) {
body.mode = MODE_TRANSACTIONAL;
body.transaction = this.id;
} else {
body.mode = MODE_NON_TRANSACTIONAL;
}
}

if (method === 'lookup' && this.id) {
body.read_options = body.read_options || {};
body.read_options.transaction = this.id;
}

var pbKey = method[0].toUpperCase() + method.substr(1);
var pbRequest = new pb[pbKey + 'Request'](body).toBuffer();
var pbResponse = pb[pbKey + 'Response'];

this.connection.createAuthorizedReq({
method: 'POST',
host: GOOGLE_APIS_HOST,
Expand All @@ -455,7 +449,7 @@ DatastoreRequest.prototype.createRequest_ =
}
}, function(err, request) {
if (err) {
cb(err);
callback(err);
return;
}
var remoteStream = https.request(request, function(resp) {
Expand All @@ -466,15 +460,15 @@ DatastoreRequest.prototype.createRequest_ =
resp.on('end', function() {
util.handleResp(null, resp, buffer.toString(), function(err) {
if (err) {
cb(err);
callback(err);
return;
}
cb(null, respType.decode(buffer));
callback(null, pbResponse.decode(buffer));
});
});
});
remoteStream.on('error', cb);
remoteStream.write(req.toBuffer());
remoteStream.on('error', callback);
remoteStream.write(pbRequest);
remoteStream.end();
});
};
Expand Down
37 changes: 10 additions & 27 deletions lib/datastore/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@

var nodeutil = require('util');

/**
* @type module:datastore/pb
* @private
*/
var pb = require('./pb.js');

/**
* @type module:common/util
* @private
Expand Down Expand Up @@ -66,11 +60,9 @@ var DatastoreRequest = require('./request.js');
*/
function Transaction(connection, projectId) {
this.connection = connection;
this.projectId = projectId;
// the default transaction has no id.
// if id is not set, run operations non-transactional.
this.id = null;
this.isFinalized = false;
this.projectId = projectId;
}

nodeutil.inherits(Transaction, DatastoreRequest);
Expand All @@ -96,17 +88,14 @@ nodeutil.inherits(Transaction, DatastoreRequest);
*/
Transaction.prototype.begin = function(callback) {
callback = callback || util.noop;
var that = this;
var req = new pb.BeginTransactionRequest();
var res = pb.BeginTransactionResponse;
this.createRequest_('beginTransaction', req, res, function(err, resp) {
this.makeReq_('beginTransaction', function(err, resp) {
if (err) {
callback(err);
return;
}
that.id = resp.transaction;
this.id = resp.transaction;
callback(null);
});
}.bind(this));
};

/**
Expand All @@ -125,13 +114,10 @@ Transaction.prototype.begin = function(callback) {
*/
Transaction.prototype.rollback = function(callback) {
callback = callback || util.noop;
var that = this;
var req = new pb.RollbackRequest({ transaction: this.id });
var res = pb.RollbackResponse;
this.createRequest_('rollback', req, res, function(err) {
that.isFinalized = true;
this.makeReq_('rollback', function(err) {
this.isFinalized = true;
callback(err || null);
});
}.bind(this));
};

/**
Expand All @@ -150,17 +136,14 @@ Transaction.prototype.rollback = function(callback) {
*/
Transaction.prototype.commit = function(callback) {
callback = callback || util.noop;
var that = this;
var req = new pb.CommitRequest({ transaction: this.id });
var res = pb.CommitResponse;
this.createRequest_('commit', req, res, function(err) {
this.makeReq_('commit', function(err) {
if (err) {
callback(err);
return;
}
that.isFinalized = true;
this.isFinalized = true;
callback(null);
});
}.bind(this));
};

/**
Expand Down
Loading

0 comments on commit 00e6466

Please sign in to comment.