Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datastore: runQuery as a stream. fixes #289 #291

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
'use strict';

var https = require('https');
var streamEvents = require('stream-events');
var through = require('through2');

/**
* @type module:datastore/entity
Expand Down Expand Up @@ -315,7 +317,8 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
* });
*/
DatastoreRequest.prototype.runQuery = function(q, callback) {
callback = callback || util.noop;
var that = this;
var stream;

var req = {
read_options: {},
Expand All @@ -328,19 +331,51 @@ DatastoreRequest.prototype.runQuery = function(q, callback) {
};
}

this.makeReq_('runQuery', req, function(err, resp) {
if (err || !resp.batch || !resp.batch.entity_result) {
callback(err);
return;
}
if (!util.is(callback, 'function')) {
stream = streamEvents(through.obj());

This comment was marked as spam.

stream.once('reading', runQuery);
return stream;
} else {
callback = callback || util.noop;

This comment was marked as spam.

This comment was marked as spam.

runQuery();
}

var cursor = '';
if (resp.batch.end_cursor) {
cursor = resp.batch.end_cursor.toBase64();
}
function runQuery() {
that.makeReq_('runQuery', req, function(err, resp) {
if (err) {
if (stream) {
stream.emit('error', err);
stream.end();
} else {
callback(err);
}
return;
}

callback(null, entity.formatArray(resp.batch.entity_result), cursor);
});
var entities = entity.formatArray(resp.batch.entity_result);

var cursor = '';
if (resp.batch.end_cursor) {
cursor = resp.batch.end_cursor.toBase64();
}

if (stream) {
if (cursor && entities.length > 0) {
entities.forEach(function (entity) {
stream.push(entity);
});

req.query = entity.queryToQueryProto(q.start(cursor).offset(0));

runQuery();
} else {
stream.end();
}
} else {
callback(null, entities, cursor);
}
});
}
};

/**
Expand Down
15 changes: 15 additions & 0 deletions regression/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,21 @@ describe('datastore', function() {
});
});

it('should run a query as a stream', function(done) {
var q = ds.createQuery('Character').hasAncestor(ancestor)
.limit(5);

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


var resultsReturned = 0;

ds.runQuery(q)
.on('error', done)
.on('data', function() { resultsReturned++; })
.on('end', function() {
assert.equal(resultsReturned, characters.length);
done();
});
});

it('should filter queries with simple indexes', function(done) {
var q = ds.createQuery('Character').hasAncestor(ancestor)
.filter('appearances >=', 20);
Expand Down
76 changes: 62 additions & 14 deletions test/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var https = require('https');
var mockRespGet = require('../testdata/response_get.json');
var pb = require('../../lib/datastore/pb.js');
var Query = require('../../lib/datastore/query.js');
var Stream = require('stream');
var util = require('../../lib/common/util.js');

var httpsRequestOverride = util.noop;
Expand Down Expand Up @@ -278,8 +279,7 @@ describe('Request', function() {
entity_result: mockRespGet.found,
end_cursor: new ByteBuffer().writeIString('cursor').flip()
}
},
withoutResults: mockRespGet
}

This comment was marked as spam.

};

beforeEach(function() {
Expand All @@ -298,18 +298,6 @@ describe('Request', function() {
assert.equal(err, error);
});
});

it('should handle missing results error', function() {
request.makeReq_ = function(method, req, callback) {
assert.equal(method, 'runQuery');
callback(null, mockResponse.withoutResults);
};

request.runQuery(query, function(err, entities) {
assert.strictEqual(err, null);
assert.strictEqual(entities, undefined);
});
});
});

it('should execute callback with results', function() {
Expand Down Expand Up @@ -354,6 +342,66 @@ describe('Request', function() {
done();
});
});

describe('streams', function() {
it('should be a stream if a callback is omitted', function() {
assert(request.runQuery(query) instanceof Stream);
});

it('should run the query after being read from', function(done) {
request.makeReq_ = function() {
done();
};

request.runQuery(query).emit('reading');
});

it('should continuosly run until there are no results', function(done) {
var run = 0;
var timesToRun = 2;

request.makeReq_ = function(method, req, callback) {
run++;

if (run < timesToRun) {
callback(null, mockResponse.withResultsAndEndCursor);
} else {
var lastEndCursor =
mockResponse.withResultsAndEndCursor.batch.end_cursor.toBase64();
lastEndCursor = new Buffer(lastEndCursor, 'base64').toString();

assert.equal(String(req.query.start_cursor), lastEndCursor);
assert.strictEqual(req.query.offset, undefined);

callback(null, mockResponse.withResults);
}
};

var resultsReturned = 0;

request.runQuery(query)
.on('data', function() { resultsReturned++; })
.on('end', function() {
assert.equal(resultsReturned, mockRespGet.found.length);
done();
});
});

it('should emit an error', function(done) {
var error = new Error('Error.');

request.makeReq_ = function(method, req, callback) {
callback(error);
};

request.runQuery(query)
.on('error', function(err) {
assert.equal(err, error);
done();
})
.emit('reading');
});
});
});

describe('allocateIds', function() {
Expand Down