Skip to content

Commit

Permalink
measure the pre-reactor IO latency
Browse files Browse the repository at this point in the history
We now record three kinds of source latencies:
* pre-reactor: the time the source spent queued (this commit);
* io: the time the source spent in the kernel;
* post-reactor: the time the scheduler took to get back at the consuming
  task.

Previously, the time the source was queued counted towards IO
latency (second item in the list).
  • Loading branch information
HippoBaro committed Jan 11, 2022
1 parent 1180325 commit d258184
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 21 deletions.
18 changes: 17 additions & 1 deletion glommio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ pub struct RingIoStats {
pub(crate) file_buffered_bytes_written: u64,

// Distributions
pub(crate) pre_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch,
pub(crate) io_latency_us: sketches_ddsketch::DDSketch,
pub(crate) post_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch,
}
Expand All @@ -615,6 +616,9 @@ impl Default for RingIoStats {
file_bytes_written: 0,
file_buffered_writes: 0,
file_buffered_bytes_written: 0,
pre_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch::new(
sketches_ddsketch::Config::new(0.01, 2048, 1.0e-9),
),
io_latency_us: sketches_ddsketch::DDSketch::new(sketches_ddsketch::Config::new(
0.01, 2048, 1.0e-9,
)),
Expand Down Expand Up @@ -704,6 +708,15 @@ impl RingIoStats {
(self.file_buffered_writes, self.file_buffered_bytes_written)
}

/// The pre-reactor IO scheduler latency
///
/// Returns a distribution of measures tracking the time between the moment
/// an IO operation was queued up and the moment it was submitted to the
/// kernel
pub fn pre_reactor_io_scheduler_latency_us(&self) -> &DDSketch {
&self.pre_reactor_io_scheduler_latency_us
}

/// The IO latency
///
/// Returns a distribution of measures tracking the time sources spent in
Expand Down Expand Up @@ -737,10 +750,13 @@ impl<'a> Sum<&'a RingIoStats> for RingIoStats {
a.file_bytes_written += b.file_bytes_written;
a.file_buffered_writes += b.file_buffered_writes;
a.file_buffered_bytes_written += b.file_buffered_bytes_written;
a.pre_reactor_io_scheduler_latency_us
.merge(&b.pre_reactor_io_scheduler_latency_us)
.unwrap();
a.io_latency_us.merge(&b.io_latency_us).unwrap();
a.post_reactor_io_scheduler_latency_us
.merge(&b.post_reactor_io_scheduler_latency_us)
.unwrap();
a.io_latency_us.merge(&b.io_latency_us).unwrap();
a
})
}
Expand Down
14 changes: 10 additions & 4 deletions glommio/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,14 @@ impl Reactor {
}
}),
latency: if self.record_io_latencies {
Some(|io_lat, sched_lat, stats| {
Some(|pre_lat, io_lat, post_lat, stats| {
stats
.pre_reactor_io_scheduler_latency_us
.add(pre_lat.as_micros() as f64);
stats.io_latency_us.add(io_lat.as_micros() as f64);
stats
.post_reactor_io_scheduler_latency_us
.add(sched_lat.as_micros() as f64)
.add(post_lat.as_micros() as f64)
})
} else {
None
Expand Down Expand Up @@ -548,11 +551,14 @@ impl Reactor {
}),
reused: None,
latency: if self.record_io_latencies {
Some(|io_lat, sched_lat, stats| {
Some(|pre_lat, io_lat, post_lat, stats| {
stats
.pre_reactor_io_scheduler_latency_us
.add(pre_lat.as_micros() as f64);
stats.io_latency_us.add(io_lat.as_micros() as f64);
stats
.post_reactor_io_scheduler_latency_us
.add(sched_lat.as_micros() as f64)
.add(post_lat.as_micros() as f64)
})
} else {
None
Expand Down
35 changes: 32 additions & 3 deletions glommio/src/sys/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,14 +451,30 @@ impl From<Duration> for TimeSpec64 {
}
}

pub(super) struct Latencies {
/// The timestamp at which the source was added to the IO queue of the
/// reactor
pub(super) queued_at: std::time::Instant,

/// The timestamp at which the source was submitted to the kernel
pub(super) submitted_at: std::time::Instant,

/// The timestamp at which the reactor fulfilled the source
pub(super) fulfilled_at: std::time::Instant,
}

/// Tasks interested in events on a source.
#[derive(Debug)]
pub(super) struct Wakers {
/// Raw result of the operation.
pub(super) result: Option<io::Result<usize>>,

/// The timestamp at which the reactor inserted the source in the ring
pub(super) seen_at: Option<std::time::Instant>,
/// The timestamp at which the source was added to the IO queue of the
/// reactor
pub(super) queued_at: Option<std::time::Instant>,

/// The timestamp at which the source was submitted to the kernel
pub(super) submitted_at: Option<std::time::Instant>,

/// The timestamp at which the reactor fulfilled the source
pub(super) fulfilled_at: Option<std::time::Instant>,
Expand All @@ -471,7 +487,8 @@ impl Wakers {
pub(super) fn new() -> Self {
Wakers {
result: None,
seen_at: None,
queued_at: None,
submitted_at: None,
fulfilled_at: None,
waiters: Default::default(),
}
Expand All @@ -487,6 +504,18 @@ impl Wakers {
true
}
}

fn timestamps(&mut self) -> Option<Latencies> {
if self.queued_at.is_none() || self.submitted_at.is_none() || self.fulfilled_at.is_none() {
None
} else {
Some(Latencies {
queued_at: self.queued_at.take().unwrap(),
submitted_at: self.submitted_at.take().unwrap(),
fulfilled_at: self.fulfilled_at.take().unwrap(),
})
}
}
}

/// Shuts down the requested side of a socket.
Expand Down
23 changes: 13 additions & 10 deletions glommio/src/sys/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub struct EnqueuedSource {

pub(crate) type StatsCollectionFn = fn(&io::Result<usize>, &mut RingIoStats, waiters: u64) -> ();
pub(crate) type LatencyCollectionFn =
fn(std::time::Duration, std::time::Duration, &mut RingIoStats) -> ();
fn(std::time::Duration, std::time::Duration, std::time::Duration, &mut RingIoStats) -> ();

#[derive(Copy, Clone)]
pub(crate) struct StatsCollection {
Expand Down Expand Up @@ -240,26 +240,29 @@ impl Source {
.result
.as_ref()
.map(|x| OsResult::from(x).into());
if ret.is_none() {
return ret;
}

// if there is a scheduler latency collection function present, invoke it once
if ret.is_some() && inner.wakers.fulfilled_at.is_some() && inner.wakers.seen_at.is_some() {
if let Some(Some(stat_fn)) = inner.stats_collection.as_ref().map(|x| x.latency) {
let seen_at = inner.wakers.seen_at.take().unwrap();
let fulfilled_at = inner.wakers.fulfilled_at.take().unwrap();
if let Some(Some(stat_fn)) = inner.stats_collection.as_ref().map(|x| x.latency) {
if let Some(lat) = inner.wakers.timestamps() {
drop(inner);

let io_lat = fulfilled_at - seen_at;
let sched_lat = fulfilled_at.elapsed();
let pre_lat = lat.submitted_at - lat.queued_at;
let io_lat = lat.fulfilled_at - lat.submitted_at;
let post_lat = lat.fulfilled_at.elapsed();

let reactor = &crate::executor().reactor().sys;
(stat_fn)(
pre_lat,
io_lat,
sched_lat,
post_lat,
reactor.ring_for_source(self).io_stats_mut(),
);
(stat_fn)(
pre_lat,
io_lat,
sched_lat,
post_lat,
reactor
.ring_for_source(self)
.io_stats_for_task_queue_mut(crate::executor().current_task_queue()),
Expand Down
10 changes: 7 additions & 3 deletions glommio/src/sys/uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,13 +526,15 @@ fn extract_one_chain(
source_map: &mut SourceMap,
queue: &mut VecDeque<UringDescriptor>,
chain: Range<usize>,
now: Instant,
) -> SmallVec<[UringDescriptor; 1]> {
queue
.drain(chain)
.filter(move |op| {
if op.user_data > 0 {
let id = from_user_data(op.user_data);
let status = source_map.peek_source_mut(from_user_data(op.user_data), |mut x| {
x.wakers.submitted_at = Some(now);
let current = x.enqueued.as_mut().expect("bug");
match current.status {
EnqueuedStatus::Enqueued => {
Expand Down Expand Up @@ -851,10 +853,11 @@ impl UringCommon for PollRing {

fn submit_one_event(&mut self, queue: &mut VecDeque<UringDescriptor>) -> Option<bool> {
let source_map = &mut *self.source_map.borrow_mut();
let now = Instant::now();

while let Some(chain) = peek_one_chain(queue, self.size) {
return if let Some(sqes) = self.ring.prepare_sqes(chain.len() as u32) {
let ops = extract_one_chain(source_map, queue, chain);
let ops = extract_one_chain(source_map, queue, chain, now);
if ops.is_empty() {
// all the sources in the ring were cancelled
continue;
Expand Down Expand Up @@ -1081,10 +1084,11 @@ impl UringCommon for SleepableRing {

fn submit_one_event(&mut self, queue: &mut VecDeque<UringDescriptor>) -> Option<bool> {
let source_map = &mut *self.source_map.borrow_mut();
let now = Instant::now();

while let Some(chain) = peek_one_chain(queue, self.size) {
return if let Some(sqes) = self.ring.prepare_sqes(chain.len() as u32) {
let ops = extract_one_chain(source_map, queue, chain);
let ops = extract_one_chain(source_map, queue, chain, now);
if ops.is_empty() {
// all the sources in the ring were cancelled
continue;
Expand Down Expand Up @@ -1796,7 +1800,7 @@ fn queue_request_into_ring(
descriptor: UringOpDescriptor,
source_map: &mut SourceMap,
) {
source.inner.borrow_mut().wakers.seen_at = Some(Instant::now());
source.inner.borrow_mut().wakers.queued_at = Some(Instant::now());
let q = ring.borrow_mut().submission_queue();
let id = source_map.add_source(source, Rc::clone(&q));

Expand Down

0 comments on commit d258184

Please sign in to comment.