diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 5f828d86cbeb1c..956ebc8c94f9a0 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -5,18 +5,17 @@ use { self::{ decision_maker::{BufferedPacketsDecision, DecisionMaker}, + forwarder::Forwarder, packet_receiver::PacketReceiver, }, crate::{ banking_trace::BankingPacketReceiver, - forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, immutable_deserialized_packet::ImmutableDeserializedPacket, latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource}, leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary}, leader_slot_banking_stage_timing_metrics::{ LeaderExecuteAndCommitTimings, RecordTransactionsTimings, }, - next_leader::{next_leader_tpu_forwards, next_leader_tpu_vote}, packet_deserializer::PacketDeserializer, qos_service::QosService, tracer_packet_stats::TracerPacketStats, @@ -25,11 +24,10 @@ use { ConsumeScannerPayload, ThreadType, UnprocessedTransactionStorage, }, }, - core::iter::repeat, crossbeam_channel::RecvTimeoutError, histogram::Histogram, itertools::Itertools, - solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + solana_client::connection_cache::ConnectionCache, solana_entry::entry::hash_transactions, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ @@ -37,10 +35,7 @@ use { }, solana_measure::{measure, measure::Measure}, solana_metrics::inc_new_counter_info, - solana_perf::{ - data_budget::DataBudget, - packet::{Packet, PACKETS_PER_BATCH}, - }, + solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH}, solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}, solana_program_runtime::timings::ExecuteTimings, solana_runtime::{ @@ -63,9 +58,7 @@ use { saturating_add_assign, timing::{timestamp, AtomicInterval}, transaction::{self, SanitizedTransaction, TransactionError, VersionedTransaction}, - transport::TransportError, }, - solana_streamer::sendmmsg::batch_send, solana_transaction_status::{ token_balances::TransactionTokenBalancesSet, TransactionTokenBalance, }, @@ -84,6 +77,7 @@ use { }; mod decision_maker; +mod forwarder; mod packet_receiver; // Fixed thread size seems to be fastest on GCP setup @@ -503,101 +497,6 @@ impl BankingStage { Self { bank_thread_hdls } } - /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns - /// the number of successfully forwarded packets in second part of tuple - fn forward_buffered_packets<'a>( - connection_cache: &ConnectionCache, - forward_option: &ForwardOption, - cluster_info: &ClusterInfo, - poh_recorder: &Arc>, - socket: &UdpSocket, - forwardable_packets: impl Iterator, - data_budget: &DataBudget, - banking_stage_stats: &BankingStageStats, - ) -> ( - std::result::Result<(), TransportError>, - usize, - Option, - ) { - let leader_and_addr = match forward_option { - ForwardOption::NotForward => return (Ok(()), 0, None), - ForwardOption::ForwardTransaction => { - next_leader_tpu_forwards(cluster_info, poh_recorder) - } - - ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder), - }; - let (leader_pubkey, addr) = match leader_and_addr { - Some(leader_and_addr) => leader_and_addr, - None => return (Ok(()), 0, None), - }; - - const INTERVAL_MS: u64 = 100; - // 12 MB outbound limit per second - const MAX_BYTES_PER_SECOND: usize = 12_000_000; - const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; - const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5; - data_budget.update(INTERVAL_MS, |bytes| { - std::cmp::min( - bytes.saturating_add(MAX_BYTES_PER_INTERVAL), - MAX_BYTES_BUDGET, - ) - }); - - let packet_vec: Vec<_> = forwardable_packets - .filter_map(|p| { - if !p.meta().forwarded() && data_budget.take(p.meta().size) { - Some(p.data(..)?.to_vec()) - } else { - None - } - }) - .collect(); - - let packet_vec_len = packet_vec.len(); - // TODO: see https://github.com/solana-labs/solana/issues/23819 - // fix this so returns the correct number of succeeded packets - // when there's an error sending the batch. This was left as-is for now - // in favor of shipping Quic support, which was considered higher-priority - if !packet_vec.is_empty() { - inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec_len); - - let mut measure = Measure::start("banking_stage-forward-us"); - - let res = if let ForwardOption::ForwardTpuVote = forward_option { - // The vote must be forwarded using only UDP. - banking_stage_stats - .forwarded_vote_count - .fetch_add(packet_vec_len, Ordering::Relaxed); - let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect(); - batch_send(socket, &pkts).map_err(|err| err.into()) - } else { - // All other transactions can be forwarded using QUIC, get_connection() will use - // system wide setting to pick the correct connection object. - banking_stage_stats - .forwarded_transaction_count - .fetch_add(packet_vec_len, Ordering::Relaxed); - let conn = connection_cache.get_connection(&addr); - conn.send_wire_transaction_batch_async(packet_vec) - }; - - measure.stop(); - inc_new_counter_info!( - "banking_stage-forward-us", - measure.as_us() as usize, - 1000, - 1000 - ); - - if let Err(err) = res { - inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1); - return (Err(err), 0, Some(leader_pubkey)); - } - } - - (Ok(()), packet_vec_len, Some(leader_pubkey)) - } - #[allow(clippy::too_many_arguments)] fn do_process_packets( bank_start: &BankStart, @@ -799,7 +698,7 @@ impl BankingStage { } BufferedPacketsDecision::Forward => { let (_, forward_time) = measure!( - Self::handle_forwarding( + Forwarder::handle_forwarding( cluster_info, unprocessed_transaction_storage, poh_recorder, @@ -821,7 +720,7 @@ impl BankingStage { } BufferedPacketsDecision::ForwardAndHold => { let (_, forward_and_hold_time) = measure!( - Self::handle_forwarding( + Forwarder::handle_forwarding( cluster_info, unprocessed_transaction_storage, poh_recorder, @@ -844,103 +743,6 @@ impl BankingStage { } } - #[allow(clippy::too_many_arguments)] - fn handle_forwarding( - cluster_info: &ClusterInfo, - unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, - poh_recorder: &Arc>, - socket: &UdpSocket, - hold: bool, - data_budget: &DataBudget, - slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - banking_stage_stats: &BankingStageStats, - connection_cache: &ConnectionCache, - tracer_packet_stats: &mut TracerPacketStats, - bank_forks: &Arc>, - ) { - let forward_option = unprocessed_transaction_storage.forward_option(); - - // get current root bank from bank_forks, use it to sanitize transaction and - // load all accounts from address loader; - let current_bank = bank_forks.read().unwrap().root_bank(); - - let mut forward_packet_batches_by_accounts = - ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); - - // sanitize and filter packets that are no longer valid (could be too old, a duplicate of something - // already processed), then add to forwarding buffer. - let filter_forwarding_result = unprocessed_transaction_storage - .filter_forwardable_packets_and_add_batches( - current_bank, - &mut forward_packet_batches_by_accounts, - ); - slot_metrics_tracker.increment_transactions_from_packets_us( - filter_forwarding_result.total_packet_conversion_us, - ); - banking_stage_stats.packet_conversion_elapsed.fetch_add( - filter_forwarding_result.total_packet_conversion_us, - Ordering::Relaxed, - ); - banking_stage_stats - .filter_pending_packets_elapsed - .fetch_add( - filter_forwarding_result.total_filter_packets_us, - Ordering::Relaxed, - ); - - forward_packet_batches_by_accounts - .iter_batches() - .filter(|&batch| !batch.is_empty()) - .for_each(|forward_batch| { - slot_metrics_tracker.increment_forwardable_batches_count(1); - - let batched_forwardable_packets_count = forward_batch.len(); - let (_forward_result, sucessful_forwarded_packets_count, leader_pubkey) = - Self::forward_buffered_packets( - connection_cache, - &forward_option, - cluster_info, - poh_recorder, - socket, - forward_batch.get_forwardable_packets(), - data_budget, - banking_stage_stats, - ); - - if let Some(leader_pubkey) = leader_pubkey { - tracer_packet_stats.increment_total_forwardable_tracer_packets( - filter_forwarding_result.total_forwardable_tracer_packets, - leader_pubkey, - ); - } - let failed_forwarded_packets_count = batched_forwardable_packets_count - .saturating_sub(sucessful_forwarded_packets_count); - - if failed_forwarded_packets_count > 0 { - slot_metrics_tracker.increment_failed_forwarded_packets_count( - failed_forwarded_packets_count as u64, - ); - slot_metrics_tracker.increment_packet_batch_forward_failure_count(1); - } - - if sucessful_forwarded_packets_count > 0 { - slot_metrics_tracker.increment_successful_forwarded_packets_count( - sucessful_forwarded_packets_count as u64, - ); - } - }); - - if !hold { - slot_metrics_tracker.increment_cleared_from_buffer_after_forward_count( - filter_forwarding_result.total_forwardable_packets as u64, - ); - tracer_packet_stats.increment_total_cleared_from_buffer_after_forward( - filter_forwarding_result.total_tracer_packets_in_buffer, - ); - unprocessed_transaction_storage.clear_forwarded_packets(); - } - } - #[allow(clippy::too_many_arguments)] fn process_loop( packet_deserializer: &mut PacketDeserializer, @@ -1759,7 +1561,7 @@ mod tests { get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, }, - solana_perf::packet::{to_packet_batches, PacketBatch, PacketFlags}, + solana_perf::packet::{to_packet_batches, PacketBatch}, solana_poh::{ poh_recorder::{create_test_recorder, Record, WorkingBankEntry}, poh_service::PohService, @@ -1783,7 +1585,7 @@ mod tests { system_transaction, transaction::{MessageHash, Transaction, TransactionError, VersionedTransaction}, }, - solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace}, + solana_streamer::socket::SocketAddrSpace, solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta}, solana_vote_program::{ vote_state::VoteStateUpdate, vote_transaction::new_vote_state_update_transaction, @@ -1796,7 +1598,7 @@ mod tests { }, }; - fn new_test_cluster_info(keypair: Option>) -> (Node, ClusterInfo) { + pub(crate) fn new_test_cluster_info(keypair: Option>) -> (Node, ClusterInfo) { let keypair = keypair.unwrap_or_else(|| Arc::new(Keypair::new())); let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); let cluster_info = @@ -2296,11 +2098,11 @@ mod tests { ); } - fn create_slow_genesis_config(lamports: u64) -> GenesisConfigInfo { + pub(crate) fn create_slow_genesis_config(lamports: u64) -> GenesisConfigInfo { create_slow_genesis_config_with_leader(lamports, &solana_sdk::pubkey::new_rand()) } - fn create_slow_genesis_config_with_leader( + pub(crate) fn create_slow_genesis_config_with_leader( lamports: u64, validator_pubkey: &Pubkey, ) -> GenesisConfigInfo { @@ -2310,6 +2112,7 @@ mod tests { // See solana_ledger::genesis_utils::create_genesis_config. bootstrap_validator_stake_lamports(), ); + // For these tests there's only 1 slot, don't want to run out of ticks config_info.genesis_config.ticks_per_slot *= 8; config_info @@ -3550,203 +3353,6 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } - #[test] - #[ignore] - fn test_forwarder_budget() { - solana_logger::setup(); - // Create `PacketBatch` with 1 unprocessed packet - let tx = system_transaction::transfer( - &Keypair::new(), - &solana_sdk::pubkey::new_rand(), - 1, - Hash::new_unique(), - ); - let packet = Packet::from_data(None, tx).unwrap(); - let deserialized_packet = DeserializedPacket::new(packet).unwrap(); - - let validator_keypair = Arc::new(Keypair::new()); - let genesis_config_info = - create_slow_genesis_config_with_leader(10_000, &validator_keypair.pubkey()); - let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info; - - let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config); - let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); - let ledger_path = get_tmp_ledger_path_auto_delete!(); - { - let blockstore = Arc::new( - Blockstore::open(ledger_path.path()) - .expect("Expected to be able to open database ledger"), - ); - let poh_config = PohConfig { - // limit tick count to avoid clearing working_bank at - // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage - target_tick_count: Some(bank.max_tick_height() - 1), - ..PohConfig::default() - }; - - let (exit, poh_recorder, poh_service, _entry_receiver) = - create_test_recorder(&bank, &blockstore, Some(poh_config), None); - - let (local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair)); - let recv_socket = &local_node.sockets.tpu_forwards[0]; - - let test_cases = vec![ - ("budget-restricted", DataBudget::restricted(), 0), - ("budget-available", DataBudget::default(), 1), - ]; - - let connection_cache = ConnectionCache::default(); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - for (name, data_budget, expected_num_forwarded) in test_cases { - let unprocessed_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter( - vec![deserialized_packet.clone()].into_iter(), - 1, - ); - let stats = BankingStageStats::default(); - BankingStage::handle_forwarding( - &cluster_info, - &mut UnprocessedTransactionStorage::new_transaction_storage( - unprocessed_packet_batches, - ThreadType::Transactions, - ), - &poh_recorder, - &socket, - true, - &data_budget, - &mut LeaderSlotMetricsTracker::new(0), - &stats, - &connection_cache, - &mut TracerPacketStats::new(0), - &bank_forks, - ); - - recv_socket - .set_nonblocking(expected_num_forwarded == 0) - .unwrap(); - - let mut packets = vec![Packet::default(); 2]; - let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); - assert_eq!(num_received, expected_num_forwarded, "{name}"); - } - - exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); - } - Blockstore::destroy(ledger_path.path()).unwrap(); - } - - #[test] - #[ignore] - fn test_handle_forwarding() { - solana_logger::setup(); - // packets are deserialized upon receiving, failed packets will not be - // forwarded; Therefore need to create real packets here. - let keypair = Keypair::new(); - let pubkey = solana_sdk::pubkey::new_rand(); - - let fwd_block_hash = Hash::new_unique(); - let forwarded_packet = { - let transaction = system_transaction::transfer(&keypair, &pubkey, 1, fwd_block_hash); - let mut packet = Packet::from_data(None, transaction).unwrap(); - packet.meta_mut().flags |= PacketFlags::FORWARDED; - DeserializedPacket::new(packet).unwrap() - }; - - let normal_block_hash = Hash::new_unique(); - let normal_packet = { - let transaction = system_transaction::transfer(&keypair, &pubkey, 1, normal_block_hash); - let packet = Packet::from_data(None, transaction).unwrap(); - DeserializedPacket::new(packet).unwrap() - }; - - let mut unprocessed_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::from_iter( - vec![forwarded_packet, normal_packet].into_iter(), - 2, - ), - ThreadType::Transactions, - ); - - let validator_keypair = Arc::new(Keypair::new()); - let genesis_config_info = - create_slow_genesis_config_with_leader(10_000, &validator_keypair.pubkey()); - let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info; - let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config); - let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); - let ledger_path = get_tmp_ledger_path_auto_delete!(); - { - let blockstore = Arc::new( - Blockstore::open(ledger_path.path()) - .expect("Expected to be able to open database ledger"), - ); - let poh_config = PohConfig { - // limit tick count to avoid clearing working_bank at - // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage - target_tick_count: Some(bank.max_tick_height() - 1), - ..PohConfig::default() - }; - - let (exit, poh_recorder, poh_service, _entry_receiver) = - create_test_recorder(&bank, &blockstore, Some(poh_config), None); - - let (local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair)); - let recv_socket = &local_node.sockets.tpu_forwards[0]; - let connection_cache = ConnectionCache::default(); - - let test_cases = vec![ - ("fwd-normal", true, vec![normal_block_hash], 2), - ("fwd-no-op", true, vec![], 2), - ("fwd-no-hold", false, vec![], 0), - ]; - - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - for (name, hold, expected_ids, expected_num_unprocessed) in test_cases { - let stats = BankingStageStats::default(); - BankingStage::handle_forwarding( - &cluster_info, - &mut unprocessed_packet_batches, - &poh_recorder, - &socket, - hold, - &DataBudget::default(), - &mut LeaderSlotMetricsTracker::new(0), - &stats, - &connection_cache, - &mut TracerPacketStats::new(0), - &bank_forks, - ); - - recv_socket - .set_nonblocking(expected_ids.is_empty()) - .unwrap(); - - let mut packets = vec![Packet::default(); 2]; - let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); - assert_eq!(num_received, expected_ids.len(), "{name}"); - for (i, expected_id) in expected_ids.iter().enumerate() { - assert_eq!(packets[i].meta().size, 215); - let recv_transaction: VersionedTransaction = - packets[i].deserialize_slice(..).unwrap(); - assert_eq!( - recv_transaction.message.recent_blockhash(), - expected_id, - "{name}" - ); - } - - let num_unprocessed_packets: usize = unprocessed_packet_batches.len(); - assert_eq!(num_unprocessed_packets, expected_num_unprocessed, "{name}"); - } - - exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); - } - Blockstore::destroy(ledger_path.path()).unwrap(); - } - #[test] fn test_accumulate_execute_units_and_time() { let mut execute_timings = ExecuteTimings::default(); diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs new file mode 100644 index 00000000000000..aad0af93712704 --- /dev/null +++ b/core/src/banking_stage/forwarder.rs @@ -0,0 +1,440 @@ +use { + super::{BankingStageStats, ForwardOption}, + crate::{ + forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, + leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, + next_leader::{next_leader_tpu_forwards, next_leader_tpu_vote}, + tracer_packet_stats::TracerPacketStats, + unprocessed_transaction_storage::UnprocessedTransactionStorage, + }, + solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + solana_gossip::cluster_info::ClusterInfo, + solana_measure::measure::Measure, + solana_perf::{data_budget::DataBudget, packet::Packet}, + solana_poh::poh_recorder::PohRecorder, + solana_runtime::bank_forks::BankForks, + solana_sdk::{pubkey::Pubkey, transport::TransportError}, + solana_streamer::sendmmsg::batch_send, + std::{ + iter::repeat, + net::UdpSocket, + sync::{atomic::Ordering, Arc, RwLock}, + }, +}; + +pub struct Forwarder; + +impl Forwarder { + #[allow(clippy::too_many_arguments)] + pub fn handle_forwarding( + cluster_info: &ClusterInfo, + unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, + poh_recorder: &Arc>, + socket: &UdpSocket, + hold: bool, + data_budget: &DataBudget, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, + banking_stage_stats: &BankingStageStats, + connection_cache: &ConnectionCache, + tracer_packet_stats: &mut TracerPacketStats, + bank_forks: &Arc>, + ) { + let forward_option = unprocessed_transaction_storage.forward_option(); + + // get current root bank from bank_forks, use it to sanitize transaction and + // load all accounts from address loader; + let current_bank = bank_forks.read().unwrap().root_bank(); + + let mut forward_packet_batches_by_accounts = + ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); + + // sanitize and filter packets that are no longer valid (could be too old, a duplicate of something + // already processed), then add to forwarding buffer. + let filter_forwarding_result = unprocessed_transaction_storage + .filter_forwardable_packets_and_add_batches( + current_bank, + &mut forward_packet_batches_by_accounts, + ); + slot_metrics_tracker.increment_transactions_from_packets_us( + filter_forwarding_result.total_packet_conversion_us, + ); + banking_stage_stats.packet_conversion_elapsed.fetch_add( + filter_forwarding_result.total_packet_conversion_us, + Ordering::Relaxed, + ); + banking_stage_stats + .filter_pending_packets_elapsed + .fetch_add( + filter_forwarding_result.total_filter_packets_us, + Ordering::Relaxed, + ); + + forward_packet_batches_by_accounts + .iter_batches() + .filter(|&batch| !batch.is_empty()) + .for_each(|forward_batch| { + slot_metrics_tracker.increment_forwardable_batches_count(1); + + let batched_forwardable_packets_count = forward_batch.len(); + let (_forward_result, sucessful_forwarded_packets_count, leader_pubkey) = + Self::forward_buffered_packets( + connection_cache, + &forward_option, + cluster_info, + poh_recorder, + socket, + forward_batch.get_forwardable_packets(), + data_budget, + banking_stage_stats, + ); + + if let Some(leader_pubkey) = leader_pubkey { + tracer_packet_stats.increment_total_forwardable_tracer_packets( + filter_forwarding_result.total_forwardable_tracer_packets, + leader_pubkey, + ); + } + let failed_forwarded_packets_count = batched_forwardable_packets_count + .saturating_sub(sucessful_forwarded_packets_count); + + if failed_forwarded_packets_count > 0 { + slot_metrics_tracker.increment_failed_forwarded_packets_count( + failed_forwarded_packets_count as u64, + ); + slot_metrics_tracker.increment_packet_batch_forward_failure_count(1); + } + + if sucessful_forwarded_packets_count > 0 { + slot_metrics_tracker.increment_successful_forwarded_packets_count( + sucessful_forwarded_packets_count as u64, + ); + } + }); + + if !hold { + slot_metrics_tracker.increment_cleared_from_buffer_after_forward_count( + filter_forwarding_result.total_forwardable_packets as u64, + ); + tracer_packet_stats.increment_total_cleared_from_buffer_after_forward( + filter_forwarding_result.total_tracer_packets_in_buffer, + ); + unprocessed_transaction_storage.clear_forwarded_packets(); + } + } + + /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns + /// the number of successfully forwarded packets in second part of tuple + fn forward_buffered_packets<'a>( + connection_cache: &ConnectionCache, + forward_option: &ForwardOption, + cluster_info: &ClusterInfo, + poh_recorder: &Arc>, + socket: &UdpSocket, + forwardable_packets: impl Iterator, + data_budget: &DataBudget, + banking_stage_stats: &BankingStageStats, + ) -> ( + std::result::Result<(), TransportError>, + usize, + Option, + ) { + let leader_and_addr = match forward_option { + ForwardOption::NotForward => return (Ok(()), 0, None), + ForwardOption::ForwardTransaction => { + next_leader_tpu_forwards(cluster_info, poh_recorder) + } + + ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder), + }; + let (leader_pubkey, addr) = match leader_and_addr { + Some(leader_and_addr) => leader_and_addr, + None => return (Ok(()), 0, None), + }; + + const INTERVAL_MS: u64 = 100; + // 12 MB outbound limit per second + const MAX_BYTES_PER_SECOND: usize = 12_000_000; + const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; + const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5; + data_budget.update(INTERVAL_MS, |bytes| { + std::cmp::min( + bytes.saturating_add(MAX_BYTES_PER_INTERVAL), + MAX_BYTES_BUDGET, + ) + }); + + let packet_vec: Vec<_> = forwardable_packets + .filter_map(|p| { + if !p.meta().forwarded() && data_budget.take(p.meta().size) { + Some(p.data(..)?.to_vec()) + } else { + None + } + }) + .collect(); + + let packet_vec_len = packet_vec.len(); + // TODO: see https://github.com/solana-labs/solana/issues/23819 + // fix this so returns the correct number of succeeded packets + // when there's an error sending the batch. This was left as-is for now + // in favor of shipping Quic support, which was considered higher-priority + if !packet_vec.is_empty() { + inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec_len); + + let mut measure = Measure::start("banking_stage-forward-us"); + + let res = if let ForwardOption::ForwardTpuVote = forward_option { + // The vote must be forwarded using only UDP. + banking_stage_stats + .forwarded_vote_count + .fetch_add(packet_vec_len, Ordering::Relaxed); + let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect(); + batch_send(socket, &pkts).map_err(|err| err.into()) + } else { + // All other transactions can be forwarded using QUIC, get_connection() will use + // system wide setting to pick the correct connection object. + banking_stage_stats + .forwarded_transaction_count + .fetch_add(packet_vec_len, Ordering::Relaxed); + let conn = connection_cache.get_connection(&addr); + conn.send_wire_transaction_batch_async(packet_vec) + }; + + measure.stop(); + inc_new_counter_info!( + "banking_stage-forward-us", + measure.as_us() as usize, + 1000, + 1000 + ); + + if let Err(err) = res { + inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1); + return (Err(err), 0, Some(leader_pubkey)); + } + } + + (Ok(()), packet_vec_len, Some(leader_pubkey)) + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + crate::{ + banking_stage::tests::{create_slow_genesis_config_with_leader, new_test_cluster_info}, + unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches}, + unprocessed_transaction_storage::ThreadType, + }, + solana_ledger::{ + blockstore::Blockstore, genesis_utils::GenesisConfigInfo, + get_tmp_ledger_path_auto_delete, + }, + solana_perf::packet::PacketFlags, + solana_poh::poh_recorder::create_test_recorder, + solana_runtime::bank::Bank, + solana_sdk::{ + hash::Hash, poh_config::PohConfig, signature::Keypair, signer::Signer, + system_transaction, transaction::VersionedTransaction, + }, + solana_streamer::recvmmsg::recv_mmsg, + }; + + #[test] + #[ignore] + fn test_forwarder_budget() { + solana_logger::setup(); + // Create `PacketBatch` with 1 unprocessed packet + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let packet = Packet::from_data(None, tx).unwrap(); + let deserialized_packet = DeserializedPacket::new(packet).unwrap(); + + let validator_keypair = Arc::new(Keypair::new()); + let genesis_config_info = + create_slow_genesis_config_with_leader(10_000, &validator_keypair.pubkey()); + let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info; + + let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + { + let blockstore = Arc::new( + Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"), + ); + let poh_config = PohConfig { + // limit tick count to avoid clearing working_bank at + // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage + target_tick_count: Some(bank.max_tick_height() - 1), + ..PohConfig::default() + }; + + let (exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blockstore, Some(poh_config), None); + + let (local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair)); + let recv_socket = &local_node.sockets.tpu_forwards[0]; + + let test_cases = vec![ + ("budget-restricted", DataBudget::restricted(), 0), + ("budget-available", DataBudget::default(), 1), + ]; + + let connection_cache = ConnectionCache::default(); + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + for (name, data_budget, expected_num_forwarded) in test_cases { + let unprocessed_packet_batches: UnprocessedPacketBatches = + UnprocessedPacketBatches::from_iter( + vec![deserialized_packet.clone()].into_iter(), + 1, + ); + let stats = BankingStageStats::default(); + Forwarder::handle_forwarding( + &cluster_info, + &mut UnprocessedTransactionStorage::new_transaction_storage( + unprocessed_packet_batches, + ThreadType::Transactions, + ), + &poh_recorder, + &socket, + true, + &data_budget, + &mut LeaderSlotMetricsTracker::new(0), + &stats, + &connection_cache, + &mut TracerPacketStats::new(0), + &bank_forks, + ); + + recv_socket + .set_nonblocking(expected_num_forwarded == 0) + .unwrap(); + + let mut packets = vec![Packet::default(); 2]; + let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + assert_eq!(num_received, expected_num_forwarded, "{name}"); + } + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + Blockstore::destroy(ledger_path.path()).unwrap(); + } + + #[test] + #[ignore] + fn test_handle_forwarding() { + solana_logger::setup(); + // packets are deserialized upon receiving, failed packets will not be + // forwarded; Therefore need to create real packets here. + let keypair = Keypair::new(); + let pubkey = solana_sdk::pubkey::new_rand(); + + let fwd_block_hash = Hash::new_unique(); + let forwarded_packet = { + let transaction = system_transaction::transfer(&keypair, &pubkey, 1, fwd_block_hash); + let mut packet = Packet::from_data(None, transaction).unwrap(); + packet.meta_mut().flags |= PacketFlags::FORWARDED; + DeserializedPacket::new(packet).unwrap() + }; + + let normal_block_hash = Hash::new_unique(); + let normal_packet = { + let transaction = system_transaction::transfer(&keypair, &pubkey, 1, normal_block_hash); + let packet = Packet::from_data(None, transaction).unwrap(); + DeserializedPacket::new(packet).unwrap() + }; + + let mut unprocessed_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::from_iter( + vec![forwarded_packet, normal_packet].into_iter(), + 2, + ), + ThreadType::Transactions, + ); + + let validator_keypair = Arc::new(Keypair::new()); + let genesis_config_info = + create_slow_genesis_config_with_leader(10_000, &validator_keypair.pubkey()); + let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info; + let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + { + let blockstore = Arc::new( + Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"), + ); + let poh_config = PohConfig { + // limit tick count to avoid clearing working_bank at + // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage + target_tick_count: Some(bank.max_tick_height() - 1), + ..PohConfig::default() + }; + + let (exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blockstore, Some(poh_config), None); + + let (local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair)); + let recv_socket = &local_node.sockets.tpu_forwards[0]; + let connection_cache = ConnectionCache::default(); + + let test_cases = vec![ + ("fwd-normal", true, vec![normal_block_hash], 2), + ("fwd-no-op", true, vec![], 2), + ("fwd-no-hold", false, vec![], 0), + ]; + + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + for (name, hold, expected_ids, expected_num_unprocessed) in test_cases { + let stats = BankingStageStats::default(); + Forwarder::handle_forwarding( + &cluster_info, + &mut unprocessed_packet_batches, + &poh_recorder, + &socket, + hold, + &DataBudget::default(), + &mut LeaderSlotMetricsTracker::new(0), + &stats, + &connection_cache, + &mut TracerPacketStats::new(0), + &bank_forks, + ); + + recv_socket + .set_nonblocking(expected_ids.is_empty()) + .unwrap(); + + let mut packets = vec![Packet::default(); 2]; + let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + assert_eq!(num_received, expected_ids.len(), "{name}"); + for (i, expected_id) in expected_ids.iter().enumerate() { + assert_eq!(packets[i].meta().size, 215); + let recv_transaction: VersionedTransaction = + packets[i].deserialize_slice(..).unwrap(); + assert_eq!( + recv_transaction.message.recent_blockhash(), + expected_id, + "{name}" + ); + } + + let num_unprocessed_packets: usize = unprocessed_packet_batches.len(); + assert_eq!(num_unprocessed_packets, expected_num_unprocessed, "{name}"); + } + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + Blockstore::destroy(ledger_path.path()).unwrap(); + } +}