Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: use synchronous error validation & validate abort signal option #41777

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 58 additions & 10 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ const {
},
AbortError,
} = require('internal/errors');
const { validateInteger } = require('internal/validators');
const {
validateAbortSignal,
validateInteger,
} = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');

Expand All @@ -33,10 +36,12 @@ function map(fn, options) {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}

if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

let concurrency = 1;
if (options?.concurrency != null) {
Expand Down Expand Up @@ -161,17 +166,33 @@ function map(fn, options) {
}.call(this);
}

async function* asIndexedPairs(options) {
let index = 0;
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
}
yield [index++, val];
function asIndexedPairs(options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

return async function* asIndexedPairs() {
let index = 0;
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
}
yield [index++, val];
}
}.call(this);
}

async function some(fn, options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}
Comment on lines +192 to +194
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified to validateAbortSignal(options?.signal, 'options.signal') (also in other places)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good to know 👍


// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
// Note that some does short circuit but also closes the iterator if it does
const ac = new AbortController();
Expand Down Expand Up @@ -246,6 +267,13 @@ async function reduce(reducer, initialValue, options) {
throw new ERR_INVALID_ARG_TYPE(
'reducer', ['Function', 'AsyncFunction'], reducer);
}
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

let hasInitialValue = arguments.length > 1;
if (options?.signal?.aborted) {
const err = new AbortError(undefined, { cause: options.signal.reason });
Expand Down Expand Up @@ -283,6 +311,13 @@ async function reduce(reducer, initialValue, options) {
}

async function toArray(options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

const result = [];
for await (const val of this) {
if (options?.signal?.aborted) {
Expand Down Expand Up @@ -316,6 +351,13 @@ function toIntegerOrInfinity(number) {
}

function drop(number, options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

number = toIntegerOrInfinity(number);
return async function* drop() {
if (options?.signal?.aborted) {
Expand All @@ -332,8 +374,14 @@ function drop(number, options) {
}.call(this);
}


function take(number, options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

number = toIntegerOrInfinity(number);
return async function* take() {
if (options?.signal?.aborted) {
Expand Down
8 changes: 7 additions & 1 deletion test/parallel/test-stream-asIndexedPairs.mjs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import '../common/index.mjs';
import { Readable } from 'stream';
import { deepStrictEqual, rejects } from 'assert';
import { deepStrictEqual, rejects, throws } from 'assert';

{
// asIndexedPairs with a synchronous stream
Expand Down Expand Up @@ -45,3 +45,9 @@ import { deepStrictEqual, rejects } from 'assert';
await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
}, /AbortError/);
}

{
// Error cases
throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/);
throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/);
}
6 changes: 6 additions & 0 deletions test/parallel/test-stream-drop-take.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,10 @@ const naturals = () => from(async function*() {
for (const example of invalidArgs) {
throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
}

throws(() => Readable.from([1]).drop(1, 1), /ERR_INVALID_ARG_TYPE/);
throws(() => Readable.from([1]).drop(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);

throws(() => Readable.from([1]).take(1, 1), /ERR_INVALID_ARG_TYPE/);
throws(() => Readable.from([1]).take(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}
1 change: 1 addition & 0 deletions test/parallel/test-stream-flatMap.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ function oneTo5() {
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).flatMap((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}
{
// Test result is a Readable
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-stream-map.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ const { setTimeout } = require('timers/promises');
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}
{
// Test result is a Readable
Expand Down
2 changes: 2 additions & 0 deletions test/parallel/test-stream-reduce.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ function sum(p, c) {
// Error cases
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, 1), /ERR_INVALID_ARG_TYPE/);
assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, { signal: true }), /ERR_INVALID_ARG_TYPE/);
Comment on lines +124 to +125
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these two should probably have .then(common.mustCall()) (e.g. like you did in test-stream-some-every)

Copy link
Member

@Linkgoron Linkgoron Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also add it to the two lines above them (that you didn't add).

}

{
Expand Down
11 changes: 11 additions & 0 deletions test/parallel/test-stream-some-every.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ function oneTo5Async() {
assert.rejects(async () => {
await Readable.from([1]).every(1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());

assert.rejects(async () => {
await Readable.from([1]).every((x) => x, 1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());

assert.rejects(async () => {
await Readable.from([1]).every((x) => x, {
signal: true
});
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());

assert.rejects(async () => {
await Readable.from([1]).every((x) => x, {
concurrency: 'Foo'
Expand Down
12 changes: 12 additions & 0 deletions test/parallel/test-stream-toArray.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,15 @@ const assert = require('assert');
const result = Readable.from([1, 2, 3, 4, 5]).toArray();
assert.strictEqual(result instanceof Promise, true);
}
{
// Error cases
assert.rejects(async () => {
await Readable.from([1]).toArray(1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());

assert.rejects(async () => {
await Readable.from([1]).toArray({
signal: true
});
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}