Skip to content

Commit

Permalink
stream-router: support premature stream ending
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Jun 18, 2015
1 parent 27fbcf9 commit abca6f1
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 24 deletions.
65 changes: 42 additions & 23 deletions lib/common/stream-router.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,35 +84,54 @@ streamRouter.router_ = function(args, originalMethod) {
var callback = args[args.length - 1];
var isStreamMode = !util.is(callback, 'function');

if (isStreamMode) {
var stream = streamEvents(through.obj());

var onResultSet = function(err, results, nextQuery) {
if (err) {
stream.emit('error', err);
stream.end();
return;
}
if (!isStreamMode) {
originalMethod.apply(null, args);
return;
}

results.forEach(function(result) {
var stream = streamEvents(through.obj());

// Results from the API are split apart for the user. If 50 results are
// returned, we emit 50 data events. While the user is consuming these, they
// might choose to end the stream early by calling ".end()". We keep track of
// this state to prevent pushing more results to the stream, ending it again,
// or making unnecessary API calls.
var streamEnded = false;
var _end = stream.end;
stream.end = function() {
streamEnded = true;
_end.apply(this, arguments);
};

function onResultSet(err, results, nextQuery) {
if (err) {
stream.emit('error', err);
stream.end();
return;
}

results.forEach(function(result) {
if (!streamEnded) {
stream.push(result);
});

if (nextQuery) {
originalMethod(nextQuery, onResultSet);
} else {
stream.end();
}
};

stream.once('reading', function() {
originalMethod.apply(null, args.concat(onResultSet));
});

return stream;
} else {
originalMethod.apply(null, args);
if (streamEnded) {
return;
}

if (nextQuery) {
originalMethod(nextQuery, onResultSet);
} else {
stream.end();
}
}

stream.once('reading', function() {
originalMethod.apply(null, args.concat(onResultSet));
});

return stream;
};

module.exports = streamRouter;
4 changes: 3 additions & 1 deletion system-test/search.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ describe('Search', function() {

search.getIndexes()
.on('error', done)
.on('data', function() { resultsMatched++; })
.on('data', function() {
resultsMatched++;
})
.on('end', function() {
assert(resultsMatched > 0);
done();
Expand Down
52 changes: 52 additions & 0 deletions test/common/stream-router.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,58 @@ describe('streamRouter', function() {
var rs = streamRouter.router_(ARGS_WITHOUT_CALLBACK, originalMethod);
rs.on('data', util.noop); // Trigger the underlying `_read` event.
});

it('should not push more results if stream ends early', function(done) {
var results = ['a', 'b', 'c'];

function originalMethod() {
var callback = [].slice.call(arguments).pop();
setImmediate(function() {
callback(null, results);
});
}

var rs = streamRouter.router_(ARGS_WITHOUT_CALLBACK, originalMethod);
rs.on('data', function(result) {
if (result === 'b') {
// Pre-maturely end the stream.
this.end();
}

assert.notEqual(result, 'c');
});
rs.on('end', function() {
done();
});
});

it('should not get more results if stream ends early', function(done) {
var results = ['a', 'b', 'c'];

var originalMethodCalledCount = 0;

function originalMethod() {
originalMethodCalledCount++;

var callback = [].slice.call(arguments).pop();

setImmediate(function() {
callback(null, results, {});
});
}

var rs = streamRouter.router_(ARGS_WITHOUT_CALLBACK, originalMethod);
rs.on('data', function(result) {
if (result === 'b') {
// Pre-maturely end the stream.
this.end();
}
});
rs.on('end', function() {
assert.equal(originalMethodCalledCount, 1);
done();
});
});
});

describe('callback mode', function() {
Expand Down

0 comments on commit abca6f1

Please sign in to comment.