diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index dc5012b679ccd7..8ee95d8be70ab4 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -266,22 +266,23 @@ fn execute_batch( first_err.map(|(result, _)| result).unwrap_or(Ok(())) } +#[derive(Default)] +struct ExecuteBatchesInternalMetrics { + execution_timings_per_thread: RwLock>, + total_batches_len: u64, + execute_batches_us: u64, +} + fn execute_batches_internal( bank: &Arc, batches: &[TransactionBatchWithIndexes], entry_callback: Option<&ProcessCallback>, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, - confirmation_timing: &mut ConfirmationTiming, cost_capacity_meter: Arc>, tx_costs: &[u64], -) -> Result<()> { +) -> Result { inc_new_counter_debug!("bank-par_execute_entries-count", batches.len()); - let ConfirmationTiming { - execute_timings: ref mut cumulative_execute_timings, - ref mut end_to_end_execute_timings, - .. - } = confirmation_timing; let execution_timings_per_thread: RwLock> = RwLock::new(HashMap::new()); @@ -296,8 +297,8 @@ fn execute_batches_internal( .sanitized_transactions() .len() as u64; let mut timings = ExecuteTimings::default(); - let (result, execute_batches_time) = measure!( - |_| { + let (result, execute_batches_time): (Result<()>, Measure) = measure!( + { let result = execute_batch( transaction_batch_with_indexes, bank, @@ -343,43 +344,13 @@ fn execute_batches_internal( }); execute_batches_elapsed.stop(); - cumulative_execute_timings - .saturating_add_in_place(ExecuteTimingType::TotalBatchesLen, batches.len() as u64); - cumulative_execute_timings.saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1); - saturating_add_assign!( - confirmation_timing.execute_batches_us, - execute_batches_elapsed.as_us() - ); - - let mut current_max_thread_execution_time: Option = None; - for (_, thread_execution_time) in execution_timings_per_thread - .into_inner() - .unwrap() - .into_iter() - { - let ThreadExecuteTimings { - total_thread_us, - execute_timings, - .. - } = &thread_execution_time; - cumulative_execute_timings.accumulate(execute_timings); - if *total_thread_us - > current_max_thread_execution_time - .as_ref() - .map(|thread_execution_time| thread_execution_time.total_thread_us) - .unwrap_or(0) - { - current_max_thread_execution_time = Some(thread_execution_time); - } - } + first_err(&results)?; - if let Some(current_max_thread_execution_time) = current_max_thread_execution_time { - end_to_end_execute_timings.accumulate(¤t_max_thread_execution_time); - end_to_end_execute_timings - .execute_timings - .saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1); - }; - first_err(&results) + Ok(ExecuteBatchesInternalMetrics { + execution_timings_per_thread, + total_batches_len: batches.len() as u64, + execute_batches_us: execute_batches_elapsed.as_us(), + }) } fn rebatch_transactions<'a>( @@ -487,16 +458,18 @@ fn execute_batches( batches }; - execute_batches_internal( + let execute_batches_internal_metrics = execute_batches_internal( bank, rebatched_txs, entry_callback, transaction_status_sender, replay_vote_sender, - confirmation_timing, cost_capacity_meter, &tx_batch_costs, - ) + )?; + + confirmation_timing.process_execute_batches_internal_metrics(execute_batches_internal_metrics); + Ok(()) } /// Process an ordered list of entries in parallel @@ -1014,6 +987,61 @@ pub struct ConfirmationTiming { pub end_to_end_execute_timings: ThreadExecuteTimings, } +impl ConfirmationTiming { + fn process_execute_batches_internal_metrics( + &mut self, + execute_batches_internal_metrics: ExecuteBatchesInternalMetrics, + ) { + let ConfirmationTiming { + execute_timings: ref mut cumulative_execute_timings, + execute_batches_us: ref mut cumulative_execute_batches_us, + ref mut end_to_end_execute_timings, + .. + } = self; + + saturating_add_assign!( + *cumulative_execute_batches_us, + execute_batches_internal_metrics.execute_batches_us + ); + + cumulative_execute_timings.saturating_add_in_place( + ExecuteTimingType::TotalBatchesLen, + execute_batches_internal_metrics.total_batches_len, + ); + cumulative_execute_timings.saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1); + + let mut current_max_thread_execution_time: Option = None; + for (_, thread_execution_time) in execute_batches_internal_metrics + .execution_timings_per_thread + .into_inner() + .unwrap() + .into_iter() + { + let ThreadExecuteTimings { + total_thread_us, + execute_timings, + .. + } = &thread_execution_time; + cumulative_execute_timings.accumulate(execute_timings); + if *total_thread_us + > current_max_thread_execution_time + .as_ref() + .map(|thread_execution_time| thread_execution_time.total_thread_us) + .unwrap_or(0) + { + current_max_thread_execution_time = Some(thread_execution_time); + } + } + + if let Some(current_max_thread_execution_time) = current_max_thread_execution_time { + end_to_end_execute_timings.accumulate(¤t_max_thread_execution_time); + end_to_end_execute_timings + .execute_timings + .saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1); + }; + } +} + impl Default for ConfirmationTiming { fn default() -> Self { Self {