-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Split out voting and banking threads in banking stage #27931
Split out voting and banking threads in banking stage #27931
Conversation
c400a2a
to
79a4b3a
Compare
They should maybe have separate block compute limits too? Right now it seems that there is a potential for non-vote transaction compute to starve out votes due to non votes filling up block compute (see recent discussion in mb planning channel) |
79a4b3a
to
dd742cf
Compare
The priority details are being ripped out of vote txs. With this change vote threads and bank threads would compete to add to block however having some sort of separation might be interesting cc @taozhu-chicago . |
core/src/banking_stage.rs
Outdated
@@ -648,12 +685,111 @@ impl BankingStage { | |||
(Ok(()), packet_vec_len, Some(leader_pubkey)) | |||
} | |||
|
|||
#[allow(clippy::too_many_arguments)] | |||
fn processing_function( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: processing_function
-> do_process_packets
// Mark so we don't forward again | ||
deserialized_packet.forwarded = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like it should be handled inside filter_valid_packets_for_forwarding
, was this a bug with the existing code?
If so, we should check this fix in separately so we don't block the fix behind conflicts with Tao's changes here: #26798
core/src/banking_stage.rs
Outdated
@@ -2054,16 +2058,16 @@ impl BankingStage { | |||
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed); | |||
banking_stage_stats | |||
.current_buffered_packet_batches_count | |||
.swap(buffered_packet_batches.len(), Ordering::Relaxed); | |||
.swap(unprocessed_transaction_storage.len(), Ordering::Relaxed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's going on here? this doesn't seem right.
It's not new in this change, but why not just remove the duplicate stat if we can't get an actual number of "packet_batches"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, probably would want to divide by batch_size
here, not sure what the original intent of this metric was.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems at some point we had
banking_stage_stats.current_buffered_packets_count.swap(
buffered_packets.iter().map(|packets| packets.1.len()).sum(),
Ordering::Relaxed,
);
because back then we stored the packets as batches, however now we use individual packets in the minmaxheap.
I think that means we can deprecate one of these metric points.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree too, time to deprecate some metrics, can only back-fitting them for so long :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah agreed, this can be deleted, as well as the corresponding field in the datapoint. The current_buffered_packets_count
below this covers the number of packets in the buffer already
core/src/banking_stage.rs
Outdated
@@ -1117,8 +1140,7 @@ impl BankingStage { | |||
&socket, | |||
poh_recorder, | |||
cluster_info, | |||
&mut buffered_packet_batches, | |||
&forward_option, | |||
&mut unprocessed_transaction_storage, | |||
transaction_status_sender.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use a reference here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are you referring to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ha sorry, that comment wasn't clear.
You changed a bunch of the transaction_status_sender
args into references. But here we are cloning, so just wondering if you missed this one or if there's a reason we need to clone it here.
) | ||
} | ||
|
||
fn filter_forwardable_packets_and_add_batches( | ||
&mut self, | ||
forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, | ||
) -> FilterForwardingResults { | ||
BankingStage::filter_valid_packets_for_forwarding( | ||
let results = BankingStage::filter_valid_packets_for_forwarding( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we've got a circular dependency with BankingStage
and UnprocessedTransactionStorage
. Might be better to move the filter_valid_packets_for_forwarding
here. Since this is the only place it's used.
@@ -25,6 +23,9 @@ pub enum UnprocessedTransactionStorage { | |||
LocalTransactionStorage(ThreadLocalUnprocessedPackets), | |||
} | |||
|
|||
unsafe impl Send for UnprocessedTransactionStorage {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced we can actually do this w/o introducing race conditions on the reference counts for votes.
Even though we wrap in an RwLock in the VoteStorage structure, I think it's possible another thread could drop or clone the Rc from another reference to it. Then because it is non-atomically updated, we end up with a miscounted Rc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, in my local changes for the tx-scheduler, I am actually sharing these references between threads, and converted the Rc to Arc.
It seems it'd be useful if I split that change out so we could use it here. I'll do that shortly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^ I split out #28145 if we go that route here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, this only works in this specific situation where there is only 1 thread dropping the Rc - even then it's pretty sketch. I would also prefer it to be an Arc. I can hold off and wait for #28145 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AshwinSekar, #28145 is merged
&mut self, | ||
packet_batch: &PacketBatch, | ||
packet_indexes: &[usize], | ||
deserialized_packets: Vec<ImmutableDeserializedPacket>, | ||
) -> InsertPacketBatchesSummary { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming-nit: we insert a "batch" but get "batches" summary back.
) -> InsertPacketBatchesSummary { | ||
match self { | ||
Self::VoteStorage(vote_storage) => { | ||
let VoteBatchInsertionMetrics { | ||
num_dropped_gossip, | ||
num_dropped_tpu, | ||
} = vote_storage.deserialize_and_insert_batch(packet_batch, packet_indexes); | ||
} = vote_storage.insert_batch(deserialized_packets); | ||
InsertPacketBatchesSummary { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not have the respective insert_batch
functions just return the summary struct directly?
Was bench-tps run to compare before/after this change? |
dd742cf
to
4e8c730
Compare
} | ||
} | ||
|
||
/// Filter out packets that fail to sanitize, or are no longer valid (could be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this code is moved (untouched) to unprocessed_transaction_storage for better encapsulation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be a separate PR as well I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caused some issues because we didn't have an unprocessed_transaction_storage to invoke on, we're still using packet batches on master.
@@ -2057,35 +1804,6 @@ impl BankingStage { | |||
.collect_vec() | |||
} | |||
|
|||
/// Checks a batch of sanitized transactions again bank for age and status | |||
fn bank_check_transactions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was moved to the bank itself.
|
||
Self::filter_valid_transaction_indexes(&results, transaction_to_packet_indexes) | ||
} | ||
|
||
fn filter_processed_packets<'a, F>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to unprocessed_transaction_storage
@@ -3466,144 +3170,6 @@ mod tests { | |||
Blockstore::destroy(ledger_path.path()).unwrap(); | |||
} | |||
|
|||
#[test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test also moved to unprocessed_transaction_storage
// messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned | ||
// with their packet indexes. | ||
#[allow(clippy::needless_collect)] | ||
pub fn transaction_from_deserialized_packet( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this to ImmutableDeserializedPacket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wish this is in a separate PR that can be easily merged, so I can use it in my current PR now 😃 But all good, can be merge later
.collect(); | ||
vote_slots.reverse(); | ||
let vote_tx = vote_transaction::new_vote_transaction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to new vote instruction
|
cdc2706
to
53a11a0
Compare
53a11a0
to
8d7f0c6
Compare
Additionally this allows us to aggressively prune the buffer for voting threads as with the new vote state only the latest vote from each validator is necessary.
8d7f0c6
to
073de96
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Only minor comments. Still wait for feedback from Carl or Tao - bigger change should have more than just my eyes on it
.map(|bank_forks| { | ||
let bank = bank_forks.root_bank(); | ||
bank.feature_set | ||
.is_active(&allow_votes_to_directly_update_vote_state::id()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any concern to use a feature check only at start up time to choose how we initialize? The feature gets activated, but then that changes how we restart.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah originally we were considering a hot swap approach where we check root bank every loop to decide however this seemed cumbersome. Not to mention that this hot swap would then require another feature in order to activate and with the rate that feature activations seem to be going it would be quicker to just wait for a restart.
|
||
// This function deserializes packets into transactions, computes the blake3 hash of transaction | ||
// messages, and verifies secp256k1 instructions. | ||
pub fn compute_sanitized_transaction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: compute seems like an odd name. build?
core/src/latest_unprocessed_votes.rs
Outdated
@@ -240,12 +230,12 @@ impl LatestUnprocessedVotes { | |||
/// Votes from validators with 0 stakes are ignored | |||
pub fn get_and_insert_forwardable_packets( | |||
&self, | |||
bank: Arc<Bank>, | |||
bank: &Arc<Bank>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: &Bank. don't need the Arc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me, wish some of refactoring were in their own PRs, but OK with this. There are other comments/nits to address, otherwise good to ge
// messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned | ||
// with their packet indexes. | ||
#[allow(clippy::needless_collect)] | ||
pub fn transaction_from_deserialized_packet( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wish this is in a separate PR that can be easily merged, so I can use it in my current PR now 😃 But all good, can be merge later
@@ -104,7 +107,7 @@ fn insert_packet_batches( | |||
#[allow(clippy::unit_arg)] | |||
fn bench_packet_clone(bencher: &mut Bencher) { | |||
let batch_count = 1000; | |||
let packet_per_batch_count = 128; | |||
let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for using proper const here
| Ok(VoteInstruction::UpdateVoteStateSwitch(vote_state_update, _)) | ||
| Ok(VoteInstruction::CompactUpdateVoteState(vote_state_update)) | ||
| Ok(VoteInstruction::CompactUpdateVoteStateSwitch(vote_state_update, _)) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we introduce a is_single_vote_update()
on VoteInstruction
that returns true/false
for each of these enum variations? That way we don't miss these cases in the future.
Can be a separate PR as well, since this seems like a bug in the existing code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will address in subsequent pr
fn deserialize_packets<'a>( | ||
packet_batch: &'a PacketBatch, | ||
packet_indexes: &'a [usize], | ||
vote_source: VoteSource, | ||
) -> impl Iterator<Item = LatestValidatorVotePacket> + 'a { | ||
packet_indexes.iter().filter_map(move |packet_index| { | ||
LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source).ok() | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function seems unchanged, can we move this back where it was before to make the diff smaller
} | ||
} | ||
|
||
/// Filter out packets that fail to sanitize, or are no longer valid (could be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be a separate PR as well I think
core/src/banking_stage.rs
Outdated
@@ -2054,16 +2058,16 @@ impl BankingStage { | |||
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed); | |||
banking_stage_stats | |||
.current_buffered_packet_batches_count | |||
.swap(buffered_packet_batches.len(), Ordering::Relaxed); | |||
.swap(unprocessed_transaction_storage.len(), Ordering::Relaxed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah agreed, this can be deleted, as well as the corresponding field in the datapoint. The current_buffered_packets_count
below this covers the number of packets in the buffer already
core/src/banking_stage.rs
Outdated
fn test_unprocessed_transaction_storage_insert() -> Result<(), Box<dyn Error>> { | ||
let keypair = Keypair::new(); | ||
let vote_keypair = Keypair::new(); | ||
let pubkey = solana_sdk::pubkey::new_rand(); | ||
|
||
let retryable_indexes = [0, 1, 2, 3, 5]; | ||
let mut non_retryable_indexes = vec![]; | ||
let f = |start, end| { | ||
non_retryable_indexes.push((start, end)); | ||
}; | ||
BankingStage::filter_processed_packets(retryable_indexes.iter(), f); | ||
assert_eq!(non_retryable_indexes, vec![(4, 5)]); | ||
let small_transfer = Packet::from_data( | ||
None, | ||
system_transaction::transfer(&keypair, &pubkey, 1, Hash::new_unique()), | ||
)?; | ||
let mut vote = Packet::from_data( | ||
None, | ||
new_vote_state_update_transaction( | ||
VoteStateUpdate::default(), | ||
Hash::new_unique(), | ||
&keypair, | ||
&vote_keypair, | ||
&vote_keypair, | ||
None, | ||
), | ||
)?; | ||
vote.meta.flags.set(PacketFlags::SIMPLE_VOTE_TX, true); | ||
let big_transfer = Packet::from_data( | ||
None, | ||
system_transaction::transfer(&keypair, &pubkey, 1000000, Hash::new_unique()), | ||
)?; | ||
|
||
let retryable_indexes = [1, 2, 3]; | ||
let mut non_retryable_indexes = vec![]; | ||
let f = |start, end| { | ||
non_retryable_indexes.push((start, end)); | ||
}; | ||
BankingStage::filter_processed_packets(retryable_indexes.iter(), f); | ||
assert_eq!(non_retryable_indexes, vec![(0, 1)]); | ||
for thread_type in [ | ||
ThreadType::Transactions, | ||
ThreadType::Voting(VoteSource::Gossip), | ||
ThreadType::Voting(VoteSource::Tpu), | ||
] { | ||
let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( | ||
UnprocessedPacketBatches::with_capacity(100), | ||
thread_type, | ||
); | ||
transaction_storage.insert_batch(vec![ | ||
ImmutableDeserializedPacket::new(small_transfer.clone(), None)?, | ||
ImmutableDeserializedPacket::new(vote.clone(), None)?, | ||
ImmutableDeserializedPacket::new(big_transfer.clone(), None)?, | ||
]); | ||
let deserialized_packets = transaction_storage | ||
.iter() | ||
.map(|packet| packet.immutable_section().original_packet().clone()) | ||
.collect_vec(); | ||
assert_eq!(3, deserialized_packets.len()); | ||
assert!(deserialized_packets.contains(&small_transfer)); | ||
assert!(deserialized_packets.contains(&vote)); | ||
assert!(deserialized_packets.contains(&big_transfer)); | ||
} | ||
|
||
let retryable_indexes = [1, 2, 3, 5]; | ||
let mut non_retryable_indexes = vec![]; | ||
let f = |start, end| { | ||
non_retryable_indexes.push((start, end)); | ||
}; | ||
BankingStage::filter_processed_packets(retryable_indexes.iter(), f); | ||
assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5)]); | ||
for vote_source in [VoteSource::Gossip, VoteSource::Tpu] { | ||
let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( | ||
Arc::new(LatestUnprocessedVotes::new()), | ||
vote_source, | ||
); | ||
transaction_storage.insert_batch(vec![ | ||
ImmutableDeserializedPacket::new(small_transfer.clone(), None)?, | ||
ImmutableDeserializedPacket::new(vote.clone(), None)?, | ||
ImmutableDeserializedPacket::new(big_transfer.clone(), None)?, | ||
]); | ||
assert_eq!(1, transaction_storage.len()); | ||
} | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this test belongs in unprocessed_transaction_storage.rs
since it's testing those internal methods
InsertPacketBatchSummary { | ||
num_dropped_packets: num_dropped_gossip + num_dropped_tpu, | ||
num_dropped_gossip_vote_packets: num_dropped_gossip, | ||
num_dropped_tpu_vote_packets: num_dropped_tpu, | ||
..InsertPacketBatchSummary::default() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for these structured return values
pub(crate) num_dropped_gossip_vote_packets: usize, | ||
pub(crate) num_dropped_tpu_vote_packets: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be cleaner to just replace these two fields with a VoteBatchInsertionMetrics
since that struct has the same fields.
Alternatively we can have InsertPacketBatchSummary
be an enum of
enum InsertPacketBatchSummary {
VoteBatchInsertionMetrics(metrics),
PacketBatchMetrics(metrics),
}
and just expose a fn total_dropped_packets()
and fn total_dropped_tracer_packets()
method on the enum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea also allows us to remove the (usize, usize) returns in unprocessed_packet_batches.
Pull request has been modified.
24e8041
to
f7a21bf
Compare
f7a21bf
to
648c2ad
Compare
) * Split out voting and banking threads in banking stage Additionally this allows us to aggressively prune the buffer for voting threads as with the new vote state only the latest vote from each validator is necessary. * Update local cluster test to use new Vote ix * Encapsulate transaction storage filtering better * Address pr comments * Commit cargo lock change * clippy * Remove unsafe impls * pr comments * compute_sanitized_transaction -> build_sanitized_transaction * &Arc -> Arc * Move test * Refactor metrics enums * clippy
Additionally this allows us to aggressively prune the buffer for voting threads as with the new vote state only the latest vote from each validator is necessary.
Problem
Summary of Changes
Fixes #