diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index c272a01f418dce..aa7e031d3e48d4 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -36,6 +36,7 @@ function from(Readable, iterable, opts) { throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable); } + const readable = new Readable({ objectMode: true, highWaterMark: 1, @@ -46,11 +47,19 @@ function from(Readable, iterable, opts) { // Flag to protect against _read // being called before last iteration completion. let reading = false; + let isAsyncValues = false; readable._read = function() { if (!reading) { reading = true; - next(); + + if (isAsync) { + nextAsync(); + } else if (isAsyncValues) { + nextSyncWithAsyncValues(); + } else { + nextSyncWithSyncValues(); + } } }; @@ -78,29 +87,115 @@ function from(Readable, iterable, opts) { } } - async function next() { + // There are a lot of duplication here, it's done on purpose for performance + // reasons - avoid await when not needed. + + function nextSyncWithSyncValues() { + for (;;) { + try { + const { value, done } = iterator.next(); + + if (done) { + readable.push(null); + return; + } + + if (value && + typeof value.then === 'function') { + return changeToAsyncValues(value); + } + + if (value === null) { + reading = false; + throw new ERR_STREAM_NULL_VALUES(); + } + + if (readable.push(value)) { + continue; + } + + reading = false; + } catch (err) { + readable.destroy(err); + } + break; + } + } + + async function changeToAsyncValues(value) { + isAsyncValues = true; + + try { + const res = await value; + + if (res === null) { + reading = false; + throw new ERR_STREAM_NULL_VALUES(); + } + + if (readable.push(res)) { + nextSyncWithAsyncValues(); + return; + } + + reading = false; + } catch (err) { + readable.destroy(err); + } + } + + async function nextSyncWithAsyncValues() { for (;;) { try { - const { value, done } = isAsync ? - await iterator.next() : - iterator.next(); + const { value, done } = iterator.next(); if (done) { readable.push(null); - } else { - const res = (value && - typeof value.then === 'function') ? - await value : - value; - if (res === null) { - reading = false; - throw new ERR_STREAM_NULL_VALUES(); - } else if (readable.push(res)) { - continue; - } else { - reading = false; - } + return; + } + + const res = (value && + typeof value.then === 'function') ? + await value : + value; + + if (res === null) { + reading = false; + throw new ERR_STREAM_NULL_VALUES(); } + + if (readable.push(res)) { + continue; + } + + reading = false; + } catch (err) { + readable.destroy(err); + } + break; + } + } + + async function nextAsync() { + for (;;) { + try { + const { value, done } = await iterator.next(); + + if (done) { + readable.push(null); + return; + } + + if (value === null) { + reading = false; + throw new ERR_STREAM_NULL_VALUES(); + } + + if (readable.push(value)) { + continue; + } + + reading = false; } catch (err) { readable.destroy(err); }