Skip to content

Commit

Permalink
fix(ext/web/streams): fix ReadableStream asyncIterator (denoland#16276)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosc90 authored Jan 9, 2023
1 parent e6c49d1 commit 45768f0
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 38 deletions.
117 changes: 80 additions & 37 deletions ext/web/06_streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
ObjectDefineProperties,
ObjectDefineProperty,
ObjectGetPrototypeOf,
ObjectPrototype,
ObjectPrototypeIsPrototypeOf,
ObjectSetPrototypeOf,
Promise,
Expand Down Expand Up @@ -4424,7 +4425,7 @@
* @returns {IteratorResult<T>}
*/
function createIteratorResult(value, done) {
const result = ObjectCreate(null);
const result = ObjectCreate(ObjectPrototype);
ObjectDefineProperties(result, {
value: { value, writable: true, enumerable: true, configurable: true },
done: {
Expand All @@ -4442,57 +4443,99 @@
ObjectGetPrototypeOf(async function* () {}).prototype,
);

const _iteratorNext = Symbol("[[iteratorNext]]");
const _iteratorFinished = Symbol("[[iteratorFinished]]");

/** @type {AsyncIterator<unknown>} */
const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
/** @returns {Promise<IteratorResult<unknown>>} */
next() {
/** @type {ReadableStreamDefaultReader} */
const reader = this[_reader];
if (reader[_stream] === undefined) {
return PromiseReject(
new TypeError(
"Cannot get the next iteration result once the reader has been released.",
),
);
function nextSteps() {
if (reader[_iteratorFinished]) {
return PromiseResolve(createIteratorResult(undefined, true));
}

if (reader[_stream] === undefined) {
return PromiseReject(
new TypeError(
"Cannot get the next iteration result once the reader has been released.",
),
);
}

/** @type {Deferred<IteratorResult<any>>} */
const promise = new Deferred();
/** @type {ReadRequest} */
const readRequest = {
chunkSteps(chunk) {
promise.resolve(createIteratorResult(chunk, false));
},
closeSteps() {
readableStreamDefaultReaderRelease(reader);
promise.resolve(createIteratorResult(undefined, true));
},
errorSteps(e) {
readableStreamDefaultReaderRelease(reader);
promise.reject(e);
},
};

readableStreamDefaultReaderRead(reader, readRequest);
return PromisePrototypeThen(promise.promise, (result) => {
reader[_iteratorNext] = null;
if (result.done === true) {
reader[_iteratorFinished] = true;
return createIteratorResult(undefined, true);
}
return result;
}, (reason) => {
reader[_iteratorNext] = null;
reader[_iteratorFinished] = true;
throw reason;
});
}
/** @type {Deferred<IteratorResult<any>>} */
const promise = new Deferred();
/** @type {ReadRequest} */
const readRequest = {
chunkSteps(chunk) {
promise.resolve(createIteratorResult(chunk, false));
},
closeSteps() {
readableStreamDefaultReaderRelease(reader);
promise.resolve(createIteratorResult(undefined, true));
},
errorSteps(e) {
readableStreamDefaultReaderRelease(reader);
promise.reject(e);
},
};
readableStreamDefaultReaderRead(reader, readRequest);
return promise.promise;

reader[_iteratorNext] = reader[_iteratorNext]
? PromisePrototypeThen(reader[_iteratorNext], nextSteps, nextSteps)
: nextSteps();

return reader[_iteratorNext];
},
/**
* @param {unknown} arg
* @returns {Promise<IteratorResult<unknown>>}
*/
async return(arg) {
return(arg) {
/** @type {ReadableStreamDefaultReader} */
const reader = this[_reader];
if (reader[_stream] === undefined) {
return createIteratorResult(undefined, true);
}
assert(reader[_readRequests].length === 0);
if (this[_preventCancel] === false) {
const result = readableStreamReaderGenericCancel(reader, arg);
const returnSteps = () => {
if (reader[_iteratorFinished]) {
return PromiseResolve(createIteratorResult(arg, true));
}
reader[_iteratorFinished] = true;

if (reader[_stream] === undefined) {
return PromiseResolve(createIteratorResult(undefined, true));
}
assert(reader[_readRequests].length === 0);
if (this[_preventCancel] === false) {
const result = readableStreamReaderGenericCancel(reader, arg);
readableStreamDefaultReaderRelease(reader);
return result;
}
readableStreamDefaultReaderRelease(reader);
await result;
return createIteratorResult(arg, true);
}
readableStreamDefaultReaderRelease(reader);
return createIteratorResult(undefined, true);
return PromiseResolve(createIteratorResult(undefined, true));
};

const returnPromise = reader[_iteratorNext]
? PromisePrototypeThen(reader[_iteratorNext], returnSteps, returnSteps)
: returnSteps();
return PromisePrototypeThen(
returnPromise,
() => createIteratorResult(arg, true),
);
},
}, asyncIteratorPrototype);

Expand Down
5 changes: 4 additions & 1 deletion tools/wpt/expectation.json
Original file line number Diff line number Diff line change
Expand Up @@ -1311,7 +1311,10 @@
"respond-after-enqueue.any.worker.html": true
},
"readable-streams": {
"async-iterator.any.html": false,
"async-iterator.any.html": [
"next() that succeeds; return()",
"next() that succeeds; return() [no awaiting]"
],
"bad-strategies.any.html": true,
"bad-strategies.any.worker.html": true,
"bad-underlying-sources.any.html": true,
Expand Down

0 comments on commit 45768f0

Please sign in to comment.