Skip to content

Commit c20e28e

Browse files
ronagdanielleadams
authored andcommitted
stream: fix pipeline pump
Refs: #39005 PR-URL: #39006 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 4c6193f commit c20e28e

File tree

2 files changed

+91
-14
lines changed

2 files changed

+91
-14
lines changed

lib/internal/streams/pipeline.js

+58-14
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
const {
77
ArrayIsArray,
8+
Promise,
89
SymbolAsyncIterator,
910
} = primordials;
1011

@@ -13,21 +14,27 @@ const eos = require('internal/streams/end-of-stream');
1314
const { once } = require('internal/util');
1415
const destroyImpl = require('internal/streams/destroy');
1516
const {
16-
ERR_INVALID_ARG_TYPE,
17-
ERR_INVALID_RETURN_VALUE,
18-
ERR_MISSING_ARGS,
19-
ERR_STREAM_DESTROYED
20-
} = require('internal/errors').codes;
17+
aggregateTwoErrors,
18+
codes: {
19+
ERR_INVALID_ARG_TYPE,
20+
ERR_INVALID_RETURN_VALUE,
21+
ERR_MISSING_ARGS,
22+
ERR_STREAM_DESTROYED,
23+
ERR_STREAM_PREMATURE_CLOSE,
24+
},
25+
} = require('internal/errors');
2126

2227
const { validateCallback } = require('internal/validators');
2328

29+
function noop() {}
30+
2431
const {
2532
isIterable,
2633
isReadable,
2734
isStream,
2835
} = require('internal/streams/utils');
36+
const assert = require('internal/assert');
2937

30-
let EE;
3138
let PassThrough;
3239
let Readable;
3340

@@ -101,25 +108,62 @@ async function* fromReadable(val) {
101108
}
102109

103110
async function pump(iterable, writable, finish) {
104-
if (!EE) {
105-
EE = require('events');
106-
}
107111
let error;
112+
let callback = noop;
113+
const resume = (err) => {
114+
error = aggregateTwoErrors(error, err);
115+
const _callback = callback;
116+
callback = noop;
117+
_callback();
118+
};
119+
const onClose = () => {
120+
resume(new ERR_STREAM_PREMATURE_CLOSE());
121+
};
122+
123+
const waitForDrain = () => new Promise((resolve) => {
124+
assert(callback === noop);
125+
if (error || writable.destroyed) {
126+
resolve();
127+
} else {
128+
callback = resolve;
129+
}
130+
});
131+
132+
writable
133+
.on('drain', resume)
134+
.on('error', resume)
135+
.on('close', onClose);
136+
108137
try {
109-
if (writable.writableNeedDrain === true) {
110-
await EE.once(writable, 'drain');
138+
if (writable.writableNeedDrain) {
139+
await waitForDrain();
140+
}
141+
142+
if (error) {
143+
return;
111144
}
112145

113146
for await (const chunk of iterable) {
114147
if (!writable.write(chunk)) {
115-
if (writable.destroyed) return;
116-
await EE.once(writable, 'drain');
148+
await waitForDrain();
149+
}
150+
if (error) {
151+
return;
117152
}
118153
}
154+
155+
if (error) {
156+
return;
157+
}
158+
119159
writable.end();
120160
} catch (err) {
121-
error = err;
161+
error = aggregateTwoErrors(error, err);
122162
} finally {
163+
writable
164+
.off('drain', resume)
165+
.off('error', resume)
166+
.off('close', onClose);
123167
finish(error);
124168
}
125169
}

test/parallel/test-stream-pipeline.js

+33
Original file line numberDiff line numberDiff line change
@@ -1387,3 +1387,36 @@ const net = require('net');
13871387
assert.strictEqual(res, content);
13881388
}));
13891389
}
1390+
1391+
{
1392+
const writableLike = new Stream();
1393+
writableLike.writableNeedDrain = true;
1394+
1395+
pipeline(
1396+
async function *() {},
1397+
writableLike,
1398+
common.mustCall((err) => {
1399+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
1400+
})
1401+
);
1402+
1403+
writableLike.emit('close');
1404+
}
1405+
1406+
{
1407+
const writableLike = new Stream();
1408+
writableLike.write = () => false;
1409+
1410+
pipeline(
1411+
async function *() {
1412+
yield null;
1413+
yield null;
1414+
},
1415+
writableLike,
1416+
common.mustCall((err) => {
1417+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
1418+
})
1419+
);
1420+
1421+
writableLike.emit('close');
1422+
}

0 commit comments

Comments
 (0)