Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust replay-related metrics for unified scheduler #1741

Merged
merged 7 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3009,18 +3009,22 @@ impl ReplayStage {
.expect("Bank fork progress entry missing for completed bank");

let replay_stats = bank_progress.replay_stats.clone();
let mut is_unified_scheduler_enabled = false;

if let Some((result, completed_execute_timings)) =
bank.wait_for_completed_scheduler()
{
// It's guaranteed that wait_for_completed_scheduler() returns Some(_), iff the
// unified scheduler is enabled for the bank.
is_unified_scheduler_enabled = true;
let metrics = ExecuteBatchesInternalMetrics::new_with_timings_from_all_threads(
completed_execute_timings,
);
replay_stats
.write()
.unwrap()
.batch_execute
.accumulate(metrics);
.accumulate(metrics, is_unified_scheduler_enabled);

if let Err(err) = result {
let root = bank_forks.read().unwrap().root();
Expand Down Expand Up @@ -3219,6 +3223,7 @@ impl ReplayStage {
r_replay_progress.num_entries,
r_replay_progress.num_shreds,
bank_complete_time.as_us(),
is_unified_scheduler_enabled,
);
execute_timings.accumulate(&r_replay_stats.batch_execute.totals);
} else {
Expand Down
99 changes: 73 additions & 26 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use {
time::{Duration, Instant},
},
thiserror::Error,
ExecuteTimingType::{NumExecuteBatches, TotalBatchesLen},
};

pub struct TransactionBatchWithIndexes<'a, 'b> {
Expand Down Expand Up @@ -513,7 +514,8 @@ fn rebatch_and_execute_batches(
prioritization_fee_cache,
)?;

timing.accumulate(execute_batches_internal_metrics);
// Pass false because this code-path is never touched by unified scheduler.
timing.accumulate(execute_batches_internal_metrics, false);
Ok(())
}

Expand Down Expand Up @@ -1079,11 +1081,15 @@ pub struct ConfirmationTiming {
/// and replay. As replay can run in parallel with the verification, this value can not be
/// recovered from the `replay_elapsed` and or `{poh,transaction}_verify_elapsed`. This
/// includes failed cases, when `confirm_slot_entries` exist with an error. In microseconds.
/// When unified scheduler is enabled, replay excludes the transaction execution, only
/// accounting for task creation and submission to the scheduler.
pub confirmation_elapsed: u64,

/// Wall clock time used by the entry replay code. Does not include the PoH or the transaction
/// signature/precompiles verification, but can overlap with the PoH and signature verification.
/// In microseconds.
/// When unified scheduler is enabled, replay excludes the transaction execution, only
/// accounting for task creation and submission to the scheduler.
pub replay_elapsed: u64,

/// Wall clock times, used for the PoH verification of entries. In microseconds.
Expand Down Expand Up @@ -1129,42 +1135,59 @@ pub struct BatchExecutionTiming {

/// Wall clock time used by the transaction execution part of pipeline.
/// [`ConfirmationTiming::replay_elapsed`] includes this time. In microseconds.
pub wall_clock_us: u64,
wall_clock_us: u64,

/// Time used to execute transactions, via `execute_batch()`, in the thread that consumed the
/// most time.
pub slowest_thread: ThreadExecuteTimings,
/// most time (in terms of total_thread_us) among rayon threads. Note that the slowest thread
/// is determined each time a given group of batches is newly processed. So, this is a coarse
/// approximation of wall-time single-threaded linearized metrics, discarding all metrics other
/// than the arbitrary set of batches mixed with various transactions, which replayed slowest
/// as a whole for each rayon processing session, also after blockstore_processor's rebatching.
///
/// When unified scheduler is enabled, this field isn't maintained, because it's not batched at
/// all.
slowest_thread: ThreadExecuteTimings,
}

impl BatchExecutionTiming {
pub fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) {
pub fn accumulate(
&mut self,
new_batch: ExecuteBatchesInternalMetrics,
is_unified_scheduler_enabled: bool,
) {
let Self {
totals,
wall_clock_us,
slowest_thread,
} = self;

saturating_add_assign!(*wall_clock_us, new_batch.execute_batches_us);
// These metric fields aren't applicable for the unified scheduler
if !is_unified_scheduler_enabled {
saturating_add_assign!(*wall_clock_us, new_batch.execute_batches_us);

use ExecuteTimingType::{NumExecuteBatches, TotalBatchesLen};
totals.saturating_add_in_place(TotalBatchesLen, new_batch.total_batches_len);
totals.saturating_add_in_place(NumExecuteBatches, 1);
totals.saturating_add_in_place(TotalBatchesLen, new_batch.total_batches_len);
totals.saturating_add_in_place(NumExecuteBatches, 1);
}

for thread_times in new_batch.execution_timings_per_thread.values() {
totals.accumulate(&thread_times.execute_timings);
}
Comment on lines 1172 to 1174
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here


let slowest = new_batch
.execution_timings_per_thread
.values()
.max_by_key(|thread_times| thread_times.total_thread_us);

if let Some(slowest) = slowest {
slowest_thread.accumulate(slowest);
slowest_thread
.execute_timings
.saturating_add_in_place(NumExecuteBatches, 1);
};
// This whole metric (replay-slot-end-to-end-stats) isn't applicable for the unified
// scheduler.
if !is_unified_scheduler_enabled {
let slowest = new_batch
.execution_timings_per_thread
.values()
.max_by_key(|thread_times| thread_times.total_thread_us);

if let Some(slowest) = slowest {
slowest_thread.accumulate(slowest);
slowest_thread
.execute_timings
.saturating_add_in_place(NumExecuteBatches, 1);
};
}
}
}

Expand All @@ -1185,7 +1208,8 @@ impl ThreadExecuteTimings {
("total_transactions_executed", self.total_transactions_executed as i64, i64),
// Everything inside the `eager!` block will be eagerly expanded before
// evaluation of the rest of the surrounding macro.
eager!{report_execute_timings!(self.execute_timings)}
// Pass false because this code-path is never touched by unified scheduler.
eager!{report_execute_timings!(self.execute_timings, false)}
);
};
}
Expand Down Expand Up @@ -1222,7 +1246,24 @@ impl ReplaySlotStats {
num_entries: usize,
num_shreds: u64,
bank_complete_time_us: u64,
is_unified_scheduler_enabled: bool,
) {
let confirmation_elapsed = if is_unified_scheduler_enabled {
"confirmation_without_replay_us"
} else {
"confirmation_time_us"
};
let replay_elapsed = if is_unified_scheduler_enabled {
"task_submission_us"
} else {
"replay_time"
};
let execute_batches_us = if is_unified_scheduler_enabled {
None
} else {
Some(self.batch_execute.wall_clock_us as i64)
};

lazy! {
datapoint_info!(
"replay-slot-stats",
Expand All @@ -1243,9 +1284,9 @@ impl ReplaySlotStats {
self.transaction_verify_elapsed as i64,
i64
),
("confirmation_time_us", self.confirmation_elapsed as i64, i64),
("replay_time", self.replay_elapsed as i64, i64),
("execute_batches_us", self.batch_execute.wall_clock_us as i64, i64),
(confirmation_elapsed, self.confirmation_elapsed as i64, i64),
(replay_elapsed, self.replay_elapsed as i64, i64),
("execute_batches_us", execute_batches_us, Option<i64>),
(
"replay_total_elapsed",
self.started.elapsed().as_micros() as i64,
Expand All @@ -1257,11 +1298,17 @@ impl ReplaySlotStats {
("total_shreds", num_shreds as i64, i64),
// Everything inside the `eager!` block will be eagerly expanded before
// evaluation of the rest of the surrounding macro.
eager!{report_execute_timings!(self.batch_execute.totals)}
eager!{report_execute_timings!(self.batch_execute.totals, is_unified_scheduler_enabled)}
);
};

self.batch_execute.slowest_thread.report_stats(slot);
// Skip reporting replay-slot-end-to-end-stats entirely if unified scheduler is enabled,
// because the whole metrics itself is only meaningful for rayon-based worker threads.
//
// See slowest_thread doc comment for details.
if !is_unified_scheduler_enabled {
self.batch_execute.slowest_thread.report_stats(slot);
}

let mut per_pubkey_timings: Vec<_> = self
.batch_execute
Expand Down
28 changes: 17 additions & 11 deletions program-runtime/src/timings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl core::fmt::Debug for Metrics {
eager_macro_rules! { $eager_1
#[macro_export]
macro_rules! report_execute_timings {
($self: expr) => {
($self: expr, $is_unified_scheduler_enabled: expr) => {
(
"validate_transactions_us",
*$self
Expand Down Expand Up @@ -149,19 +149,25 @@ eager_macro_rules! { $eager_1
),
(
"total_batches_len",
*$self

.metrics
.index(ExecuteTimingType::TotalBatchesLen),
i64
(if $is_unified_scheduler_enabled {
None
} else {
Some(*$self
.metrics
.index(ExecuteTimingType::TotalBatchesLen))
}),
Option<i64>
),
(
"num_execute_batches",
*$self

.metrics
.index(ExecuteTimingType::NumExecuteBatches),
i64
(if $is_unified_scheduler_enabled {
None
} else {
Some(*$self
.metrics
.index(ExecuteTimingType::NumExecuteBatches))
}),
Option<i64>
),
(
"update_transaction_statuses",
Expand Down