Skip to content

Commit

Permalink
fix: ensure vote packets can be retried (#2605)
Browse files Browse the repository at this point in the history
  • Loading branch information
jstarry authored Aug 15, 2024
1 parent f46fda3 commit ecb44d7
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 48 deletions.
114 changes: 81 additions & 33 deletions core/src/banking_stage/latest_unprocessed_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use {
},
solana_vote_program::vote_instruction::VoteInstruction,
std::{
cmp,
collections::HashMap,
ops::DerefMut,
sync::{
Expand Down Expand Up @@ -174,12 +175,13 @@ impl LatestUnprocessedVotes {
pub(crate) fn insert_batch(
&self,
votes: impl Iterator<Item = LatestValidatorVotePacket>,
should_replenish_taken_votes: bool,
) -> VoteBatchInsertionMetrics {
let mut num_dropped_gossip = 0;
let mut num_dropped_tpu = 0;

for vote in votes {
if let Some(vote) = self.update_latest_vote(vote) {
if let Some(vote) = self.update_latest_vote(vote, should_replenish_taken_votes) {
match vote.vote_source {
VoteSource::Gossip => num_dropped_gossip += 1,
VoteSource::Tpu => num_dropped_tpu += 1,
Expand Down Expand Up @@ -207,26 +209,41 @@ impl LatestUnprocessedVotes {
pub fn update_latest_vote(
&self,
vote: LatestValidatorVotePacket,
should_replenish_taken_votes: bool,
) -> Option<LatestValidatorVotePacket> {
let pubkey = vote.pubkey();
let slot = vote.slot();
let timestamp = vote.timestamp();

// Allow votes for later slots or the same slot with later timestamp (refreshed votes)
// We directly compare as options to prioritize votes for same slot with timestamp as
// Some > None
let allow_update = |latest_vote: &LatestValidatorVotePacket| -> bool {
match slot.cmp(&latest_vote.slot()) {
cmp::Ordering::Less => return false,
cmp::Ordering::Greater => return true,
cmp::Ordering::Equal => {}
};

// Slots are equal, now check timestamp
match timestamp.cmp(&latest_vote.timestamp()) {
cmp::Ordering::Less => return false,
cmp::Ordering::Greater => return true,
cmp::Ordering::Equal => {}
};

// Timestamps are equal, lastly check if vote was taken previously
// and should be replenished
should_replenish_taken_votes && latest_vote.is_vote_taken()
};

let with_latest_vote = |latest_vote: &RwLock<LatestValidatorVotePacket>,
vote: LatestValidatorVotePacket|
-> Option<LatestValidatorVotePacket> {
let (latest_slot, latest_timestamp) = latest_vote
.read()
.map(|vote| (vote.slot(), vote.timestamp()))
.unwrap();
// Allow votes for later slots or the same slot with later timestamp (refreshed votes)
// We directly compare as options to prioritize votes for same slot with timestamp as
// Some > None
if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) {
let should_try_update = allow_update(&latest_vote.read().unwrap());
if should_try_update {
let mut latest_vote = latest_vote.write().unwrap();
let latest_slot = latest_vote.slot();
let latest_timestamp = latest_vote.timestamp();
if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) {
if allow_update(&latest_vote) {
let old_vote = std::mem::replace(latest_vote.deref_mut(), vote);
if old_vote.is_vote_taken() {
self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -573,10 +590,10 @@ mod tests {
);

assert!(latest_unprocessed_votes
.update_latest_vote(vote_a)
.update_latest_vote(vote_a, false /* should replenish */)
.is_none());
assert!(latest_unprocessed_votes
.update_latest_vote(vote_b)
.update_latest_vote(vote_b, false /* should replenish */)
.is_none());
assert_eq!(2, latest_unprocessed_votes.len());

Expand Down Expand Up @@ -606,15 +623,15 @@ mod tests {
assert_eq!(
1,
latest_unprocessed_votes
.update_latest_vote(vote_a)
.update_latest_vote(vote_a, false /* should replenish */)
.unwrap()
.slot
);
// Drop current vote
assert_eq!(
6,
latest_unprocessed_votes
.update_latest_vote(vote_b)
.update_latest_vote(vote_b, false /* should replenish */)
.unwrap()
.slot
);
Expand All @@ -634,8 +651,8 @@ mod tests {
&keypair_b,
None,
);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);

assert_eq!(2, latest_unprocessed_votes.len());
assert_eq!(
Expand Down Expand Up @@ -664,8 +681,8 @@ mod tests {
&keypair_b,
Some(2),
);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);

assert_eq!(2, latest_unprocessed_votes.len());
assert_eq!(
Expand All @@ -690,8 +707,8 @@ mod tests {
&keypair_b,
Some(6),
);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);

assert_eq!(2, latest_unprocessed_votes.len());
assert_eq!(
Expand All @@ -716,8 +733,10 @@ mod tests {
&keypair_b,
Some(3),
);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes
.update_latest_vote(vote_a.clone(), false /* should replenish */);
latest_unprocessed_votes
.update_latest_vote(vote_b.clone(), false /* should replenish */);

assert_eq!(2, latest_unprocessed_votes.len());
assert_eq!(
Expand All @@ -728,6 +747,33 @@ mod tests {
Some(6),
latest_unprocessed_votes.get_latest_timestamp(keypair_b.node_keypair.pubkey())
);

// Drain all latest votes
for packet in latest_unprocessed_votes
.latest_votes_per_pubkey
.read()
.unwrap()
.values()
{
packet.write().unwrap().take_vote().inspect(|_vote| {
latest_unprocessed_votes
.num_unprocessed_votes
.fetch_sub(1, Ordering::Relaxed);
});
}
assert_eq!(0, latest_unprocessed_votes.len());

// Same votes with same timestamps should not replenish without flag
latest_unprocessed_votes
.update_latest_vote(vote_a.clone(), false /* should replenish */);
latest_unprocessed_votes
.update_latest_vote(vote_b.clone(), false /* should replenish */);
assert_eq!(0, latest_unprocessed_votes.len());

// Same votes with same timestamps should replenish with the flag
latest_unprocessed_votes.update_latest_vote(vote_a, true /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, true /* should replenish */);
assert_eq!(0, latest_unprocessed_votes.len());
}

#[test]
Expand All @@ -748,7 +794,7 @@ mod tests {
keypairs: &Arc<Vec<ValidatorVoteKeypairs>>,
i: usize| {
let vote = from_slots(vec![(i as u64, 1)], VoteSource::Gossip, &keypairs[i], None);
latest_unprocessed_votes.update_latest_vote(vote);
latest_unprocessed_votes.update_latest_vote(vote, false /* should replenish */);
};

let hdl = Builder::new()
Expand Down Expand Up @@ -793,7 +839,8 @@ mod tests {
&keypairs[rng.gen_range(0..10)],
None,
);
latest_unprocessed_votes.update_latest_vote(vote);
latest_unprocessed_votes
.update_latest_vote(vote, false /* should replenish */);
}
})
.unwrap();
Expand All @@ -808,7 +855,8 @@ mod tests {
&keypairs_tpu[rng.gen_range(0..10)],
None,
);
latest_unprocessed_votes_tpu.update_latest_vote(vote);
latest_unprocessed_votes_tpu
.update_latest_vote(vote, false /* should replenish */);
if i % 214 == 0 {
// Simulate draining and processing packets
let latest_votes_per_pubkey = latest_unprocessed_votes_tpu
Expand Down Expand Up @@ -844,8 +892,8 @@ mod tests {

let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None);
let vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None);
latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);

// Don't forward 0 stake accounts
let forwarded = latest_unprocessed_votes
Expand Down Expand Up @@ -939,10 +987,10 @@ mod tests {
let vote_c = from_slots(vec![(3, 1)], VoteSource::Tpu, &keypair_c, None);
let vote_d = from_slots(vec![(4, 1)], VoteSource::Gossip, &keypair_d, None);

latest_unprocessed_votes.update_latest_vote(vote_a);
latest_unprocessed_votes.update_latest_vote(vote_b);
latest_unprocessed_votes.update_latest_vote(vote_c);
latest_unprocessed_votes.update_latest_vote(vote_d);
latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_c, false /* should replenish */);
latest_unprocessed_votes.update_latest_vote(vote_d, false /* should replenish */);
assert_eq!(4, latest_unprocessed_votes.len());

latest_unprocessed_votes.clear_forwarded_packets();
Expand Down
86 changes: 71 additions & 15 deletions core/src/banking_stage/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,18 +443,18 @@ impl VoteStorage {
&mut self,
deserialized_packets: Vec<ImmutableDeserializedPacket>,
) -> VoteBatchInsertionMetrics {
self.latest_unprocessed_votes
.insert_batch(
deserialized_packets
.into_iter()
.filter_map(|deserialized_packet| {
LatestValidatorVotePacket::new_from_immutable(
Arc::new(deserialized_packet),
self.vote_source,
)
.ok()
}),
)
self.latest_unprocessed_votes.insert_batch(
deserialized_packets
.into_iter()
.filter_map(|deserialized_packet| {
LatestValidatorVotePacket::new_from_immutable(
Arc::new(deserialized_packet),
self.vote_source,
)
.ok()
}),
false, // should_replenish_taken_votes
)
}

fn filter_forwardable_packets_and_add_batches(
Expand Down Expand Up @@ -525,12 +525,15 @@ impl VoteStorage {
)
.ok()
}),
true, // should_replenish_taken_votes
);
} else {
self.latest_unprocessed_votes
.insert_batch(vote_packets.into_iter().filter_map(|packet| {
self.latest_unprocessed_votes.insert_batch(
vote_packets.into_iter().filter_map(|packet| {
LatestValidatorVotePacket::new_from_immutable(packet, self.vote_source).ok()
}));
}),
true, // should_replenish_taken_votes
);
}
}

Expand Down Expand Up @@ -988,6 +991,7 @@ mod tests {
super::*,
solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
solana_perf::packet::{Packet, PacketFlags},
solana_runtime::genesis_utils,
solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
Expand Down Expand Up @@ -1256,6 +1260,58 @@ mod tests {
Ok(())
}

#[test]
fn test_process_packets_retryable_indexes_reinserted() -> Result<(), Box<dyn Error>> {
let node_keypair = Keypair::new();
let genesis_config =
genesis_utils::create_genesis_config_with_leader(100, &node_keypair.pubkey(), 200)
.genesis_config;
let (bank, _bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config);
let vote_keypair = Keypair::new();
let mut vote = Packet::from_data(
None,
new_tower_sync_transaction(
TowerSync::default(),
Hash::new_unique(),
&node_keypair,
&vote_keypair,
&vote_keypair,
None,
),
)?;
vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true);

let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage(
Arc::new(LatestUnprocessedVotes::new()),
VoteSource::Tpu,
);

transaction_storage.insert_batch(vec![ImmutableDeserializedPacket::new(vote.clone())?]);
assert_eq!(1, transaction_storage.len());

// When processing packets, return all packets as retryable so that they
// are reinserted into storage
let _ = transaction_storage.process_packets(
bank.clone(),
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
|packets, _payload| {
// Return all packets indexes as retryable
Some(
packets
.iter()
.enumerate()
.map(|(index, _packet)| index)
.collect_vec(),
)
},
);

// All packets should remain in the transaction storage
assert_eq!(1, transaction_storage.len());
Ok(())
}

#[test]
fn test_prepare_packets_to_forward() {
solana_logger::setup();
Expand Down

0 comments on commit ecb44d7

Please sign in to comment.