Skip to content

Commit

Permalink
fix: ensure vote packets can be retried (#2605)
Browse files Browse the repository at this point in the history
(cherry picked from commit ecb44d7)

# Conflicts:
#	core/src/banking_stage/latest_unprocessed_votes.rs
  • Loading branch information
jstarry authored and mergify[bot] committed Aug 15, 2024
1 parent 34409e7 commit 8fc0961
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 39 deletions.
155 changes: 131 additions & 24 deletions core/src/banking_stage/latest_unprocessed_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use {
},
solana_vote_program::vote_instruction::VoteInstruction,
std::{
cmp,
collections::HashMap,
ops::DerefMut,
sync::{Arc, RwLock},
Expand Down Expand Up @@ -168,12 +169,13 @@ impl LatestUnprocessedVotes {
pub(crate) fn insert_batch(
&self,
votes: impl Iterator<Item = LatestValidatorVotePacket>,
should_replenish_taken_votes: bool,
) -> VoteBatchInsertionMetrics {
let mut num_dropped_gossip = 0;
let mut num_dropped_tpu = 0;

for vote in votes {
if let Some(vote) = self.update_latest_vote(vote) {
if let Some(vote) = self.update_latest_vote(vote, should_replenish_taken_votes) {
match vote.vote_source {
VoteSource::Gossip => num_dropped_gossip += 1,
VoteSource::Tpu => num_dropped_tpu += 1,
Expand Down Expand Up @@ -201,10 +203,12 @@ impl LatestUnprocessedVotes {
pub fn update_latest_vote(
&self,
vote: LatestValidatorVotePacket,
should_replenish_taken_votes: bool,
) -> Option<LatestValidatorVotePacket> {
let pubkey = vote.pubkey();
let slot = vote.slot();
let timestamp = vote.timestamp();
<<<<<<< HEAD
if let Some(latest_vote) = self.get_entry(pubkey) {
let (latest_slot, latest_timestamp) = latest_vote
.read()
Expand All @@ -214,10 +218,38 @@ impl LatestUnprocessedVotes {
// We directly compare as options to prioritize votes for same slot with timestamp as
// Some > None
if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) {
=======

// Allow votes for later slots or the same slot with later timestamp (refreshed votes)
// We directly compare as options to prioritize votes for same slot with timestamp as
// Some > None
let allow_update = |latest_vote: &LatestValidatorVotePacket| -> bool {
match slot.cmp(&latest_vote.slot()) {
cmp::Ordering::Less => return false,
cmp::Ordering::Greater => return true,
cmp::Ordering::Equal => {}
};

// Slots are equal, now check timestamp
match timestamp.cmp(&latest_vote.timestamp()) {
cmp::Ordering::Less => return false,
cmp::Ordering::Greater => return true,
cmp::Ordering::Equal => {}
};

// Timestamps are equal, lastly check if vote was taken previously
// and should be replenished
should_replenish_taken_votes && latest_vote.is_vote_taken()
};

let with_latest_vote = |latest_vote: &RwLock<LatestValidatorVotePacket>,
vote: LatestValidatorVotePacket|
-> Option<LatestValidatorVotePacket> {
let should_try_update = allow_update(&latest_vote.read().unwrap());
if should_try_update {
>>>>>>> ecb44d7bd7 (fix: ensure vote packets can be retried (#2605))
let mut latest_vote = latest_vote.write().unwrap();
let latest_slot = latest_vote.slot();
let latest_timestamp = latest_vote.timestamp();
if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) {
if allow_update(&latest_vote) {
let old_vote = std::mem::replace(latest_vote.deref_mut(), vote);
if old_vote.is_vote_taken() {
return None;
Expand Down Expand Up @@ -521,10 +553,10 @@ mod tests {
);

assert!(latest_unprocessed_votes
.update_latest_vote(vote_a)
.update_latest_vote(vote_a, false /* should replenish */)
.is_none());
assert!(latest_unprocessed_votes
.update_latest_vote(vote_b)
.update_latest_vote(vote_b, false /* should replenish */)
.is_none());
assert_eq!(2, latest_unprocessed_votes.len());

Expand Down Expand Up @@ -554,15 +586,15 @@ mod tests {
assert_eq!(
1,
latest_unprocessed_votes
.update_latest_vote(vote_a)
.update_latest_vote(vote_a, false /* should replenish */)
.unwrap()
.slot
);
// Drop current vote
assert_eq!(
6,
latest_unprocessed_votes
.update_latest_vote(vote_b)
.update_latest_vote(vote_b, false /* should replenish */)
.unwrap()
.slot
);
Expand All @@ -582,8 +614,8 @@ mod tests {
&keypair_b,
None,
);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);

assert_eq!(2, latest_unprocessed_votes.len());
assert_eq!(
Expand Down Expand Up @@ -612,8 +644,8 @@ mod tests {
&keypair_b,
Some(2),
);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);

assert_eq!(2, latest_unprocessed_votes.len());
assert_eq!(
Expand All @@ -638,8 +670,8 @@ mod tests {
&keypair_b,
Some(6),
);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);

assert_eq!(2, latest_unprocessed_votes.len());
assert_eq!(
Expand All @@ -664,8 +696,10 @@ mod tests {
&keypair_b,
Some(3),
);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes
.update_latest_vote(vote_a.clone(), false /* should replenish */);
latest_unprocessed_votes
.update_latest_vote(vote_b.clone(), false /* should replenish */);

assert_eq!(2, latest_unprocessed_votes.len());
assert_eq!(
Expand All @@ -676,9 +710,80 @@ mod tests {
Some(6),
latest_unprocessed_votes.get_latest_timestamp(keypair_b.node_keypair.pubkey())
);

// Drain all latest votes
for packet in latest_unprocessed_votes
.latest_votes_per_pubkey
.read()
.unwrap()
.values()
{
packet.write().unwrap().take_vote().inspect(|_vote| {
latest_unprocessed_votes
.num_unprocessed_votes
.fetch_sub(1, Ordering::Relaxed);
});
}
assert_eq!(0, latest_unprocessed_votes.len());

// Same votes with same timestamps should not replenish without flag
latest_unprocessed_votes
.update_latest_vote(vote_a.clone(), false /* should replenish */);
latest_unprocessed_votes
.update_latest_vote(vote_b.clone(), false /* should replenish */);
assert_eq!(0, latest_unprocessed_votes.len());

// Same votes with same timestamps should replenish with the flag
latest_unprocessed_votes.update_latest_vote(vote_a, true /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, true /* should replenish */);
assert_eq!(0, latest_unprocessed_votes.len());
}

#[test]
<<<<<<< HEAD
=======
fn test_update_latest_vote_race() {
// There was a race condition in updating the same pubkey in the hashmap
// when the entry does not initially exist.
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());

const NUM_VOTES: usize = 100;
let keypairs = Arc::new(
(0..NUM_VOTES)
.map(|_| ValidatorVoteKeypairs::new_rand())
.collect_vec(),
);

// Insert votes in parallel
let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes,
keypairs: &Arc<Vec<ValidatorVoteKeypairs>>,
i: usize| {
let vote = from_slots(vec![(i as u64, 1)], VoteSource::Gossip, &keypairs[i], None);
latest_unprocessed_votes.update_latest_vote(vote, false /* should replenish */);
};

let hdl = Builder::new()
.spawn({
let latest_unprocessed_votes = latest_unprocessed_votes.clone();
let keypairs = keypairs.clone();
move || {
for i in 0..NUM_VOTES {
insert_vote(&latest_unprocessed_votes, &keypairs, i);
}
}
})
.unwrap();

for i in 0..NUM_VOTES {
insert_vote(&latest_unprocessed_votes, &keypairs, i);
}

hdl.join().unwrap();
assert_eq!(NUM_VOTES, latest_unprocessed_votes.len());
}

#[test]
>>>>>>> ecb44d7bd7 (fix: ensure vote packets can be retried (#2605))
fn test_simulate_threads() {
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());
let latest_unprocessed_votes_tpu = latest_unprocessed_votes.clone();
Expand All @@ -700,7 +805,8 @@ mod tests {
&keypairs[rng.gen_range(0..10)],
None,
);
latest_unprocessed_votes.update_latest_vote(vote);
latest_unprocessed_votes
.update_latest_vote(vote, false /* should replenish */);
}
})
.unwrap();
Expand All @@ -715,7 +821,8 @@ mod tests {
&keypairs_tpu[rng.gen_range(0..10)],
None,
);
latest_unprocessed_votes_tpu.update_latest_vote(vote);
latest_unprocessed_votes_tpu
.update_latest_vote(vote, false /* should replenish */);
if i % 214 == 0 {
// Simulate draining and processing packets
let latest_votes_per_pubkey = latest_unprocessed_votes_tpu
Expand Down Expand Up @@ -748,8 +855,8 @@ mod tests {

let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None);
let vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);

// Don't forward 0 stake accounts
let forwarded = latest_unprocessed_votes
Expand Down Expand Up @@ -843,10 +950,10 @@ mod tests {
let vote_c = from_slots(vec![(3, 1)], VoteSource::Tpu, &keypair_c, None);
let vote_d = from_slots(vec![(4, 1)], VoteSource::Gossip, &keypair_d, None);

latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_c);
latest_unprocessed_votes.update_latest_vote(vote_d);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_c, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_d, false /* should replenish */);
assert_eq!(4, latest_unprocessed_votes.len());

latest_unprocessed_votes.clear_forwarded_packets();
Expand Down
86 changes: 71 additions & 15 deletions core/src/banking_stage/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,18 +421,18 @@ impl VoteStorage {
&mut self,
deserialized_packets: Vec<ImmutableDeserializedPacket>,
) -> VoteBatchInsertionMetrics {
self.latest_unprocessed_votes
.insert_batch(
deserialized_packets
.into_iter()
.filter_map(|deserialized_packet| {
LatestValidatorVotePacket::new_from_immutable(
Arc::new(deserialized_packet),
self.vote_source,
)
.ok()
}),
)
self.latest_unprocessed_votes.insert_batch(
deserialized_packets
.into_iter()
.filter_map(|deserialized_packet| {
LatestValidatorVotePacket::new_from_immutable(
Arc::new(deserialized_packet),
self.vote_source,
)
.ok()
}),
false, // should_replenish_taken_votes
)
}

fn filter_forwardable_packets_and_add_batches(
Expand Down Expand Up @@ -503,12 +503,15 @@ impl VoteStorage {
)
.ok()
}),
true, // should_replenish_taken_votes
);
} else {
self.latest_unprocessed_votes
.insert_batch(vote_packets.into_iter().filter_map(|packet| {
self.latest_unprocessed_votes.insert_batch(
vote_packets.into_iter().filter_map(|packet| {
LatestValidatorVotePacket::new_from_immutable(packet, self.vote_source).ok()
}));
}),
true, // should_replenish_taken_votes
);
}
}

Expand Down Expand Up @@ -968,6 +971,7 @@ mod tests {
super::*,
solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
solana_perf::packet::{Packet, PacketFlags},
solana_runtime::genesis_utils,
solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
Expand Down Expand Up @@ -1236,6 +1240,58 @@ mod tests {
Ok(())
}

#[test]
fn test_process_packets_retryable_indexes_reinserted() -> Result<(), Box<dyn Error>> {
let node_keypair = Keypair::new();
let genesis_config =
genesis_utils::create_genesis_config_with_leader(100, &node_keypair.pubkey(), 200)
.genesis_config;
let (bank, _bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config);
let vote_keypair = Keypair::new();
let mut vote = Packet::from_data(
None,
new_tower_sync_transaction(
TowerSync::default(),
Hash::new_unique(),
&node_keypair,
&vote_keypair,
&vote_keypair,
None,
),
)?;
vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true);

let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage(
Arc::new(LatestUnprocessedVotes::new()),
VoteSource::Tpu,
);

transaction_storage.insert_batch(vec![ImmutableDeserializedPacket::new(vote.clone())?]);
assert_eq!(1, transaction_storage.len());

// When processing packets, return all packets as retryable so that they
// are reinserted into storage
let _ = transaction_storage.process_packets(
bank.clone(),
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
|packets, _payload| {
// Return all packets indexes as retryable
Some(
packets
.iter()
.enumerate()
.map(|(index, _packet)| index)
.collect_vec(),
)
},
);

// All packets should remain in the transaction storage
assert_eq!(1, transaction_storage.len());
Ok(())
}

#[test]
fn test_prepare_packets_to_forward() {
solana_logger::setup();
Expand Down

0 comments on commit 8fc0961

Please sign in to comment.