Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reading binary into an existing buffer #111

Closed
domenic opened this issue May 20, 2014 · 15 comments
Closed

Reading binary into an existing buffer #111

domenic opened this issue May 20, 2014 · 15 comments

Comments

@domenic
Copy link
Member

domenic commented May 20, 2014

@dherman brought this up originally, and then it came up in a conversation with @willchan and @slightlyoff.

The idea would be something like:

var ab = rs.read();

// OR
var bytesWritten = rs.readInto(ab, offset);

There's lots of bikeshedding that could be done here (overload instead of separate method? What does the prior art in other languages and libraries do? How to handle overflows? etc.). But the argument is quite sound in principle: it would allow you to allocate a single contiguous array buffer, then fill it with multiple reads from the stream.

The alternative now would be concatenating array buffers, which (a) probably involves a copy in all existing browsers; (b) even if it were optimized (in a similar manner to string concatenation), would cause memory fragmentation/cache locality problems compared to the single contiguous buffer.

This would be for binary (and string?) streams only. It may be a good use case for a BinaryReadableStream subclass.

@domenic
Copy link
Member Author

domenic commented Jun 27, 2014

Just had a quick in-person conversation with @willchan. Tentatively it seems like the right approach is a BinaryReadableStream subclass with a readInto(dest, offset = 0, maxDesired = dest.byteLength - offset) method that synchronously writes up to maxDesired bytes at position offset into the dest ArrayBuffer.

Just like the generic read(), this will only work when state is "readable".

It is up to the stream implementation / underlying source to decide how much of dest to fill up, as long as it is at most maxDesired.

Still to figure out: how/whether this ties into the writable side and piping between them.

This is straightforward enough, and important enough, that I will try to get it written up pretty soon. Although probably not before transforms.

@domenic
Copy link
Member Author

domenic commented Jun 27, 2014

Augh, it doesn't quite work. Channeling @willchan again:

When you ask the kernel to fill up a buffer for you, it might say "error, not ready: try again later." In that case you wait until it's ready and try again.

