@@ -27,7 +27,7 @@ const {
27
27
const kEmpty = Symbol ( 'kEmpty' ) ;
28
28
const kEof = Symbol ( 'kEof' ) ;
29
29
30
- async function * map ( fn , options ) {
30
+ function map ( fn , options ) {
31
31
if ( typeof fn !== 'function' ) {
32
32
throw new ERR_INVALID_ARG_TYPE (
33
33
'fn' , [ 'Function' , 'AsyncFunction' ] , fn ) ;
@@ -44,118 +44,120 @@ async function * map(fn, options) {
44
44
45
45
validateInteger ( concurrency , 'concurrency' , 1 ) ;
46
46
47
- const ac = new AbortController ( ) ;
48
- const stream = this ;
49
- const queue = [ ] ;
50
- const signal = ac . signal ;
51
- const signalOpt = { signal } ;
52
-
53
- const abort = ( ) => ac . abort ( ) ;
54
- if ( options ?. signal ?. aborted ) {
55
- abort ( ) ;
56
- }
57
-
58
- options ?. signal ?. addEventListener ( 'abort' , abort ) ;
59
-
60
- let next ;
61
- let resume ;
62
- let done = false ;
63
-
64
- function onDone ( ) {
65
- done = true ;
66
- }
47
+ return async function * map ( ) {
48
+ const ac = new AbortController ( ) ;
49
+ const stream = this ;
50
+ const queue = [ ] ;
51
+ const signal = ac . signal ;
52
+ const signalOpt = { signal } ;
67
53
68
- async function pump ( ) {
69
- try {
70
- for await ( let val of stream ) {
71
- if ( done ) {
72
- return ;
73
- }
54
+ const abort = ( ) => ac . abort ( ) ;
55
+ if ( options ?. signal ?. aborted ) {
56
+ abort ( ) ;
57
+ }
74
58
75
- if ( signal . aborted ) {
76
- throw new AbortError ( ) ;
77
- }
59
+ options ?. signal ?. addEventListener ( 'abort' , abort ) ;
78
60
79
- try {
80
- val = fn ( val , signalOpt ) ;
81
- } catch ( err ) {
82
- val = PromiseReject ( err ) ;
83
- }
61
+ let next ;
62
+ let resume ;
63
+ let done = false ;
84
64
85
- if ( val === kEmpty ) {
86
- continue ;
87
- }
65
+ function onDone ( ) {
66
+ done = true ;
67
+ }
88
68
89
- if ( typeof val ?. catch === 'function' ) {
90
- val . catch ( onDone ) ;
69
+ async function pump ( ) {
70
+ try {
71
+ for await ( let val of stream ) {
72
+ if ( done ) {
73
+ return ;
74
+ }
75
+
76
+ if ( signal . aborted ) {
77
+ throw new AbortError ( ) ;
78
+ }
79
+
80
+ try {
81
+ val = fn ( val , signalOpt ) ;
82
+ } catch ( err ) {
83
+ val = PromiseReject ( err ) ;
84
+ }
85
+
86
+ if ( val === kEmpty ) {
87
+ continue ;
88
+ }
89
+
90
+ if ( typeof val ?. catch === 'function' ) {
91
+ val . catch ( onDone ) ;
92
+ }
93
+
94
+ queue . push ( val ) ;
95
+ if ( next ) {
96
+ next ( ) ;
97
+ next = null ;
98
+ }
99
+
100
+ if ( ! done && queue . length && queue . length >= concurrency ) {
101
+ await new Promise ( ( resolve ) => {
102
+ resume = resolve ;
103
+ } ) ;
104
+ }
91
105
}
92
-
106
+ queue . push ( kEof ) ;
107
+ } catch ( err ) {
108
+ const val = PromiseReject ( err ) ;
109
+ PromisePrototypeCatch ( val , onDone ) ;
93
110
queue . push ( val ) ;
111
+ } finally {
112
+ done = true ;
94
113
if ( next ) {
95
114
next ( ) ;
96
115
next = null ;
97
116
}
98
-
99
- if ( ! done && queue . length && queue . length >= concurrency ) {
100
- await new Promise ( ( resolve ) => {
101
- resume = resolve ;
102
- } ) ;
103
- }
104
- }
105
- queue . push ( kEof ) ;
106
- } catch ( err ) {
107
- const val = PromiseReject ( err ) ;
108
- PromisePrototypeCatch ( val , onDone ) ;
109
- queue . push ( val ) ;
110
- } finally {
111
- done = true ;
112
- if ( next ) {
113
- next ( ) ;
114
- next = null ;
117
+ options ?. signal ?. removeEventListener ( 'abort' , abort ) ;
115
118
}
116
- options ?. signal ?. removeEventListener ( 'abort' , abort ) ;
117
119
}
118
- }
119
-
120
- pump ( ) ;
121
-
122
- try {
123
- while ( true ) {
124
- while ( queue . length > 0 ) {
125
- const val = await queue [ 0 ] ;
126
-
127
- if ( val === kEof ) {
128
- return ;
129
- }
130
120
131
- if ( signal . aborted ) {
132
- throw new AbortError ( ) ;
133
- }
121
+ pump ( ) ;
134
122
135
- if ( val !== kEmpty ) {
136
- yield val ;
123
+ try {
124
+ while ( true ) {
125
+ while ( queue . length > 0 ) {
126
+ const val = await queue [ 0 ] ;
127
+
128
+ if ( val === kEof ) {
129
+ return ;
130
+ }
131
+
132
+ if ( signal . aborted ) {
133
+ throw new AbortError ( ) ;
134
+ }
135
+
136
+ if ( val !== kEmpty ) {
137
+ yield val ;
138
+ }
139
+
140
+ queue . shift ( ) ;
141
+ if ( resume ) {
142
+ resume ( ) ;
143
+ resume = null ;
144
+ }
137
145
}
138
146
139
- queue . shift ( ) ;
140
- if ( resume ) {
141
- resume ( ) ;
142
- resume = null ;
143
- }
147
+ await new Promise ( ( resolve ) => {
148
+ next = resolve ;
149
+ } ) ;
144
150
}
151
+ } finally {
152
+ ac . abort ( ) ;
145
153
146
- await new Promise ( ( resolve ) => {
147
- next = resolve ;
148
- } ) ;
149
- }
150
- } finally {
151
- ac . abort ( ) ;
152
-
153
- done = true ;
154
- if ( resume ) {
155
- resume ( ) ;
156
- resume = null ;
154
+ done = true ;
155
+ if ( resume ) {
156
+ resume ( ) ;
157
+ resume = null ;
158
+ }
157
159
}
158
- }
160
+ } . call ( this ) ;
159
161
}
160
162
161
163
async function * asIndexedPairs ( options ) {
@@ -215,7 +217,7 @@ async function forEach(fn, options) {
215
217
for await ( const unused of this . map ( forEachFn , options ) ) ;
216
218
}
217
219
218
- async function * filter ( fn , options ) {
220
+ function filter ( fn , options ) {
219
221
if ( typeof fn !== 'function' ) {
220
222
throw new ERR_INVALID_ARG_TYPE (
221
223
'fn' , [ 'Function' , 'AsyncFunction' ] , fn ) ;
@@ -226,7 +228,7 @@ async function * filter(fn, options) {
226
228
}
227
229
return kEmpty ;
228
230
}
229
- yield * this . map ( filterFn , options ) ;
231
+ return this . map ( filterFn , options ) ;
230
232
}
231
233
232
234
async function toArray ( options ) {
@@ -243,10 +245,13 @@ async function toArray(options) {
243
245
return result ;
244
246
}
245
247
246
- async function * flatMap ( fn , options ) {
247
- for await ( const val of this . map ( fn , options ) ) {
248
- yield * val ;
249
- }
248
+ function flatMap ( fn , options ) {
249
+ const values = this . map ( fn , options ) ;
250
+ return async function * flatMap ( ) {
251
+ for await ( const val of values ) {
252
+ yield * val ;
253
+ }
254
+ } . call ( this ) ;
250
255
}
251
256
252
257
function toIntegerOrInfinity ( number ) {
0 commit comments