diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index d302b5a5f6ca8b..e9beea441348d2 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -394,18 +394,24 @@ impl Consumer { chunk_offset: usize, ) -> ProcessTransactionBatchOutput { let ( - (transaction_costs, transactions_qos_results, cost_model_throttled_transactions_count), + (transaction_qos_cost_results, cost_model_throttled_transactions_count), cost_model_us, - ) = measure_us!(self - .qos_service - .select_and_accumulate_transaction_costs(bank, txs)); + ) = measure_us!(self.qos_service.select_and_accumulate_transaction_costs( + bank, + txs, + std::iter::repeat(Ok(())) // no filtering before QoS + )); // Only lock accounts for those transactions are selected for the block; // Once accounts are locked, other threads cannot encode transactions that will modify the // same account state - let (batch, lock_us) = measure_us!( - bank.prepare_sanitized_batch_with_results(txs, transactions_qos_results.iter()) - ); + let (batch, lock_us) = measure_us!(bank.prepare_sanitized_batch_with_results( + txs, + transaction_qos_cost_results.iter().map(|r| match r { + Ok(_cost) => Ok(()), + Err(err) => Err(err.clone()), + }) + )); // retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit // WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit @@ -424,8 +430,7 @@ impl Consumer { } = execute_and_commit_transactions_output; QosService::update_or_remove_transaction_costs( - transaction_costs.iter(), - transactions_qos_results.iter(), + transaction_qos_cost_results.iter(), commit_transactions_result.as_ref().ok(), bank, ); diff --git a/core/src/qos_service.rs b/core/src/qos_service.rs index 4345e7efe68a58..2fa5631d47a581 100644 --- a/core/src/qos_service.rs +++ b/core/src/qos_service.rs @@ -84,50 +84,57 @@ impl QosService { } } - /// Calculate cost of transactions, determine which ones to include in the slot, and - /// accumulate costs in the cost tracker. - /// Returns a vector of transaction costs, a vector of results indicating which transactions - /// were selected, and the number of transactions that were *NOT* selected. + /// Calculate cost of transactions, if not already filtered out, determine which ones to + /// include in the slot, and accumulate costs in the cost tracker. + /// Returns a vector of results containing selected transaction costs, and the number of + /// transactions that were *NOT* selected. pub fn select_and_accumulate_transaction_costs( &self, bank: &Bank, transactions: &[SanitizedTransaction], - ) -> (Vec, Vec>, usize) { + pre_results: impl Iterator>, + ) -> (Vec>, usize) { let transaction_costs = - self.compute_transaction_costs(&bank.feature_set, transactions.iter()); - let (transactions_qos_results, num_included) = - self.select_transactions_per_cost(transactions.iter(), transaction_costs.iter(), bank); + self.compute_transaction_costs(&bank.feature_set, transactions.iter(), pre_results); + let (transactions_qos_cost_results, num_included) = self.select_transactions_per_cost( + transactions.iter(), + transaction_costs.into_iter(), + bank, + ); self.accumulate_estimated_transaction_costs(&Self::accumulate_batched_transaction_costs( - transaction_costs.iter(), - transactions_qos_results.iter(), + transactions_qos_cost_results.iter(), )); let cost_model_throttled_transactions_count = transactions.len().saturating_sub(num_included); ( - transaction_costs, - transactions_qos_results, + transactions_qos_cost_results, cost_model_throttled_transactions_count, ) } - // invoke cost_model to calculate cost for the given list of transactions + // invoke cost_model to calculate cost for the given list of transactions that have not + // been filtered out already. fn compute_transaction_costs<'a>( &self, feature_set: &FeatureSet, transactions: impl Iterator, - ) -> Vec { + pre_results: impl Iterator>, + ) -> Vec> { let mut compute_cost_time = Measure::start("compute_cost_time"); let txs_costs: Vec<_> = transactions - .map(|tx| { - let cost = CostModel::calculate_cost(tx, feature_set); - debug!( - "transaction {:?}, cost {:?}, cost sum {}", - tx, - cost, - cost.sum() - ); - cost + .zip(pre_results) + .map(|(tx, pre_result)| { + pre_result.map(|()| { + let cost = CostModel::calculate_cost(tx, feature_set); + debug!( + "transaction {:?}, cost {:?}, cost sum {}", + tx, + cost, + cost.sum() + ); + cost + }) }) .collect(); compute_cost_time.stop(); @@ -148,27 +155,34 @@ impl QosService { fn select_transactions_per_cost<'a>( &self, transactions: impl Iterator, - transactions_costs: impl Iterator, + transactions_costs: impl Iterator>, bank: &Bank, - ) -> (Vec>, usize) { + ) -> (Vec>, usize) { let mut cost_tracking_time = Measure::start("cost_tracking_time"); let mut cost_tracker = bank.write_cost_tracker().unwrap(); let mut num_included = 0; - let select_results = transactions - .zip(transactions_costs) - .map(|(tx, cost)| match cost_tracker.try_add(cost) { - Ok(current_block_cost) => { - debug!("slot {:?}, transaction {:?}, cost {:?}, fit into current block, current block cost {}", bank.slot(), tx, cost, current_block_cost); - self.metrics.stats.selected_txs_count.fetch_add(1, Ordering::Relaxed); - num_included += 1; - Ok(()) - }, - Err(e) => { - debug!("slot {:?}, transaction {:?}, cost {:?}, not fit into current block, '{:?}'", bank.slot(), tx, cost, e); - Err(TransactionError::from(e)) + let select_results = transactions.zip(transactions_costs) + .map(|(tx, cost)| { + match cost { + Ok(cost) => { + match cost_tracker.try_add(&cost) { + Ok(current_block_cost) => { + debug!("slot {:?}, transaction {:?}, cost {:?}, fit into current block, current block cost {}", bank.slot(), tx, cost, current_block_cost); + self.metrics.stats.selected_txs_count.fetch_add(1, Ordering::Relaxed); + num_included += 1; + Ok(cost) + }, + Err(e) => { + debug!("slot {:?}, transaction {:?}, cost {:?}, not fit into current block, '{:?}'", bank.slot(), tx, cost, e); + Err(TransactionError::from(e)) + } + } + }, + Err(e) => Err(e), } }) .collect(); + cost_tracking_time.stop(); self.metrics .stats @@ -182,65 +196,54 @@ impl QosService { /// Otherwise remove the cost from the cost tracker, therefore preventing cost_tracker /// being inflated with unsuccessfully executed transactions. pub fn update_or_remove_transaction_costs<'a>( - transaction_costs: impl Iterator, - transaction_qos_results: impl Iterator>, + transaction_cost_results: impl Iterator>, transaction_committed_status: Option<&Vec>, bank: &Arc, ) { match transaction_committed_status { Some(transaction_committed_status) => Self::update_transaction_costs( - transaction_costs, - transaction_qos_results, + transaction_cost_results, transaction_committed_status, bank, ), - None => { - Self::remove_transaction_costs(transaction_costs, transaction_qos_results, bank) - } + None => Self::remove_transaction_costs(transaction_cost_results, bank), } } fn update_transaction_costs<'a>( - transaction_costs: impl Iterator, - transaction_qos_results: impl Iterator>, + transaction_cost_results: impl Iterator>, transaction_committed_status: &Vec, bank: &Arc, ) { let mut cost_tracker = bank.write_cost_tracker().unwrap(); - transaction_costs - .zip(transaction_qos_results) + transaction_cost_results .zip(transaction_committed_status) - .for_each( - |((tx_cost, qos_inclusion_result), transaction_committed_details)| { - // Only transactions that the qos service included have to be - // checked for update - if qos_inclusion_result.is_ok() { - match transaction_committed_details { - CommitTransactionDetails::Committed { compute_units } => { - cost_tracker.update_execution_cost(tx_cost, *compute_units) - } - CommitTransactionDetails::NotCommitted => cost_tracker.remove(tx_cost), + .for_each(|(tx_cost, transaction_committed_details)| { + // Only transactions that the qos service included have to be + // checked for update + if let Ok(tx_cost) = tx_cost { + match transaction_committed_details { + CommitTransactionDetails::Committed { compute_units } => { + cost_tracker.update_execution_cost(tx_cost, *compute_units) } + CommitTransactionDetails::NotCommitted => cost_tracker.remove(tx_cost), } - }, - ); + } + }); } fn remove_transaction_costs<'a>( - transaction_costs: impl Iterator, - transaction_qos_results: impl Iterator>, + transaction_cost_results: impl Iterator>, bank: &Arc, ) { let mut cost_tracker = bank.write_cost_tracker().unwrap(); - transaction_costs.zip(transaction_qos_results).for_each( - |(tx_cost, qos_inclusion_result)| { - // Only transactions that the qos service included have to be - // removed - if qos_inclusion_result.is_ok() { - cost_tracker.remove(tx_cost); - } - }, - ); + transaction_cost_results.for_each(|tx_cost| { + // Only transactions that the qos service included have to be + // removed + if let Ok(tx_cost) = tx_cost { + cost_tracker.remove(tx_cost); + } + }); } // metrics are reported by bank slot @@ -341,81 +344,78 @@ impl QosService { // rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and // execution_cost from the batch of transactions selected for block. fn accumulate_batched_transaction_costs<'a>( - transactions_costs: impl Iterator, - transaction_results: impl Iterator>, + transactions_costs: impl Iterator>, ) -> BatchedTransactionDetails { let mut batched_transaction_details = BatchedTransactionDetails::default(); - transactions_costs - .zip(transaction_results) - .for_each(|(cost, result)| match result { - Ok(_) => { + transactions_costs.for_each(|cost| match cost { + Ok(cost) => { + saturating_add_assign!( + batched_transaction_details.costs.batched_signature_cost, + cost.signature_cost + ); + saturating_add_assign!( + batched_transaction_details.costs.batched_write_lock_cost, + cost.write_lock_cost + ); + saturating_add_assign!( + batched_transaction_details.costs.batched_data_bytes_cost, + cost.data_bytes_cost + ); + saturating_add_assign!( + batched_transaction_details + .costs + .batched_builtins_execute_cost, + cost.builtins_execution_cost + ); + saturating_add_assign!( + batched_transaction_details.costs.batched_bpf_execute_cost, + cost.bpf_execution_cost + ); + } + Err(transaction_error) => match transaction_error { + TransactionError::WouldExceedMaxBlockCostLimit => { saturating_add_assign!( - batched_transaction_details.costs.batched_signature_cost, - cost.signature_cost + batched_transaction_details + .errors + .batched_retried_txs_per_block_limit_count, + 1 ); + } + TransactionError::WouldExceedMaxVoteCostLimit => { saturating_add_assign!( - batched_transaction_details.costs.batched_write_lock_cost, - cost.write_lock_cost + batched_transaction_details + .errors + .batched_retried_txs_per_vote_limit_count, + 1 ); + } + TransactionError::WouldExceedMaxAccountCostLimit => { saturating_add_assign!( - batched_transaction_details.costs.batched_data_bytes_cost, - cost.data_bytes_cost + batched_transaction_details + .errors + .batched_retried_txs_per_account_limit_count, + 1 ); + } + TransactionError::WouldExceedAccountDataBlockLimit => { saturating_add_assign!( batched_transaction_details - .costs - .batched_builtins_execute_cost, - cost.builtins_execution_cost + .errors + .batched_retried_txs_per_account_data_block_limit_count, + 1 ); + } + TransactionError::WouldExceedAccountDataTotalLimit => { saturating_add_assign!( - batched_transaction_details.costs.batched_bpf_execute_cost, - cost.bpf_execution_cost + batched_transaction_details + .errors + .batched_dropped_txs_per_account_data_total_limit_count, + 1 ); } - Err(transaction_error) => match transaction_error { - TransactionError::WouldExceedMaxBlockCostLimit => { - saturating_add_assign!( - batched_transaction_details - .errors - .batched_retried_txs_per_block_limit_count, - 1 - ); - } - TransactionError::WouldExceedMaxVoteCostLimit => { - saturating_add_assign!( - batched_transaction_details - .errors - .batched_retried_txs_per_vote_limit_count, - 1 - ); - } - TransactionError::WouldExceedMaxAccountCostLimit => { - saturating_add_assign!( - batched_transaction_details - .errors - .batched_retried_txs_per_account_limit_count, - 1 - ); - } - TransactionError::WouldExceedAccountDataBlockLimit => { - saturating_add_assign!( - batched_transaction_details - .errors - .batched_retried_txs_per_account_data_block_limit_count, - 1 - ); - } - TransactionError::WouldExceedAccountDataTotalLimit => { - saturating_add_assign!( - batched_transaction_details - .errors - .batched_dropped_txs_per_account_data_total_limit_count, - 1 - ); - } - _ => {} - }, - }); + _ => {} + }, + }); batched_transaction_details } @@ -671,8 +671,11 @@ mod tests { let txs = vec![transfer_tx.clone(), vote_tx.clone(), vote_tx, transfer_tx]; let qos_service = QosService::new(1); - let txs_costs = - qos_service.compute_transaction_costs(&FeatureSet::all_enabled(), txs.iter()); + let txs_costs = qos_service.compute_transaction_costs( + &FeatureSet::all_enabled(), + txs.iter(), + std::iter::repeat(Ok(())), + ); // verify the size of txs_costs and its contents assert_eq!(txs_costs.len(), txs.len()); @@ -681,7 +684,7 @@ mod tests { .enumerate() .map(|(index, cost)| { assert_eq!( - cost.sum(), + cost.as_ref().unwrap().sum(), CostModel::calculate_cost(&txs[index], &FeatureSet::all_enabled()).sum() ); }) @@ -717,8 +720,11 @@ mod tests { let txs = vec![transfer_tx.clone(), vote_tx.clone(), transfer_tx, vote_tx]; let qos_service = QosService::new(1); - let txs_costs = - qos_service.compute_transaction_costs(&FeatureSet::all_enabled(), txs.iter()); + let txs_costs = qos_service.compute_transaction_costs( + &FeatureSet::all_enabled(), + txs.iter(), + std::iter::repeat(Ok(())), + ); // set cost tracker limit to fit 1 transfer tx and 1 vote tx let cost_limit = transfer_tx_cost + vote_tx_cost; @@ -726,7 +732,7 @@ mod tests { .unwrap() .set_limits(cost_limit, cost_limit, cost_limit); let (results, num_selected) = - qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank); + qos_service.select_transactions_per_cost(txs.iter(), txs_costs.into_iter(), &bank); assert_eq!(num_selected, 2); // verify that first transfer tx and first vote are allowed @@ -758,26 +764,32 @@ mod tests { // assert all tx_costs should be applied to cost_tracker if all execution_results are all committed { let qos_service = QosService::new(1); - let txs_costs = - qos_service.compute_transaction_costs(&FeatureSet::all_enabled(), txs.iter()); - let total_txs_cost: u64 = txs_costs.iter().map(|cost| cost.sum()).sum(); - let (qos_results, _num_included) = - qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank); + let txs_costs = qos_service.compute_transaction_costs( + &FeatureSet::all_enabled(), + txs.iter(), + std::iter::repeat(Ok(())), + ); + let total_txs_cost: u64 = txs_costs + .iter() + .map(|cost| cost.as_ref().unwrap().sum()) + .sum(); + let (qos_cost_results, _num_included) = + qos_service.select_transactions_per_cost(txs.iter(), txs_costs.into_iter(), &bank); assert_eq!( total_txs_cost, bank.read_cost_tracker().unwrap().block_cost() ); // all transactions are committed with actual units more than estimated - let commited_status: Vec = txs_costs + let commited_status: Vec = qos_cost_results .iter() .map(|tx_cost| CommitTransactionDetails::Committed { - compute_units: tx_cost.bpf_execution_cost + execute_units_adjustment, + compute_units: tx_cost.as_ref().unwrap().bpf_execution_cost + + execute_units_adjustment, }) .collect(); let final_txs_cost = total_txs_cost + execute_units_adjustment * transaction_count; QosService::update_or_remove_transaction_costs( - txs_costs.iter(), - qos_results.iter(), + qos_cost_results.iter(), Some(&commited_status), &bank, ); @@ -812,21 +824,22 @@ mod tests { // assert all tx_costs should be removed from cost_tracker if all execution_results are all Not Committed { let qos_service = QosService::new(1); - let txs_costs = - qos_service.compute_transaction_costs(&FeatureSet::all_enabled(), txs.iter()); - let total_txs_cost: u64 = txs_costs.iter().map(|cost| cost.sum()).sum(); - let (qos_results, _num_included) = - qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank); + let txs_costs = qos_service.compute_transaction_costs( + &FeatureSet::all_enabled(), + txs.iter(), + std::iter::repeat(Ok(())), + ); + let total_txs_cost: u64 = txs_costs + .iter() + .map(|cost| cost.as_ref().unwrap().sum()) + .sum(); + let (qos_cost_results, _num_included) = + qos_service.select_transactions_per_cost(txs.iter(), txs_costs.into_iter(), &bank); assert_eq!( total_txs_cost, bank.read_cost_tracker().unwrap().block_cost() ); - QosService::update_or_remove_transaction_costs( - txs_costs.iter(), - qos_results.iter(), - None, - &bank, - ); + QosService::update_or_remove_transaction_costs(qos_cost_results.iter(), None, &bank); assert_eq!(0, bank.read_cost_tracker().unwrap().block_cost()); assert_eq!(0, bank.read_cost_tracker().unwrap().transaction_count()); } @@ -853,17 +866,23 @@ mod tests { // assert only commited tx_costs are applied cost_tracker { let qos_service = QosService::new(1); - let txs_costs = - qos_service.compute_transaction_costs(&FeatureSet::all_enabled(), txs.iter()); - let total_txs_cost: u64 = txs_costs.iter().map(|cost| cost.sum()).sum(); - let (qos_results, _num_included) = - qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank); + let txs_costs = qos_service.compute_transaction_costs( + &FeatureSet::all_enabled(), + txs.iter(), + std::iter::repeat(Ok(())), + ); + let total_txs_cost: u64 = txs_costs + .iter() + .map(|cost| cost.as_ref().unwrap().sum()) + .sum(); + let (qos_cost_results, _num_included) = + qos_service.select_transactions_per_cost(txs.iter(), txs_costs.into_iter(), &bank); assert_eq!( total_txs_cost, bank.read_cost_tracker().unwrap().block_cost() ); // Half of transactions are not committed, the rest with cost adjustment - let commited_status: Vec = txs_costs + let commited_status: Vec = qos_cost_results .iter() .enumerate() .map(|(n, tx_cost)| { @@ -871,14 +890,14 @@ mod tests { CommitTransactionDetails::NotCommitted } else { CommitTransactionDetails::Committed { - compute_units: tx_cost.bpf_execution_cost + execute_units_adjustment, + compute_units: tx_cost.as_ref().unwrap().bpf_execution_cost + + execute_units_adjustment, } } }) .collect(); QosService::update_or_remove_transaction_costs( - txs_costs.iter(), - qos_results.iter(), + qos_cost_results.iter(), Some(&commited_status), &bank, ); @@ -886,10 +905,11 @@ mod tests { // assert the final block cost let mut expected_final_txs_count = 0u64; let mut expected_final_block_cost = 0u64; - txs_costs.iter().enumerate().for_each(|(n, cost)| { + qos_cost_results.iter().enumerate().for_each(|(n, cost)| { if n % 2 != 0 { expected_final_txs_count += 1; - expected_final_block_cost += cost.sum() + execute_units_adjustment; + expected_final_block_cost += + cost.as_ref().unwrap().sum() + execute_units_adjustment; } }); assert_eq!( @@ -912,20 +932,17 @@ mod tests { let bpf_execution_cost = 10; let num_txs = 4; - let tx_costs: Vec<_> = (0..num_txs) - .map(|_| TransactionCost { - signature_cost, - write_lock_cost, - data_bytes_cost, - builtins_execution_cost, - bpf_execution_cost, - ..TransactionCost::default() - }) - .collect(); - let tx_results: Vec<_> = (0..num_txs) + let tx_cost_results: Vec<_> = (0..num_txs) .map(|n| { if n % 2 == 0 { - Ok(()) + Ok(TransactionCost { + signature_cost, + write_lock_cost, + data_bytes_cost, + builtins_execution_cost, + bpf_execution_cost, + ..TransactionCost::default() + }) } else { Err(TransactionError::WouldExceedMaxBlockCostLimit) } @@ -938,7 +955,7 @@ mod tests { let expected_builtins_execution_costs = builtins_execution_cost * (num_txs / 2); let expected_bpf_execution_costs = bpf_execution_cost * (num_txs / 2); let batched_transaction_details = - QosService::accumulate_batched_transaction_costs(tx_costs.iter(), tx_results.iter()); + QosService::accumulate_batched_transaction_costs(tx_cost_results.iter()); assert_eq!( expected_signatures, batched_transaction_details.costs.batched_signature_cost diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index d630c899c58708..27341be8d9ac14 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -1233,14 +1233,14 @@ impl Accounts { pub fn lock_accounts_with_results<'a>( &self, txs: impl Iterator, - results: impl Iterator>, + results: impl Iterator>, tx_account_lock_limit: usize, ) -> Vec> { let tx_account_locks_results: Vec> = txs .zip(results) .map(|(tx, result)| match result { Ok(()) => tx.get_account_locks(tx_account_lock_limit), - Err(err) => Err(err.clone()), + Err(err) => Err(err), }) .collect(); self.lock_accounts_inner(tx_account_locks_results) @@ -3162,7 +3162,7 @@ mod tests { let results = accounts.lock_accounts_with_results( txs.iter(), - qos_results.iter(), + qos_results.into_iter(), MAX_TX_ACCOUNT_LOCKS, ); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 3707ff8950872b..5f076348851392 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -3765,7 +3765,7 @@ impl Bank { pub fn prepare_sanitized_batch_with_results<'a, 'b>( &'a self, transactions: &'b [SanitizedTransaction], - transaction_results: impl Iterator>, + transaction_results: impl Iterator>, ) -> TransactionBatch<'a, 'b> { // this lock_results could be: Ok, AccountInUse, WouldExceedBlockMaxLimit or WouldExceedAccountMaxLimit let tx_account_lock_limit = self.get_transaction_account_lock_limit();