Skip to content

Commit

Permalink
Change the model for ReadableStream to have async read()
Browse files Browse the repository at this point in the history
This replaces the dual ready + read() approach previously, which was derived from the epoll(7) + read(2) paradigm. In #253 we did a deep dive on real-world implementation strategies for byte streams, and discovered that the ready + read() model causes a conflict with the semantics we want for byte streams. Briefly, because some byte streams will demand to know the size of the buffer they must fill before doing any I/O (the fread(3) model), the byte stream method that reads into a given typed array must be asynchronous. If such byte streams are then to conform to the readable stream interface, with a no-argument read() method that is an auto-allocating version of the read-into method, then read() must also be async, across all readable streams.

This is a usability upgrade for consumers, in many cases. However, for non-byte streams, it potentially costs more microtasks when multiple chunks of data would be available synchronously. We are hopeful that even if current JS engine microtask queues are not as fast as they could be, they will improve over time until this overhead is negligible (which in theory it should be).

Alongside this change, we moved the read() method from the readable stream to the reader (now called "readable stream reader" instead of "exclusive stream reader"). This drastically simplifies the stream/reader interaction, and also allows the possibility of different types of readers which have different reading behavior---again, very helpful for byte streams. The usability impact is also positive, as it guides consumers toward using piping instead of directly reading chunks from the stream.

Note that read() promises fulfill with { value, done } instead of using an EOS sentinel value. This avoids a number of problems (see extensive discussion in #253), and also provides a mechanism by which readable byte streams can smuggle out "unused" buffers given to them, by using { value: zeroLengthViewOntoBuffer, done: true }.

Finally, the state property is now removed (from readable stream), since there is no longer a "waiting" vs. "readable" distinction.

This commit also adds some new infrastructure for _templated tests_, and ports some portion of the existing tests there. This is our solution for #217 and #264.

Note that we re-merged all related code into a single readable-stream.js file, as the setup with the three separate files (readable-stream.js, exclusive-stream-reader.js, and readable-stream-abstract-ops.js) was problematic in causing circular dependencies.

- Closes #253!
- Closes #266 by simplifying reader-related stuff, removing the problematic `ready` promise, and ensuring that cancel() always returns a new promise instead of reusing [[closedPromise]].
- Closes #264 by introducing templated tests.
  • Loading branch information
domenic committed Mar 17, 2015
1 parent a4ffbfd commit fce682e
Show file tree
Hide file tree
Showing 36 changed files with 2,950 additions and 3,213 deletions.
63 changes: 23 additions & 40 deletions Examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,6 @@ Many examples of using and creating streams are given in-line in the specificati

## Readable Streams

### Getting the Next Piece of Available Data

As another example, this helper function will return a promise for the next available piece of data from a given readable stream. This introduces an artificial delay if there is already data queued, but can provide a convenient interface for simple chunk-by-chunk consumption, as one might do e.g. when streaming database records. It uses an EOF sentinel to signal the end of the stream, and behaves poorly if called twice in parallel without waiting for the previously-returned promise to fulfill.

```js
const EOF = Symbol("ReadableStream getNext EOF");

function getNext(stream) {
if (stream.state === "closed") {
return Promise.resolve(EOF);
}

return stream.ready.then(() => {
if (stream.state === "closed") {
return EOF;
}

// If stream is "errored", this will throw, causing the promise to be rejected.
return stream.read();
});
}

// Usage with proposed ES2016 async/await keywords:
async function processStream(stream) {
while ((const chunk = await getNext(stream)) !== EOF) {
// do something with `chunk`.
}
}
```

### Buffering the Entire Stream Into Memory

This function uses the reading APIs to buffer the entire stream in memory and give a promise for the results, defeating the purpose of streams but educating us while doing so:
Expand All @@ -42,19 +12,17 @@ This function uses the reading APIs to buffer the entire stream in memory and gi
function readableStreamToArray(readable) {
const chunks = [];

pump();
return readable.closed.then(() => chunks);
return pump();

function pump() {
while (readable.state === "readable") {
chunks.push(readable.read());
}

if (readable.state === "waiting") {
readable.ready.then(pump);
}
return readable.read().then(({ value, done }) => {
if (done) {
return chunks;
}

// Otherwise the stream is "closed" or "errored", which will be handled above.
chunks.push(value);
return pump();
});
}
}

Expand All @@ -65,6 +33,21 @@ readableStreamToArray(myStream).then(chunks => {
})
```

We can also write this using the [async function syntax](https://github.com/lukehoban/ecmascript-asyncawait/) proposed for ES2016:

```js
async function readableStreamToArray(readable) {
const chunks = [];

let result;
while (!(result = await readable.read()).done) {
chunks.push(result.value);
}

return chunks;
}
```

## Writable Streams

### Reporting Incremental Progress
Expand Down
92 changes: 0 additions & 92 deletions Locking Design Doc.md

This file was deleted.

Loading

0 comments on commit fce682e

Please sign in to comment.