Skip to content

Commit

Permalink
better read_many interface and defaults
Browse files Browse the repository at this point in the history
* Provide a better, less confusing interface using named enums. This
  makes the code a lot more readable;
* Read sysfs to give good device-specific options for request merging
  (if we merge too much, we may with a request so large the kernel has
  to split it underneath our feet, which defeats the purpose).
  • Loading branch information
HippoBaro committed Jan 14, 2022
1 parent ba5b01f commit 867ac71
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 35 deletions.
12 changes: 9 additions & 3 deletions examples/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use glommio::{
DmaStreamReader,
DmaStreamReaderBuilder,
DmaStreamWriterBuilder,
MergedBufferLimit,
ReadAmplificationLimit,
StreamReaderBuilder,
StreamWriterBuilder,
},
Expand Down Expand Up @@ -167,9 +169,13 @@ impl Reader {
) {
match &self {
Reader::Direct(file) => {
file.read_many(futures_lite::stream::iter(iovs), max_buffer_size, None)
.for_each(|_| {})
.await;
file.read_many(
futures_lite::stream::iter(iovs),
MergedBufferLimit::Custom(max_buffer_size),
ReadAmplificationLimit::NoAmplification,
)
.for_each(|_| {})
.await;
}
Reader::Buffered(_) => {
panic!("bulk io is not available for buffered files")
Expand Down
6 changes: 3 additions & 3 deletions glommio/benches/competing_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::future::join;
use futures_lite::{stream, AsyncWriteExt, StreamExt};
use glommio::{
enclose,
io::{ImmutableFile, ImmutableFileBuilder},
io::{ImmutableFile, ImmutableFileBuilder, MergedBufferLimit, ReadAmplificationLimit},
Latency,
LocalExecutorBuilder,
Placement,
Expand Down Expand Up @@ -100,8 +100,8 @@ async fn run_io(name: &str, file: &ImmutableFile, count: usize, size: usize) {
at: Instant::now(),
})
.take(count),
0,
Some(0),
MergedBufferLimit::NoMerging,
ReadAmplificationLimit::NoAmplification,
)
.for_each(|res| {
let (io, _) = res.unwrap();
Expand Down
53 changes: 53 additions & 0 deletions glommio/src/io/bulk_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,59 @@ use std::{
rc::Rc,
};

/// Set a limit to the size of merged IO requests.
#[derive(Debug)]
pub enum MergedBufferLimit {
/// Disables request coalescing
NoMerging,

/// Sets the limit to the maximum the kernel allows for the underlying
/// device without breaking down the request into smaller ones
/// (/sys/block/.../queue/max_sectors_kb)
DeviceMaxSingleRequest,

/// Sets a custom limit.
/// This value should be a multiple of the file's alignment. If it is not,
/// it'll be aligned down.
Custom(usize),
}

impl Default for MergedBufferLimit {
fn default() -> Self {
Self::DeviceMaxSingleRequest
}
}

/// Set a limit to the amount of read amplification in-between two mergeable IO
/// requests.
#[derive(Debug)]
pub enum ReadAmplificationLimit {
/// Deny read amplification.
///
/// Note that, because IO request coalescing is done _before_ buffers are
/// aligned, requests may be merged if they are smaller than the minimum IO
/// size. For instance, if the minimum IO size is 4KiB and the user reads
/// [0..256] and [2048..2560] then the two will be merged into [0..4096] to
/// accommodate the 4KiB minimum IO size.
NoAmplification,

/// Merge two consecutive IO requests if the read amplification is below a
/// limit.
///
/// This value doesn't have any alignment constrain.
Custom(usize),

/// Always merge successive IO requests if possible, no matter the distance
/// between them. This is likely not what you want.
NoLimit,
}

impl Default for ReadAmplificationLimit {
fn default() -> Self {
Self::NoAmplification
}
}

/// An interface to an IO vector.
pub trait IoVec {
/// The read position (the offset) in the file
Expand Down
58 changes: 44 additions & 14 deletions glommio/src/io/dma_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@
//
use crate::{
io::{
bulk_io::{CoalescedReads, IoVec, OrderedBulkIo, ReadManyArgs, ReadManyResult},
bulk_io::{
CoalescedReads,
IoVec,
MergedBufferLimit,
OrderedBulkIo,
ReadAmplificationLimit,
ReadManyArgs,
ReadManyResult,
},
glommio_file::GlommioFile,
open_options::OpenOptions,
read_result::ReadResult,
Expand Down Expand Up @@ -297,29 +305,39 @@ impl DmaFile {
/// This API will optimistically coalesce and deduplicate IO requests such
/// that two overlapping or adjacent reads will result in a single IO
/// request. This is transparent for the consumer, you will still
/// receive individual ReadResults corresponding to what you asked for.
/// receive individual [`ReadResult`]s corresponding to what you asked for.
///
/// The first argument is a stream of [`IoVec`]. The last two
/// arguments control how aggressive the IO coalescing should be:
/// * `max_merged_buffer_size` controls how large a merged IO request can
/// be. A value of 0 disables merging completely.
/// * `max_read_amp` is optional and defines the maximum read amplification
/// you are comfortable with. If two read requests are separated by a
/// distance less than this value, they will be merged. A value `None`
/// disables all read amplification limitation.
/// * `buffer_limit` controls how large a merged IO request can get;
/// * `read_amp_limit` controls how much read amplification is acceptable.
///
/// It is not necessary to respect the `O_DIRECT` alignment of the file, and
/// this API will internally convert the positions and sizes to match.
/// this API will internally align the reads appropriately.
pub fn read_many<V, S>(
self: &Rc<DmaFile>,
iovs: S,
max_merged_buffer_size: usize,
max_read_amp: Option<usize>,
buffer_limit: MergedBufferLimit,
read_amp_limit: ReadAmplificationLimit,
) -> ReadManyResult<V, impl Stream<Item = (ScheduledSource, ReadManyArgs<V>)>>
where
V: IoVec + Unpin,
S: Stream<Item = V> + Unpin,
{
let max_merged_buffer_size = match buffer_limit {
MergedBufferLimit::NoMerging => 0,
MergedBufferLimit::DeviceMaxSingleRequest => self.max_sectors_size,
MergedBufferLimit::Custom(limit) => {
self.align_down(limit.min(self.max_segment_size) as u64) as usize
}
};

let max_read_amp = match read_amp_limit {
ReadAmplificationLimit::NoAmplification => Some(0),
ReadAmplificationLimit::Custom(limit) => Some(limit),
ReadAmplificationLimit::NoLimit => None,
};

let file = self.clone();
let reactor = file.file.reactor.upgrade().unwrap();
let it = CoalescedReads::new(
Expand Down Expand Up @@ -846,7 +864,11 @@ pub(crate) mod test {
let mut iovs: Vec<(u64, usize)> = (0..512).map(|x| (x * 8, 8)).collect();
iovs.shuffle(&mut thread_rng());
new_file
.read_many(stream::iter(iovs.into_iter()), 4096, None)
.read_many(
stream::iter(iovs.into_iter()),
MergedBufferLimit::Custom(4096),
ReadAmplificationLimit::NoAmplification,
)
.enumerate()
.for_each(enclose! {(total_reads, last_read) |x| {
*total_reads.borrow_mut() += 1;
Expand Down Expand Up @@ -878,7 +900,11 @@ pub(crate) mod test {
let mut iovs: Vec<(u64, usize)> = (0..511).map(|x| (x * 8 + 1, 7)).collect();
iovs.shuffle(&mut thread_rng());
new_file
.read_many(stream::iter(iovs.into_iter()), 4096, None)
.read_many(
stream::iter(iovs.into_iter()),
MergedBufferLimit::Custom(4096),
ReadAmplificationLimit::NoAmplification,
)
.enumerate()
.for_each(enclose! {(total_reads, last_read) |x| {
*total_reads.borrow_mut() += 1;
Expand Down Expand Up @@ -907,7 +933,11 @@ pub(crate) mod test {
let last_read = Rc::new(RefCell::new(-1));

new_file
.read_many(stream::iter((0..511).map(|x| (x * 8 + 1, 7))), 0, Some(0))
.read_many(
stream::iter((0..511).map(|x| (x * 8 + 1, 7))),
MergedBufferLimit::NoMerging,
ReadAmplificationLimit::NoAmplification,
)
.enumerate()
.for_each(enclose! {(total_reads, last_read) |x| {
*total_reads.borrow_mut() += 1;
Expand Down
28 changes: 14 additions & 14 deletions glommio/src/io/immutable_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc.
//
use crate::io::{
bulk_io::ReadManyArgs,
bulk_io::{MergedBufferLimit, ReadAmplificationLimit, ReadManyArgs},
open_options::OpenOptions,
DmaStreamReaderBuilder,
DmaStreamWriter,
Expand Down Expand Up @@ -388,32 +388,28 @@ impl ImmutableFile {
/// This API will optimistically coalesce and deduplicate IO requests such
/// that two overlapping or adjacent reads will result in a single IO
/// request. This is transparent for the consumer, you will still
/// receive individual ReadResults corresponding to what you asked for.
/// receive individual [`ReadResult`]s corresponding to what you asked for.
///
/// The first argument is an iterator of [`IoVec`]. The last two
/// The first argument is a stream of [`IoVec`]. The last two
/// arguments control how aggressive the IO coalescing should be:
/// * `max_merged_buffer_size` controls how large a merged IO request can
/// be. A value of 0 disables merging completely.
/// * `max_read_amp` is optional and defines the maximum read amplification
/// you are comfortable with. If two read requests are separated by a
/// distance less than this value, they will be merged. A value `None`
/// disables all read amplification limitation.
/// * `buffer_limit` controls how large a merged IO request can get;
/// * `read_amp_limit` controls how much read amplification is acceptable.
///
/// It is not necessary to respect the `O_DIRECT` alignment of the file, and
/// this API will internally convert the positions and sizes to match.
/// this API will internally align the reads appropriately.
pub fn read_many<V, S>(
&self,
iovs: S,
max_merged_buffer_size: usize,
max_read_amp: Option<usize>,
buffer_limit: MergedBufferLimit,
read_amp_limit: ReadAmplificationLimit,
) -> ReadManyResult<V, impl Stream<Item = (ScheduledSource, ReadManyArgs<V>)>>
where
V: IoVec + Unpin,
S: Stream<Item = V> + Unpin,
{
self.stream_builder
.file
.read_many(iovs, max_merged_buffer_size, max_read_amp)
.read_many(iovs, buffer_limit, read_amp_limit)
}

/// Rename this file.
Expand Down Expand Up @@ -566,7 +562,11 @@ mod test {

{
let iovs = vec![(0, 1), (3, 1)];
let mut bufs = stream.read_many(stream::iter(iovs.into_iter()), 0, None);
let mut bufs = stream.read_many(
stream::iter(iovs.into_iter()),
MergedBufferLimit::NoMerging,
ReadAmplificationLimit::NoAmplification,
);
let next_buffer = bufs.next().await.unwrap();
assert_eq!(next_buffer.unwrap().1.len(), 1);
let next_buffer = bufs.next().await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion glommio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub use self::{
StreamWriter,
StreamWriterBuilder,
},
bulk_io::{IoVec, ReadManyResult},
bulk_io::{IoVec, MergedBufferLimit, ReadAmplificationLimit, ReadManyResult},
directory::Directory,
dma_file::{CloseResult, DmaFile},
dma_file_stream::{
Expand Down

0 comments on commit 867ac71

Please sign in to comment.