Skip to content

Commit

Permalink
Refactor and cleanup metrics accumulatoin code
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin committed Jul 1, 2022
1 parent 4418417 commit 85d3de0
Showing 1 changed file with 76 additions and 48 deletions.
124 changes: 76 additions & 48 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,22 +266,23 @@ fn execute_batch(
first_err.map(|(result, _)| result).unwrap_or(Ok(()))
}

#[derive(Default)]
struct ExecuteBatchesInternalMetrics {
execution_timings_per_thread: RwLock<HashMap<usize, ThreadExecuteTimings>>,
total_batches_len: u64,
execute_batches_us: u64,
}

fn execute_batches_internal(
bank: &Arc<Bank>,
batches: &[TransactionBatchWithIndexes],
entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
confirmation_timing: &mut ConfirmationTiming,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
tx_costs: &[u64],
) -> Result<()> {
) -> Result<ExecuteBatchesInternalMetrics> {
inc_new_counter_debug!("bank-par_execute_entries-count", batches.len());
let ConfirmationTiming {
execute_timings: ref mut cumulative_execute_timings,
ref mut end_to_end_execute_timings,
..
} = confirmation_timing;
let execution_timings_per_thread: RwLock<HashMap<usize, ThreadExecuteTimings>> =
RwLock::new(HashMap::new());

Expand All @@ -296,8 +297,8 @@ fn execute_batches_internal(
.sanitized_transactions()
.len() as u64;
let mut timings = ExecuteTimings::default();
let (result, execute_batches_time) = measure!(
|_| {
let (result, execute_batches_time): (Result<()>, Measure) = measure!(
{
let result = execute_batch(
transaction_batch_with_indexes,
bank,
Expand Down Expand Up @@ -343,43 +344,13 @@ fn execute_batches_internal(
});
execute_batches_elapsed.stop();

cumulative_execute_timings
.saturating_add_in_place(ExecuteTimingType::TotalBatchesLen, batches.len() as u64);
cumulative_execute_timings.saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1);
saturating_add_assign!(
confirmation_timing.execute_batches_us,
execute_batches_elapsed.as_us()
);

let mut current_max_thread_execution_time: Option<ThreadExecuteTimings> = None;
for (_, thread_execution_time) in execution_timings_per_thread
.into_inner()
.unwrap()
.into_iter()
{
let ThreadExecuteTimings {
total_thread_us,
execute_timings,
..
} = &thread_execution_time;
cumulative_execute_timings.accumulate(execute_timings);
if *total_thread_us
> current_max_thread_execution_time
.as_ref()
.map(|thread_execution_time| thread_execution_time.total_thread_us)
.unwrap_or(0)
{
current_max_thread_execution_time = Some(thread_execution_time);
}
}
first_err(&results)?;

if let Some(current_max_thread_execution_time) = current_max_thread_execution_time {
end_to_end_execute_timings.accumulate(&current_max_thread_execution_time);
end_to_end_execute_timings
.execute_timings
.saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1);
};
first_err(&results)
Ok(ExecuteBatchesInternalMetrics {
execution_timings_per_thread,
total_batches_len: batches.len() as u64,
execute_batches_us: execute_batches_elapsed.as_us(),
})
}

fn rebatch_transactions<'a>(
Expand Down Expand Up @@ -487,16 +458,18 @@ fn execute_batches(
batches
};

execute_batches_internal(
let execute_batches_internal_metrics = execute_batches_internal(
bank,
rebatched_txs,
entry_callback,
transaction_status_sender,
replay_vote_sender,
confirmation_timing,
cost_capacity_meter,
&tx_batch_costs,
)
)?;

confirmation_timing.process_execute_batches_internal_metrics(execute_batches_internal_metrics);
Ok(())
}

/// Process an ordered list of entries in parallel
Expand Down Expand Up @@ -1014,6 +987,61 @@ pub struct ConfirmationTiming {
pub end_to_end_execute_timings: ThreadExecuteTimings,
}

impl ConfirmationTiming {
fn process_execute_batches_internal_metrics(
&mut self,
execute_batches_internal_metrics: ExecuteBatchesInternalMetrics,
) {
let ConfirmationTiming {
execute_timings: ref mut cumulative_execute_timings,
execute_batches_us: ref mut cumulative_execute_batches_us,
ref mut end_to_end_execute_timings,
..
} = self;

saturating_add_assign!(
*cumulative_execute_batches_us,
execute_batches_internal_metrics.execute_batches_us
);

cumulative_execute_timings.saturating_add_in_place(
ExecuteTimingType::TotalBatchesLen,
execute_batches_internal_metrics.total_batches_len,
);
cumulative_execute_timings.saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1);

let mut current_max_thread_execution_time: Option<ThreadExecuteTimings> = None;
for (_, thread_execution_time) in execute_batches_internal_metrics
.execution_timings_per_thread
.into_inner()
.unwrap()
.into_iter()
{
let ThreadExecuteTimings {
total_thread_us,
execute_timings,
..
} = &thread_execution_time;
cumulative_execute_timings.accumulate(execute_timings);
if *total_thread_us
> current_max_thread_execution_time
.as_ref()
.map(|thread_execution_time| thread_execution_time.total_thread_us)
.unwrap_or(0)
{
current_max_thread_execution_time = Some(thread_execution_time);
}
}

if let Some(current_max_thread_execution_time) = current_max_thread_execution_time {
end_to_end_execute_timings.accumulate(&current_max_thread_execution_time);
end_to_end_execute_timings
.execute_timings
.saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1);
};
}
}

impl Default for ConfirmationTiming {
fn default() -> Self {
Self {
Expand Down

0 comments on commit 85d3de0

Please sign in to comment.