Skip to content

Commit

Permalink
Use cluster information about slots to prioritize repair (#8820)
Browse files Browse the repository at this point in the history
automerge
  • Loading branch information
aeyakovenko authored Mar 13, 2020
1 parent 2182521 commit 9a79be5
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 30 deletions.
80 changes: 66 additions & 14 deletions core/src/cluster_slots.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{cluster_info::ClusterInfo, epoch_slots::EpochSlots, serve_repair::RepairType};
use crate::{
cluster_info::ClusterInfo, contact_info::ContactInfo, epoch_slots::EpochSlots,
serve_repair::RepairType,
};

use solana_ledger::{bank_forks::BankForks, staking_utils};
use solana_sdk::{clock::Slot, pubkey::Pubkey};
Expand Down Expand Up @@ -108,11 +111,7 @@ impl ClusterSlots {
.collect()
}

pub fn update_peers(
&mut self,
cluster_info: &RwLock<ClusterInfo>,
bank_forks: &RwLock<BankForks>,
) {
fn update_peers(&mut self, cluster_info: &RwLock<ClusterInfo>, bank_forks: &RwLock<BankForks>) {
let root = bank_forks.read().unwrap().root();
let (epoch, _) = bank_forks
.read()
Expand Down Expand Up @@ -141,14 +140,20 @@ impl ClusterSlots {
self.epoch = Some(epoch);
}
}
pub fn peers(&self, slot: Slot) -> Vec<(Rc<Pubkey>, u64)> {
let mut peers: HashMap<Rc<Pubkey>, u64> = self.validator_stakes.clone();
if let Some(slot_peers) = self.lookup(slot) {
slot_peers
.iter()
.for_each(|(x, y)| *peers.entry(x.clone()).or_insert(0) += *y);
}
peers.into_iter().filter(|x| *x.0 != self.self_id).collect()

pub fn compute_weights(&self, slot: Slot, repair_peers: &[ContactInfo]) -> Vec<(u64, usize)> {
let slot_peers = self.lookup(slot);
repair_peers
.iter()
.enumerate()
.map(|(i, x)| {
(
1 + slot_peers.and_then(|v| v.get(&x.id)).cloned().unwrap_or(0)
+ self.validator_stakes.get(&x.id).cloned().unwrap_or(0),
i,
)
})
.collect()
}

pub fn generate_repairs_for_missing_slots(
Expand Down Expand Up @@ -217,6 +222,53 @@ mod tests {
assert_eq!(cs.lookup(1).unwrap().get(&Pubkey::default()), Some(&0));
}

#[test]
fn test_compute_weights() {
let cs = ClusterSlots::default();
let ci = ContactInfo::default();
assert_eq!(cs.compute_weights(0, &[ci]), vec![(1, 0)]);
}

#[test]
fn test_best_peer_2() {
let mut cs = ClusterSlots::default();
let mut c1 = ContactInfo::default();
let mut c2 = ContactInfo::default();
let mut map = HashMap::new();
let k1 = Pubkey::new_rand();
let k2 = Pubkey::new_rand();
map.insert(Rc::new(k1.clone()), std::u64::MAX / 2);
map.insert(Rc::new(k2.clone()), 0);
cs.cluster_slots.insert(0, map);
c1.id = k1;
c2.id = k2;
assert_eq!(
cs.compute_weights(0, &[c1, c2]),
vec![(std::u64::MAX / 2 + 1, 0), (1, 1)]
);
}

#[test]
fn test_best_peer_3() {
let mut cs = ClusterSlots::default();
let mut c1 = ContactInfo::default();
let mut c2 = ContactInfo::default();
let mut map = HashMap::new();
let k1 = Pubkey::new_rand();
let k2 = Pubkey::new_rand();
map.insert(Rc::new(k2.clone()), 0);
cs.cluster_slots.insert(0, map);
//make sure default weights are used as well
cs.validator_stakes
.insert(Rc::new(k1.clone()), std::u64::MAX / 2);
c1.id = k1;
c2.id = k2;
assert_eq!(
cs.compute_weights(0, &[c1, c2]),
vec![(std::u64::MAX / 2 + 1, 0), (1, 1)]
);
}

#[test]
fn test_update_new_staked_slot() {
let mut cs = ClusterSlots::default();
Expand Down
4 changes: 3 additions & 1 deletion core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use solana_ledger::{
};
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey};
use std::{
collections::HashMap,
iter::Iterator,
net::SocketAddr,
net::UdpSocket,
Expand Down Expand Up @@ -129,11 +130,12 @@ impl RepairService {
};

if let Ok(repairs) = repairs {
let mut cache = HashMap::new();
let reqs: Vec<((SocketAddr, Vec<u8>), RepairType)> = repairs
.into_iter()
.filter_map(|repair_request| {
serve_repair
.repair_request(&repair_request)
.repair_request(&cluster_slots, &repair_request, &mut cache)
.map(|result| (result, repair_request))
.ok()
})
Expand Down
57 changes: 42 additions & 15 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@ use crate::packet::limited_deserialize;
use crate::streamer::{PacketReceiver, PacketSender};
use crate::{
cluster_info::{ClusterInfo, ClusterInfoError},
cluster_slots::ClusterSlots,
contact_info::ContactInfo,
packet::Packet,
result::{Error, Result},
weighted_shuffle::weighted_best,
};
use bincode::serialize;
use rand::{thread_rng, Rng};
use solana_ledger::blockstore::Blockstore;
use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint_debug, inc_new_counter_debug};
use solana_perf::packet::{Packets, PacketsRecycler};
use solana_sdk::{
clock::Slot,
pubkey::Pubkey,
signature::{Keypair, Signer},
timing::duration_as_ms,
};
use std::{
collections::HashMap,
net::SocketAddr,
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
Expand Down Expand Up @@ -61,6 +64,8 @@ pub struct ServeRepair {
cluster_info: Arc<RwLock<ClusterInfo>>,
}

type RepairCache = HashMap<Slot, (Vec<ContactInfo>, Vec<(u64, usize)>)>;

impl ServeRepair {
/// Without a valid keypair gossip will not function. Only useful for tests.
pub fn new_with_invalid_keypair(contact_info: ContactInfo) -> Self {
Expand Down Expand Up @@ -269,21 +274,30 @@ impl ServeRepair {
Ok(out)
}

pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec<u8>)> {
pub fn repair_request(
&self,
cluster_slots: &ClusterSlots,
repair_request: &RepairType,
cache: &mut RepairCache,
) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
let valid: Vec<_> = self
.cluster_info
.read()
.unwrap()
.repair_peers(repair_request.slot());
if valid.is_empty() {
return Err(ClusterInfoError::NoPeers.into());
if cache.get(&repair_request.slot()).is_none() {
let repair_peers: Vec<_> = self
.cluster_info
.read()
.unwrap()
.repair_peers(repair_request.slot());
if repair_peers.is_empty() {
return Err(ClusterInfoError::NoPeers.into());
}
let weights = cluster_slots.compute_weights(repair_request.slot(), &repair_peers);
cache.insert(repair_request.slot(), (repair_peers, weights));
}
let n = thread_rng().gen::<usize>() % valid.len();
let addr = valid[n].serve_repair; // send the request to the peer's serve_repair port
let (repair_peers, weights) = cache.get(&repair_request.slot()).unwrap();
let n = weighted_best(&weights, Pubkey::new_rand().to_bytes());
let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port
let out = self.map_repair_request(repair_request)?;

Ok((addr, out))
}

Expand Down Expand Up @@ -563,10 +577,15 @@ mod tests {

#[test]
fn window_index_request() {
let cluster_slots = ClusterSlots::default();
let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(me)));
let serve_repair = ServeRepair::new(cluster_info.clone());
let rv = serve_repair.repair_request(&RepairType::Shred(0, 0));
let rv = serve_repair.repair_request(
&cluster_slots,
&RepairType::Shred(0, 0),
&mut HashMap::new(),
);
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));

let serve_repair_addr = socketaddr!([127, 0, 0, 1], 1243);
Expand All @@ -587,7 +606,11 @@ mod tests {
};
cluster_info.write().unwrap().insert_info(nxt.clone());
let rv = serve_repair
.repair_request(&RepairType::Shred(0, 0))
.repair_request(
&cluster_slots,
&RepairType::Shred(0, 0),
&mut HashMap::new(),
)
.unwrap();
assert_eq!(nxt.serve_repair, serve_repair_addr);
assert_eq!(rv.0, nxt.serve_repair);
Expand All @@ -614,7 +637,11 @@ mod tests {
while !one || !two {
//this randomly picks an option, so eventually it should pick both
let rv = serve_repair
.repair_request(&RepairType::Shred(0, 0))
.repair_request(
&cluster_slots,
&RepairType::Shred(0, 0),
&mut HashMap::new(),
)
.unwrap();
if rv.0 == serve_repair_addr {
one = true;
Expand Down

0 comments on commit 9a79be5

Please sign in to comment.