@@ -45,6 +45,7 @@ const { Buffer } = require('buffer');
45
45
const {
46
46
addAbortSignalNoValidate,
47
47
} = require ( 'internal/streams/add-abort-signal' ) ;
48
+ const eos = require ( 'internal/streams/end-of-stream' ) ;
48
49
49
50
let debug = require ( 'internal/util/debuglog' ) . debuglog ( 'stream' , ( fn ) => {
50
51
debug = fn ;
@@ -57,12 +58,14 @@ const {
57
58
} = require ( 'internal/streams/state' ) ;
58
59
59
60
const {
60
- ERR_INVALID_ARG_TYPE ,
61
- ERR_METHOD_NOT_IMPLEMENTED ,
62
- ERR_STREAM_PREMATURE_CLOSE ,
63
- ERR_STREAM_PUSH_AFTER_EOF ,
64
- ERR_STREAM_UNSHIFT_AFTER_END_EVENT ,
65
- } = require ( 'internal/errors' ) . codes ;
61
+ aggregateTwoErrors,
62
+ codes : {
63
+ ERR_INVALID_ARG_TYPE ,
64
+ ERR_METHOD_NOT_IMPLEMENTED ,
65
+ ERR_STREAM_PUSH_AFTER_EOF ,
66
+ ERR_STREAM_UNSHIFT_AFTER_END_EVENT ,
67
+ }
68
+ } = require ( 'internal/errors' ) ;
66
69
const { validateObject } = require ( 'internal/validators' ) ;
67
70
68
71
const kPaused = Symbol ( 'kPaused' ) ;
@@ -1090,12 +1093,6 @@ function streamToAsyncIterator(stream, options) {
1090
1093
async function * createAsyncIterator ( stream , options ) {
1091
1094
let callback = nop ;
1092
1095
1093
- const opts = {
1094
- destroyOnReturn : true ,
1095
- destroyOnError : true ,
1096
- ...options ,
1097
- } ;
1098
-
1099
1096
function next ( resolve ) {
1100
1097
if ( this === stream ) {
1101
1098
callback ( ) ;
@@ -1105,54 +1102,38 @@ async function* createAsyncIterator(stream, options) {
1105
1102
}
1106
1103
}
1107
1104
1108
- const state = stream . _readableState ;
1105
+ stream . on ( 'readable' , next ) ;
1106
+
1107
+ let error ;
1108
+ eos ( stream , { writable : false } , ( err ) => {
1109
+ error = err ? aggregateTwoErrors ( error , err ) : null ;
1110
+ callback ( ) ;
1111
+ callback = nop ;
1112
+ } ) ;
1109
1113
1110
- let error = state . errored ;
1111
- let errorEmitted = state . errorEmitted ;
1112
- let endEmitted = state . endEmitted ;
1113
- let closeEmitted = state . closeEmitted ;
1114
-
1115
- stream
1116
- . on ( 'readable' , next )
1117
- . on ( 'error' , function ( err ) {
1118
- error = err ;
1119
- errorEmitted = true ;
1120
- next . call ( this ) ;
1121
- } )
1122
- . on ( 'end' , function ( ) {
1123
- endEmitted = true ;
1124
- next . call ( this ) ;
1125
- } )
1126
- . on ( 'close' , function ( ) {
1127
- closeEmitted = true ;
1128
- next . call ( this ) ;
1129
- } ) ;
1130
-
1131
- let errorThrown = false ;
1132
1114
try {
1133
1115
while ( true ) {
1134
1116
const chunk = stream . destroyed ? null : stream . read ( ) ;
1135
1117
if ( chunk !== null ) {
1136
1118
yield chunk ;
1137
- } else if ( errorEmitted ) {
1119
+ } else if ( error ) {
1138
1120
throw error ;
1139
- } else if ( endEmitted ) {
1140
- break ;
1141
- } else if ( closeEmitted ) {
1142
- throw new ERR_STREAM_PREMATURE_CLOSE ( ) ;
1121
+ } else if ( error === null ) {
1122
+ return ;
1143
1123
} else {
1144
1124
await new Promise ( next ) ;
1145
1125
}
1146
1126
}
1147
1127
} catch ( err ) {
1148
- if ( opts . destroyOnError ) {
1149
- destroyImpl . destroyer ( stream , err ) ;
1150
- }
1151
- errorThrown = true ;
1152
- throw err ;
1128
+ error = aggregateTwoErrors ( error , err ) ;
1129
+ throw error ;
1153
1130
} finally {
1154
- if ( ! errorThrown && opts . destroyOnReturn ) {
1155
- if ( state . autoDestroy || ! endEmitted ) {
1131
+ if ( error ) {
1132
+ if ( options ?. destroyOnError !== false ) {
1133
+ destroyImpl . destroyer ( stream , error ) ;
1134
+ }
1135
+ } else if ( options ?. destroyOnReturn !== false ) {
1136
+ if ( error === undefined || stream . _readableState . autoDestroy ) {
1156
1137
destroyImpl . destroyer ( stream , null ) ;
1157
1138
}
1158
1139
}
0 commit comments