diff --git a/lib/common/stream-router.js b/lib/common/stream-router.js index 12490d2288d..a2ae6d15c74 100644 --- a/lib/common/stream-router.js +++ b/lib/common/stream-router.js @@ -84,35 +84,54 @@ streamRouter.router_ = function(args, originalMethod) { var callback = args[args.length - 1]; var isStreamMode = !util.is(callback, 'function'); - if (isStreamMode) { - var stream = streamEvents(through.obj()); - - var onResultSet = function(err, results, nextQuery) { - if (err) { - stream.emit('error', err); - stream.end(); - return; - } + if (!isStreamMode) { + originalMethod.apply(null, args); + return; + } - results.forEach(function(result) { + var stream = streamEvents(through.obj()); + + // Results from the API are split apart for the user. If 50 results are + // returned, we emit 50 data events. While the user is consuming these, they + // might choose to end the stream early by calling ".end()". We keep track of + // this state to prevent pushing more results to the stream, ending it again, + // or making unnecessary API calls. + var streamEnded = false; + var _end = stream.end; + stream.end = function() { + streamEnded = true; + _end.apply(this, arguments); + }; + + function onResultSet(err, results, nextQuery) { + if (err) { + stream.emit('error', err); + stream.end(); + return; + } + + results.forEach(function(result) { + if (!streamEnded) { stream.push(result); - }); - - if (nextQuery) { - originalMethod(nextQuery, onResultSet); - } else { - stream.end(); } - }; - - stream.once('reading', function() { - originalMethod.apply(null, args.concat(onResultSet)); }); - return stream; - } else { - originalMethod.apply(null, args); + if (streamEnded) { + return; + } + + if (nextQuery) { + originalMethod(nextQuery, onResultSet); + } else { + stream.end(); + } } + + stream.once('reading', function() { + originalMethod.apply(null, args.concat(onResultSet)); + }); + + return stream; }; module.exports = streamRouter; diff --git a/system-test/search.js b/system-test/search.js index 4d059633e1c..c5e517530aa 100644 --- a/system-test/search.js +++ b/system-test/search.js @@ -146,7 +146,9 @@ describe('Search', function() { search.getIndexes() .on('error', done) - .on('data', function() { resultsMatched++; }) + .on('data', function() { + resultsMatched++; + }) .on('end', function() { assert(resultsMatched > 0); done(); diff --git a/test/common/stream-router.js b/test/common/stream-router.js index b25986ace1c..c4de9f9ff0e 100644 --- a/test/common/stream-router.js +++ b/test/common/stream-router.js @@ -208,6 +208,58 @@ describe('streamRouter', function() { var rs = streamRouter.router_(ARGS_WITHOUT_CALLBACK, originalMethod); rs.on('data', util.noop); // Trigger the underlying `_read` event. }); + + it('should not push more results if stream ends early', function(done) { + var results = ['a', 'b', 'c']; + + function originalMethod() { + var callback = [].slice.call(arguments).pop(); + setImmediate(function() { + callback(null, results); + }); + } + + var rs = streamRouter.router_(ARGS_WITHOUT_CALLBACK, originalMethod); + rs.on('data', function(result) { + if (result === 'b') { + // Pre-maturely end the stream. + this.end(); + } + + assert.notEqual(result, 'c'); + }); + rs.on('end', function() { + done(); + }); + }); + + it('should not get more results if stream ends early', function(done) { + var results = ['a', 'b', 'c']; + + var originalMethodCalledCount = 0; + + function originalMethod() { + originalMethodCalledCount++; + + var callback = [].slice.call(arguments).pop(); + + setImmediate(function() { + callback(null, results, {}); + }); + } + + var rs = streamRouter.router_(ARGS_WITHOUT_CALLBACK, originalMethod); + rs.on('data', function(result) { + if (result === 'b') { + // Pre-maturely end the stream. + this.end(); + } + }); + rs.on('end', function() { + assert.equal(originalMethodCalledCount, 1); + done(); + }); + }); }); describe('callback mode', function() {