Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Split out voting and banking threads in banking stage #27931

Merged
merged 13 commits into from
Oct 20, 2022

Conversation

AshwinSekar
Copy link
Contributor

Additionally this allows us to aggressively prune the buffer for voting threads as with the new vote state only the latest vote from each validator is necessary.

Problem

Summary of Changes

Fixes #

@AshwinSekar AshwinSekar force-pushed the take-last-vote-banking branch 4 times, most recently from c400a2a to 79a4b3a Compare September 20, 2022 19:39
@nikhayes
Copy link

They should maybe have separate block compute limits too? Right now it seems that there is a potential for non-vote transaction compute to starve out votes due to non votes filling up block compute (see recent discussion in mb planning channel)

@AshwinSekar AshwinSekar force-pushed the take-last-vote-banking branch from 79a4b3a to dd742cf Compare September 23, 2022 18:53
@AshwinSekar AshwinSekar marked this pull request as ready for review September 23, 2022 20:40
@AshwinSekar
Copy link
Contributor Author

They should maybe have separate block compute limits too? Right now it seems that there is a potential for non-vote transaction compute to starve out votes due to non votes filling up block compute (see recent discussion in mb planning channel)

The priority details are being ripped out of vote txs. With this change vote threads and bank threads would compete to add to block however having some sort of separation might be interesting cc @taozhu-chicago .

@@ -648,12 +685,111 @@ impl BankingStage {
(Ok(()), packet_vec_len, Some(leader_pubkey))
}

