Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d957541
stream: add Transform.by utility function
Jun 25, 2019
9005e46
docs: stream.Transform.by typo
Jul 1, 2019
944f228
stream: Transform.by SourceIterator next optimization
Jul 1, 2019
84c84c9
docs: Transform.by doc tweaks
davidmarkclements Jul 2, 2019
a1937c7
docs: sort type-parser types alphabetically within their groups
davidmarkclements Jul 2, 2019
ad35382
docs: sort type-parser types alphabetically within their groups
davidmarkclements Jul 2, 2019
8d82c5b
docs: typo
davidmarkclements Jul 2, 2019
7a1ef77
docs: Transform.by clarify as async iterable
davidmarkclements Jul 2, 2019
5707445
stream: Transform. by remove unnecessary defensive code
davidmarkclements Jul 2, 2019
970ed3d
stream: Transform.by code style
davidmarkclements Jul 2, 2019
6e67a85
stream: Transform.by minor refactoring
Jul 3, 2019
98aebc5
streams: Transform.by check fn return value instead of fn instance
Jul 5, 2019
eee19c5
docs: emphasize Transform.by objectMode default behaviour
Jul 7, 2019
26e96be
docs: Transform.by function naming convention
davidmarkclements Jul 7, 2019
185a6f4
docs: add transform content to streams <-> async generators compatibi…
Jul 7, 2019
8e73c51
docs: includemissing parens
Jul 7, 2019
b73347f
tests: preempt conflict with #28566
Jul 7, 2019
9789a5b
tests: fix transform async iterator test
Jul 12, 2019
bad0bfd
docs: add meta data to Transform.by
davidmarkclements Jul 15, 2019
f2c8b22
stream: error handling bug fix, ensure stream is destroyed after proc…
Jul 15, 2019
001fe01
Update doc/api/stream.md
davidmarkclements Aug 5, 2019
9545ee0
Update doc/api/stream.md
davidmarkclements Aug 5, 2019
f0fa8b6
Update doc/api/stream.md
davidmarkclements Aug 5, 2019
b504b15
Update doc/api/stream.md
davidmarkclements Aug 5, 2019
447a895
streams: Transform.by, rm unncessary check
Sep 9, 2019
2fe96e4
lint fixes
Dec 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,12 @@ display if `block` does not throw.
An iterable argument (i.e. a value that works with `for...of` loops) was
required, but not provided to a Node.js API.

<a id="ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE"></a>
### ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE

A function argument that returns an async iterable (i.e. a value that works
with `for await...of` loops) was required, but not provided to a Node.js API.

<a id="ERR_ASSERTION"></a>
### ERR_ASSERTION

Expand Down
87 changes: 80 additions & 7 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ There are four fundamental stream types within Node.js:
is written and read (for example, [`zlib.createDeflate()`][]).

Additionally, this module includes the utility functions
[`stream.pipeline()`][], [`stream.finished()`][] and
[`stream.Readable.from()`][].
[`stream.pipeline()`][], [`stream.finished()`][],
[`stream.Readable.from()`][] and [`stream.Transform.by()`][].

### Object Mode

Expand Down Expand Up @@ -1646,6 +1646,49 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
the strings or buffers be iterated to match the other streams semantics
for performance reasons.

### stream.Transform.by(asyncGeneratorFunction[, options])
<!-- YAML
added: REPLACEME
-->

* `asyncGeneratorFunction` {AsyncGeneratorFunction} A mapping function which
accepts a `source` async iterable which can be used to read incoming data, while
transformed data is pushed to the stream with the `yield` keyword.
* `options` {Object} Options provided to `new stream.Transform([options])`.
By default, `Transform.by()` will set `options.objectMode` to `true`,
unless this is explicitly opted out by setting `options.objectMode` to `false`.
* Returns: {stream.Transform}

A utility method for creating Transform Streams with async generator functions.
The async generator is supplied a single argument, `source`, which is used to
read incoming chunks.

Yielded values become the data chunks emitted from the stream.

```js
const { Readable, Transform } = require('stream');

const readable = Readable.from(['hello', 'streams']);
async function * mapper(source) {
for await (const chunk of source) {
// If objectMode was set to false, the buffer would have to be converted
// to a string here but since it is true by default for both Readable.from()
// and Transform.by() each chunk is already a string.
yield chunk.toUpperCase();
}
}
const transform = Transform.by(mapper);
readable.pipe(transform);
transform.on('data', (chunk) => {
console.log(chunk);
});
```

The `source` parameter has an `encoding` property which represents the encoding
of the `WriteableStream` side of the transform. This is the same `encoding`
value that would be passed as the second parameter to the `transform()` function
option (or `_transform()` method) supplied to `stream.Transform`.

## API for Stream Implementers

