Skip to content

Commit 4b63439

Browse files
iMosesdanielleadams
authored andcommitted
stream: use synchronous error validation on iteration helpers
is no longer a generator function, instead it returns a called generator so that validation can be synchronous and not wait for the first iteration Fixes: #41648 PR-URL: #41652 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
1 parent 25109a6 commit 4b63439

File tree

4 files changed

+119
-141
lines changed

4 files changed

+119
-141
lines changed

lib/internal/streams/operators.js

Lines changed: 104 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ const {
2727
const kEmpty = Symbol('kEmpty');
2828
const kEof = Symbol('kEof');
2929

30-
async function * map(fn, options) {
30+
function map(fn, options) {
3131
if (typeof fn !== 'function') {
3232
throw new ERR_INVALID_ARG_TYPE(
3333
'fn', ['Function', 'AsyncFunction'], fn);
@@ -44,118 +44,120 @@ async function * map(fn, options) {
4444

4545
validateInteger(concurrency, 'concurrency', 1);
4646

47-
const ac = new AbortController();
48-
const stream = this;
49-
const queue = [];
50-
const signal = ac.signal;
51-
const signalOpt = { signal };
52-
53-
const abort = () => ac.abort();
54-
if (options?.signal?.aborted) {
55-
abort();
56-
}
57-
58-
options?.signal?.addEventListener('abort', abort);
59-
60-
let next;
61-
let resume;
62-
let done = false;
63-
64-
function onDone() {
65-
done = true;
66-
}
47+
return async function* map() {
48+
const ac = new AbortController();
49+
const stream = this;
50+
const queue = [];
51+
const signal = ac.signal;
52+
const signalOpt = { signal };
6753

68-
async function pump() {
69-
try {
70-
for await (let val of stream) {
71-
if (done) {
72-
return;
73-
}
54+
const abort = () => ac.abort();
55+
if (options?.signal?.aborted) {
56+
abort();
57+
}
7458

75-
if (signal.aborted) {
76-
throw new AbortError();
77-
}
59+
options?.signal?.addEventListener('abort', abort);
7860

79-
try {
80-
val = fn(val, signalOpt);
81-
} catch (err) {
82-
val = PromiseReject(err);
83-
}
61+
let next;
62+
let resume;
63+
let done = false;
8464

85-
if (val === kEmpty) {
86-
continue;
87-
}
65+
function onDone() {
66+
done = true;
67+
}
8868

89-
if (typeof val?.catch === 'function') {
90-
val.catch(onDone);
69+
async function pump() {
70+
try {
71+
for await (let val of stream) {
72+
if (done) {
73+
return;
74+
}
75+
76+
if (signal.aborted) {
77+
throw new AbortError();
78+
}
79+
80+
try {
81+
val = fn(val, signalOpt);
82+
} catch (err) {
83+
val = PromiseReject(err);
84+
}
85+
86+
if (val === kEmpty) {
87+
continue;
88+
}
89+
90+
if (typeof val?.catch === 'function') {
91+
val.catch(onDone);
92+
}
93+
94+
queue.push(val);
95+
if (next) {
96+
next();
97+
next = null;
98+
}
99+
100+
if (!done && queue.length && queue.length >= concurrency) {
101+
await new Promise((resolve) => {
102+
resume = resolve;
103+
});
104+
}
91105
}
92-
106+
queue.push(kEof);
107+
} catch (err) {
108+
const val = PromiseReject(err);
109+
PromisePrototypeCatch(val, onDone);
93110
queue.push(val);
111+
} finally {
112+
done = true;
94113
if (next) {
95114
next();
96115
next = null;
97116
}
98-
99-
if (!done && queue.length && queue.length >= concurrency) {
100-
await new Promise((resolve) => {
101-
resume = resolve;
102-
});
103-
}
104-
}
105-
queue.push(kEof);
106-
} catch (err) {
107-
const val = PromiseReject(err);
108-
PromisePrototypeCatch(val, onDone);
109-
queue.push(val);
110-
} finally {
111-
done = true;
112-
if (next) {
113-
next();
114-
next = null;
117+
options?.signal?.removeEventListener('abort', abort);
115118
}
116-
options?.signal?.removeEventListener('abort', abort);
117119
}
118-
}
119-
120-
pump();
121-
122-
try {
123-
while (true) {
124-
while (queue.length > 0) {
125-
const val = await queue[0];
126-
127-
if (val === kEof) {
128-
return;
129-
}
130120

131-
if (signal.aborted) {
132-
throw new AbortError();
133-
}
121+
pump();
134122

135-
if (val !== kEmpty) {
136-
yield val;
123+
try {
124+
while (true) {
125+
while (queue.length > 0) {
126+
const val = await queue[0];
127+
128+
if (val === kEof) {
129+
return;
130+
}
131+
132+
if (signal.aborted) {
133+
throw new AbortError();
134+
}
135+
136+
if (val !== kEmpty) {
137+
yield val;
138+
}
139+
140+
queue.shift();
141+
if (resume) {
142+
resume();
143+
resume = null;
144+
}
137145
}
138146

139-
queue.shift();
140-
if (resume) {
141-
resume();
142-
resume = null;
143-
}
147+
await new Promise((resolve) => {
148+
next = resolve;
149+
});
144150
}
151+
} finally {
152+
ac.abort();
145153

146-
await new Promise((resolve) => {
147-
next = resolve;
148-
});
149-
}
150-
} finally {
151-
ac.abort();
152-
153-
done = true;
154-
if (resume) {
155-
resume();
156-
resume = null;
154+
done = true;
155+
if (resume) {
156+
resume();
157+
resume = null;
158+
}
157159
}
158-
}
160+
}.call(this);
159161
}
160162

161163
async function* asIndexedPairs(options) {
@@ -215,7 +217,7 @@ async function forEach(fn, options) {
215217
for await (const unused of this.map(forEachFn, options));
216218
}
217219

218-
async function * filter(fn, options) {
220+
function filter(fn, options) {
219221
if (typeof fn !== 'function') {
220222
throw new ERR_INVALID_ARG_TYPE(
221223
'fn', ['Function', 'AsyncFunction'], fn);
@@ -226,7 +228,7 @@ async function * filter(fn, options) {
226228
}
227229
return kEmpty;
228230
}
229-
yield* this.map(filterFn, options);
231+
return this.map(filterFn, options);
230232
}
231233

232234
async function toArray(options) {
@@ -243,10 +245,13 @@ async function toArray(options) {
243245
return result;
244246
}
245247

246-
async function* flatMap(fn, options) {
247-
for await (const val of this.map(fn, options)) {
248-
yield* val;
249-
}
248+
function flatMap(fn, options) {
249+
const values = this.map(fn, options);
250+
return async function* flatMap() {
251+
for await (const val of values) {
252+
yield* val;
253+
}
254+
}.call(this);
250255
}
251256

252257
function toIntegerOrInfinity(number) {

test/parallel/test-stream-filter.js

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,20 +87,11 @@ const { setTimeout } = require('timers/promises');
8787

8888
{
8989
// Error cases
90-
assert.rejects(async () => {
91-
// eslint-disable-next-line no-unused-vars
92-
for await (const unused of Readable.from([1]).filter(1));
93-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
94-
assert.rejects(async () => {
95-
// eslint-disable-next-line no-unused-vars
96-
for await (const _ of Readable.from([1]).filter((x) => x, {
97-
concurrency: 'Foo'
98-
}));
99-
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
100-
assert.rejects(async () => {
101-
// eslint-disable-next-line no-unused-vars
102-
for await (const _ of Readable.from([1]).filter((x) => x, 1));
103-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
90+
assert.throws(() => Readable.from([1]).filter(1), /ERR_INVALID_ARG_TYPE/);
91+
assert.throws(() => Readable.from([1]).filter((x) => x, {
92+
concurrency: 'Foo'
93+
}), /ERR_OUT_OF_RANGE/);
94+
assert.throws(() => Readable.from([1]).filter((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
10495
}
10596
{
10697
// Test result is a Readable

test/parallel/test-stream-flatMap.js

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,11 @@ function oneTo5() {
109109

110110
{
111111
// Error cases
112-
assert.rejects(async () => {
113-
// eslint-disable-next-line no-unused-vars
114-
for await (const unused of Readable.from([1]).flatMap(1));
115-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
116-
assert.rejects(async () => {
117-
// eslint-disable-next-line no-unused-vars
118-
for await (const _ of Readable.from([1]).flatMap((x) => x, {
119-
concurrency: 'Foo'
120-
}));
121-
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
122-
assert.rejects(async () => {
123-
// eslint-disable-next-line no-unused-vars
124-
for await (const _ of Readable.from([1]).flatMap((x) => x, 1));
125-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
112+
assert.throws(() => Readable.from([1]).flatMap(1), /ERR_INVALID_ARG_TYPE/);
113+
assert.throws(() => Readable.from([1]).flatMap((x) => x, {
114+
concurrency: 'Foo'
115+
}), /ERR_OUT_OF_RANGE/);
116+
assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
126117
}
127118
{
128119
// Test result is a Readable

test/parallel/test-stream-map.js

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,11 @@ const { setTimeout } = require('timers/promises');
8686

8787
{
8888
// Error cases
89-
assert.rejects(async () => {
90-
// eslint-disable-next-line no-unused-vars
91-
for await (const unused of Readable.from([1]).map(1));
92-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
93-
assert.rejects(async () => {
94-
// eslint-disable-next-line no-unused-vars
95-
for await (const _ of Readable.from([1]).map((x) => x, {
96-
concurrency: 'Foo'
97-
}));
98-
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
99-
assert.rejects(async () => {
100-
// eslint-disable-next-line no-unused-vars
101-
for await (const _ of Readable.from([1]).map((x) => x, 1));
102-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
89+
assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/);
90+
assert.throws(() => Readable.from([1]).map((x) => x, {
91+
concurrency: 'Foo'
92+
}), /ERR_OUT_OF_RANGE/);
93+
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
10394
}
10495
{
10596
// Test result is a Readable

0 commit comments

Comments
 (0)