Skip to content

Commit

Permalink
make latency recording a per-executor opt-in config knob
Browse files Browse the repository at this point in the history
Recording latency can be expensive so gate this feature behind a config
knob that's disabled by default.
  • Loading branch information
HippoBaro committed Dec 27, 2021
1 parent 18e87b2 commit 5c483ee
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 15 deletions.
35 changes: 34 additions & 1 deletion glommio/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ pub struct LocalExecutorBuilder {
ring_depth: usize,
/// How often to yield to other task queues
preempt_timer_duration: Duration,
/// Whether to record the latencies of individual IO requests
record_io_latencies: bool,
}

impl LocalExecutorBuilder {
Expand All @@ -452,6 +454,7 @@ impl LocalExecutorBuilder {
io_memory: DEFAULT_IO_MEMORY,
ring_depth: DEFAULT_RING_SUBMISSION_DEPTH,
preempt_timer_duration: DEFAULT_PREEMPT_TIMER,
record_io_latencies: false,
}
}

Expand Down Expand Up @@ -512,6 +515,14 @@ impl LocalExecutorBuilder {
self
}

/// Whether to record the latencies of individual IO requests as part of the
/// IO stats. Recording latency can be expensive. Disabled by default.
#[must_use = "The builder must be built to be useful"]
pub fn record_io_latencies(mut self, enabled: bool) -> LocalExecutorBuilder {
self.record_io_latencies = enabled;
self
}

