diff --git a/package.json b/package.json index 59d1b4d..954d59c 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,9 @@ "object", "concat" ], + "dependencies": { + "is-stream": "^4.0.1" + }, "devDependencies": { "@types/node": "^20.8.9", "ava": "^5.3.1", diff --git a/source/contents.js b/source/contents.js index 20cb92e..e41bdb5 100644 --- a/source/contents.js +++ b/source/contents.js @@ -1,13 +1,13 @@ +import {getAsyncIterable} from './stream.js'; + export const getStreamContents = async (stream, {init, convertChunk, getSize, truncateChunk, addChunk, getFinalChunk, finalize}, {maxBuffer = Number.POSITIVE_INFINITY} = {}) => { - if (!isAsyncIterable(stream)) { - throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.'); - } + const asyncIterable = getAsyncIterable(stream); const state = init(); state.length = 0; try { - for await (const chunk of stream) { + for await (const chunk of asyncIterable) { const chunkType = getChunkType(chunk); const convertedChunk = convertChunk[chunkType](chunk, state); appendChunk({convertedChunk, state, getSize, truncateChunk, addChunk, maxBuffer}); @@ -52,8 +52,6 @@ const addNewChunk = (convertedChunk, state, addChunk, newLength) => { state.length = newLength; }; -const isAsyncIterable = stream => typeof stream === 'object' && stream !== null && typeof stream[Symbol.asyncIterator] === 'function'; - const getChunkType = chunk => { const typeOfChunk = typeof chunk; diff --git a/source/stream.js b/source/stream.js new file mode 100644 index 0000000..e9858b1 --- /dev/null +++ b/source/stream.js @@ -0,0 +1,46 @@ +import {isReadableStream} from 'is-stream'; + +export const getAsyncIterable = stream => { + if (isReadableStream(stream, {checkOpen: false})) { + return getStreamIterable(stream); + } + + if (stream?.[Symbol.asyncIterator] === undefined) { + throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.'); + } + + return stream; +}; + +// The default iterable for Node.js streams does not allow for multiple readers at once, so we re-implement it +const getStreamIterable = async function * (stream) { + const {on, finished} = await getNodeImports(); + const onStreamData = on(stream, 'data', {highWatermark: stream.readableHighWaterMark}); + handleStreamEnd(stream, onStreamData, finished); + try { + for await (const [chunk] of onStreamData) { + yield chunk; + } + } finally { + stream.destroy(); + } +}; + +const handleStreamEnd = async (stream, onStreamData, finished) => { + try { + await finished(stream, {cleanup: true, readable: true, writable: false}); + await onStreamData.return(); + } catch (error) { + const normalizedError = error instanceof Error ? error : new Error(String(error)); + await onStreamData.throw(normalizedError); + } +}; + +// Use dynamic imports to support browsers +const getNodeImports = async () => { + const [{on}, {finished}] = await Promise.all([ + import('node:events'), + import('node:stream/promises'), + ]); + return {on, finished}; +}; diff --git a/test/fixtures/index.js b/test/fixtures/index.js index 65feb79..be5e690 100644 --- a/test/fixtures/index.js +++ b/test/fixtures/index.js @@ -31,3 +31,5 @@ export const fixtureMultibyteString = '\u1000'; export const longMultibyteString = `${fixtureMultibyteString}\u1000`; export const bigArray = Array.from({length: 1e6}, () => Math.floor(Math.random() * (2 ** 8))); + +export const prematureClose = {code: 'ERR_STREAM_PREMATURE_CLOSE'}; diff --git a/test/stream.js b/test/stream.js new file mode 100644 index 0000000..eff9b71 --- /dev/null +++ b/test/stream.js @@ -0,0 +1,262 @@ +import {once} from 'node:events'; +import {Readable, Duplex} from 'node:stream'; +import {finished} from 'node:stream/promises'; +import {scheduler, setTimeout as pSetTimeout} from 'node:timers/promises'; +import test from 'ava'; +import getStream, {getStreamAsArray, MaxBufferError} from '../source/index.js'; +import {fixtureString, fixtureMultiString, prematureClose} from './fixtures/index.js'; + +const onFinishedStream = stream => finished(stream, {cleanup: true}); +const noopMethods = {read() {}, write() {}}; + +// eslint-disable-next-line max-params +const assertStream = ({readableEnded = false, writableEnded = false}, t, stream, StreamClass, error = null) => { + t.is(stream.errored, error); + t.true(stream.destroyed); + t.false(stream.readable); + t.is(stream.readableEnded, readableEnded); + + if (StreamClass === Duplex) { + t.false(stream.writable); + t.is(stream.writableEnded, writableEnded); + } +}; + +const assertSuccess = assertStream.bind(undefined, {readableEnded: true, writableEnded: true}); +const assertReadFail = assertStream.bind(undefined, {writableEnded: true}); +const assertWriteFail = assertStream.bind(undefined, {readableEnded: true}); +const assertBothFail = assertStream.bind(undefined, {}); + +const testSuccess = async (t, StreamClass) => { + const stream = StreamClass.from(fixtureMultiString); + t.true(stream instanceof StreamClass); + + t.deepEqual(await getStreamAsArray(stream), fixtureMultiString); + assertSuccess(t, stream, StreamClass); +}; + +test('Can use Readable stream', testSuccess, Readable); +test('Can use Duplex stream', testSuccess, Duplex); + +const testAlreadyEnded = async (t, StreamClass) => { + const stream = StreamClass.from(fixtureMultiString); + await stream.toArray(); + assertSuccess(t, stream, StreamClass); + + t.deepEqual(await getStreamAsArray(stream), []); +}; + +test('Can use already ended Readable', testAlreadyEnded, Readable); +test('Can use already ended Duplex', testAlreadyEnded, Duplex); + +const testAlreadyAborted = async (t, StreamClass) => { + const stream = StreamClass.from(fixtureMultiString); + stream.destroy(); + await t.throwsAsync(onFinishedStream(stream), prematureClose); + assertReadFail(t, stream, StreamClass); + + const error = await t.throwsAsync(getStreamAsArray(stream), prematureClose); + t.deepEqual(error.bufferedData, []); +}; + +test('Throw if already aborted Readable', testAlreadyAborted, Readable); +test('Throw if already aborted Duplex', testAlreadyAborted, Duplex); + +const testAlreadyErrored = async (t, StreamClass) => { + const stream = StreamClass.from(fixtureMultiString); + const error = new Error('test'); + stream.destroy(error); + t.is(await t.throwsAsync(onFinishedStream(stream)), error); + assertReadFail(t, stream, StreamClass, error); + + t.is(await t.throwsAsync(getStreamAsArray(stream)), error); + t.deepEqual(error.bufferedData, []); +}; + +test('Throw if already errored Readable', testAlreadyErrored, Readable); +test('Throw if already errored Duplex', testAlreadyErrored, Duplex); + +const testAbort = async (t, StreamClass) => { + const stream = new StreamClass(noopMethods); + setTimeout(() => { + stream.destroy(); + }, 0); + const error = await t.throwsAsync(getStreamAsArray(stream), prematureClose); + t.deepEqual(error.bufferedData, []); + assertBothFail(t, stream, StreamClass); +}; + +test('Throw when aborting Readable', testAbort, Readable); +test('Throw when aborting Duplex', testAbort, Duplex); + +const testError = async (t, StreamClass) => { + const stream = new StreamClass(noopMethods); + const error = new Error('test'); + setTimeout(() => { + stream.destroy(error); + }, 0); + t.is(await t.throwsAsync(getStreamAsArray(stream)), error); + t.deepEqual(error.bufferedData, []); + assertBothFail(t, stream, StreamClass, error); +}; + +test('Throw when erroring Readable', testError, Readable); +test('Throw when erroring Duplex', testError, Duplex); + +const testErrorEvent = async (t, StreamClass) => { + const stream = new StreamClass(noopMethods); + const error = new Error('test'); + setTimeout(() => { + stream.emit('error', error); + }, 0); + t.is(await t.throwsAsync(getStreamAsArray(stream)), error); + t.deepEqual(error.bufferedData, []); + assertBothFail(t, stream, StreamClass); +}; + +test('Throw when emitting "error" event with Readable', testErrorEvent, Readable); +test('Throw when emitting "error" event with Duplex', testErrorEvent, Duplex); + +const testThrowRead = async (t, StreamClass) => { + const error = new Error('test'); + const stream = new StreamClass({ + read() { + throw error; + }, + }); + t.is(await t.throwsAsync(getStreamAsArray(stream)), error); + t.deepEqual(error.bufferedData, []); + assertBothFail(t, stream, StreamClass, error); +}; + +test('Throw when throwing error in Readable read()', testThrowRead, Readable); +test('Throw when throwing error in Duplex read()', testThrowRead, Duplex); + +test('Handle non-error instances', async t => { + const stream = Readable.from(fixtureMultiString); + const errorMessage = `< ${fixtureString} >`; + stream.destroy(errorMessage); + const [{reason}] = await Promise.allSettled([onFinishedStream(stream)]); + t.is(reason, errorMessage); + assertReadFail(t, stream, Readable, errorMessage); + + await t.throwsAsync(getStreamAsArray(stream), {message: errorMessage}); +}); + +test('Handles objectMode errors', async t => { + const stream = new Readable({ + read() { + this.push(fixtureString); + this.push({}); + }, + objectMode: true, + }); + + const error = await t.throwsAsync(getStream(stream), {message: /in object mode/}); + t.is(error.bufferedData, fixtureString); + assertReadFail(t, stream, Readable); +}); + +test('Handles maxBuffer errors', async t => { + const stream = new Readable({ + read() { + this.push(fixtureString); + this.push(fixtureString); + }, + }); + + const error = await t.throwsAsync( + getStream(stream, {maxBuffer: fixtureString.length}), + {instanceOf: MaxBufferError}, + ); + t.is(error.bufferedData, fixtureString); + assertReadFail(t, stream, Readable); +}); + +test('Works if Duplex readable side ends before its writable side', async t => { + const stream = new Duplex(noopMethods); + stream.push(null); + + t.deepEqual(await getStreamAsArray(stream), []); + assertWriteFail(t, stream, Duplex); +}); + +test('Cleans up event listeners', async t => { + const stream = Readable.from([]); + t.is(stream.listenerCount('error'), 0); + + t.deepEqual(await getStreamAsArray(stream), []); + + t.is(stream.listenerCount('error'), 0); +}); + +const testMultipleReads = async (t, wait) => { + let once = false; + const size = 10; + const stream = new Readable({ + async read() { + if (once) { + return; + } + + once = true; + for (let index = 0; index < size; index += 1) { + for (let index = 0; index < size; index += 1) { + this.push(fixtureString); + } + + // eslint-disable-next-line no-await-in-loop + await wait(); + } + + this.push(null); + }, + }); + + t.is(await getStream(stream), fixtureString.repeat(size * size)); + assertSuccess(t, stream, Readable); +}; + +test('Handles multiple successive fast reads', testMultipleReads, () => scheduler.yield()); +test('Handles multiple successive slow reads', testMultipleReads, () => pSetTimeout(100)); + +test('Pause stream when too much data at once', async t => { + const stream = new Readable({ + read() { + this.push('.'); + this.push('.'); + this.push('.'); + this.push('.'); + this.push(null); + }, + highWaterMark: 2, + }); + const [result] = await Promise.all([ + getStream(stream), + once(stream, 'pause'), + ]); + t.is(result, '....'); + assertSuccess(t, stream, Readable); +}); + +test('Can call twice at the same time', async t => { + const stream = Readable.from(fixtureMultiString); + const [result, secondResult] = await Promise.all([ + getStream(stream), + getStream(stream), + ]); + t.deepEqual(result, fixtureString); + t.deepEqual(secondResult, fixtureString); + assertSuccess(t, stream, Readable); +}); + +test('Can call and listen to "data" event at the same time', async t => { + const stream = Readable.from([fixtureString]); + const [result, secondResult] = await Promise.all([ + getStream(stream), + once(stream, 'data'), + ]); + t.deepEqual(result, fixtureString); + t.deepEqual(secondResult.toString(), fixtureString); + assertSuccess(t, stream, Readable); +});