Skip to content

Commit

Permalink
Add support for TypedArray stream chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed Aug 9, 2023
1 parent 5afd75a commit d8591cd
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
21 changes: 21 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,30 @@ const getChunkType = chunk => {
return 'string';
}

if (typeof chunk !== 'object' || chunk === null) {
return 'others';
}

if (Buffer.isBuffer(chunk)) {
return 'buffer';
}

const prototypeName = objectToString.call(chunk);

if (
prototypeName !== '[object DataView]'
&& Number.isInteger(chunk.byteLength)
&& Number.isInteger(chunk.byteOffset)
&& objectToString.call(chunk.buffer) === '[object ArrayBuffer]'
) {
return 'typedArray';
}

return 'others';
};

const {toString: objectToString} = Object.prototype;

const getBufferedData = (chunks, getContents, textDecoder, length) => {
try {
return getContents(chunks, textDecoder, length);
Expand Down Expand Up @@ -90,6 +107,8 @@ const throwObjectStream = chunk => {

const useBufferFrom = chunk => Buffer.from(chunk);

const useBufferFromWithOffset = chunk => Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength);

const getContentsAsBuffer = (chunks, textDecoder, length) => Buffer.concat(chunks, length);

const useTextDecoder = (chunk, textDecoder) => textDecoder.decode(chunk, {stream: true});
Expand All @@ -101,6 +120,7 @@ const chunkTypes = {
convertChunk: {
string: useBufferFrom,
buffer: identity,
typedArray: useBufferFromWithOffset,
others: throwObjectStream,
},
getContents: getContentsAsBuffer,
Expand All @@ -109,6 +129,7 @@ const chunkTypes = {
convertChunk: {
string: identity,
buffer: useTextDecoder,
typedArray: useTextDecoder,
others: throwObjectStream,
},
getContents: getContentsAsString,
Expand Down
22 changes: 22 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,21 @@ import getStream, {getStreamAsBuffer, MaxBufferError} from './index.js';

const fixtureString = 'unicorn\n';
const fixtureBuffer = Buffer.from(fixtureString);
const fixtureTypedArray = new TextEncoder().encode(fixtureString);
const fixtureArrayBuffer = fixtureTypedArray.buffer;
const fixtureUint16Array = new Uint16Array(fixtureArrayBuffer);

const fixtureStringWide = ` ${fixtureString} `;
const fixtureTypedArrayWide = new TextEncoder().encode(fixtureStringWide);
const fixtureArrayBufferWide = fixtureTypedArrayWide.buffer;
const fixtureTypedArrayWithOffset = new Uint8Array(fixtureArrayBufferWide, 2, fixtureString.length);
const fixtureUint16ArrayWithOffset = new Uint16Array(fixtureArrayBufferWide, 2, fixtureString.length / 2);

const longString = `${fixtureString}..`;
const longBuffer = Buffer.from(longString);
const longTypedArray = new TextEncoder().encode(longString);
const longArrayBuffer = longTypedArray.buffer;
const longUint16Array = new Uint16Array(longArrayBuffer);
const maxBuffer = fixtureString.length;

const setup = (streamDef, options) => getStream(createStream(streamDef), options);
Expand All @@ -34,6 +46,10 @@ const getStreamToString = async (t, inputStream) => {

test('get stream from string to string', getStreamToString, fixtureString);
test('get stream from buffer to string', getStreamToString, fixtureBuffer);
test('get stream from typedArray to string', getStreamToString, fixtureTypedArray);
test('get stream from typedArray with offset to string', getStreamToString, fixtureTypedArrayWithOffset);
test('get stream from uint16Array to string', getStreamToString, fixtureUint16Array);
test('get stream from uint16Array with offset to string', getStreamToString, fixtureUint16ArrayWithOffset);

const getStreamToBuffer = async (t, inputStream) => {
const result = await setupBuffer([inputStream]);
Expand All @@ -43,6 +59,10 @@ const getStreamToBuffer = async (t, inputStream) => {

test('get stream from string to buffer', getStreamToBuffer, fixtureString);
test('get stream from buffer to buffer', getStreamToBuffer, fixtureBuffer);
test('get stream from typedArray to buffer', getStreamToBuffer, fixtureTypedArray);
test('get stream from typedArray with offset to buffer', getStreamToBuffer, fixtureTypedArrayWithOffset);
test('get stream from uint16Array to buffer', getStreamToBuffer, fixtureUint16Array);
test('get stream from uint16Array with offset to buffer', getStreamToBuffer, fixtureUint16ArrayWithOffset);

const throwOnInvalidChunkType = async (t, setupFunction, inputStream) => {
await t.throwsAsync(setupFunction([inputStream]), {message: /not supported/});
Expand Down Expand Up @@ -97,6 +117,8 @@ const checkMaxBuffer = async (t, setupFunction, longValue, shortValue) => {

test('maxBuffer throws when size is exceeded with a string', checkMaxBuffer, setup, longString, fixtureString);
test('maxBuffer throws when size is exceeded with a buffer', checkMaxBuffer, setupBuffer, longBuffer, fixtureBuffer);
test('maxBuffer throws when size is exceeded with a typedArray', checkMaxBuffer, setupBuffer, longTypedArray, fixtureTypedArray);
test('maxBuffer throws when size is exceeded with an uint16Array', checkMaxBuffer, setupBuffer, longUint16Array, fixtureUint16Array);

test('set error.bufferedData when `maxBuffer` is hit', async t => {
const error = await t.throwsAsync(setup([longString], {maxBuffer}), {instanceOf: MaxBufferError});
Expand Down

0 comments on commit d8591cd

Please sign in to comment.