Skip to content

Streams: forward errors on pipe #1521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion doc/api/stream.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ readable.isPaused() // === false
* `destination` {[Writable][] Stream} The destination for writing data
* `options` {Object} Pipe options
* `end` {Boolean} End the writer when the reader ends. Default = `true`
* `forwardErrors` If the stream should pass errors down the pipe instead of
throwing them. Default = `false`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might reword this:

`forwardErrors` {Boolean} If errors on the source stream should be re-emitted as errors on the
destination stream. Defaults to `false`.


This method pulls all the data out of a readable stream, and writes it
to the supplied destination, automatically managing the flow so that
Expand Down Expand Up @@ -1315,7 +1317,7 @@ for examples and testing, but there are occasionally use cases where
it can come in handy as a building block for novel sorts of streams.


## Simplified Constructor API
## Simplified Constructor API

<!--type=misc-->

Expand Down
21 changes: 20 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ Readable.prototype._read = function(n) {
Readable.prototype.pipe = function(dest, pipeOpts) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add this option to Stream.prototype.pipe as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably I forgot about that one

var src = this;
var state = this._readableState;
pipeOpts = pipeOpts || {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we care that in a typical use-case this would create an arbitrary object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also avoid an unnecessary conversion of undefined to a Boolean, I doubt it makes much difference


switch (state.pipesCount) {
case 0:
Expand All @@ -462,7 +463,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
state.pipesCount += 1;
debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);

var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
var doEnd = (pipeOpts.end !== false) &&
dest !== process.stdout &&
dest !== process.stderr;

Expand Down Expand Up @@ -492,6 +493,12 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
var ondrain = pipeOnDrain(src);
dest.on('drain', ondrain);

var forwardErrors = Boolean(pipeOpts.forwardErrors);

if (forwardErrors) {
src.on('error', onsrcerror);
}

function cleanup() {
debug('cleanup');
// cleanup event handlers once the pipe is broken
Expand All @@ -504,6 +511,10 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
src.removeListener('end', cleanup);
src.removeListener('data', ondata);

if (forwardErrors) {
src.removeListener('error', onsrcerror);
}

// if the reader is waiting for a drain event from this
// specific writer, then it would cause it to never start
// flowing again.
Expand All @@ -526,6 +537,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
}
}

// if src has an error and error forwarding is turned on
// cause the error to be emited in dest and do suppress the
// throwing behavior
function onsrcerror(er) {
debug('onsrcerror', er);
dest.emit('error', er);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we care if this creates an infinite loop? a.pipe(b, {forwardErrors: true}).pipe(a, {forwardErrors: true})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should see how @domenic deals with this in whatwg streams

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An object cannot be both readable and writable so the problem is avoided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the writable half could forward to the readable half

On Sat, Apr 25, 2015, 8:49 PM Domenic Denicola notifications@github.com
wrote:

In lib/_stream_readable.js
#1521 (comment):

@@ -526,6 +537,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
}
}

  • // if src has an error and error forwarding is turned on
  • // cause the error to be emited in dest and do suppress the
  • // throwing behavior
  • function onsrcerror(er) {
  • debug('onsrcerror', er);
  • dest.emit('error', er);

An object cannot be both readable and writable so the problem is avoided.


Reply to this email directly or view it on GitHub
https://github.com/iojs/io.js/pull/1521/files#r29105716.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's fine. It means e.g. an echo server

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could just change it from an on to a once

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait @chrisdickinson that won't be a problem because on an error the stream is unpiped so it will only ever pass one error through

}

// if the dest has an error, then stop piping into it.
// however, don't suppress the throwing behavior for this.
function onerror(er) {
Expand Down
18 changes: 17 additions & 1 deletion lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ function Stream() {

Stream.prototype.pipe = function(dest, options) {
var source = this;
options = options || {};

function ondata(chunk) {
if (dest.writable) {
Expand All @@ -47,7 +48,7 @@ Stream.prototype.pipe = function(dest, options) {

// If the 'end' option is not supplied, dest.end() will be called when
// source gets the 'end' or 'close' events. Only dest.end() once.
if (!dest._isStdio && (!options || options.end !== false)) {
if (!dest._isStdio && (options.end !== false)) {
source.on('end', onend);
source.on('close', onclose);
}
Expand All @@ -60,6 +61,18 @@ Stream.prototype.pipe = function(dest, options) {
dest.end();
}

// if src has an error and error forwarding is turned on
// cause the error to be emited in dest and do suppress the
// throwing behavior
function onsrcerror(er) {
dest.emit('error', er);
}

var forwardErrors = Boolean(options.forwardErrors);

if (forwardErrors) {
source.on('error', onsrcerror);
}

function onclose() {
if (didOnEnd) return;
Expand Down Expand Up @@ -94,6 +107,9 @@ Stream.prototype.pipe = function(dest, options) {
source.removeListener('close', cleanup);

dest.removeListener('close', cleanup);
if (forwardErrors) {
source.removeListener('error', onsrcerror);
}
}

source.on('end', cleanup);
Expand Down
58 changes: 58 additions & 0 deletions test/parallel/test-stream-pipe-error-forwarding.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
var common = require('../common');
var stream = require('stream');
var assert = require('assert');

(function errorsAreForwardes() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a typo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

var writable = new stream.Writable({
write: function (chunk, _, cb) {
process.nextTick(cb);
}
});

var called = 0;
writable.on('error', function (){
called++;
});
writable.on('unpipe', function () {
process.nextTick(function () {
assert.equal(called, 1);
});
});

var readable = new stream.Readable({
read: function () {
this.emit('error');
}
});
readable.pipe(writable, {
forwardErrors: true
})
}());


(function CircularErrorsAreForwardes() {
var passthrough1 = new stream.PassThrough();
var passthrough2 = new stream.PassThrough();
var called = 0;
passthrough2.on('error', function (){
called++;
});
passthrough2.on('unpipe', function () {
process.nextTick(function () {
assert.equal(called, 1);
});
});

var readable = new stream.Readable({
read: function () {
this.emit('error');
}
});
readable.pipe(passthrough1, {
forwardErrors: true
}).pipe(passthrough2, {
forwardErrors: true
}).pipe(passthrough1, {
forwardErrors: true
});
}())