Skip to content

Commit

Permalink
Adjust replay-related metrics for unified schduler
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Jun 14, 2024
1 parent 3342215 commit fce38e4
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 29 deletions.
5 changes: 4 additions & 1 deletion core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3009,18 +3009,20 @@ impl ReplayStage {
.expect("Bank fork progress entry missing for completed bank");

let replay_stats = bank_progress.replay_stats.clone();
let mut is_stats_from_completed_scheduler = false;

if let Some((result, completed_execute_timings)) =
bank.wait_for_completed_scheduler()
{
is_stats_from_completed_scheduler = true;
let metrics = ExecuteBatchesInternalMetrics::new_with_timings_from_all_threads(
completed_execute_timings,
);
replay_stats
.write()
.unwrap()
.batch_execute
.accumulate(metrics);
.accumulate(metrics, is_stats_from_completed_scheduler);

if let Err(err) = result {
let root = bank_forks.read().unwrap().root();
Expand Down Expand Up @@ -3219,6 +3221,7 @@ impl ReplayStage {
r_replay_progress.num_entries,
r_replay_progress.num_shreds,
bank_complete_time.as_us(),
is_stats_from_completed_scheduler,
);
execute_timings.accumulate(&r_replay_stats.batch_execute.totals);
} else {
Expand Down
78 changes: 61 additions & 17 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ fn rebatch_and_execute_batches(
prioritization_fee_cache,
)?;

timing.accumulate(execute_batches_internal_metrics);
// Pass false because this code-path is never touched by unified scheduler.
timing.accumulate(execute_batches_internal_metrics, false);
Ok(())
}

Expand Down Expand Up @@ -1079,11 +1080,15 @@ pub struct ConfirmationTiming {
/// and replay. As replay can run in parallel with the verification, this value can not be
/// recovered from the `replay_elapsed` and or `{poh,transaction}_verify_elapsed`. This
/// includes failed cases, when `confirm_slot_entries` exist with an error. In microseconds.
/// When unified scheduler is enabled, replay excludes the transaction execution, only
/// accounting for task creation and submission to the scheduler.
pub confirmation_elapsed: u64,

/// Wall clock time used by the entry replay code. Does not include the PoH or the transaction
/// signature/precompiles verification, but can overlap with the PoH and signature verification.
/// In microseconds.
/// When unified scheduler is enabled, replay excludes the transaction execution, only
/// accounting for task creation and submission to the scheduler.
pub replay_elapsed: u64,

/// Wall clock times, used for the PoH verification of entries. In microseconds.
Expand Down Expand Up @@ -1137,7 +1142,11 @@ pub struct BatchExecutionTiming {
}

impl BatchExecutionTiming {
pub fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) {
pub fn accumulate(
&mut self,
new_batch: ExecuteBatchesInternalMetrics,
is_unified_scheduler_enabled: bool,
) {
let Self {
totals,
wall_clock_us,
Expand All @@ -1146,9 +1155,13 @@ impl BatchExecutionTiming {

saturating_add_assign!(*wall_clock_us, new_batch.execute_batches_us);

use ExecuteTimingType::{NumExecuteBatches, TotalBatchesLen};
totals.saturating_add_in_place(TotalBatchesLen, new_batch.total_batches_len);
totals.saturating_add_in_place(NumExecuteBatches, 1);
use ExecuteTimingType::NumExecuteBatches;
// These metrics aren't applicable for the unified scheduler
if !is_unified_scheduler_enabled {
use ExecuteTimingType::TotalBatchesLen;
totals.saturating_add_in_place(TotalBatchesLen, new_batch.total_batches_len);
totals.saturating_add_in_place(NumExecuteBatches, 1);
}

for thread_times in new_batch.execution_timings_per_thread.values() {
totals.accumulate(&thread_times.execute_timings);
Expand All @@ -1161,9 +1174,12 @@ impl BatchExecutionTiming {

if let Some(slowest) = slowest {
slowest_thread.accumulate(slowest);
slowest_thread
.execute_timings
.saturating_add_in_place(NumExecuteBatches, 1);
// This metric isn't applicable for the unified scheduler
if !is_unified_scheduler_enabled {
slowest_thread
.execute_timings
.saturating_add_in_place(NumExecuteBatches, 1);
}
};
}
}
Expand All @@ -1176,16 +1192,25 @@ pub struct ThreadExecuteTimings {
}

impl ThreadExecuteTimings {
pub fn report_stats(&self, slot: Slot) {
pub fn report_stats(&self, slot: Slot, is_unified_scheduler_enabled: bool) {
let (total_thread_us, total_transactions_executed) = if is_unified_scheduler_enabled {
(None, None)
} else {
(
Some(self.total_thread_us as i64),
Some(self.total_transactions_executed as i64),
)
};

lazy! {
datapoint_info!(
"replay-slot-end-to-end-stats",
("slot", slot as i64, i64),
("total_thread_us", self.total_thread_us as i64, i64),
("total_transactions_executed", self.total_transactions_executed as i64, i64),
("total_thread_us", total_thread_us, Option<i64>),
("total_transactions_executed", total_transactions_executed, Option<i64>),
// Everything inside the `eager!` block will be eagerly expanded before
// evaluation of the rest of the surrounding macro.
eager!{report_execute_timings!(self.execute_timings)}
eager!{report_execute_timings!(self.execute_timings, is_unified_scheduler_enabled)}
);
};
}
Expand Down Expand Up @@ -1222,7 +1247,24 @@ impl ReplaySlotStats {
num_entries: usize,
num_shreds: u64,
bank_complete_time_us: u64,
is_unified_scheduler_enabled: bool,
) {
let confirmation_elapsed = if is_unified_scheduler_enabled {
"confirmation_without_replay_us"
} else {
"confirmation_time_us"
};
let replay_elapsed = if is_unified_scheduler_enabled {
"task_submission_us"
} else {
"replay_time"
};
let execute_batches_us = if is_unified_scheduler_enabled {
None
} else {
Some(self.batch_execute.wall_clock_us as i64)
};

lazy! {
datapoint_info!(
"replay-slot-stats",
Expand All @@ -1243,9 +1285,9 @@ impl ReplaySlotStats {
self.transaction_verify_elapsed as i64,
i64
),
("confirmation_time_us", self.confirmation_elapsed as i64, i64),
("replay_time", self.replay_elapsed as i64, i64),
("execute_batches_us", self.batch_execute.wall_clock_us as i64, i64),
(confirmation_elapsed, self.confirmation_elapsed as i64, i64),
(replay_elapsed, self.replay_elapsed as i64, i64),
("execute_batches_us", execute_batches_us, Option<i64>),
(
"replay_total_elapsed",
self.started.elapsed().as_micros() as i64,
Expand All @@ -1257,11 +1299,13 @@ impl ReplaySlotStats {
("total_shreds", num_shreds as i64, i64),
// Everything inside the `eager!` block will be eagerly expanded before
// evaluation of the rest of the surrounding macro.
eager!{report_execute_timings!(self.batch_execute.totals)}
eager!{report_execute_timings!(self.batch_execute.totals, is_unified_scheduler_enabled)}
);
};

self.batch_execute.slowest_thread.report_stats(slot);
self.batch_execute
.slowest_thread
.report_stats(slot, is_unified_scheduler_enabled);

let mut per_pubkey_timings: Vec<_> = self
.batch_execute
Expand Down
28 changes: 17 additions & 11 deletions program-runtime/src/timings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl core::fmt::Debug for Metrics {
eager_macro_rules! { $eager_1
#[macro_export]
macro_rules! report_execute_timings {
($self: expr) => {
($self: expr, $is_unified_scheduler_enabled: expr) => {
(
"validate_transactions_us",
*$self
Expand Down Expand Up @@ -149,19 +149,25 @@ eager_macro_rules! { $eager_1
),
(
"total_batches_len",
*$self

.metrics
.index(ExecuteTimingType::TotalBatchesLen),
i64
(if $is_unified_scheduler_enabled {
None
} else {
Some(*$self
.metrics
.index(ExecuteTimingType::TotalBatchesLen))
}),
Option<i64>
),
(
"num_execute_batches",
*$self

.metrics
.index(ExecuteTimingType::NumExecuteBatches),
i64
(if $is_unified_scheduler_enabled {
None
} else {
Some(*$self
.metrics
.index(ExecuteTimingType::NumExecuteBatches))
}),
Option<i64>
),
(
"update_transaction_statuses",
Expand Down

0 comments on commit fce38e4

Please sign in to comment.