-
Notifications
You must be signed in to change notification settings - Fork 29.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PR-URL: #44937 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
- Loading branch information
1 parent
ba0e7ae
commit 5f7d2b5
Showing
4 changed files
with
210 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
'use strict'; | ||
|
||
const common = require('../common'); | ||
const { | ||
Readable, Transform, | ||
} = require('stream'); | ||
const assert = require('assert'); | ||
|
||
{ | ||
// with async generator | ||
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function *(stream) { | ||
let str = ''; | ||
for await (const chunk of stream) { | ||
str += chunk; | ||
|
||
if (str.length === 2) { | ||
yield str; | ||
str = ''; | ||
} | ||
} | ||
}); | ||
const result = ['ab', 'cd']; | ||
(async () => { | ||
for await (const item of stream) { | ||
assert.strictEqual(item, result.shift()); | ||
} | ||
})().then(common.mustCall()); | ||
} | ||
|
||
{ | ||
// With Transformer | ||
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(new Transform({ | ||
objectMode: true, | ||
transform: common.mustCall((chunk, encoding, callback) => { | ||
callback(null, chunk); | ||
}, 4) | ||
})); | ||
const result = ['a', 'b', 'c', 'd']; | ||
(async () => { | ||
for await (const item of stream) { | ||
assert.strictEqual(item, result.shift()); | ||
} | ||
})().then(common.mustCall()); | ||
} | ||
|
||
{ | ||
// Throwing an error during `compose` (before waiting for data) | ||
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield | ||
|
||
throw new Error('boom'); | ||
}); | ||
|
||
assert.rejects(async () => { | ||
for await (const item of stream) { | ||
assert.fail('should not reach here, got ' + item); | ||
} | ||
}, /boom/).then(common.mustCall()); | ||
} | ||
|
||
{ | ||
// Throwing an error during `compose` (when waiting for data) | ||
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { | ||
for await (const chunk of stream) { | ||
if (chunk === 3) { | ||
throw new Error('boom'); | ||
} | ||
yield chunk; | ||
} | ||
}); | ||
|
||
assert.rejects( | ||
stream.toArray(), | ||
/boom/, | ||
).then(common.mustCall()); | ||
} | ||
|
||
{ | ||
// Throwing an error during `compose` (after finishing all readable data) | ||
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield | ||
|
||
// eslint-disable-next-line no-unused-vars,no-empty | ||
for await (const chunk of stream) { | ||
} | ||
|
||
throw new Error('boom'); | ||
}); | ||
assert.rejects( | ||
stream.toArray(), | ||
/boom/, | ||
).then(common.mustCall()); | ||
} | ||
|
||
{ | ||
// AbortSignal | ||
const ac = new AbortController(); | ||
const stream = Readable.from([1, 2, 3, 4, 5]) | ||
.compose(async function *(source) { | ||
// Should not reach here | ||
for await (const chunk of source) { | ||
yield chunk; | ||
} | ||
}, { signal: ac.signal }); | ||
|
||
ac.abort(); | ||
|
||
assert.rejects(async () => { | ||
for await (const item of stream) { | ||
assert.fail('should not reach here, got ' + item); | ||
} | ||
}, { | ||
name: 'AbortError', | ||
}).then(common.mustCall()); | ||
} | ||
|
||
{ | ||
assert.throws( | ||
() => Readable.from(['a']).compose(Readable.from(['b'])), | ||
{ code: 'ERR_INVALID_ARG_VALUE' } | ||
); | ||
} | ||
|
||
{ | ||
assert.throws( | ||
() => Readable.from(['a']).compose(), | ||
{ code: 'ERR_INVALID_ARG_TYPE' } | ||
); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters