Skip to content

Commit b2ae12d

Browse files
RaisinTennodejs-github-bot
authored andcommitted
stream: throw on premature close in Readable[AsyncIterator]
Fixes: #39086 Signed-off-by: Darshan Sen <raisinten@gmail.com> PR-URL: #39117 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
1 parent 3590b5d commit b2ae12d

File tree

2 files changed

+64
-19
lines changed

2 files changed

+64
-19
lines changed

lib/internal/streams/readable.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ const {
5858

5959
const {
6060
ERR_INVALID_ARG_TYPE,
61-
ERR_STREAM_PUSH_AFTER_EOF,
6261
ERR_METHOD_NOT_IMPLEMENTED,
63-
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
62+
ERR_STREAM_PREMATURE_CLOSE,
63+
ERR_STREAM_PUSH_AFTER_EOF,
64+
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
6465
} = require('internal/errors').codes;
6566
const { validateObject } = require('internal/validators');
6667

@@ -1138,7 +1139,7 @@ async function* createAsyncIterator(stream, options) {
11381139
} else if (endEmitted) {
11391140
break;
11401141
} else if (closeEmitted) {
1141-
break;
1142+
throw new ERR_STREAM_PREMATURE_CLOSE();
11421143
} else {
11431144
await new Promise(next);
11441145
}
@@ -1152,7 +1153,6 @@ async function* createAsyncIterator(stream, options) {
11521153
} finally {
11531154
if (!errorThrown && opts.destroyOnReturn) {
11541155
if (state.autoDestroy || !endEmitted) {
1155-
// TODO(ronag): ERR_PREMATURE_CLOSE?
11561156
destroyImpl.destroyer(stream, null);
11571157
}
11581158
}

test/parallel/test-stream-readable-async-iterators.js

+60-15
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const {
1010
} = require('stream');
1111
const assert = require('assert');
1212
const http = require('http');
13+
const fs = require('fs');
1314

1415
async function tests() {
1516
{
@@ -338,11 +339,17 @@ async function tests() {
338339
process.nextTick(async () => {
339340
readable.on('close', common.mustNotCall());
340341
let received = 0;
341-
for await (const k of readable) {
342-
// Just make linting pass. This should never run.
343-
assert.strictEqual(k, 'hello');
344-
received++;
342+
let err = null;
343+
try {
344+
for await (const k of readable) {
345+
// Just make linting pass. This should never run.
346+
assert.strictEqual(k, 'hello');
347+
received++;
348+
}
349+
} catch (_err) {
350+
err = _err;
345351
}
352+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
346353
assert.strictEqual(received, 0);
347354
});
348355
}
@@ -412,8 +419,13 @@ async function tests() {
412419

413420
readable.destroy();
414421

415-
const { done } = await readable[Symbol.asyncIterator]().next();
416-
assert.strictEqual(done, true);
422+
const it = await readable[Symbol.asyncIterator]();
423+
const next = it.next();
424+
next
425+
.then(common.mustNotCall())
426+
.catch(common.mustCall((err) => {
427+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
428+
}));
417429
}
418430

419431
{
@@ -458,7 +470,7 @@ async function tests() {
458470
}
459471

460472
{
461-
console.log('destroy mid-stream does not error');
473+
console.log('destroy mid-stream errors');
462474
const r = new Readable({
463475
objectMode: true,
464476
read() {
@@ -467,10 +479,16 @@ async function tests() {
467479
}
468480
});
469481

470-
// eslint-disable-next-line no-unused-vars
471-
for await (const a of r) {
472-
r.destroy(null);
482+
let err = null;
483+
try {
484+
// eslint-disable-next-line no-unused-vars
485+
for await (const a of r) {
486+
r.destroy(null);
487+
}
488+
} catch (_err) {
489+
err = _err;
473490
}
491+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
474492
}
475493

476494
{
@@ -514,7 +532,7 @@ async function tests() {
514532
}
515533

516534
{
517-
console.log('all next promises must be resolved on destroy');
535+
console.log('all next promises must be rejected on destroy');
518536
const r = new Readable({
519537
objectMode: true,
520538
read() {
@@ -525,7 +543,11 @@ async function tests() {
525543
const c = b.next();
526544
const d = b.next();
527545
r.destroy();
528-
assert.deepStrictEqual(await c, { done: true, value: undefined });
546+
c
547+
.then(common.mustNotCall())
548+
.catch(common.mustCall((err) => {
549+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
550+
}));
529551
assert.deepStrictEqual(await d, { done: true, value: undefined });
530552
}
531553

@@ -675,7 +697,7 @@ async function tests() {
675697
}
676698

677699
{
678-
// AsyncIterator should finish correctly if destroyed.
700+
// AsyncIterator should not finish correctly if destroyed.
679701

680702
const r = new Readable({
681703
objectMode: true,
@@ -688,11 +710,34 @@ async function tests() {
688710
const it = r[Symbol.asyncIterator]();
689711
const next = it.next();
690712
next
691-
.then(common.mustCall(({ done }) => assert.strictEqual(done, true)))
692-
.catch(common.mustNotCall());
713+
.then(common.mustNotCall())
714+
.catch(common.mustCall((err) => {
715+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
716+
}));
693717
});
694718
}
695719

720+
{
721+
// AsyncIterator should throw if prematurely closed
722+
// before end has been emitted.
723+
(async function() {
724+
const readable = fs.createReadStream(__filename);
725+
726+
try {
727+
// eslint-disable-next-line no-unused-vars
728+
for await (const chunk of readable) {
729+
readable.close();
730+
}
731+
732+
assert.fail('should have thrown');
733+
} catch (err) {
734+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
735+
}
736+
737+
assert.ok(readable.destroyed);
738+
})().then(common.mustCall());
739+
}
740+
696741
// AsyncIterator non-destroying iterator
697742
{
698743
function createReadable() {

0 commit comments

Comments
 (0)