/// Make a new [`LocalExecutor`] by taking ownership of the Builder, and
/// returns a [`Result`](crate::Result) to the executor.
/// # Examples
Expand All @@ -529,6 +540,7 @@ impl LocalExecutorBuilder {
self.io_memory,
self.ring_depth,
self.preempt_timer_duration,
self.record_io_latencies,
cpu_set_gen.next().cpu_binding(),
self.spin_before_park,
)?;
Expand Down Expand Up @@ -592,6 +604,7 @@ impl LocalExecutorBuilder {
let ring_depth = self.ring_depth;
let preempt_timer_duration = self.preempt_timer_duration;
let spin_before_park = self.spin_before_park;
let record_io_latencies = self.record_io_latencies;

Builder::new()
.name(name)
Expand All @@ -601,6 +614,7 @@ impl LocalExecutorBuilder {
io_memory,
ring_depth,
preempt_timer_duration,
record_io_latencies,
cpu_set_gen.next().cpu_binding(),
spin_before_park,
)
Expand Down Expand Up @@ -663,6 +677,8 @@ pub struct LocalExecutorPoolBuilder {
preempt_timer_duration: Duration,
/// Indicates a policy by which [`LocalExecutor`]s are bound to CPUs.
placement: PoolPlacement,
/// Whether to record the latencies of individual IO requests
record_io_latencies: bool,
}

impl LocalExecutorPoolBuilder {
Expand All @@ -679,6 +695,7 @@ impl LocalExecutorPoolBuilder {
ring_depth: DEFAULT_RING_SUBMISSION_DEPTH,
preempt_timer_duration: DEFAULT_PREEMPT_TIMER,
placement,
record_io_latencies: false,
}
}

Expand Down Expand Up @@ -727,6 +744,14 @@ impl LocalExecutorPoolBuilder {
self
}

/// Whether to record the latencies of individual IO requests as part of the
/// IO stats. Recording latency can be expensive. Disabled by default.
#[must_use = "The builder must be built to be useful"]
pub fn record_io_latencies(mut self, enabled: bool) -> Self {
self.record_io_latencies = enabled;
self
}

/// Spawn a pool of [`LocalExecutor`]s in a new thread according to the
/// [`PoolPlacement`] policy, which is `Unbound` by default.
///
Expand Down Expand Up @@ -788,6 +813,7 @@ impl LocalExecutorPoolBuilder {
let ring_depth = self.ring_depth;
let preempt_timer_duration = self.preempt_timer_duration;
let spin_before_park = self.spin_before_park;
let record_io_latencies = self.record_io_latencies;
let latch = Latch::clone(latch);

move || {
Expand All @@ -799,6 +825,7 @@ impl LocalExecutorPoolBuilder {
io_memory,
ring_depth,
preempt_timer_duration,
record_io_latencies,
cpu_binding,
spin_before_park,
)
Expand Down Expand Up @@ -932,6 +959,7 @@ impl LocalExecutor {
io_memory: usize,
ring_depth: usize,
preempt_timer: Duration,
record_io_latencies: bool,
cpu_binding: Option<impl IntoIterator<Item = usize>>,
mut spin_before_park: Option<Duration>,
) -> Result<LocalExecutor> {
Expand All @@ -954,7 +982,12 @@ impl LocalExecutor {
queues: Rc::new(RefCell::new(queues)),
parker: p,
id: notifier.id(),
reactor: Rc::new(reactor::Reactor::new(notifier, io_memory, ring_depth)),
reactor: Rc::new(reactor::Reactor::new(
notifier,
io_memory,
ring_depth,
record_io_latencies,
)),
})
}

Expand Down
6 changes: 4 additions & 2 deletions glommio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,12 @@ macro_rules! to_io_error {
#[cfg(test)]
macro_rules! test_executor {
($( $fut:expr ),+ ) => {
use crate::executor::{LocalExecutor};
use futures::future::join_all;

let local_ex = LocalExecutor::default();
let local_ex = crate::executor::LocalExecutorBuilder::new(crate::executor::Placement::Unbound)
.record_io_latencies(true)
.make()
.unwrap();
local_ex.run(async move {
let mut joins = Vec::new();
$(
Expand Down
35 changes: 23 additions & 12 deletions glommio/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ pub(crate) struct Reactor {
shared_channels: RefCell<SharedChannels>,

io_scheduler: Rc<IoScheduler>,
record_io_latencies: bool,

/// Whether there are events in the latency ring.
///
Expand All @@ -196,6 +197,7 @@ impl Reactor {
notifier: Arc<SleepNotifier>,
io_memory: usize,
ring_depth: usize,
record_io_latencies: bool,
) -> Reactor {
let sys = sys::Reactor::new(notifier, io_memory, ring_depth)
.expect("cannot initialize I/O event notification");
Expand All @@ -205,6 +207,7 @@ impl Reactor {
timers: RefCell::new(Timers::new()),
shared_channels: RefCell::new(SharedChannels::new()),
io_scheduler: Rc::new(IoScheduler::new()),
record_io_latencies,
preempt_ptr_head,
preempt_ptr_tail: preempt_ptr_tail as _,
}
Expand Down Expand Up @@ -500,12 +503,16 @@ impl Reactor {
stats.file_deduped_bytes_read += *result as u64 * op_count;
}
}),
latency: Some(|io_lat, sched_lat, stats| {
stats.io_latency_us.add(io_lat.as_micros() as f64);
stats
.post_reactor_io_scheduler_latency_us
.add(sched_lat.as_micros() as f64)
}),
latency: if self.record_io_latencies {
Some(|io_lat, sched_lat, stats| {
stats.io_latency_us.add(io_lat.as_micros() as f64);
stats
.post_reactor_io_scheduler_latency_us
.add(sched_lat.as_micros() as f64)
})
} else {
None
},
};

let source = self.new_source(raw, SourceType::Read(pollable, None), Some(stats));
Expand Down Expand Up @@ -540,12 +547,16 @@ impl Reactor {
}
}),
reused: None,
latency: Some(|io_lat, sched_lat, stats| {
stats.io_latency_us.add(io_lat.as_micros() as f64);
stats
.post_reactor_io_scheduler_latency_us
.add(sched_lat.as_micros() as f64)
}),
latency: if self.record_io_latencies {
Some(|io_lat, sched_lat, stats| {
stats.io_latency_us.add(io_lat.as_micros() as f64);
stats
.post_reactor_io_scheduler_latency_us
.add(sched_lat.as_micros() as f64)
})
} else {
None
},
};

let source = self.new_source(
Expand Down

0 comments on commit 5c483ee

Please sign in to comment.