<!--type=misc-->
Expand Down Expand Up @@ -1689,7 +1732,7 @@ on the type of stream being created, as detailed in the chart below:
| Reading only | [`Readable`][] | [`_read()`][stream-_read] |
| Writing only | [`Writable`][] | [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final] |
| Reading and writing | [`Duplex`][] | [`_read()`][stream-_read], [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final] |
| Operate on written data, then read the result | [`Transform`][] | [`_transform()`][stream-_transform], [`_flush()`][stream-_flush], [`_final()`][stream-_final] |
| Operate on written data, then read the result | [`Transform`][] | [`_transform()`][], [`_flush()`][stream-_flush], [`_final()`][stream-_final] |

The implementation code for a stream should *never* call the "public" methods
of a stream that are intended for use by consumers (as described in the
Expand Down Expand Up @@ -2430,7 +2473,7 @@ The `stream.Transform` class is extended to implement a [`Transform`][] stream.
The `stream.Transform` class prototypically inherits from `stream.Duplex` and
implements its own versions of the `writable._write()` and `readable._read()`
methods. Custom `Transform` implementations *must* implement the
[`transform._transform()`][stream-_transform] method and *may* also implement
[`transform._transform()`][] method and *may* also implement
the [`transform._flush()`][stream-_flush] method.

Care must be taken when using `Transform` streams in that data written to the
Expand All @@ -2442,7 +2485,7 @@ output on the `Readable` side is not consumed.
* `options` {Object} Passed to both `Writable` and `Readable`
constructors. Also has the following fields:
* `transform` {Function} Implementation for the
[`stream._transform()`][stream-_transform] method.
[`stream._transform()`][] method.
* `flush` {Function} Implementation for the [`stream._flush()`][stream-_flush]
method.

Expand Down Expand Up @@ -2489,7 +2532,7 @@ const myTransform = new Transform({
The [`'finish'`][] and [`'end'`][] events are from the `stream.Writable`
and `stream.Readable` classes, respectively. The `'finish'` event is emitted
after [`stream.end()`][stream-end] is called and all chunks have been processed
by [`stream._transform()`][stream-_transform]. The `'end'` event is emitted
by [`stream._transform()`][]. The `'end'` event is emitted
after all data has been output, which occurs after the callback in
[`transform._flush()`][stream-_flush] has been called. In the case of an error,
neither `'finish'` nor `'end'` should be emitted.
Expand Down Expand Up @@ -2630,6 +2673,35 @@ readable.on('data', (chunk) => {
});
```

#### Creating Transform Streams with Async Generator Functions

We can construct a Node.js Transform stream with an asynchronous
generator function using the `Transform.by()` utility method.

```js
const { Readable, Transform } = require('stream');

async function * toUpperCase(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
}
}
const transform = Transform.by(toUpperCase);

async function * generate() {
yield 'a';
yield 'b';
yield 'c';
}

const readable = Readable.from(generate());

readable.pipe(transform);
transform.on('data', (chunk) => {
console.log(chunk);
});
```

#### Piping to Writable Streams from Async Iterators

In the scenario of writing to a writable stream from an async iterator, ensure
Expand Down Expand Up @@ -2819,6 +2891,7 @@ contain multi-byte characters.
[`readable.push('')`]: #stream_readable_push
[`readable.setEncoding()`]: #stream_readable_setencoding_encoding
[`stream.Readable.from()`]: #stream_stream_readable_from_iterable_options
[`stream.Transform.by()`]: #stream_stream_transform_by_asyncgeneratorfunction_options
[`stream.cork()`]: #stream_writable_cork
[`stream.finished()`]: #stream_stream_finished_stream_options_callback
[`stream.pipe()`]: #stream_readable_pipe_destination_options
Expand Down Expand Up @@ -2853,7 +2926,7 @@ contain multi-byte characters.
[stream-_final]: #stream_writable_final_callback
[stream-_flush]: #stream_transform_flush_callback
[stream-_read]: #stream_readable_read_size_1
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
[]: #stream_transform_transform_chunk_encoding_callback
[stream-_write]: #stream_writable_write_chunk_encoding_callback_1
[stream-_writev]: #stream_writable_writev_chunks_callback
[stream-end]: #stream_writable_end_chunk_encoding_callback
Expand Down
122 changes: 121 additions & 1 deletion lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,29 @@

const {
ObjectSetPrototypeOf,
ObjectGetPrototypeOf,
Symbol
} = primordials;

module.exports = Transform;
const {
ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_TRANSFORM_ALREADY_TRANSFORMING,
ERR_TRANSFORM_WITH_LENGTH_0
} = require('internal/errors').codes;
const Duplex = require('_stream_duplex');
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);

const kSourceIteratorPull = Symbol('kSourceIteratorPull');
const kSourceIteratorResolve = Symbol('kSourceIteratorResolve');
const kSourceIteratorChunk = Symbol('kSourceIteratorChunk');
const kSourceIteratorStream = Symbol('kSourceIteratorStream');
const kSourceIteratorPump = Symbol('kSourceIteratorPump');
const kSourceIteratorGrabResolve = Symbol('kSourceIteratorGrabResolve');

ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
ObjectSetPrototypeOf(Transform, Duplex);

Expand Down Expand Up @@ -203,7 +216,6 @@ Transform.prototype._destroy = function(err, cb) {
});
};


function done(stream, er, data) {
if (er)
return stream.emit('error', er);
Expand All @@ -219,3 +231,111 @@ function done(stream, er, data) {
throw new ERR_TRANSFORM_ALREADY_TRANSFORMING();
return stream.push(null);
}

function SourceIterator(asyncGeneratorFn, opts) {
const source = this;
const result = asyncGeneratorFn(this);
if (typeof result[Symbol.asyncIterator] !== 'function') {
throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn');
}
const iter = result[Symbol.asyncIterator]();
if (typeof iter.next !== 'function') {
throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn');
}

this[kSourceIteratorPull] = null;
this[kSourceIteratorChunk] = null;
this[kSourceIteratorResolve] = null;
this[kSourceIteratorStream] = new Transform({
objectMode: true,
...opts,
transform(chunk, encoding, cb) {
source.encoding = encoding;
if (source[kSourceIteratorResolve] === null) {
source[kSourceIteratorChunk] = chunk;
source[kSourceIteratorPull] = cb;
return;
}
source[kSourceIteratorResolve]({ value: chunk, done: false });
source[kSourceIteratorResolve] = null;
cb(null);
}
});
this.encoding = this[kSourceIteratorStream]._transformState.writeencoding;
this[kSourceIteratorGrabResolve] = (resolve) => {
this[kSourceIteratorResolve] = resolve;
};
const first = iter.next();
this[kSourceIteratorPump](iter, first);
}

SourceIterator.prototype[Symbol.asyncIterator] = function() {
return this;
};

ObjectSetPrototypeOf(SourceIterator.prototype, AsyncIteratorPrototype);

SourceIterator.prototype.next = function next() {
if (this[kSourceIteratorPull] === null || this[kSourceIteratorChunk] === null)
return new Promise(this[kSourceIteratorGrabResolve]);

this[kSourceIteratorPull](null);
const result = Promise.resolve({
value: this[kSourceIteratorChunk],
done: false
});
this[kSourceIteratorChunk] = null;
this[kSourceIteratorPull] = null;
return result;
};

SourceIterator.prototype[kSourceIteratorPump] = async function pump(iter, p) {
const stream = this[kSourceIteratorStream];
try {
stream.removeListener('prefinish', prefinish);
stream.on('prefinish', () => {
if (this[kSourceIteratorResolve] !== null) {
this[kSourceIteratorResolve]({ value: undefined, done: true });
}
});
let next = await p;
while (true) {
const { done, value } = next;
if (done) {
if (value !== undefined) stream.push(value);

// In the event of an early return we explicitly
// discard any buffered state
if (stream._writableState.length > 0) {
const { length } = stream._writableState;
const { transforming } = stream._transformState;
stream._writableState.length = 0;
stream._transformState.transforming = false;
prefinish.call(stream);
stream._writableState.length = length;
stream._transformState.transforming = transforming;
} else {
prefinish.call(stream);
}
break;
}
stream.push(value);
next = await iter.next();
}
} catch (err) {
process.nextTick(() => stream.destroy(err));
} finally {
this[kSourceIteratorPull] = null;
this[kSourceIteratorChunk] = null;
this[kSourceIteratorResolve] = null;
this[kSourceIteratorStream] = null;
}
};


Transform.by = function by(asyncGeneratorFn, opts) {
const source = new SourceIterator(asyncGeneratorFn, opts);
const stream = source[kSourceIteratorStream];

return stream;
};
2 changes: 2 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,8 @@ module.exports = {
// Note: Node.js specific errors must begin with the prefix ERR_
E('ERR_AMBIGUOUS_ARGUMENT', 'The "%s" argument is ambiguous. %s', TypeError);
E('ERR_ARG_NOT_ITERABLE', '%s must be iterable', TypeError);
E('ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', '%s must return an async iterable',
TypeError);
E('ERR_ASSERTION', '%s', Error);
E('ERR_ASYNC_CALLBACK', '%s must be a function', TypeError);
E('ERR_ASYNC_TYPE', 'Invalid name for async "type": %s', TypeError);
Expand Down
24 changes: 24 additions & 0 deletions test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,30 @@ async function tests() {
mustReach[1]();
}

{
console.log('readable side of a transform stream pushes null');
const transform = new Transform({
objectMode: true,
transform: (chunk, enc, cb) => { cb(null, chunk); }
});
transform.push(0);
transform.push(1);
process.nextTick(() => {
transform.push(null);
});

const mustReach = [ common.mustCall(), common.mustCall() ];

const iter = transform[Symbol.asyncIterator]();
assert.strictEqual((await iter.next()).value, 0);

for await (const d of iter) {
assert.strictEqual(d, 1);
mustReach[0]();
}
mustReach[1]();
}

{
console.log('all next promises must be resolved on end');
const r = new Readable({
Expand Down
Loading