Skip to content

Commit

Permalink
stream: pipe to a closed or destroyed stream is not allowed in pipeline
Browse files Browse the repository at this point in the history
PR-URL: nodejs#53241
Fixes: nodejs#52622
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
jakecastelli authored and bmeck committed Jun 22, 2024
1 parent 8fc8733 commit 03e23de
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 0 deletions.
6 changes: 6 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -2621,6 +2621,12 @@ or a pipeline ends non gracefully with no explicit error.
An attempt was made to call [`stream.push()`][] after a `null`(EOF) had been
pushed to the stream.

<a id="ERR_STREAM_UNABLE_TO_PIPE"></a>

### `ERR_STREAM_UNABLE_TO_PIPE`

An attempt was made to pipe to a closed or destroyed stream in a pipeline.

<a id="ERR_STREAM_UNSHIFT_AFTER_END_EVENT"></a>

### `ERR_STREAM_UNSHIFT_AFTER_END_EVENT`
Expand Down
1 change: 1 addition & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1716,6 +1716,7 @@ E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
E('ERR_STREAM_UNABLE_TO_PIPE', 'Connot pipe to a closed or destroyed stream', Error);
E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
'stream.unshift() after end event', Error);
E('ERR_STREAM_WRAP', 'Stream has StringDecoder set or is in objectMode', Error);
Expand Down
6 changes: 6 additions & 0 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const {
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED,
ERR_STREAM_PREMATURE_CLOSE,
ERR_STREAM_UNABLE_TO_PIPE,
},
} = require('internal/errors');

Expand Down Expand Up @@ -253,10 +254,15 @@ function pipelineImpl(streams, callback, opts) {
const stream = streams[i];
const reading = i < streams.length - 1;
const writing = i > 0;
const next = i + 1 < streams.length ? streams[i + 1] : null;
const end = reading || opts?.end !== false;
const isLastStream = i === streams.length - 1;

if (isNodeStream(stream)) {
if (next !== null && (next?.closed || next?.destroyed)) {
throw new ERR_STREAM_UNABLE_TO_PIPE();
}

if (end) {
const { destroy, cleanup } = destroyer(stream, reading, writing);
destroys.push(destroy);
Expand Down
13 changes: 13 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const http = require('http');
const { promisify } = require('util');
const net = require('net');
const tsp = require('timers/promises');
const tmpdir = require('../common/tmpdir');
const fs = require('fs');

{
let finished = false;
Expand Down Expand Up @@ -69,6 +71,17 @@ const tsp = require('timers/promises');
}, /ERR_INVALID_ARG_TYPE/);
}

tmpdir.refresh();
{
assert.rejects(async () => {
const read = fs.createReadStream(__filename);
const write = fs.createWriteStream(tmpdir.resolve('a'));
const close = promisify(write.close);
await close.call(write);
await pipelinep(read, write);
}, /ERR_STREAM_UNABLE_TO_PIPE/).then(common.mustCall());
}

{
const read = new Readable({
read() {}
Expand Down

0 comments on commit 03e23de

Please sign in to comment.