Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use node's latest vote for commitment calc. too #1964

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 67 additions & 33 deletions core/src/commitment_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
bank::Bank,
commitment::{BlockCommitment, BlockCommitmentCache, CommitmentSlots, VOTE_THRESHOLD_SIZE},
},
solana_sdk::clock::Slot,
solana_sdk::{clock::Slot, pubkey::Pubkey},
solana_vote_program::vote_state::VoteState,
std::{
cmp::max,
Expand All @@ -26,14 +26,23 @@ pub struct CommitmentAggregationData {
bank: Arc<Bank>,
root: Slot,
total_stake: Stake,
// The latest local vote state of the node running this service.
// Used for commitment aggregation if the node's vote account is staked.
node_vote_state: (Pubkey, VoteState),
}

impl CommitmentAggregationData {
pub fn new(bank: Arc<Bank>, root: Slot, total_stake: Stake) -> Self {
pub fn new(
bank: Arc<Bank>,
root: Slot,
total_stake: Stake,
node_vote_state: (Pubkey, VoteState),
) -> Self {
Self {
bank,
root,
total_stake,
node_vote_state,
}
}
}
Expand Down Expand Up @@ -139,8 +148,11 @@ impl AggregateCommitmentService {
aggregation_data: CommitmentAggregationData,
ancestors: Vec<u64>,
) -> CommitmentSlots {
let (block_commitment, rooted_stake) =
Self::aggregate_commitment(&ancestors, &aggregation_data.bank);
let (block_commitment, rooted_stake) = Self::aggregate_commitment(
&ancestors,
&aggregation_data.bank,
&aggregation_data.node_vote_state,
);

let highest_super_majority_root =
get_highest_super_majority_root(rooted_stake, aggregation_data.total_stake);
Expand Down Expand Up @@ -173,6 +185,7 @@ impl AggregateCommitmentService {
pub fn aggregate_commitment(
ancestors: &[Slot],
bank: &Bank,
(node_vote_pubkey, node_vote_state): &(Pubkey, VoteState),
) -> (HashMap<Slot, BlockCommitment>, Vec<(Slot, u64)>) {
assert!(!ancestors.is_empty());

Expand All @@ -183,11 +196,17 @@ impl AggregateCommitmentService {

let mut commitment = HashMap::new();
let mut rooted_stake: Vec<(Slot, u64)> = Vec::new();
for (lamports, account) in bank.vote_accounts().values() {
for (pubkey, (lamports, account)) in bank.vote_accounts().iter() {
if *lamports == 0 {
continue;
}
if let Ok(vote_state) = account.vote_state().as_ref() {
let vote_state = if pubkey == node_vote_pubkey {
// Override old vote_state in bank with latest one for my own vote pubkey
Ok(node_vote_state)
} else {
account.vote_state()
};
if let Ok(vote_state) = vote_state {
Self::aggregate_commitment_for_vote_account(
&mut commitment,
&mut rooted_stake,
Expand Down Expand Up @@ -382,8 +401,7 @@ mod tests {
assert_eq!(rooted_stake[0], (root, lamports));
}

#[test]
fn test_aggregate_commitment_validity() {
fn do_test_aggregate_commitment_validity(with_node_vote_state: bool) {
let ancestors = vec![3, 4, 5, 7, 9, 10, 11];
let GenesisConfigInfo {
mut genesis_config, ..
Expand Down Expand Up @@ -447,9 +465,11 @@ mod tests {
let mut vote_state1 = vote_state::from(&vote_account1).unwrap();
process_slot_vote_unchecked(&mut vote_state1, 3);
process_slot_vote_unchecked(&mut vote_state1, 5);
let versioned = VoteStateVersions::new_current(vote_state1);
vote_state::to(&versioned, &mut vote_account1).unwrap();
bank.store_account(&pk1, &vote_account1);
if !with_node_vote_state {
let versioned = VoteStateVersions::new_current(vote_state1.clone());
vote_state::to(&versioned, &mut vote_account1).unwrap();
bank.store_account(&pk1, &vote_account1);
}

let mut vote_state2 = vote_state::from(&vote_account2).unwrap();
process_slot_vote_unchecked(&mut vote_state2, 9);
Expand All @@ -470,8 +490,18 @@ mod tests {
vote_state::to(&versioned, &mut vote_account4).unwrap();
bank.store_account(&pk4, &vote_account4);

let (commitment, rooted_stake) =
AggregateCommitmentService::aggregate_commitment(&ancestors, &bank);
let node_vote_pubkey = if with_node_vote_state {
pk1
} else {
// Use some random pubkey as dummy to suppress the override.
solana_sdk::pubkey::new_rand()
};

let (commitment, rooted_stake) = AggregateCommitmentService::aggregate_commitment(
&ancestors,
&bank,
&(node_vote_pubkey, vote_state1),
);

for a in ancestors {
if a <= 3 {
Expand Down Expand Up @@ -499,17 +529,21 @@ mod tests {
assert_eq!(get_highest_super_majority_root(rooted_stake, 100), 1)
}

#[test]
fn test_aggregate_commitment_validity_with_node_vote_state() {
do_test_aggregate_commitment_validity(true)
}

#[test]
fn test_aggregate_commitment_validity_without_node_vote_state() {
do_test_aggregate_commitment_validity(false);
}

#[test]
fn test_highest_super_majority_root_advance() {
fn get_vote_account_root_slot(vote_pubkey: Pubkey, bank: &Bank) -> Slot {
fn get_vote_state(vote_pubkey: Pubkey, bank: &Bank) -> VoteState {
let vote_account = bank.get_vote_account(&vote_pubkey).unwrap();
let slot = vote_account
.vote_state()
.as_ref()
.unwrap()
.root_slot
.unwrap();
slot
vote_account.vote_state().cloned().unwrap()
}

let block_commitment_cache = RwLock::new(BlockCommitmentCache::new_for_tests());
Expand Down Expand Up @@ -547,10 +581,10 @@ mod tests {
}

let working_bank = bank_forks.read().unwrap().working_bank();
let root = get_vote_account_root_slot(
validator_vote_keypairs.vote_keypair.pubkey(),
&working_bank,
);
let vote_pubkey = validator_vote_keypairs.vote_keypair.pubkey();
let root = get_vote_state(vote_pubkey, &working_bank)
.root_slot
.unwrap();
for x in 0..root {
bank_forks
.write()
Expand Down Expand Up @@ -579,17 +613,16 @@ mod tests {
bank34.process_transaction(&vote33).unwrap();

let working_bank = bank_forks.read().unwrap().working_bank();
let root = get_vote_account_root_slot(
validator_vote_keypairs.vote_keypair.pubkey(),
&working_bank,
);
let vote_state = get_vote_state(vote_pubkey, &working_bank);
let root = vote_state.root_slot.unwrap();
let ancestors = working_bank.status_cache_ancestors();
let _ = AggregateCommitmentService::update_commitment_cache(
&block_commitment_cache,
CommitmentAggregationData {
bank: working_bank,
root: 0,
total_stake: 100,
node_vote_state: (vote_pubkey, vote_state.clone()),
},
ancestors,
);
Expand Down Expand Up @@ -628,6 +661,7 @@ mod tests {
bank: working_bank,
root: 1,
total_stake: 100,
node_vote_state: (vote_pubkey, vote_state),
},
ancestors,
);
Expand Down Expand Up @@ -662,17 +696,17 @@ mod tests {
}

let working_bank = bank_forks.read().unwrap().working_bank();
let root = get_vote_account_root_slot(
validator_vote_keypairs.vote_keypair.pubkey(),
&working_bank,
);
let vote_state =
get_vote_state(validator_vote_keypairs.vote_keypair.pubkey(), &working_bank);
let root = vote_state.root_slot.unwrap();
let ancestors = working_bank.status_cache_ancestors();
let _ = AggregateCommitmentService::update_commitment_cache(
&block_commitment_cache,
CommitmentAggregationData {
bank: working_bank,
root: 0,
total_stake: 100,
node_vote_state: (vote_pubkey, vote_state),
},
ancestors,
);
Expand Down
38 changes: 31 additions & 7 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use {
timing::timestamp,
transaction::Transaction,
},
solana_vote_program::vote_state::VoteTransaction,
solana_vote_program::vote_state::{VoteState, VoteTransaction},
std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
Expand Down Expand Up @@ -2406,10 +2406,28 @@ impl ReplayStage {
}

let mut update_commitment_cache_time = Measure::start("update_commitment_cache");
// Send (voted) bank along with the updated vote account state for this node, the vote
// state is always newer than the one in the bank by definition, because banks can't
// contain vote transactions which are voting on its own slot.
//
// It should be acceptable to aggressively use the vote for our own _local view_ of
// commitment aggregation, although it's not guaranteed that the new vote transaction is
// observed by other nodes at this point.
//
// The justification stems from the assumption of the sensible voting behavior from the
// consensus subsystem. That's because it means there would be a slashing possibility
// otherwise.
//
// This behavior isn't significant normally for mainnet-beta, because staked nodes aren't
// servicing RPC requests. However, this eliminates artificial 1-slot delay of the
CriesofCarrots marked this conversation as resolved.
Show resolved Hide resolved
// `finalized` confirmation if a node is materially staked and servicing RPC requests at
// the same time for development purposes.
let node_vote_state = (*vote_account_pubkey, tower.vote_state.clone());
Self::update_commitment_cache(
bank.clone(),
bank_forks.read().unwrap().root(),
progress.get_fork_stats(bank.slot()).unwrap().total_stake,
node_vote_state,
lockouts_sender,
);
update_commitment_cache_time.stop();
Expand Down Expand Up @@ -2699,11 +2717,15 @@ impl ReplayStage {
bank: Arc<Bank>,
root: Slot,
total_stake: Stake,
node_vote_state: (Pubkey, VoteState),
lockouts_sender: &Sender<CommitmentAggregationData>,
) {
if let Err(e) =
lockouts_sender.send(CommitmentAggregationData::new(bank, root, total_stake))
{
if let Err(e) = lockouts_sender.send(CommitmentAggregationData::new(
bank,
root,
total_stake,
node_vote_state,
)) {
trace!("lockouts_sender failed: {:?}", e);
}
}
Expand Down Expand Up @@ -5281,13 +5303,14 @@ pub(crate) mod tests {

#[test]
fn test_replay_commitment_cache() {
fn leader_vote(vote_slot: Slot, bank: &Bank, pubkey: &Pubkey) {
fn leader_vote(vote_slot: Slot, bank: &Bank, pubkey: &Pubkey) -> (Pubkey, VoteState) {
let mut leader_vote_account = bank.get_account(pubkey).unwrap();
let mut vote_state = vote_state::from(&leader_vote_account).unwrap();
vote_state::process_slot_vote_unchecked(&mut vote_state, vote_slot);
let versioned = VoteStateVersions::new_current(vote_state);
let versioned = VoteStateVersions::new_current(vote_state.clone());
vote_state::to(&versioned, &mut leader_vote_account).unwrap();
bank.store_account(pubkey, &leader_vote_account);
(*pubkey, vote_state)
}

let leader_pubkey = solana_sdk::pubkey::new_rand();
Expand Down Expand Up @@ -5353,11 +5376,12 @@ pub(crate) mod tests {
}

let arc_bank = bank_forks.read().unwrap().get(i).unwrap();
leader_vote(i - 1, &arc_bank, &leader_voting_pubkey);
let node_vote_state = leader_vote(i - 1, &arc_bank, &leader_voting_pubkey);
ReplayStage::update_commitment_cache(
arc_bank.clone(),
0,
leader_lamports,
node_vote_state,
&lockouts_sender,
);
arc_bank.freeze();
Expand Down
15 changes: 3 additions & 12 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1794,12 +1794,9 @@ fn test_validator_saves_tower() {

// Wait for the first new root
let last_replayed_root = loop {
#[allow(deprecated)]
// This test depends on knowing the immediate root, without any delay from the commitment
// service, so the deprecated CommitmentConfig::root() is retained
if let Ok(root) = validator_client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::root())
.get_slot_with_commitment(CommitmentConfig::finalized())
{
trace!("current root: {}", root);
if root > 0 {
Expand All @@ -1826,12 +1823,9 @@ fn test_validator_saves_tower() {

// Wait for a new root, demonstrating the validator was able to make progress from the older `tower1`
let new_root = loop {
#[allow(deprecated)]
// This test depends on knowing the immediate root, without any delay from the commitment
// service, so the deprecated CommitmentConfig::root() is retained
if let Ok(root) = validator_client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::root())
.get_slot_with_commitment(CommitmentConfig::finalized())
{
trace!(
"current root: {}, last_replayed_root: {}",
Expand Down Expand Up @@ -1862,12 +1856,9 @@ fn test_validator_saves_tower() {

// Wait for another new root
let new_root = loop {
#[allow(deprecated)]
// This test depends on knowing the immediate root, without any delay from the commitment
// service, so the deprecated CommitmentConfig::root() is retained
if let Ok(root) = validator_client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::root())
.get_slot_with_commitment(CommitmentConfig::finalized())
{
trace!("current root: {}, last tower root: {}", root, tower3_root);
if root > tower3_root {
Expand Down