Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Allowed staked nodes weight override
Browse files Browse the repository at this point in the history
  • Loading branch information
janlegner authored and ochaloup committed Aug 10, 2022
1 parent c03f3b1 commit 5010b79
Show file tree
Hide file tree
Showing 12 changed files with 299 additions and 11 deletions.
56 changes: 51 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rand_chacha = "0.2.2"
rayon = "1.5.3"
serde = "1.0.143"
serde_derive = "1.0.103"
serde_yaml = "0.9.1"
solana-address-lookup-table-program = { path = "../programs/address-lookup-table", version = "=1.12.0" }
solana-bloom = { path = "../bloom", version = "=1.12.0" }
solana-client = { path = "../client", version = "=1.12.0" }
Expand Down
64 changes: 63 additions & 1 deletion core/src/staked_nodes_updater_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use {
serde::de::Deserializer,
solana_gossip::cluster_info::ClusterInfo,
solana_runtime::bank_forks::BankForks,
solana_sdk::pubkey::Pubkey,
Expand All @@ -8,7 +9,7 @@ use {
net::IpAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
Arc, RwLock, RwLockReadGuard,
},
thread::{self, sleep, Builder, JoinHandle},
time::{Duration, Instant},
Expand All @@ -21,18 +22,40 @@ pub struct StakedNodesUpdaterService {
thread_hdl: JoinHandle<()>,
}

#[derive(Default, Deserialize, Clone)]
pub struct StakedNodesOverrides {
#[serde(deserialize_with = "deserialize_pubkey_map")]
pub staked_map_id: HashMap<Pubkey, u64>,
}

pub fn deserialize_pubkey_map<'de, D>(des: D) -> Result<HashMap<Pubkey, u64>, D::Error>
where
D: Deserializer<'de>,
{
let container: HashMap<String, u64> = serde::Deserialize::deserialize(des)?;
let mut container_typed: HashMap<Pubkey, u64> = HashMap::new();
for (key, value) in container.iter() {
let typed_key = Pubkey::try_from(key.as_str())
.map_err(|_| serde::de::Error::invalid_type(serde::de::Unexpected::Map, &"PubKey"))?;
container_typed.insert(typed_key, *value);
}
Ok(container_typed)
}

impl StakedNodesUpdaterService {
pub fn new(
exit: Arc<AtomicBool>,
cluster_info: Arc<ClusterInfo>,
bank_forks: Arc<RwLock<BankForks>>,
shared_staked_nodes: Arc<RwLock<StakedNodes>>,
shared_staked_nodes_overrides: Arc<RwLock<StakedNodesOverrides>>,
) -> Self {
let thread_hdl = Builder::new()
.name("sol-sn-updater".to_string())
.spawn(move || {
let mut last_stakes = Instant::now();
while !exit.load(Ordering::Relaxed) {
let overrides = shared_staked_nodes_overrides.read().unwrap();
let mut new_ip_to_stake = HashMap::new();
let mut new_id_to_stake = HashMap::new();
let mut total_stake = 0;
Expand All @@ -47,6 +70,7 @@ impl StakedNodesUpdaterService {
&mut min_stake,
&bank_forks,
&cluster_info,
&overrides,
) {
let mut shared = shared_staked_nodes.write().unwrap();
shared.total_stake = total_stake;
Expand All @@ -69,6 +93,7 @@ impl StakedNodesUpdaterService {
min_stake: &mut u64,
bank_forks: &RwLock<BankForks>,
cluster_info: &ClusterInfo,
overrides: &RwLockReadGuard<StakedNodesOverrides>,
) -> bool {
if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION {
let root_bank = bank_forks.read().unwrap().root_bank();
Expand Down Expand Up @@ -96,6 +121,14 @@ impl StakedNodesUpdaterService {
Some((node.tvu.ip(), *stake))
})
.collect();
Self::override_stake(
cluster_info,
total_stake,
id_to_stake,
ip_to_stake,
&overrides.staked_map_id,
);

*last_stakes = Instant::now();
true
} else {
Expand All @@ -104,6 +137,35 @@ impl StakedNodesUpdaterService {
}
}

fn override_stake(
cluster_info: &ClusterInfo,
total_stake: &mut u64,
id_to_stake_map: &mut HashMap<Pubkey, u64>,
ip_to_stake_map: &mut HashMap<IpAddr, u64>,
staked_map_overrides: &HashMap<Pubkey, u64>,
) {
for (id_override, stake_override) in staked_map_overrides.iter() {
if let Some(ip_override) = cluster_info.tvu_peers().into_iter().find_map(|node| {
if node.id == *id_override {
return Some(node.tvu.ip());
}
None
}) {
if let Some(previous_stake) = id_to_stake_map.get(id_override) {
*total_stake -= previous_stake;
}
*total_stake += stake_override;
id_to_stake_map.insert(*id_override, *stake_override);
ip_to_stake_map.insert(ip_override, *stake_override);
} else {
error!(
"staked nodes overrides configuration for id {} with stake {} does not match existing IP. Skipping",
id_override, stake_override
);
}
}
}

pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
find_packet_sender_stake_stage::FindPacketSenderStakeStage,
sigverify::TransactionSigVerifier,
sigverify_stage::SigVerifyStage,
staked_nodes_updater_service::StakedNodesUpdaterService,
staked_nodes_updater_service::{StakedNodesOverrides, StakedNodesUpdaterService},
},
crossbeam_channel::{unbounded, Receiver},
solana_client::connection_cache::ConnectionCache,
Expand Down Expand Up @@ -98,6 +98,7 @@ impl Tpu {
log_messages_bytes_limit: Option<usize>,
enable_quic_servers: bool,
staked_nodes: &Arc<RwLock<StakedNodes>>,
shared_staked_nodes_overrides: Arc<RwLock<StakedNodesOverrides>>,
) -> Self {
let TpuSockets {
transactions: transactions_sockets,
Expand Down Expand Up @@ -130,6 +131,7 @@ impl Tpu {
cluster_info.clone(),
bank_forks.clone(),
staked_nodes.clone(),
shared_staked_nodes_overrides,
);

let (find_packet_sender_stake_sender, find_packet_sender_stake_receiver) = unbounded();
Expand Down
4 changes: 4 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use {
serve_repair_service::ServeRepairService,
sigverify,
snapshot_packager_service::SnapshotPackagerService,
staked_nodes_updater_service::StakedNodesOverrides,
stats_reporter_service::StatsReporterService,
system_monitor_service::{verify_net_stats_access, SystemMonitorService},
tower_storage::TowerStorage,
Expand Down Expand Up @@ -168,6 +169,7 @@ pub struct ValidatorConfig {
pub accounts_db_test_hash_calculation: bool,
pub accounts_db_skip_shrink: bool,
pub tpu_coalesce_ms: u64,
pub staked_nodes_overrides: Arc<RwLock<StakedNodesOverrides>>,
pub validator_exit: Arc<RwLock<Exit>>,
pub no_wait_for_vote_to_start_leader: bool,
pub accounts_shrink_ratio: AccountShrinkThreshold,
Expand Down Expand Up @@ -230,6 +232,7 @@ impl Default for ValidatorConfig {
accounts_db_test_hash_calculation: false,
accounts_db_skip_shrink: false,
tpu_coalesce_ms: DEFAULT_TPU_COALESCE_MS,
staked_nodes_overrides: Arc::new(RwLock::new(StakedNodesOverrides::default())),
validator_exit: Arc::new(RwLock::new(Exit::default())),
no_wait_for_vote_to_start_leader: true,
accounts_shrink_ratio: AccountShrinkThreshold::default(),
Expand Down Expand Up @@ -1029,6 +1032,7 @@ impl Validator {
config.runtime_config.log_messages_bytes_limit,
config.enable_quic_servers,
&staked_nodes,
config.staked_nodes_overrides.clone(),
);

datapoint_info!(
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
accounts_db_test_hash_calculation: config.accounts_db_test_hash_calculation,
accounts_db_skip_shrink: config.accounts_db_skip_shrink,
tpu_coalesce_ms: config.tpu_coalesce_ms,
staked_nodes_overrides: config.staked_nodes_overrides.clone(),
validator_exit: Arc::new(RwLock::new(Exit::default())),
poh_hashes_per_batch: config.poh_hashes_per_batch,
no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader,
Expand Down
Loading

0 comments on commit 5010b79

Please sign in to comment.