Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a poll variant to DmaStreamReader::get_buffer_aligned #449

Merged
merged 2 commits into from
Nov 4, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 49 additions & 8 deletions glommio/src/io/dma_file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use core::task::Waker;
use futures_lite::{
future::poll_fn,
io::{AsyncRead, AsyncWrite},
ready,
stream::{self, StreamExt},
};
use std::{
Expand Down Expand Up @@ -519,7 +520,7 @@ impl DmaStreamReader {

/// Allows access to the buffer that holds the current position with no
/// extra copy
//_ In order to use this API, one must guarantee that reading the specified
/// In order to use this API, one must guarantee that reading the specified
/// length may not cross into a different buffer. Users of this API are
/// expected to be aware of their buffer size (selectable in the
/// [`DmaStreamReaderBuilder`]) and act accordingly.
Expand Down Expand Up @@ -562,8 +563,45 @@ impl DmaStreamReader {
/// [`AsyncReadExt`]: https://docs.rs/futures-lite/1.11.2/futures_lite/io/trait.AsyncReadExt.html
/// [`ReadResult`]: struct.ReadResult.html
pub async fn get_buffer_aligned(&mut self, len: u64) -> Result<ReadResult> {
poll_fn(|cx| self.poll_get_buffer_aligned(cx, len)).await
}

/// A variant of [`get_buffer_aligned`] that can be called from a poll
/// function context.
///
/// Allows access to the buffer that holds the current position with no
/// extra copy
/// In order to use this API, one must guarantee that reading the specified
/// length may not cross into a different buffer. Users of this API are
/// expected to be aware of their buffer size (selectable in the
/// [`DmaStreamReaderBuilder`]) and act accordingly.
///
/// The buffer is also not released until the returned [`ReadResult`] goes
/// out of scope. So if you plan to keep this alive for a long time this
/// is probably the wrong API.
///
/// If EOF is hit while reading with this method, the number of bytes in the
/// returned buffer will be less than number requested.
///
/// Let's say you want to open a file and check if its header is sane: this
/// is a good API for that.
///
/// But if after such header there is an index that you want to keep in
/// memory, then you are probably better off with one of the methods
/// from [`AsyncReadExt`].
///
/// [`get_buffer_aligned`]: Self::get_buffer_aligned
/// [`DmaStreamReader`]: struct.DmaStreamReader.html
/// [`DmaStreamReaderBuilder`]: struct.DmaStreamReaderBuilder.html
/// [`AsyncReadExt`]: https://docs.rs/futures-lite/1.11.2/futures_lite/io/trait.AsyncReadExt.html
/// [`ReadResult`]: struct.ReadResult.html
pub fn poll_get_buffer_aligned(
&mut self,
cx: &mut Context<'_>,
len: u64,
) -> Poll<Result<ReadResult>> {
if len == 0 {
return Ok(ReadResult::empty_buffer());
return Poll::Ready(Ok(ReadResult::empty_buffer()));
}

let (start_id, end_id, buffer_size) = {
Expand All @@ -574,18 +612,21 @@ impl DmaStreamReader {
};

if start_id != end_id {
return Err(GlommioError::<()>::WouldBlock(ResourceType::File(format!(
"Reading {} bytes from position {} would cross a buffer boundary (Buffer size {})",
len, self.current_pos, buffer_size
return Poll::Ready(Err(GlommioError::<()>::WouldBlock(ResourceType::File(
format!(
"Reading {} bytes from position {} would cross a buffer boundary (Buffer size \
{})",
len, self.current_pos, buffer_size
),
))));
}

let x = poll_fn(|cx| self.get_buffer(cx, len, start_id)).await?;
let x = ready!(self.poll_get_buffer(cx, len, start_id))?;
self.skip(len);
Ok(x)
Poll::Ready(Ok(x))
}

fn get_buffer(
fn poll_get_buffer(
&mut self,
cx: &mut Context<'_>,
len: u64,
Expand Down