This is kind of the intention behind our async wait() + sync readInto() structure, but doesn't quite match up. The kernel API requires that you provide the buffer initially, even if in the end it tells you "not ready." So we can't know until the kernel is ready, i.e. know that the stream can transition from "waiting" to "readable" and that readInto() will succeed, without first providing a buffer. (The workaround is to provide a dummy buffer, and then when they call readInto, copy from the dummy buffer to the user's buffer. But buffer copying sucks!)

The most straightforward fix is to combine them into async waitAndReadInto(dest, offset, maxDesired). But now we are no longer getting data as fast as possible if it is synchronously available, which was one of the major design goals of the more-naive async wait() + sync read() in the current spec.

Note that Node (which essentially also uses async wait() + sync read()) does do the dummy buffer copy, I believe. (Will confirm.)

Time to go re-read @isaacs's original message...

@willchan
Copy link

My understanding of what Node is doing is that it's always calling read() from the kernel to populate its internal buffer. Therefore, it's easy for it to tell if the stream is "waiting". But if you don't do this internal buffer (which is undesirable for certain apps, let me know if you need more explanation), you need to determine readability from the kernel.

The normal way of doing that is calling read() and providing a buffer. Alternatively, you could mimic Node's approach with internal buffers and do a dummy read() with MSG_PEEK and a 1 byte buffer if you wanted (to minimize the buffer copies) to determine the read/wait state, but just doing this at all is wasteful. For high performance systems, you want to minimize the number of system calls you invoke because they're relatively expensive when you're doing it for each connection and you've got a bunch of connections. So you don't want to issue "dummy" read() calls. You would call read() with the buffer you want to read into, and if the kernel doesn't have data, you use epoll_ctl() to add the socket to the list of sockets to monitor.

Note that writable streams have the analogous issue. If you use an internal buffer, you can always tell whether or not the stream is in a "waiting" state. But if you don't, and you go straight to the kernel, it can always return EAGAIN to tell you to try writing again later, which you use select or epoll to determine when to do so. And unlike read() where we could use dummy read()s with MSG_PEEK, you can't do a "dummy" write to determine write vs wait state.

@tyoshino
Copy link
Member

So, we use epoll(7) to wait until the fd is ready. We could provide an internal buffer for the initial shot of read(2) abiding the cost of copy.

  • If it's not ready, use epoll(7). When back from epoll(7) (the stream knows it can read something from the kernel), accept a buffer from the user and call read(2) with the buffer.
  • If ready, keep the internal buffer, and on the next rs.read() call, copy its contents to the given ArrayBuffer.

But this still doesn't work. To determine the state after a rs.read(), we don't want to epoll(7) every time.

Another problem is determination of whether the result of read(2) is data or EOF. Depending on that we need to choose the state to enter ("readable" or "closed") but epoll(7) doesn't tell this to us. We can workaround this by not filling the given ArrayBuffer and returning bytesWritten=0 and then move the stream into the "closed" state.

Hmm, can we use this to give the "readable" state a meaning of "possibly readable"? I.e. readInto() call can be made when state is "readable", but can result in not filling the given ArrayBuffer and transitioning into "waiting" state again. Hacky?

you can't do a "dummy" write to determine write vs wait state.

The WritableStream class has a [[queue]] and is designed to accept incoming data synchronously and keep it in the [[queue]] in it until it's done.

What you pointed out here will be important if we want to implement some class with the same interface as the WritableStream which doesn't have a queue and

  • give the "writable" state of it a meaning that "write() call of a certain amount of data (Or at least 1 byte? Currently the WritableStream is not equipped with any API to tell the amount of data it can accept) can be done synchronously". or
  • equip it with a method that tells us write() call of how much data (Or at least 1 byte) can be done synchronously

"done" may mean writing all data to the kernel or some other operation in general. Different from the readable stream's problem we're discussing, there's no issue of wasteful memory copy. We can just have a queue to hold pending ArrayBuffers, I think.

@tyoshino
Copy link
Member

readInto(dest, offset, maxDesired = dest.byteLength - offset)

Can't offset also be given a default value of 0? undefined will be taken as 0?

@domenic
Copy link
Member Author

domenic commented Jul 22, 2014

If ready, keep the internal buffer, and on the next rs.read() call, copy its contents to the given ArrayBuffer.

Right, this is the copy we want to avoid. I think the cost here will be much worse than the cost of having to move work from sync to async. So at this point I am feeling that the best API is promise-returning readInto. The alternative would be some kind of crazy inside-out or C-style API that preserves maximum efficiency. E.g. NodeJS libuv has an allocator callback (which allows you to allocate ahead of time or just-in-time). I don't think that is worthwhile.

This raises the question of whether promise-returning read(), and giving up on wait() + sync read(), might work. I am really hesitant but am willing to be proven wrong by data per #120. (BTW I am at TAG meetings through Wednesday so that is why work there is slightly stalled.) This could also solve many of the other issues you mention about state transitions, I suppose? I'd want some solid data first as this is a big change and we'd have to be sure before making it.

equip it with a method that tells us write() call of how much data (Or at least 1 byte) can be done synchronously

I think the idea we had was that instead of having the writable stream signal how much data it wants, and pipeTo pushing that much data into it, we should use the pipeFrom idea from #146 to allow the writablestream to pull the appropriate amount of data. What do you think? Again I haven't had time to read through your reply in #146 so I am probably missing something.

"done" may mean writing all data to the kernel or some other operation in general

I agree, definitely.

Can't offset also be given a default value of 0? undefined will be taken as 0?

Good catch I will update that post.

@willchan
Copy link

Yes, it's the copy to the internal buffer that we want to avoid if we want to:

  1. avoid buffer copies
  2. allow for splice()'ing between a readable stream and a writable stream, since that operates on file descriptors, not buffers

It's true that a WritableStream could have a queue of pending ArrayBuffers. The problem with this is if we want full control over memory consumption. If we're implementing a highly scalable server with tight memory constraints, we don't want an interface that tries to be simple and just queues up an ArrayBuffer internally, since that just consumes more memory. We want it to return an EAGAIN equivalent (with a promise to notify when it becomes writable again), or a partial write.

@tyoshino
Copy link
Member

Hmm, can we use this to give the "readable" state a meaning of "possibly readable"? I.e. readInto() call can be made when state is "readable", but can result in not filling the given ArrayBuffer and transitioning into "waiting" state again. Hacky?

I'd like to complement this proposal by applying it also to the initial read(). This approach makes "readable" mean "possibly readable", but eliminates memory copy between the internal buffer and the given ArrayBuffer completely.

readInto(arraybuffer, offset = 0, maxDesired = dest.byteLength - offset)

(fd is set to nonblocking mode)

  1. If this.[[state]] is "waiting" or "closed", throw a TypeError exception.
  2. If this.[[state]] is "errored", throw this.[[storedError]].
  3. Assert: this.[[state]] is "readable".
  4. Let buf be the start address of arraybuffer.
  5. If offset + maxDesired > arraybuffer.byteLength or maxDesired < 0, throw a TypeError exception.
  6. Let bytesRead be the result of syscall read(fd, buf + offset, maxDesired)
  7. if bytesRead is -1,
    1. If errno is EAGAIN or errno is EWOULDBLOCK,
      1. Set this.[[state]] to "waiting".
      2. Let this.[[waitPromise]] be a new promise.
      3. Run epoll_wait(2) or something to get notified when fd becomes readable. For example, we have some message loop implementation that watches for both new tasks and events on file descriptors.
        1. When fd becomes readable,
          1. Set this.[[state]] to "readable".
          2. Resolve this.[[waitPromise]] with undefined.
      4. Return 0.
    2. Call this.[[error]](new TypeError("read(2) failed")).
    3. Return 0.
  8. If bytesRead is 0,
    1. Set this.[[state]] to "closed"
    2. Let this.[[waitPromise]] be a new promise resolved with undefined.
    3. Resolve this.[[closedPromise]] with undefined.
    4. Return 0.
  9. Return bytesRead.

@tyoshino
Copy link
Member

@willchan

We want it to return an EAGAIN equivalent (with a promise to notify when it becomes writable again), or a partial write.

In this sentence, you're talking about your bullet point 2, i.e. cases where we want to use splice(2) or sendfile(2), right? OK, then yes, we cannot go through ArrayBuffers in the JS land. Such an extreme zerocopy use cases should be discussed as a special pipe operation and is related to #97? For use cases where we abide creation of an ArrayBuffer, I guess approaches like what I proposed above would work.

Sorry but I want to sort out problems at different levels of optimization we're trying to solve.

@tyoshino
Copy link
Member

Revised version of #111 (comment)

The underlying sink implements [[onReadInto]]

ReadableByteStream

class ReadableByteStream {
    any readInto(arraybuffer, offset, size)
    [[onReadInto]]
}   

Properties of the ReadableByteStream prototype

readInto(arraybuffer, offset, size)
  1. If this.[[state]] is "waiting" or "closed", throw a TypeError exception.
  2. If this.[[state]] is "errored", throw this.[[storedError]].
  3. Assert: this.[[state]] is "readable".
  4. If offset is undefined, let offset be 0.
  5. Otherwise,
    1. Let offset be ToInteger(offset).
    2. ReturnIfAbrupt(offset).
    3. If offset < 0, throw a TypeError exception.
  6. If size is undefined, let size be arraybuffer.[[byteLength]] - offset.
  7. Otherwise,
    1. Let size be ToInteger(size).
    2. ReturnIfAbrupt(size).
  8. If size < 0 or offset + size > arraybuffer.[[byteLength]], throw a TypeError exception.
  9. Let bytesRead be the result of this.[[onReadInto]](arraybuffer, offset, size)
  10. If bytesRead is an abrupt completion,
    1. Call this.[[error]](bytesRead.[[value]]).
    2. Throw bytesRead.[[value]].
  11. Let bytesRead be ToNumber(bytesRead).
  12. If bytesRead is -2,
    1. Set this.[[state]] to "waiting".
    2. Let this.[[waitPromise]] be a new promise.
    3. Return 0.
  13. If bytesRead is -1,
    1. Set this.[[state]] to "closed"
    2. Let this.[[waitPromise]] be a new promise resolved with undefined.
    3. Resolve this.[[closedPromise]] with undefined.
    4. Return 0.
  14. If bytesRead < 0 or bytesRead > arraybuffer.[[byteLength]],
    1. Let error be a TypeError exception.
    2. Call this.[[error]](error).
    3. Throw error.
  15. Return bytesRead.

@tyoshino
Copy link
Member

Moved to https://github.com/tyoshino/streams/blob/bytestream/BinaryExtension.md
I'm writing a reference implementation to be placed under experimental/ directory. Once ready I'll send a pull request.

@domenic
Copy link
Member Author

domenic commented Aug 14, 2014

<3

@tyoshino
Copy link
Member

Added reference implementation. Test coverage is not so high yet but maybe enough for initial commit.

@tyoshino
Copy link
Member

tyoshino commented Oct 1, 2014

@tyoshino tyoshino changed the title Reading into an existing buffer Reading binary into an existing buffer Oct 1, 2014
@domenic
Copy link
Member Author

domenic commented Oct 6, 2014

Closing as progress on this is happening and is tracked by other issues.

@domenic domenic closed this as completed Oct 6, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

3 participants