Skip to content

Commit 09d8c0c

Browse files
committed
stream: destroy readable on read error
PR-URL: #39342 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent a5dec3a commit 09d8c0c

File tree

4 files changed

+29
-85
lines changed

4 files changed

+29
-85
lines changed

doc/api/stream.md

-3
Original file line numberDiff line numberDiff line change
@@ -1525,9 +1525,6 @@ added: v16.3.0
15251525
* `destroyOnReturn` {boolean} When set to `false`, calling `return` on the
15261526
async iterator, or exiting a `for await...of` iteration using a `break`,
15271527
`return`, or `throw` will not destroy the stream. **Default:** `true`.
1528-
* `destroyOnError` {boolean} When set to `false`, if the stream emits an
1529-
error while it's being iterated, the iterator will not destroy the stream.
1530-
**Default:** `true`.
15311528
* Returns: {AsyncIterator} to consume the stream.
15321529

15331530
The iterator created by this method gives users the option to cancel the

lib/internal/streams/readable.js

+21-9
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,22 @@ Readable.prototype.read = function(n) {
486486
state.needReadable = true;
487487

488488
// Call internal read method
489-
this._read(state.highWaterMark);
489+
try {
490+
const result = this._read(state.highWaterMark);
491+
if (result != null) {
492+
const then = result.then;
493+
if (typeof then === 'function') {
494+
then.call(
495+
result,
496+
nop,
497+
function(err) {
498+
errorOrDestroy(this, err);
499+
});
500+
}
501+
}
502+
} catch (err) {
503+
errorOrDestroy(this, err);
504+
}
490505

491506
state.sync = false;
492507
// If _read pushed data synchronously, then `reading` will be false,
@@ -1131,14 +1146,11 @@ async function* createAsyncIterator(stream, options) {
11311146
error = aggregateTwoErrors(error, err);
11321147
throw error;
11331148
} finally {
1134-
if (error) {
1135-
if (options?.destroyOnError !== false) {
1136-
destroyImpl.destroyer(stream, error);
1137-
}
1138-
} else if (options?.destroyOnReturn !== false) {
1139-
if (error === undefined || stream._readableState.autoDestroy) {
1140-
destroyImpl.destroyer(stream, null);
1141-
}
1149+
if (
1150+
(error || options?.destroyOnReturn !== false) &&
1151+
(error === undefined || stream._readableState.autoDestroy)
1152+
) {
1153+
destroyImpl.destroyer(stream, null);
11421154
}
11431155
}
11441156
}

test/parallel/test-stream-readable-async-iterators.js

-60
Original file line numberDiff line numberDiff line change
@@ -750,22 +750,6 @@ async function tests() {
750750
})());
751751
}
752752

753-
function createErrorReadable() {
754-
const opts = { read() { throw new Error('inner'); } };
755-
return new Readable(opts);
756-
}
757-
758-
// Check default destroys on return
759-
(async function() {
760-
const readable = createReadable();
761-
for await (const chunk of readable.iterator()) {
762-
assert.strictEqual(chunk, 5);
763-
break;
764-
}
765-
766-
assert.ok(readable.destroyed);
767-
})().then(common.mustCall());
768-
769753
// Check explicit destroying on return
770754
(async function() {
771755
const readable = createReadable();
@@ -777,50 +761,6 @@ async function tests() {
777761
assert.ok(readable.destroyed);
778762
})().then(common.mustCall());
779763

780-
// Check default destroys on error
781-
(async function() {
782-
const readable = createErrorReadable();
783-
try {
784-
// eslint-disable-next-line no-unused-vars
785-
for await (const chunk of readable) { }
786-
assert.fail('should have thrown');
787-
} catch (err) {
788-
assert.strictEqual(err.message, 'inner');
789-
}
790-
791-
assert.ok(readable.destroyed);
792-
})().then(common.mustCall());
793-
794-
// Check explicit destroys on error
795-
(async function() {
796-
const readable = createErrorReadable();
797-
const opts = { destroyOnError: true, destroyOnReturn: false };
798-
try {
799-
// eslint-disable-next-line no-unused-vars
800-
for await (const chunk of readable.iterator(opts)) { }
801-
assert.fail('should have thrown');
802-
} catch (err) {
803-
assert.strictEqual(err.message, 'inner');
804-
}
805-
806-
assert.ok(readable.destroyed);
807-
})().then(common.mustCall());
808-
809-
// Check explicit non-destroy with return true
810-
(async function() {
811-
const readable = createErrorReadable();
812-
const opts = { destroyOnError: false, destroyOnReturn: true };
813-
try {
814-
// eslint-disable-next-line no-unused-vars
815-
for await (const chunk of readable.iterator(opts)) { }
816-
assert.fail('should have thrown');
817-
} catch (err) {
818-
assert.strictEqual(err.message, 'inner');
819-
}
820-
821-
assert.ok(!readable.destroyed);
822-
})().then(common.mustCall());
823-
824764
// Check explicit non-destroy with return true
825765
(async function() {
826766
const readable = createReadable();
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,13 @@
11
'use strict';
2-
require('../common');
3-
4-
const assert = require('assert');
2+
const common = require('../common');
53
const { Readable } = require('stream');
64

75
const readable = new Readable();
86

9-
assert.throws(
10-
() => {
11-
readable.read();
12-
},
13-
{
14-
code: 'ERR_METHOD_NOT_IMPLEMENTED',
15-
name: 'Error',
16-
message: 'The _read() method is not implemented'
17-
}
18-
);
7+
readable.read();
8+
readable.on('error', common.expectsError({
9+
code: 'ERR_METHOD_NOT_IMPLEMENTED',
10+
name: 'Error',
11+
message: 'The _read() method is not implemented'
12+
}));
13+
readable.on('close', common.mustCall());

0 commit comments

Comments
 (0)