From 7b0a57316d0626312a704ee5b50603123d959f7c Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 15 Oct 2024 10:25:36 -0500 Subject: [PATCH] Scheduler: Improve TTL (#3161) Co-authored-by: Justin Starry --- accounts-db/src/accounts.rs | 42 ++-- core/src/banking_stage/consume_worker.rs | 223 ++++++++++++++++-- core/src/banking_stage/consumer.rs | 32 ++- .../immutable_deserialized_packet.rs | 42 +++- .../banking_stage/latest_unprocessed_votes.rs | 2 +- core/src/banking_stage/scheduler_messages.rs | 8 +- .../prio_graph_scheduler.rs | 46 ++-- .../scheduler_controller.rs | 127 ++++++++-- .../transaction_state.rs | 46 +++- .../transaction_state_container.rs | 6 +- .../unprocessed_packet_batches.rs | 16 +- .../unprocessed_transaction_storage.rs | 8 +- runtime/src/bank/address_lookup_table.rs | 18 +- sdk/program/src/address_lookup_table/state.rs | 14 ++ 14 files changed, 505 insertions(+), 125 deletions(-) diff --git a/accounts-db/src/accounts.rs b/accounts-db/src/accounts.rs index 2584f900edbc49..cce35988aff69d 100644 --- a/accounts-db/src/accounts.rs +++ b/accounts-db/src/accounts.rs @@ -81,12 +81,14 @@ impl Accounts { } } + /// Return loaded addresses and the deactivation slot. + /// If the table hasn't been deactivated, the deactivation slot is `u64::MAX`. pub fn load_lookup_table_addresses( &self, ancestors: &Ancestors, address_table_lookup: SVMMessageAddressTableLookup, slot_hashes: &SlotHashes, - ) -> std::result::Result { + ) -> std::result::Result<(LoadedAddresses, Slot), AddressLookupError> { let table_account = self .accounts_db .load_with_fixed_root(ancestors, address_table_lookup.account_key) @@ -98,18 +100,21 @@ impl Accounts { let lookup_table = AddressLookupTable::deserialize(table_account.data()) .map_err(|_ix_err| AddressLookupError::InvalidAccountData)?; - Ok(LoadedAddresses { - writable: lookup_table.lookup( - current_slot, - address_table_lookup.writable_indexes, - slot_hashes, - )?, - readonly: lookup_table.lookup( - current_slot, - address_table_lookup.readonly_indexes, - slot_hashes, - )?, - }) + Ok(( + LoadedAddresses { + writable: lookup_table.lookup( + current_slot, + address_table_lookup.writable_indexes, + slot_hashes, + )?, + readonly: lookup_table.lookup( + current_slot, + address_table_lookup.readonly_indexes, + slot_hashes, + )?, + }, + lookup_table.meta.deactivation_slot, + )) } else { Err(AddressLookupError::InvalidAccountOwner) } @@ -806,10 +811,13 @@ mod tests { SVMMessageAddressTableLookup::from(&address_table_lookup), &SlotHashes::default(), ), - Ok(LoadedAddresses { - writable: vec![table_addresses[0]], - readonly: vec![table_addresses[1]], - }), + Ok(( + LoadedAddresses { + writable: vec![table_addresses[0]], + readonly: vec![table_addresses[1]], + }, + u64::MAX + )), ); } diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs index b676168bb04d4d..787901ffa521f8 100644 --- a/core/src/banking_stage/consume_worker.rs +++ b/core/src/banking_stage/consume_worker.rs @@ -107,7 +107,7 @@ impl ConsumeWorker { let output = self.consumer.process_and_record_aged_transactions( bank, &work.transactions, - &work.max_age_slots, + &work.max_ages, ); self.metrics.update_for_consume(&output); @@ -694,7 +694,7 @@ mod tests { crate::banking_stage::{ committer::Committer, qos_service::QosService, - scheduler_messages::{TransactionBatchId, TransactionId}, + scheduler_messages::{MaxAge, TransactionBatchId, TransactionId}, tests::{create_slow_genesis_config, sanitize_transactions, simulate_poh}, }, crossbeam_channel::unbounded, @@ -708,10 +708,25 @@ mod tests { vote_sender_types::ReplayVoteReceiver, }, solana_sdk::{ - genesis_config::GenesisConfig, poh_config::PohConfig, pubkey::Pubkey, - signature::Keypair, system_transaction, + address_lookup_table::AddressLookupTableAccount, + clock::{Slot, MAX_PROCESSING_AGE}, + genesis_config::GenesisConfig, + message::{ + v0::{self, LoadedAddresses}, + SimpleAddressLoader, VersionedMessage, + }, + poh_config::PohConfig, + pubkey::Pubkey, + signature::Keypair, + signer::Signer, + system_instruction, system_transaction, + transaction::{ + MessageHash, SanitizedTransaction, TransactionError, VersionedTransaction, + }, }, + solana_svm_transaction::svm_message::SVMMessage, std::{ + collections::HashSet, sync::{atomic::AtomicBool, RwLock}, thread::JoinHandle, }, @@ -742,6 +757,7 @@ mod tests { .. } = create_slow_genesis_config(10_000); let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + let bank = Arc::new(Bank::new_from_parent(bank, &Pubkey::new_unique(), 1)); let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()) @@ -820,17 +836,21 @@ mod tests { )]); let bid = TransactionBatchId::new(0); let id = TransactionId::new(0); + let max_age = MaxAge { + epoch_invalidation_slot: bank.slot(), + alt_invalidation_slot: bank.slot(), + }; let work = ConsumeWork { batch_id: bid, ids: vec![id], transactions, - max_age_slots: vec![bank.slot()], + max_ages: vec![max_age], }; consume_sender.send(work).unwrap(); let consumed = consumed_receiver.recv().unwrap(); assert_eq!(consumed.work.batch_id, bid); assert_eq!(consumed.work.ids, vec![id]); - assert_eq!(consumed.work.max_age_slots, vec![bank.slot()]); + assert_eq!(consumed.work.max_ages, vec![max_age]); assert_eq!(consumed.retryable_indexes, vec![0]); drop(test_frame); @@ -865,17 +885,21 @@ mod tests { )]); let bid = TransactionBatchId::new(0); let id = TransactionId::new(0); + let max_age = MaxAge { + epoch_invalidation_slot: bank.slot(), + alt_invalidation_slot: bank.slot(), + }; let work = ConsumeWork { batch_id: bid, ids: vec![id], transactions, - max_age_slots: vec![bank.slot()], + max_ages: vec![max_age], }; consume_sender.send(work).unwrap(); let consumed = consumed_receiver.recv().unwrap(); assert_eq!(consumed.work.batch_id, bid); assert_eq!(consumed.work.ids, vec![id]); - assert_eq!(consumed.work.max_age_slots, vec![bank.slot()]); + assert_eq!(consumed.work.max_ages, vec![max_age]); assert_eq!(consumed.retryable_indexes, Vec::::new()); drop(test_frame); @@ -911,19 +935,23 @@ mod tests { let bid = TransactionBatchId::new(0); let id1 = TransactionId::new(1); let id2 = TransactionId::new(0); + let max_age = MaxAge { + epoch_invalidation_slot: bank.slot(), + alt_invalidation_slot: bank.slot(), + }; consume_sender .send(ConsumeWork { batch_id: bid, ids: vec![id1, id2], transactions: txs, - max_age_slots: vec![bank.slot(), bank.slot()], + max_ages: vec![max_age, max_age], }) .unwrap(); let consumed = consumed_receiver.recv().unwrap(); assert_eq!(consumed.work.batch_id, bid); assert_eq!(consumed.work.ids, vec![id1, id2]); - assert_eq!(consumed.work.max_age_slots, vec![bank.slot(), bank.slot()]); + assert_eq!(consumed.work.max_ages, vec![max_age, max_age]); assert_eq!(consumed.retryable_indexes, vec![1]); // id2 is retryable since lock conflict drop(test_frame); @@ -968,12 +996,16 @@ mod tests { let bid2 = TransactionBatchId::new(1); let id1 = TransactionId::new(1); let id2 = TransactionId::new(0); + let max_age = MaxAge { + epoch_invalidation_slot: bank.slot(), + alt_invalidation_slot: bank.slot(), + }; consume_sender .send(ConsumeWork { batch_id: bid1, ids: vec![id1], transactions: txs1, - max_age_slots: vec![bank.slot()], + max_ages: vec![max_age], }) .unwrap(); @@ -982,22 +1014,185 @@ mod tests { batch_id: bid2, ids: vec![id2], transactions: txs2, - max_age_slots: vec![bank.slot()], + max_ages: vec![max_age], }) .unwrap(); let consumed = consumed_receiver.recv().unwrap(); assert_eq!(consumed.work.batch_id, bid1); assert_eq!(consumed.work.ids, vec![id1]); - assert_eq!(consumed.work.max_age_slots, vec![bank.slot()]); + assert_eq!(consumed.work.max_ages, vec![max_age]); assert_eq!(consumed.retryable_indexes, Vec::::new()); let consumed = consumed_receiver.recv().unwrap(); assert_eq!(consumed.work.batch_id, bid2); assert_eq!(consumed.work.ids, vec![id2]); - assert_eq!(consumed.work.max_age_slots, vec![bank.slot()]); + assert_eq!(consumed.work.max_ages, vec![max_age]); assert_eq!(consumed.retryable_indexes, Vec::::new()); drop(test_frame); let _ = worker_thread.join().unwrap(); } + + #[test] + fn test_worker_ttl() { + let (test_frame, worker) = setup_test_frame(); + let TestFrame { + mint_keypair, + genesis_config, + bank, + poh_recorder, + consume_sender, + consumed_receiver, + .. + } = &test_frame; + let worker_thread = std::thread::spawn(move || worker.run()); + poh_recorder + .write() + .unwrap() + .set_bank_for_test(bank.clone()); + assert!(bank.slot() > 0); + + // No conflicts between transactions. Test 6 cases. + // 1. Epoch expiration, before slot => still succeeds due to resanitizing + // 2. Epoch expiration, on slot => succeeds normally + // 3. Epoch expiration, after slot => succeeds normally + // 4. ALT expiration, before slot => fails + // 5. ALT expiration, on slot => succeeds normally + // 6. ALT expiration, after slot => succeeds normally + let simple_transfer = || { + system_transaction::transfer( + &Keypair::new(), + &Pubkey::new_unique(), + 1, + genesis_config.hash(), + ) + }; + let simple_v0_transfer = || { + let payer = Keypair::new(); + let to_pubkey = Pubkey::new_unique(); + let loaded_addresses = LoadedAddresses { + writable: vec![to_pubkey], + readonly: vec![], + }; + let loader = SimpleAddressLoader::Enabled(loaded_addresses); + SanitizedTransaction::try_create( + VersionedTransaction::try_new( + VersionedMessage::V0( + v0::Message::try_compile( + &payer.pubkey(), + &[system_instruction::transfer(&payer.pubkey(), &to_pubkey, 1)], + &[AddressLookupTableAccount { + key: Pubkey::new_unique(), // will fail if using **bank** to lookup + addresses: vec![to_pubkey], + }], + genesis_config.hash(), + ) + .unwrap(), + ), + &[&payer], + ) + .unwrap(), + MessageHash::Compute, + None, + loader, + &HashSet::default(), + ) + .unwrap() + }; + + let mut txs = sanitize_transactions(vec![ + simple_transfer(), + simple_transfer(), + simple_transfer(), + ]); + txs.push(simple_v0_transfer()); + txs.push(simple_v0_transfer()); + txs.push(simple_v0_transfer()); + let sanitized_txs = txs.clone(); + + // Fund the keypairs. + for tx in &txs { + bank.process_transaction(&system_transaction::transfer( + mint_keypair, + &tx.account_keys()[0], + 2, + genesis_config.hash(), + )) + .unwrap(); + } + + consume_sender + .send(ConsumeWork { + batch_id: TransactionBatchId::new(1), + ids: vec![ + TransactionId::new(0), + TransactionId::new(1), + TransactionId::new(2), + TransactionId::new(3), + TransactionId::new(4), + TransactionId::new(5), + ], + transactions: txs, + max_ages: vec![ + MaxAge { + epoch_invalidation_slot: bank.slot() - 1, + alt_invalidation_slot: Slot::MAX, + }, + MaxAge { + epoch_invalidation_slot: bank.slot(), + alt_invalidation_slot: Slot::MAX, + }, + MaxAge { + epoch_invalidation_slot: bank.slot() + 1, + alt_invalidation_slot: Slot::MAX, + }, + MaxAge { + epoch_invalidation_slot: u64::MAX, + alt_invalidation_slot: bank.slot() - 1, + }, + MaxAge { + epoch_invalidation_slot: u64::MAX, + alt_invalidation_slot: bank.slot(), + }, + MaxAge { + epoch_invalidation_slot: u64::MAX, + alt_invalidation_slot: bank.slot() + 1, + }, + ], + }) + .unwrap(); + + let consumed = consumed_receiver.recv().unwrap(); + assert_eq!(consumed.retryable_indexes, Vec::::new()); + // all but one succeed. 6 for initial funding + assert_eq!(bank.transaction_count(), 6 + 5); + + let already_processed_results = bank + .check_transactions( + &sanitized_txs, + &vec![Ok(()); sanitized_txs.len()], + MAX_PROCESSING_AGE, + &mut TransactionErrorMetrics::default(), + ) + .into_iter() + .map(|r| match r { + Ok(_) => Ok(()), + Err(err) => Err(err), + }) + .collect::>(); + assert_eq!( + already_processed_results, + vec![ + Err(TransactionError::AlreadyProcessed), + Err(TransactionError::AlreadyProcessed), + Err(TransactionError::AlreadyProcessed), + Ok(()), // <--- this transaction was not processed + Err(TransactionError::AlreadyProcessed), + Err(TransactionError::AlreadyProcessed) + ] + ); + + drop(test_frame); + let _ = worker_thread.join().unwrap(); + } } diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 57f1b1958b152c..ae04cc30ff0167 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -7,6 +7,7 @@ use { }, leader_slot_timing_metrics::LeaderExecuteAndCommitTimings, qos_service::QosService, + scheduler_messages::MaxAge, unprocessed_transaction_storage::{ConsumeScannerPayload, UnprocessedTransactionStorage}, BankingStageStats, }, @@ -25,12 +26,12 @@ use { }, solana_runtime_transaction::instructions_processor::process_compute_budget_instructions, solana_sdk::{ - clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, + clock::{FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, fee::FeeBudgetLimits, message::SanitizedMessage, saturating_add_assign, timing::timestamp, - transaction::{self, AddressLoader, SanitizedTransaction, TransactionError}, + transaction::{self, SanitizedTransaction, TransactionError}, }, solana_svm::{ account_loader::{validate_fee_payer, TransactionCheckResult}, @@ -429,7 +430,7 @@ impl Consumer { &self, bank: &Arc, txs: &[SanitizedTransaction], - max_slot_ages: &[Slot], + max_ages: &[MaxAge], ) -> ProcessTransactionBatchOutput { let move_precompile_verification_to_svm = bank .feature_set @@ -438,8 +439,9 @@ impl Consumer { // Need to filter out transactions since they were sanitized earlier. // This means that the transaction may cross and epoch boundary (not allowed), // or account lookup tables may have been closed. - let pre_results = txs.iter().zip(max_slot_ages).map(|(tx, max_slot_age)| { - if *max_slot_age < bank.slot() { + let pre_results = txs.iter().zip(max_ages).map(|(tx, max_age)| { + if bank.slot() > max_age.epoch_invalidation_slot { + // Epoch has rolled over. Need to fully re-verify the transaction. // Pre-compiles are verified here. // Attempt re-sanitization after epoch-cross. // Re-sanitized transaction should be equal to the original transaction, @@ -451,18 +453,24 @@ impl Consumer { return Err(TransactionError::ResanitizationNeeded); } } else { + if bank.slot() > max_age.alt_invalidation_slot { + // The address table lookup **may** have expired, but the + // expiration is not guaranteed since there may have been + // skipped slot. + // If the addresses still resolve here, then the transaction is still + // valid, and we can continue with processing. + // If they do not, then the ATL has expired and the transaction + // can be dropped. + let (_addresses, _deactivation_slot) = + bank.load_addresses_from_ref(tx.message_address_table_lookups())?; + } + // Verify pre-compiles. if !move_precompile_verification_to_svm { verify_precompiles(tx, &bank.feature_set)?; } - - // Any transaction executed between sanitization time and now may have closed the lookup table(s). - // Above re-sanitization already loads addresses, so don't need to re-check in that case. - let lookup_tables = tx.message().message_address_table_lookups(); - if !lookup_tables.is_empty() { - bank.load_addresses(lookup_tables)?; - } } + Ok(()) }); self.process_and_record_transactions_with_pre_results(bank, txs, 0, pre_results) diff --git a/core/src/banking_stage/immutable_deserialized_packet.rs b/core/src/banking_stage/immutable_deserialized_packet.rs index b03f3d5d64d4e8..978e4f9b935c7e 100644 --- a/core/src/banking_stage/immutable_deserialized_packet.rs +++ b/core/src/banking_stage/immutable_deserialized_packet.rs @@ -2,20 +2,21 @@ use { super::packet_filter::PacketFilterFailure, solana_compute_budget::compute_budget_limits::ComputeBudgetLimits, solana_perf::packet::Packet, + solana_runtime::bank::Bank, solana_runtime_transaction::instructions_processor::process_compute_budget_instructions, solana_sanitize::SanitizeError, solana_sdk::{ + clock::Slot, hash::Hash, - message::Message, + message::{v0::LoadedAddresses, AddressLoaderError, Message, SimpleAddressLoader}, pubkey::Pubkey, signature::Signature, - transaction::{ - AddressLoader, SanitizedTransaction, SanitizedVersionedTransaction, - VersionedTransaction, - }, + transaction::{SanitizedTransaction, SanitizedVersionedTransaction, VersionedTransaction}, }, solana_short_vec::decode_shortu16_len, - solana_svm_transaction::instruction::SVMInstruction, + solana_svm_transaction::{ + instruction::SVMInstruction, message_address_table_lookup::SVMMessageAddressTableLookup, + }, std::{cmp::Ordering, collections::HashSet, mem::size_of}, thiserror::Error, }; @@ -111,15 +112,22 @@ impl ImmutableDeserializedPacket { // This function deserializes packets into transactions, computes the blake3 hash of transaction // messages. + // Additionally, this returns the minimum deactivation slot of the resolved addresses. pub fn build_sanitized_transaction( &self, votes_only: bool, - address_loader: impl AddressLoader, + bank: &Bank, reserved_account_keys: &HashSet, - ) -> Option { + ) -> Option<(SanitizedTransaction, Slot)> { if votes_only && !self.is_simple_vote() { return None; } + + // Resolve the lookup addresses and retrieve the min deactivation slot + let (loaded_addresses, deactivation_slot) = + Self::resolve_addresses_with_deactivation(self.transaction(), bank).ok()?; + let address_loader = SimpleAddressLoader::Enabled(loaded_addresses); + let tx = SanitizedTransaction::try_new( self.transaction().clone(), *self.message_hash(), @@ -128,7 +136,23 @@ impl ImmutableDeserializedPacket { reserved_account_keys, ) .ok()?; - Some(tx) + Some((tx, deactivation_slot)) + } + + fn resolve_addresses_with_deactivation( + transaction: &SanitizedVersionedTransaction, + bank: &Bank, + ) -> Result<(LoadedAddresses, Slot), AddressLoaderError> { + let Some(address_table_lookups) = transaction.get_message().message.address_table_lookups() + else { + return Ok((LoadedAddresses::default(), Slot::MAX)); + }; + + bank.load_addresses_from_ref( + address_table_lookups + .iter() + .map(SVMMessageAddressTableLookup::from), + ) } } diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index bb97142bda5e81..ae13c37caa5a0e 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -420,7 +420,7 @@ impl LatestUnprocessedVotes { } let deserialized_vote_packet = vote.vote.as_ref().unwrap().clone(); - let Some(sanitized_vote_transaction) = deserialized_vote_packet + let Some((sanitized_vote_transaction, _deactivation_slot)) = deserialized_vote_packet .build_sanitized_transaction( bank.vote_only_bank(), bank.as_ref(), diff --git a/core/src/banking_stage/scheduler_messages.rs b/core/src/banking_stage/scheduler_messages.rs index d93d2d6dbb6c52..29e9b99f50588a 100644 --- a/core/src/banking_stage/scheduler_messages.rs +++ b/core/src/banking_stage/scheduler_messages.rs @@ -35,13 +35,19 @@ impl Display for TransactionId { } } +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub struct MaxAge { + pub epoch_invalidation_slot: Slot, + pub alt_invalidation_slot: Slot, +} + /// Message: [Scheduler -> Worker] /// Transactions to be consumed (i.e. executed, recorded, and committed) pub struct ConsumeWork { pub batch_id: TransactionBatchId, pub ids: Vec, pub transactions: Vec, - pub max_age_slots: Vec, + pub max_ages: Vec, } /// Message: [Worker -> Scheduler] diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 59ce92173ed26e..9f6fcc8388a364 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -9,7 +9,9 @@ use { crate::banking_stage::{ consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, read_write_account_set::ReadWriteAccountSet, - scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId, TransactionId}, + scheduler_messages::{ + ConsumeWork, FinishedConsumeWork, MaxAge, TransactionBatchId, TransactionId, + }, transaction_scheduler::{ transaction_priority_id::TransactionPriorityId, transaction_state::TransactionState, }, @@ -19,10 +21,7 @@ use { prio_graph::{AccessKind, PrioGraph}, solana_cost_model::block_cost_limits::MAX_BLOCK_UNITS, solana_measure::measure_us, - solana_sdk::{ - pubkey::Pubkey, saturating_add_assign, slot_history::Slot, - transaction::SanitizedTransaction, - }, + solana_sdk::{pubkey::Pubkey, saturating_add_assign, transaction::SanitizedTransaction}, }; pub(crate) struct PrioGraphScheduler { @@ -202,13 +201,13 @@ impl PrioGraphScheduler { Ok(TransactionSchedulingInfo { thread_id, transaction, - max_age_slot, + max_age, cost, }) => { saturating_add_assign!(num_scheduled, 1); batches.transactions[thread_id].push(transaction); batches.ids[thread_id].push(id.id); - batches.max_age_slots[thread_id].push(max_age_slot); + batches.max_ages[thread_id].push(max_age); saturating_add_assign!(batches.total_cus[thread_id], cost); // If target batch size is reached, send only this batch. @@ -309,7 +308,7 @@ impl PrioGraphScheduler { batch_id, ids, transactions, - max_age_slots, + max_ages, }, retryable_indexes, }) => { @@ -321,8 +320,8 @@ impl PrioGraphScheduler { // Retryable transactions should be inserted back into the container let mut retryable_iter = retryable_indexes.into_iter().peekable(); - for (index, (id, transaction, max_age_slot)) in - izip!(ids, transactions, max_age_slots).enumerate() + for (index, (id, transaction, max_age)) in + izip!(ids, transactions, max_ages).enumerate() { if let Some(retryable_index) = retryable_iter.peek() { if *retryable_index == index { @@ -330,7 +329,7 @@ impl PrioGraphScheduler { id, SanitizedTransactionTTL { transaction, - max_age_slot, + max_age, }, ); retryable_iter.next(); @@ -392,7 +391,7 @@ impl PrioGraphScheduler { return Ok(0); } - let (ids, transactions, max_age_slots, total_cus) = batches.take_batch(thread_index); + let (ids, transactions, max_ages, total_cus) = batches.take_batch(thread_index); let batch_id = self .in_flight_tracker @@ -403,7 +402,7 @@ impl PrioGraphScheduler { batch_id, ids, transactions, - max_age_slots, + max_ages, }; self.consume_work_senders[thread_index] .send(work) @@ -477,7 +476,7 @@ pub(crate) struct SchedulingSummary { struct Batches { ids: Vec>, transactions: Vec>, - max_age_slots: Vec>, + max_ages: Vec>, total_cus: Vec, } @@ -486,7 +485,7 @@ impl Batches { Self { ids: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], transactions: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], - max_age_slots: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], + max_ages: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], total_cus: vec![0; num_threads], } } @@ -497,7 +496,7 @@ impl Batches { ) -> ( Vec, Vec, - Vec, + Vec, u64, ) { ( @@ -510,7 +509,7 @@ impl Batches { Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH), ), core::mem::replace( - &mut self.max_age_slots[thread_id], + &mut self.max_ages[thread_id], Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH), ), core::mem::replace(&mut self.total_cus[thread_id], 0), @@ -522,7 +521,7 @@ impl Batches { struct TransactionSchedulingInfo { thread_id: ThreadId, transaction: SanitizedTransaction, - max_age_slot: Slot, + max_age: MaxAge, cost: u64, } @@ -583,7 +582,7 @@ fn try_schedule_transaction( Ok(TransactionSchedulingInfo { thread_id, transaction: sanitized_transaction_ttl.transaction, - max_age_slot: sanitized_transaction_ttl.max_age_slot, + max_age: sanitized_transaction_ttl.max_age, cost, }) } @@ -599,8 +598,8 @@ mod tests { crossbeam_channel::{unbounded, Receiver}, itertools::Itertools, solana_sdk::{ - compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, packet::Packet, - pubkey::Pubkey, signature::Keypair, signer::Signer, system_instruction, + clock::Slot, compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, + packet::Packet, pubkey::Pubkey, signature::Keypair, signer::Signer, system_instruction, transaction::Transaction, }, std::{borrow::Borrow, sync::Arc}, @@ -686,7 +685,10 @@ mod tests { ); let transaction_ttl = SanitizedTransactionTTL { transaction, - max_age_slot: Slot::MAX, + max_age: MaxAge { + epoch_invalidation_slot: Slot::MAX, + alt_invalidation_slot: Slot::MAX, + }, }; const TEST_TRANSACTION_COST: u64 = 5000; container.insert_new_transaction( diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 995b1a5782702b..ddec4ec90711c8 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -19,6 +19,7 @@ use { forwarder::Forwarder, immutable_deserialized_packet::ImmutableDeserializedPacket, packet_deserializer::PacketDeserializer, + scheduler_messages::MaxAge, ForwardOption, LikeClusterInfo, TOTAL_BUFFERED_PACKETS, }, arrayvec::ArrayVec, @@ -30,7 +31,8 @@ use { solana_runtime_transaction::instructions_processor::process_compute_budget_instructions, solana_sdk::{ self, - clock::{FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, + address_lookup_table::state::estimate_last_valid_slot, + clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, fee::FeeBudgetLimits, saturating_add_assign, transaction::SanitizedTransaction, @@ -500,16 +502,25 @@ impl SchedulerController { // Convert to Arcs let packets: Vec<_> = packets.into_iter().map(Arc::new).collect(); // Sanitize packets, generate IDs, and insert into the container. - let bank = self.bank_forks.read().unwrap().working_bank(); - let last_slot_in_epoch = bank.epoch_schedule().get_last_slot_in_epoch(bank.epoch()); - let transaction_account_lock_limit = bank.get_transaction_account_lock_limit(); - let vote_only = bank.vote_only_bank(); + let (root_bank, working_bank) = { + let bank_forks = self.bank_forks.read().unwrap(); + let root_bank = bank_forks.root_bank(); + let working_bank = bank_forks.working_bank(); + (root_bank, working_bank) + }; + let alt_resolved_slot = root_bank.slot(); + let last_slot_in_epoch = working_bank + .epoch_schedule() + .get_last_slot_in_epoch(working_bank.epoch()); + let transaction_account_lock_limit = working_bank.get_transaction_account_lock_limit(); + let vote_only = working_bank.vote_only_bank(); const CHUNK_SIZE: usize = 128; let lock_results: [_; CHUNK_SIZE] = core::array::from_fn(|_| Ok(())); let mut arc_packets = ArrayVec::<_, CHUNK_SIZE>::new(); let mut transactions = ArrayVec::<_, CHUNK_SIZE>::new(); + let mut max_ages = ArrayVec::<_, CHUNK_SIZE>::new(); let mut fee_budget_limits_vec = ArrayVec::<_, CHUNK_SIZE>::new(); let mut error_counts = TransactionErrorMetrics::default(); @@ -521,31 +532,43 @@ impl SchedulerController { packet .build_sanitized_transaction( vote_only, - bank.as_ref(), - bank.get_reserved_account_keys(), + root_bank.as_ref(), + working_bank.get_reserved_account_keys(), ) - .map(|tx| (packet.clone(), tx)) + .map(|(tx, deactivation_slot)| (packet.clone(), tx, deactivation_slot)) }) .inspect(|_| saturating_add_assign!(post_sanitization_count, 1)) - .filter(|(_packet, tx)| { + .filter(|(_packet, tx, _deactivation_slot)| { validate_account_locks( tx.message().account_keys(), transaction_account_lock_limit, ) .is_ok() }) - .filter_map(|(packet, tx)| { + .filter_map(|(packet, tx, deactivation_slot)| { process_compute_budget_instructions(SVMMessage::program_instructions_iter(&tx)) - .map(|compute_budget| (packet, tx, compute_budget.into())) + .map(|compute_budget| { + (packet, tx, deactivation_slot, compute_budget.into()) + }) .ok() }) - .for_each(|(packet, tx, fee_budget_limits)| { + .for_each(|(packet, tx, deactivation_slot, fee_budget_limits)| { arc_packets.push(packet); transactions.push(tx); + max_ages.push(calculate_max_age( + last_slot_in_epoch, + deactivation_slot, + alt_resolved_slot, + )); fee_budget_limits_vec.push(fee_budget_limits); }); - let check_results = bank.check_transactions( + let check_results: Vec< + Result< + solana_svm::account_loader::CheckedTransactionDetails, + solana_sdk::transaction::TransactionError, + >, + > = working_bank.check_transactions( &transactions, &lock_results[..transactions.len()], MAX_PROCESSING_AGE, @@ -556,21 +579,26 @@ impl SchedulerController { let mut post_transaction_check_count: usize = 0; let mut num_dropped_on_capacity: usize = 0; let mut num_buffered: usize = 0; - for (((packet, transaction), fee_budget_limits), _check_result) in arc_packets - .drain(..) - .zip(transactions.drain(..)) - .zip(fee_budget_limits_vec.drain(..)) - .zip(check_results) - .filter(|(_, check_result)| check_result.is_ok()) + for ((((packet, transaction), max_age), fee_budget_limits), _check_result) in + arc_packets + .drain(..) + .zip(transactions.drain(..)) + .zip(max_ages.drain(..)) + .zip(fee_budget_limits_vec.drain(..)) + .zip(check_results) + .filter(|(_, check_result)| check_result.is_ok()) { saturating_add_assign!(post_transaction_check_count, 1); let transaction_id = self.transaction_id_generator.next(); - let (priority, cost) = - Self::calculate_priority_and_cost(&transaction, &fee_budget_limits, &bank); + let (priority, cost) = Self::calculate_priority_and_cost( + &transaction, + &fee_budget_limits, + &working_bank, + ); let transaction_ttl = SanitizedTransactionTTL { transaction, - max_age_slot: last_slot_in_epoch, + max_age, }; if self.container.insert_new_transaction( @@ -655,6 +683,34 @@ impl SchedulerController { } } +/// Given the last slot in the epoch, the minimum deactivation slot, +/// and the current slot, return the `MaxAge` that should be used for +/// the transaction. This is used to determine the maximum slot that a +/// transaction will be considered valid for, without re-resolving addresses +/// or resanitizing. +/// +/// This function considers the deactivation period of Address Table +/// accounts. If the deactivation period runs past the end of the epoch, +/// then the transaction is considered valid until the end of the epoch. +/// Otherwise, the transaction is considered valid until the deactivation +/// period. +/// +/// Since the deactivation period technically uses blocks rather than +/// slots, the value used here is the lower-bound on the deactivation +/// period, i.e. the transaction's address lookups are valid until +/// AT LEAST this slot. +fn calculate_max_age( + last_slot_in_epoch: Slot, + deactivation_slot: Slot, + current_slot: Slot, +) -> MaxAge { + let alt_min_expire_slot = estimate_last_valid_slot(deactivation_slot.min(current_slot)); + MaxAge { + epoch_invalidation_slot: last_slot_in_epoch, + alt_invalidation_slot: alt_min_expire_slot, + } +} + #[cfg(test)] mod tests { use { @@ -827,7 +883,7 @@ mod tests { batch_id: TransactionBatchId::new(0), ids: vec![], transactions: vec![], - max_age_slots: vec![], + max_ages: vec![], }, retryable_indexes: vec![], }) @@ -1158,4 +1214,29 @@ mod tests { .collect_vec(); assert_eq!(message_hashes, vec![&tx1_hash]); } + + #[test] + fn test_calculate_max_age() { + let current_slot = 100; + let last_slot_in_epoch = 1000; + + // ALT deactivation slot is delayed + assert_eq!( + calculate_max_age(last_slot_in_epoch, current_slot - 1, current_slot), + MaxAge { + epoch_invalidation_slot: last_slot_in_epoch, + alt_invalidation_slot: current_slot - 1 + + solana_sdk::slot_hashes::get_entries() as u64, + } + ); + + // no deactivation slot + assert_eq!( + calculate_max_age(last_slot_in_epoch, u64::MAX, current_slot), + MaxAge { + epoch_invalidation_slot: last_slot_in_epoch, + alt_invalidation_slot: current_slot + solana_sdk::slot_hashes::get_entries() as u64, + } + ); + } } diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state.rs b/core/src/banking_stage/transaction_scheduler/transaction_state.rs index 85af8217309e93..efb59be1b8b5b5 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state.rs @@ -1,13 +1,15 @@ use { - crate::banking_stage::immutable_deserialized_packet::ImmutableDeserializedPacket, - solana_sdk::{clock::Slot, transaction::SanitizedTransaction}, + crate::banking_stage::{ + immutable_deserialized_packet::ImmutableDeserializedPacket, scheduler_messages::MaxAge, + }, + solana_sdk::transaction::SanitizedTransaction, std::sync::Arc, }; /// Simple wrapper type to tie a sanitized transaction to max age slot. pub(crate) struct SanitizedTransactionTTL { pub(crate) transaction: SanitizedTransaction, - pub(crate) max_age_slot: Slot, + pub(crate) max_age: MaxAge, } /// TransactionState is used to track the state of a transaction in the transaction scheduler @@ -207,8 +209,9 @@ mod tests { use { super::*, solana_sdk::{ - compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, packet::Packet, - signature::Keypair, signer::Signer, system_instruction, transaction::Transaction, + clock::Slot, compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, + packet::Packet, signature::Keypair, signer::Signer, system_instruction, + transaction::Transaction, }, }; @@ -230,7 +233,10 @@ mod tests { ); let transaction_ttl = SanitizedTransactionTTL { transaction: SanitizedTransaction::from_transaction_for_tests(tx), - max_age_slot: Slot::MAX, + max_age: MaxAge { + epoch_invalidation_slot: Slot::MAX, + alt_invalidation_slot: Slot::MAX, + }, }; const TEST_TRANSACTION_COST: u64 = 5000; TransactionState::new( @@ -271,11 +277,11 @@ mod tests { // Manually clone `SanitizedTransactionTTL` let SanitizedTransactionTTL { transaction, - max_age_slot, + max_age, } = transaction_state.transaction_ttl(); let transaction_ttl = SanitizedTransactionTTL { transaction: transaction.clone(), - max_age_slot: *max_age_slot, + max_age: *max_age, }; transaction_state.transition_to_unprocessed(transaction_ttl); // invalid transition } @@ -321,7 +327,13 @@ mod tests { transaction_state, TransactionState::Unprocessed { .. } )); - assert_eq!(transaction_ttl.max_age_slot, Slot::MAX); + assert_eq!( + transaction_ttl.max_age, + MaxAge { + epoch_invalidation_slot: Slot::MAX, + alt_invalidation_slot: Slot::MAX, + } + ); let _ = transaction_state.transition_to_pending(); assert!(matches!( @@ -339,7 +351,13 @@ mod tests { transaction_state, TransactionState::Unprocessed { .. } )); - assert_eq!(transaction_ttl.max_age_slot, Slot::MAX); + assert_eq!( + transaction_ttl.max_age, + MaxAge { + epoch_invalidation_slot: Slot::MAX, + alt_invalidation_slot: Slot::MAX, + } + ); // ensure transaction_ttl is not lost through state transitions let transaction_ttl = transaction_state.transition_to_pending(); @@ -354,6 +372,12 @@ mod tests { transaction_state, TransactionState::Unprocessed { .. } )); - assert_eq!(transaction_ttl.max_age_slot, Slot::MAX); + assert_eq!( + transaction_ttl.max_age, + MaxAge { + epoch_invalidation_slot: Slot::MAX, + alt_invalidation_slot: Slot::MAX, + } + ); } } diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index ed78b41983fa2a..7d40c66ec1b673 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -153,6 +153,7 @@ impl TransactionStateContainer { mod tests { use { super::*, + crate::banking_stage::scheduler_messages::MaxAge, solana_sdk::{ compute_budget::ComputeBudgetInstruction, hash::Hash, @@ -198,7 +199,10 @@ mod tests { ); let transaction_ttl = SanitizedTransactionTTL { transaction: tx, - max_age_slot: Slot::MAX, + max_age: MaxAge { + epoch_invalidation_slot: Slot::MAX, + alt_invalidation_slot: Slot::MAX, + }, }; const TEST_TRANSACTION_COST: u64 = 5000; (transaction_ttl, packet, priority, TEST_TRANSACTION_COST) diff --git a/core/src/banking_stage/unprocessed_packet_batches.rs b/core/src/banking_stage/unprocessed_packet_batches.rs index f92eeb09c57b54..3c4e0f66664dd2 100644 --- a/core/src/banking_stage/unprocessed_packet_batches.rs +++ b/core/src/banking_stage/unprocessed_packet_batches.rs @@ -307,13 +307,14 @@ mod tests { use { super::*, solana_perf::packet::PacketFlags, + solana_runtime::bank::Bank, solana_sdk::{ compute_budget::ComputeBudgetInstruction, message::Message, reserved_account_keys::ReservedAccountKeys, signature::{Keypair, Signer}, system_instruction, system_transaction, - transaction::{SimpleAddressLoader, Transaction}, + transaction::Transaction, }, solana_vote_program::{vote_state::TowerSync, vote_transaction}, }; @@ -475,6 +476,7 @@ mod tests { &keypair, None, ); + let bank = Bank::default_for_tests(); // packets with no votes { @@ -486,7 +488,7 @@ mod tests { let txs = packet_vector.iter().filter_map(|tx| { tx.immutable_section().build_sanitized_transaction( votes_only, - SimpleAddressLoader::Disabled, + &bank, &ReservedAccountKeys::empty_key_set(), ) }); @@ -496,7 +498,7 @@ mod tests { let txs = packet_vector.iter().filter_map(|tx| { tx.immutable_section().build_sanitized_transaction( votes_only, - SimpleAddressLoader::Disabled, + &bank, &ReservedAccountKeys::empty_key_set(), ) }); @@ -515,7 +517,7 @@ mod tests { let txs = packet_vector.iter().filter_map(|tx| { tx.immutable_section().build_sanitized_transaction( votes_only, - SimpleAddressLoader::Disabled, + &bank, &ReservedAccountKeys::empty_key_set(), ) }); @@ -525,7 +527,7 @@ mod tests { let txs = packet_vector.iter().filter_map(|tx| { tx.immutable_section().build_sanitized_transaction( votes_only, - SimpleAddressLoader::Disabled, + &bank, &ReservedAccountKeys::empty_key_set(), ) }); @@ -544,7 +546,7 @@ mod tests { let txs = packet_vector.iter().filter_map(|tx| { tx.immutable_section().build_sanitized_transaction( votes_only, - SimpleAddressLoader::Disabled, + &bank, &ReservedAccountKeys::empty_key_set(), ) }); @@ -554,7 +556,7 @@ mod tests { let txs = packet_vector.iter().filter_map(|tx| { tx.immutable_section().build_sanitized_transaction( votes_only, - SimpleAddressLoader::Disabled, + &bank, &ReservedAccountKeys::empty_key_set(), ) }); diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index f612f5eaf08b11..56e814acea9219 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -154,13 +154,15 @@ fn consume_scan_should_process_packet( return ProcessingDecision::Now; } - // Try to sanitize the packet + // Try to sanitize the packet. Ignore deactivation slot since we are + // immediately attempting to process the transaction. let (maybe_sanitized_transaction, sanitization_time_us) = measure_us!(packet .build_sanitized_transaction( bank.vote_only_bank(), bank, bank.get_reserved_account_keys(), - )); + ) + .map(|(tx, _deactivation_slot)| tx)); payload .slot_metrics_tracker @@ -799,7 +801,7 @@ impl ThreadLocalUnprocessedPackets { bank, bank.get_reserved_account_keys(), ) - .map(|transaction| (transaction, packet_index)) + .map(|(transaction, _deactivation_slot)| (transaction, packet_index)) }) .unzip(); diff --git a/runtime/src/bank/address_lookup_table.rs b/runtime/src/bank/address_lookup_table.rs index 4fa4e2bc0f570a..cb195202c9ddac 100644 --- a/runtime/src/bank/address_lookup_table.rs +++ b/runtime/src/bank/address_lookup_table.rs @@ -2,6 +2,7 @@ use { super::Bank, solana_sdk::{ address_lookup_table::error::AddressLookupError, + clock::Slot, message::{ v0::{LoadedAddresses, MessageAddressTableLookup}, AddressLoaderError, @@ -32,22 +33,25 @@ impl AddressLoader for &Bank { .iter() .map(SVMMessageAddressTableLookup::from), ) + .map(|(loaded_addresses, _deactivation_slot)| loaded_addresses) } } impl Bank { - /// Load addresses from an iterator of `SVMMessageAddressTableLookup`. + /// Load addresses from an iterator of `SVMMessageAddressTableLookup`, + /// additionally returning the minimum deactivation slot across all referenced ALTs pub fn load_addresses_from_ref<'a>( &self, address_table_lookups: impl Iterator>, - ) -> Result { + ) -> Result<(LoadedAddresses, Slot), AddressLoaderError> { let slot_hashes = self .transaction_processor .sysvar_cache() .get_slot_hashes() .map_err(|_| AddressLoaderError::SlotHashesSysvarNotFound)?; - address_table_lookups + let mut deactivation_slot = u64::MAX; + let loaded_addresses = address_table_lookups .map(|address_table_lookup| { self.rc .accounts @@ -56,8 +60,14 @@ impl Bank { address_table_lookup, &slot_hashes, ) + .map(|(loaded_addresses, table_deactivation_slot)| { + deactivation_slot = deactivation_slot.min(table_deactivation_slot); + loaded_addresses + }) .map_err(into_address_loader_error) }) - .collect::>() + .collect::>()?; + + Ok((loaded_addresses, deactivation_slot)) } } diff --git a/sdk/program/src/address_lookup_table/state.rs b/sdk/program/src/address_lookup_table/state.rs index 13a66637faa919..3136dd063f85da 100644 --- a/sdk/program/src/address_lookup_table/state.rs +++ b/sdk/program/src/address_lookup_table/state.rs @@ -1,6 +1,7 @@ #[cfg(feature = "frozen-abi")] use solana_frozen_abi_macro::{AbiEnumVisitor, AbiExample}; use { + crate::slot_hashes::get_entries, serde_derive::{Deserialize, Serialize}, solana_clock::Slot, solana_program::{ @@ -12,6 +13,19 @@ use { std::borrow::Cow, }; +/// The lookup table may be in a deactivating state until +/// the `deactivation_slot`` is no longer "recent". +/// This function returns a conservative estimate for the +/// last block that the table may be used for lookups. +/// This estimate may be incorrect due to skipped blocks, +/// however, if the current slot is lower than the returned +/// value, the table is guaranteed to still be in the +/// deactivating state. +#[inline] +pub fn estimate_last_valid_slot(deactivation_slot: Slot) -> Slot { + deactivation_slot.saturating_add(get_entries() as Slot) +} + /// The maximum number of addresses that a lookup table can hold pub const LOOKUP_TABLE_MAX_ADDRESSES: usize = 256;