Skip to content

Commit c87b830

Browse files
authored
limits number of nodes per IP address in Turbine (solana-labs#864)
1 parent c47a680 commit c87b830

File tree

2 files changed

+64
-30
lines changed

2 files changed

+64
-30
lines changed

turbine/benches/cluster_nodes.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use {
66
rand::{seq::SliceRandom, Rng},
77
solana_gossip::legacy_contact_info::LegacyContactInfo as ContactInfo,
88
solana_ledger::shred::{Shred, ShredFlags},
9-
solana_sdk::{clock::Slot, pubkey::Pubkey},
9+
solana_sdk::{clock::Slot, genesis_config::ClusterType, pubkey::Pubkey},
1010
solana_turbine::{
1111
cluster_nodes::{make_test_cluster, new_cluster_nodes, ClusterNodes},
1212
retransmit_stage::RetransmitStage,
@@ -21,7 +21,8 @@ fn make_cluster_nodes<R: Rng>(
2121
unstaked_ratio: Option<(u32, u32)>,
2222
) -> (Vec<ContactInfo>, ClusterNodes<RetransmitStage>) {
2323
let (nodes, stakes, cluster_info) = make_test_cluster(rng, 5_000, unstaked_ratio);
24-
let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
24+
let cluster_nodes =
25+
new_cluster_nodes::<RetransmitStage>(&cluster_info, ClusterType::Development, &stakes);
2526
(nodes, cluster_nodes)
2627
}
2728

turbine/src/cluster_nodes.rs

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use {
1717
solana_sdk::{
1818
clock::{Epoch, Slot},
1919
feature_set,
20+
genesis_config::ClusterType,
2021
native_token::LAMPORTS_PER_SOL,
2122
pubkey::Pubkey,
2223
signature::{Keypair, Signer},
@@ -29,7 +30,7 @@ use {
2930
collections::HashMap,
3031
iter::repeat_with,
3132
marker::PhantomData,
32-
net::SocketAddr,
33+
net::{IpAddr, SocketAddr},
3334
sync::{Arc, Mutex, RwLock},
3435
time::{Duration, Instant},
3536
},
@@ -39,6 +40,9 @@ use {
3940
const DATA_PLANE_FANOUT: usize = 200;
4041
pub(crate) const MAX_NUM_TURBINE_HOPS: usize = 4;
4142

43+
// Limit number of nodes per IP address.
44+
const MAX_NUM_NODES_PER_IP_ADDRESS: usize = 10;
45+
4246
#[derive(Debug, Error)]
4347
pub enum Error {
4448
#[error("Loopback from slot leader: {leader}, shred: {shred:?}")]
@@ -81,9 +85,6 @@ pub struct ClusterNodesCache<T> {
8185
pub struct RetransmitPeers<'a> {
8286
root_distance: usize, // distance from the root node
8387
children: Vec<&'a Node>,
84-
// Maps tvu addresses to the first node
85-
// in the shuffle with the same address.
86-
addrs: HashMap<SocketAddr, Pubkey>, // tvu addresses
8788
}
8889

8990
impl Node {
@@ -147,8 +148,12 @@ impl<T> ClusterNodes<T> {
147148
}
148149

149150
impl ClusterNodes<BroadcastStage> {
150-
pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Self {
151-
new_cluster_nodes(cluster_info, stakes)
151+
pub fn new(
152+
cluster_info: &ClusterInfo,
153+
cluster_type: ClusterType,
154+
stakes: &HashMap<Pubkey, u64>,
155+
) -> Self {
156+
new_cluster_nodes(cluster_info, cluster_type, stakes)
152157
}
153158

154159
pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
@@ -168,16 +173,13 @@ impl ClusterNodes<RetransmitStage> {
168173
let RetransmitPeers {
169174
root_distance,
170175
children,
171-
addrs,
172176
} = self.get_retransmit_peers(slot_leader, shred, fanout)?;
173177
let protocol = get_broadcast_protocol(shred);
174-
let peers = children.into_iter().filter_map(|node| {
175-
node.contact_info()?
176-
.tvu(protocol)
177-
.ok()
178-
.filter(|addr| addrs.get(addr) == Some(&node.pubkey()))
179-
});
180-
Ok((root_distance, peers.collect()))
178+
let peers = children
179+
.into_iter()
180+
.filter_map(|node| node.contact_info()?.tvu(protocol).ok())
181+
.collect();
182+
Ok((root_distance, peers))
181183
}
182184

183185
pub fn get_retransmit_peers(
@@ -197,19 +199,10 @@ impl ClusterNodes<RetransmitStage> {
197199
if let Some(index) = self.index.get(slot_leader) {
198200
weighted_shuffle.remove_index(*index);
199201
}
200-
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
201202
let mut rng = get_seeded_rng(slot_leader, shred);
202-
let protocol = get_broadcast_protocol(shred);
203203
let nodes: Vec<_> = weighted_shuffle
204204
.shuffle(&mut rng)
205205
.map(|index| &self.nodes[index])
206-
.inspect(|node| {
207-
if let Some(node) = node.contact_info() {
208-
if let Ok(addr) = node.tvu(protocol) {
209-
addrs.entry(addr).or_insert(*node.pubkey());
210-
}
211-
}
212-
})
213206
.collect();
214207
let self_index = nodes
215208
.iter()
@@ -228,7 +221,6 @@ impl ClusterNodes<RetransmitStage> {
228221
Ok(RetransmitPeers {
229222
root_distance,
230223
children: peers.collect(),
231-
addrs,
232224
})
233225
}
234226

@@ -272,10 +264,11 @@ impl ClusterNodes<RetransmitStage> {
272264

273265
pub fn new_cluster_nodes<T: 'static>(
274266
cluster_info: &ClusterInfo,
267+
cluster_type: ClusterType,
275268
stakes: &HashMap<Pubkey, u64>,
276269
) -> ClusterNodes<T> {
277270
let self_pubkey = cluster_info.id();
278-
let nodes = get_nodes(cluster_info, stakes);
271+
let nodes = get_nodes(cluster_info, cluster_type, stakes);
279272
let index: HashMap<_, _> = nodes
280273
.iter()
281274
.enumerate()
@@ -298,8 +291,21 @@ pub fn new_cluster_nodes<T: 'static>(
298291

299292
// All staked nodes + other known tvu-peers + the node itself;
300293
// sorted by (stake, pubkey) in descending order.
301-
fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<Node> {
294+
fn get_nodes(
295+
cluster_info: &ClusterInfo,
296+
cluster_type: ClusterType,
297+
stakes: &HashMap<Pubkey, u64>,
298+
) -> Vec<Node> {
302299
let self_pubkey = cluster_info.id();
300+
let should_dedup_addrs = match cluster_type {
301+
ClusterType::Development => false,
302+
ClusterType::Devnet | ClusterType::Testnet | ClusterType::MainnetBeta => true,
303+
};
304+
// Maps IP addresses to number of nodes at that IP address.
305+
let mut counts = {
306+
let capacity = if should_dedup_addrs { stakes.len() } else { 0 };
307+
HashMap::<IpAddr, usize>::with_capacity(capacity)
308+
};
303309
// The local node itself.
304310
std::iter::once({
305311
let stake = stakes.get(&self_pubkey).copied().unwrap_or_default();
@@ -328,6 +334,30 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
328334
// Since sorted_by_key is stable, in case of duplicates, this
329335
// will keep nodes with contact-info.
330336
.dedup_by(|a, b| a.pubkey() == b.pubkey())
337+
.filter_map(|node| {
338+
if !should_dedup_addrs
339+
|| node
340+
.contact_info()
341+
.and_then(|node| node.tvu(Protocol::UDP).ok())
342+
.map(|addr| {
343+
*counts
344+
.entry(addr.ip())
345+
.and_modify(|count| *count += 1)
346+
.or_insert(1)
347+
})
348+
<= Some(MAX_NUM_NODES_PER_IP_ADDRESS)
349+
{
350+
Some(node)
351+
} else {
352+
// If the node is not staked, drop it entirely. Otherwise, keep the
353+
// pubkey for deterministic shuffle, but strip the contact-info so
354+
// that no more packets are sent to this node.
355+
(node.stake > 0u64).then(|| Node {
356+
node: NodeId::from(node.pubkey()),
357+
stake: node.stake,
358+
})
359+
}
360+
})
331361
.collect()
332362
}
333363

@@ -446,6 +476,7 @@ impl<T: 'static> ClusterNodesCache<T> {
446476
}
447477
let nodes = Arc::new(new_cluster_nodes::<T>(
448478
cluster_info,
479+
root_bank.cluster_type(),
449480
&epoch_staked_nodes.unwrap_or_default(),
450481
));
451482
*entry = Some((Instant::now(), Arc::clone(&nodes)));
@@ -583,7 +614,8 @@ mod tests {
583614
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
584615
// ClusterInfo::tvu_peers excludes the node itself.
585616
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
586-
let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
617+
let cluster_nodes =
618+
new_cluster_nodes::<RetransmitStage>(&cluster_info, ClusterType::Development, &stakes);
587619
// All nodes with contact-info should be in the index.
588620
// Staked nodes with no contact-info should be included.
589621
assert!(cluster_nodes.nodes.len() > nodes.len());
@@ -618,7 +650,8 @@ mod tests {
618650
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
619651
// ClusterInfo::tvu_peers excludes the node itself.
620652
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
621-
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);
653+
let cluster_nodes =
654+
ClusterNodes::<BroadcastStage>::new(&cluster_info, ClusterType::Development, &stakes);
622655
// All nodes with contact-info should be in the index.
623656
// Excluding this node itself.
624657
// Staked nodes with no contact-info should be included.

0 commit comments

Comments
 (0)