Skip to content

Commit

Permalink
stream: implement ReadableStream.from
Browse files Browse the repository at this point in the history
  • Loading branch information
debadree25 committed Jun 8, 2023
1 parent 8c8e7e9 commit f2ba924
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 0 deletions.
74 changes: 74 additions & 0 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const {
ArrayBufferPrototypeSlice,
ArrayPrototypePush,
ArrayPrototypeShift,
Boolean,
DataView,
FunctionPrototypeBind,
FunctionPrototypeCall,
Expand Down Expand Up @@ -110,6 +111,8 @@ const {
nonOpCancel,
nonOpPull,
nonOpStart,
getIterator,
iteratorNext,
kType,
kState,
} = require('internal/webstreams/util');
Expand Down Expand Up @@ -314,6 +317,10 @@ class ReadableStream {
return isReadableStreamLocked(this);
}

static from(iterable) {
return readableStreamFromIterable(iterable);
}

/**
* @param {any} [reason]
* @returns { Promise<void> }
Expand Down Expand Up @@ -1249,6 +1256,73 @@ const isReadableStreamBYOBReader =

// ---- ReadableStream Implementation

function readableStreamFromIterable(iterable) {
let stream;
const iteratorRecord = getIterator(iterable, 'async');

const startAlgorithm = nonOpStart;

function pullAlgorithm() {
let nextResult;
try {
nextResult = iteratorNext(iteratorRecord);
} catch (error) {
return PromiseReject(error);
}
const nextPromise = PromiseResolve(nextResult);
return PromisePrototypeThen(nextPromise, (iterResult) => {
if (typeof iterResult !== 'object' || iterResult === null) {
throw new ERR_INVALID_ARG_VALUE.TypeError('iterResult', iterResult);
}
const done = Boolean(iterResult.done);
if (done) {
readableStreamDefaultControllerClose(stream[kState].controller);
} else {
readableStreamDefaultControllerEnqueue(stream[kState].controller, iterResult.value);
}
});
}

function cancelAlgorithm(reason) {
const iterator = iteratorRecord.iterator;
let returnMethod;
try {
returnMethod = iterator.return;
} catch (error) {
return PromiseReject(error);
}
if (returnMethod === undefined) {
return PromiseResolve();
}
let returnResult;
try {
returnResult = FunctionPrototypeCall(returnMethod, iterator, [ reason ]);
} catch (error) {
return PromiseReject(error);
}
const returnPromise = PromiseResolve(returnResult);
return PromisePrototypeThen(returnPromise, (iterResult) => {
if (typeof iterResult !== 'object' || iterResult === null) {
throw new ERR_INVALID_ARG_VALUE.TypeError('iterResult', iterResult);
}
return undefined;
});
}

stream = new ReadableStream({
start: startAlgorithm,
pull: pullAlgorithm,
cancel: cancelAlgorithm,
}, {
size() {
return 1;
},
highWaterMark: 0,
});

return stream;
}

function readableStreamPipeTo(
source,
dest,
Expand Down
52 changes: 52 additions & 0 deletions lib/internal/webstreams/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const {
PromiseReject,
ReflectGet,
Symbol,
SymbolAsyncIterator,
SymbolIterator,
Uint8Array,
} = primordials;

Expand Down Expand Up @@ -217,6 +219,54 @@ function lazyTransfer() {
return transfer;
}

function createAsyncFromSyncIterator(syncIteratorRecord) {
const syncIterable = {
[SymbolIterator]: () => syncIteratorRecord.iterator,
};

const asyncIterator = (async function* () {
return yield* syncIterable;
}());

const nextMethod = asyncIterator.next;
return { iterator: asyncIterator, nextMethod, done: false };
}

function getIterator(obj, kind = 'sync', method) {
if (method === undefined) {
if (kind === 'async') {
method = obj[SymbolAsyncIterator];
if (method === undefined) {
const syncMethod = obj[SymbolIterator];
const syncIteratorRecord = getIterator(obj, 'sync', syncMethod);
return createAsyncFromSyncIterator(syncIteratorRecord);
}
} else {
method = obj[SymbolIterator];
}
}

const iterator = FunctionPrototypeCall(method, obj);
if (typeof iterator !== 'object' || iterator === null) {
throw new ERR_INVALID_ARG_VALUE.TypeError('iterator', iterator);
}
const nextMethod = iterator.next;
return { iterator, nextMethod, done: false };
}

function iteratorNext(iteratorRecord, value) {
let result;
if (value === undefined) {
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator);
} else {
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator, [value]);
}
if (typeof result !== 'object' || result === null) {
throw new ERR_INVALID_ARG_VALUE.TypeError('iterator.next', result);
}
return result;
}

module.exports = {
ArrayBufferViewGetBuffer,
ArrayBufferViewGetByteLength,
Expand All @@ -243,6 +293,8 @@ module.exports = {
nonOpPull,
nonOpStart,
nonOpWrite,
getIterator,
iteratorNext,
kType,
kState,
};

0 comments on commit f2ba924

Please sign in to comment.