Skip to content

Commit ce00381

Browse files
committed
stream: use finished for async iteration
PR-URL: #39282 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent b2fa795 commit ce00381

File tree

3 files changed

+30
-49
lines changed

3 files changed

+30
-49
lines changed

lib/internal/errors.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ const maybeOverridePrepareStackTrace = (globalThis, error, trace) => {
150150
};
151151

152152
const aggregateTwoErrors = hideStackFrames((innerError, outerError) => {
153-
if (innerError && outerError) {
153+
if (innerError && outerError && innerError !== outerError) {
154154
if (ArrayIsArray(outerError.errors)) {
155155
// If `outerError` is already an `AggregateError`.
156156
ArrayPrototypePush(outerError.errors, innerError);

lib/internal/streams/readable.js

+28-47
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const { Buffer } = require('buffer');
4545
const {
4646
addAbortSignalNoValidate,
4747
} = require('internal/streams/add-abort-signal');
48+
const eos = require('internal/streams/end-of-stream');
4849

4950
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
5051
debug = fn;
@@ -57,12 +58,14 @@ const {
5758
} = require('internal/streams/state');
5859

5960
const {
60-
ERR_INVALID_ARG_TYPE,
61-
ERR_METHOD_NOT_IMPLEMENTED,
62-
ERR_STREAM_PREMATURE_CLOSE,
63-
ERR_STREAM_PUSH_AFTER_EOF,
64-
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
65-
} = require('internal/errors').codes;
61+
aggregateTwoErrors,
62+
codes: {
63+
ERR_INVALID_ARG_TYPE,
64+
ERR_METHOD_NOT_IMPLEMENTED,
65+
ERR_STREAM_PUSH_AFTER_EOF,
66+
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
67+
}
68+
} = require('internal/errors');
6669
const { validateObject } = require('internal/validators');
6770

6871
const kPaused = Symbol('kPaused');
@@ -1090,12 +1093,6 @@ function streamToAsyncIterator(stream, options) {
10901093
async function* createAsyncIterator(stream, options) {
10911094
let callback = nop;
10921095

1093-
const opts = {
1094-
destroyOnReturn: true,
1095-
destroyOnError: true,
1096-
...options,
1097-
};
1098-
10991096
function next(resolve) {
11001097
if (this === stream) {
11011098
callback();
@@ -1105,54 +1102,38 @@ async function* createAsyncIterator(stream, options) {
11051102
}
11061103
}
11071104

1108-
const state = stream._readableState;
1105+
stream.on('readable', next);
1106+
1107+
let error;
1108+
eos(stream, { writable: false }, (err) => {
1109+
error = err ? aggregateTwoErrors(error, err) : null;
1110+
callback();
1111+
callback = nop;
1112+
});
11091113

1110-
let error = state.errored;
1111-
let errorEmitted = state.errorEmitted;
1112-
let endEmitted = state.endEmitted;
1113-
let closeEmitted = state.closeEmitted;
1114-
1115-
stream
1116-
.on('readable', next)
1117-
.on('error', function(err) {
1118-
error = err;
1119-
errorEmitted = true;
1120-
next.call(this);
1121-
})
1122-
.on('end', function() {
1123-
endEmitted = true;
1124-
next.call(this);
1125-
})
1126-
.on('close', function() {
1127-
closeEmitted = true;
1128-
next.call(this);
1129-
});
1130-
1131-
let errorThrown = false;
11321114
try {
11331115
while (true) {
11341116
const chunk = stream.destroyed ? null : stream.read();
11351117
if (chunk !== null) {
11361118
yield chunk;
1137-
} else if (errorEmitted) {
1119+
} else if (error) {
11381120
throw error;
1139-
} else if (endEmitted) {
1140-
break;
1141-
} else if (closeEmitted) {
1142-
throw new ERR_STREAM_PREMATURE_CLOSE();
1121+
} else if (error === null) {
1122+
return;
11431123
} else {
11441124
await new Promise(next);
11451125
}
11461126
}
11471127
} catch (err) {
1148-
if (opts.destroyOnError) {
1149-
destroyImpl.destroyer(stream, err);
1150-
}
1151-
errorThrown = true;
1152-
throw err;
1128+
error = aggregateTwoErrors(error, err);
1129+
throw error;
11531130
} finally {
1154-
if (!errorThrown && opts.destroyOnReturn) {
1155-
if (state.autoDestroy || !endEmitted) {
1131+
if (error) {
1132+
if (options?.destroyOnError !== false) {
1133+
destroyImpl.destroyer(stream, error);
1134+
}
1135+
} else if (options?.destroyOnReturn !== false) {
1136+
if (error === undefined || stream._readableState.autoDestroy) {
11561137
destroyImpl.destroyer(stream, null);
11571138
}
11581139
}

test/parallel/test-stream-readable-async-iterators.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ async function tests() {
205205
const iterator = readable[Symbol.asyncIterator]();
206206

207207
const err = new Error('kaboom');
208-
readable.destroy(new Error('kaboom'));
208+
readable.destroy(err);
209209
await assert.rejects(iterator.next.bind(iterator), err);
210210
}
211211

0 commit comments

Comments
 (0)