#[allow(clippy::too_many_arguments)]
fn processing_function(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: processing_function -> do_process_packets

Comment on lines 366 to 383
// Mark so we don't forward again
deserialized_packet.forwarded = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like it should be handled inside filter_valid_packets_for_forwarding, was this a bug with the existing code?

If so, we should check this fix in separately so we don't block the fix behind conflicts with Tao's changes here: #26798

@@ -2054,16 +2058,16 @@ impl BankingStage {
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed);
banking_stage_stats
.current_buffered_packet_batches_count
.swap(buffered_packet_batches.len(), Ordering::Relaxed);
.swap(unprocessed_transaction_storage.len(), Ordering::Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's going on here? this doesn't seem right.

It's not new in this change, but why not just remove the duplicate stat if we can't get an actual number of "packet_batches"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, probably would want to divide by batch_size here, not sure what the original intent of this metric was.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems at some point we had

banking_stage_stats.current_buffered_packets_count.swap(
           buffered_packets.iter().map(|packets| packets.1.len()).sum(),
           Ordering::Relaxed,
);

because back then we stored the packets as batches, however now we use individual packets in the minmaxheap.

I think that means we can deprecate one of these metric points.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree too, time to deprecate some metrics, can only back-fitting them for so long :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agreed, this can be deleted, as well as the corresponding field in the datapoint. The current_buffered_packets_count below this covers the number of packets in the buffer already

@@ -1117,8 +1140,7 @@ impl BankingStage {
&socket,
poh_recorder,
cluster_info,
&mut buffered_packet_batches,
&forward_option,
&mut unprocessed_transaction_storage,
transaction_status_sender.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use a reference here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are you referring to?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ha sorry, that comment wasn't clear.
You changed a bunch of the transaction_status_sender args into references. But here we are cloning, so just wondering if you missed this one or if there's a reason we need to clone it here.

)
}

fn filter_forwardable_packets_and_add_batches(
&mut self,
forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts,
) -> FilterForwardingResults {
BankingStage::filter_valid_packets_for_forwarding(
let results = BankingStage::filter_valid_packets_for_forwarding(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we've got a circular dependency with BankingStage and UnprocessedTransactionStorage. Might be better to move the filter_valid_packets_for_forwarding here. Since this is the only place it's used.

@@ -25,6 +23,9 @@ pub enum UnprocessedTransactionStorage {
LocalTransactionStorage(ThreadLocalUnprocessedPackets),
}

unsafe impl Send for UnprocessedTransactionStorage {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced we can actually do this w/o introducing race conditions on the reference counts for votes.

Even though we wrap in an RwLock in the VoteStorage structure, I think it's possible another thread could drop or clone the Rc from another reference to it. Then because it is non-atomically updated, we end up with a miscounted Rc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, in my local changes for the tx-scheduler, I am actually sharing these references between threads, and converted the Rc to Arc.

It seems it'd be useful if I split that change out so we could use it here. I'll do that shortly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ I split out #28145 if we go that route here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this only works in this specific situation where there is only 1 thread dropping the Rc - even then it's pretty sketch. I would also prefer it to be an Arc. I can hold off and wait for #28145 .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AshwinSekar, #28145 is merged

&mut self,
packet_batch: &PacketBatch,
packet_indexes: &[usize],
deserialized_packets: Vec<ImmutableDeserializedPacket>,
) -> InsertPacketBatchesSummary {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming-nit: we insert a "batch" but get "batches" summary back.

) -> InsertPacketBatchesSummary {
match self {
Self::VoteStorage(vote_storage) => {
let VoteBatchInsertionMetrics {
num_dropped_gossip,
num_dropped_tpu,
} = vote_storage.deserialize_and_insert_batch(packet_batch, packet_indexes);
} = vote_storage.insert_batch(deserialized_packets);
InsertPacketBatchesSummary {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not have the respective insert_batch functions just return the summary struct directly?

@apfitzge
Copy link
Contributor

Was bench-tps run to compare before/after this change?

@AshwinSekar AshwinSekar force-pushed the take-last-vote-banking branch from dd742cf to 4e8c730 Compare September 30, 2022 00:30
}
}

/// Filter out packets that fail to sanitize, or are no longer valid (could be
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this code is moved (untouched) to unprocessed_transaction_storage for better encapsulation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a separate PR as well I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caused some issues because we didn't have an unprocessed_transaction_storage to invoke on, we're still using packet batches on master.

@@ -2057,35 +1804,6 @@ impl BankingStage {
.collect_vec()
}

/// Checks a batch of sanitized transactions again bank for age and status
fn bank_check_transactions(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was moved to the bank itself.


Self::filter_valid_transaction_indexes(&results, transaction_to_packet_indexes)
}

fn filter_processed_packets<'a, F>(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to unprocessed_transaction_storage

@@ -3466,144 +3170,6 @@ mod tests {
Blockstore::destroy(ledger_path.path()).unwrap();
}

#[test]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test also moved to unprocessed_transaction_storage

// messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned
// with their packet indexes.
#[allow(clippy::needless_collect)]
pub fn transaction_from_deserialized_packet(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this to ImmutableDeserializedPacket

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wish this is in a separate PR that can be easily merged, so I can use it in my current PR now 😃 But all good, can be merge later

.collect();
vote_slots.reverse();
let vote_tx = vote_transaction::new_vote_transaction(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to new vote instruction

@AshwinSekar
Copy link
Contributor Author

Was bench-tps run to compare before/after this change?

Before

test bench_banking_stage_multi_accounts             ... bench:  11,967,033 ns/iter (+/- 4,120,951)
test bench_banking_stage_multi_accounts_with_voting ... bench:  15,190,062 ns/iter (+/- 1,918,300)
test bench_banking_stage_multi_programs             ... bench:  15,294,097 ns/iter (+/- 3,577,250)
test bench_banking_stage_multi_programs_with_voting ... bench:  22,088,729 ns/iter (+/- 2,597,948)

After

test bench_banking_stage_multi_accounts             ... bench:  11,092,034 ns/iter (+/- 2,780,082)
test bench_banking_stage_multi_accounts_with_voting ... bench:  15,460,632 ns/iter (+/- 1,716,954)
test bench_banking_stage_multi_programs             ... bench:  15,422,798 ns/iter (+/- 2,654,561)
test bench_banking_stage_multi_programs_with_voting ... bench:  21,099,046 ns/iter (+/- 1,589,215)

@AshwinSekar AshwinSekar force-pushed the take-last-vote-banking branch 2 times, most recently from cdc2706 to 53a11a0 Compare October 5, 2022 21:50
@AshwinSekar AshwinSekar force-pushed the take-last-vote-banking branch from 53a11a0 to 8d7f0c6 Compare October 11, 2022 18:20
Additionally this allows us to aggressively prune the buffer for voting threads
as with the new vote state only the latest vote from each validator is
necessary.
@AshwinSekar AshwinSekar force-pushed the take-last-vote-banking branch from 8d7f0c6 to 073de96 Compare October 14, 2022 19:37
apfitzge
apfitzge previously approved these changes Oct 15, 2022
Copy link
Contributor

@apfitzge apfitzge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Only minor comments. Still wait for feedback from Carl or Tao - bigger change should have more than just my eyes on it

.map(|bank_forks| {
let bank = bank_forks.root_bank();
bank.feature_set
.is_active(&allow_votes_to_directly_update_vote_state::id())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any concern to use a feature check only at start up time to choose how we initialize? The feature gets activated, but then that changes how we restart.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah originally we were considering a hot swap approach where we check root bank every loop to decide however this seemed cumbersome. Not to mention that this hot swap would then require another feature in order to activate and with the rate that feature activations seem to be going it would be quicker to just wait for a restart.


// This function deserializes packets into transactions, computes the blake3 hash of transaction
// messages, and verifies secp256k1 instructions.
pub fn compute_sanitized_transaction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: compute seems like an odd name. build?

@@ -240,12 +230,12 @@ impl LatestUnprocessedVotes {
/// Votes from validators with 0 stakes are ignored
pub fn get_and_insert_forwardable_packets(
&self,
bank: Arc<Bank>,
bank: &Arc<Bank>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: &Bank. don't need the Arc

tao-stones
tao-stones previously approved these changes Oct 16, 2022
Copy link
Contributor

@tao-stones tao-stones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me, wish some of refactoring were in their own PRs, but OK with this. There are other comments/nits to address, otherwise good to ge

// messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned
// with their packet indexes.
#[allow(clippy::needless_collect)]
pub fn transaction_from_deserialized_packet(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wish this is in a separate PR that can be easily merged, so I can use it in my current PR now 😃 But all good, can be merge later

@@ -104,7 +107,7 @@ fn insert_packet_batches(
#[allow(clippy::unit_arg)]
fn bench_packet_clone(bencher: &mut Bencher) {
let batch_count = 1000;
let packet_per_batch_count = 128;
let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for using proper const here

Comment on lines +57 to +59
| Ok(VoteInstruction::UpdateVoteStateSwitch(vote_state_update, _))
| Ok(VoteInstruction::CompactUpdateVoteState(vote_state_update))
| Ok(VoteInstruction::CompactUpdateVoteStateSwitch(vote_state_update, _)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we introduce a is_single_vote_update() on VoteInstruction that returns true/false for each of these enum variations? That way we don't miss these cases in the future.

Can be a separate PR as well, since this seems like a bug in the existing code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will address in subsequent pr

Comment on lines +353 to +361
fn deserialize_packets<'a>(
packet_batch: &'a PacketBatch,
packet_indexes: &'a [usize],
vote_source: VoteSource,
) -> impl Iterator<Item = LatestValidatorVotePacket> + 'a {
packet_indexes.iter().filter_map(move |packet_index| {
LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source).ok()
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function seems unchanged, can we move this back where it was before to make the diff smaller

}
}

/// Filter out packets that fail to sanitize, or are no longer valid (could be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a separate PR as well I think

@@ -2054,16 +2058,16 @@ impl BankingStage {
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed);
banking_stage_stats
.current_buffered_packet_batches_count
.swap(buffered_packet_batches.len(), Ordering::Relaxed);
.swap(unprocessed_transaction_storage.len(), Ordering::Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agreed, this can be deleted, as well as the corresponding field in the datapoint. The current_buffered_packets_count below this covers the number of packets in the buffer already

Comment on lines 4250 to 4313
fn test_unprocessed_transaction_storage_insert() -> Result<(), Box<dyn Error>> {
let keypair = Keypair::new();
let vote_keypair = Keypair::new();
let pubkey = solana_sdk::pubkey::new_rand();

let retryable_indexes = [0, 1, 2, 3, 5];
let mut non_retryable_indexes = vec![];
let f = |start, end| {
non_retryable_indexes.push((start, end));
};
BankingStage::filter_processed_packets(retryable_indexes.iter(), f);
assert_eq!(non_retryable_indexes, vec![(4, 5)]);
let small_transfer = Packet::from_data(
None,
system_transaction::transfer(&keypair, &pubkey, 1, Hash::new_unique()),
)?;
let mut vote = Packet::from_data(
None,
new_vote_state_update_transaction(
VoteStateUpdate::default(),
Hash::new_unique(),
&keypair,
&vote_keypair,
&vote_keypair,
None,
),
)?;
vote.meta.flags.set(PacketFlags::SIMPLE_VOTE_TX, true);
let big_transfer = Packet::from_data(
None,
system_transaction::transfer(&keypair, &pubkey, 1000000, Hash::new_unique()),
)?;

let retryable_indexes = [1, 2, 3];
let mut non_retryable_indexes = vec![];
let f = |start, end| {
non_retryable_indexes.push((start, end));
};
BankingStage::filter_processed_packets(retryable_indexes.iter(), f);
assert_eq!(non_retryable_indexes, vec![(0, 1)]);
for thread_type in [
ThreadType::Transactions,
ThreadType::Voting(VoteSource::Gossip),
ThreadType::Voting(VoteSource::Tpu),
] {
let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::with_capacity(100),
thread_type,
);
transaction_storage.insert_batch(vec![
ImmutableDeserializedPacket::new(small_transfer.clone(), None)?,
ImmutableDeserializedPacket::new(vote.clone(), None)?,
ImmutableDeserializedPacket::new(big_transfer.clone(), None)?,
]);
let deserialized_packets = transaction_storage
.iter()
.map(|packet| packet.immutable_section().original_packet().clone())
.collect_vec();
assert_eq!(3, deserialized_packets.len());
assert!(deserialized_packets.contains(&small_transfer));
assert!(deserialized_packets.contains(&vote));
assert!(deserialized_packets.contains(&big_transfer));
}

let retryable_indexes = [1, 2, 3, 5];
let mut non_retryable_indexes = vec![];
let f = |start, end| {
non_retryable_indexes.push((start, end));
};
BankingStage::filter_processed_packets(retryable_indexes.iter(), f);
assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5)]);
for vote_source in [VoteSource::Gossip, VoteSource::Tpu] {
let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage(
Arc::new(LatestUnprocessedVotes::new()),
vote_source,
);
transaction_storage.insert_batch(vec![
ImmutableDeserializedPacket::new(small_transfer.clone(), None)?,
ImmutableDeserializedPacket::new(vote.clone(), None)?,
ImmutableDeserializedPacket::new(big_transfer.clone(), None)?,
]);
assert_eq!(1, transaction_storage.len());
}
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this test belongs in unprocessed_transaction_storage.rs since it's testing those internal methods

Comment on lines 248 to 253
InsertPacketBatchSummary {
num_dropped_packets: num_dropped_gossip + num_dropped_tpu,
num_dropped_gossip_vote_packets: num_dropped_gossip,
num_dropped_tpu_vote_packets: num_dropped_tpu,
..InsertPacketBatchSummary::default()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for these structured return values

Comment on lines 53 to 54
pub(crate) num_dropped_gossip_vote_packets: usize,
pub(crate) num_dropped_tpu_vote_packets: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be cleaner to just replace these two fields with a VoteBatchInsertionMetrics since that struct has the same fields.

Alternatively we can have InsertPacketBatchSummary be an enum of

enum  InsertPacketBatchSummary {
      VoteBatchInsertionMetrics(metrics),
      PacketBatchMetrics(metrics),
}

and just expose a fn total_dropped_packets() and fn total_dropped_tracer_packets() method on the enum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea also allows us to remove the (usize, usize) returns in unprocessed_packet_batches.

@mergify mergify bot dismissed stale reviews from tao-stones and apfitzge October 19, 2022 04:04

Pull request has been modified.

@AshwinSekar AshwinSekar force-pushed the take-last-vote-banking branch from 24e8041 to f7a21bf Compare October 20, 2022 18:09
@AshwinSekar AshwinSekar force-pushed the take-last-vote-banking branch from f7a21bf to 648c2ad Compare October 20, 2022 19:17
@AshwinSekar AshwinSekar added the automerge Merge this Pull Request automatically once CI passes label Oct 20, 2022
@mergify mergify bot merged commit f207af7 into solana-labs:master Oct 20, 2022
gnapoli23 pushed a commit to gnapoli23/solana that referenced this pull request Dec 16, 2022
)

* Split out voting and banking threads in banking stage

Additionally this allows us to aggressively prune the buffer for voting threads
as with the new vote state only the latest vote from each validator is
necessary.

* Update local cluster test to use new Vote ix

* Encapsulate transaction storage filtering better

* Address pr comments

* Commit cargo lock change

* clippy

* Remove unsafe impls

* pr comments

* compute_sanitized_transaction -> build_sanitized_transaction

* &Arc -> Arc

* Move test

* Refactor metrics enums

* clippy
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
automerge Merge this Pull Request automatically once CI passes
Projects
Development

Successfully merging this pull request may close these issues.

5 participants