From 0d99cb2fdcb1a3f152e77d2ff7e55a5f3c054ea1 Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Fri, 27 Jul 2018 22:13:27 +0800 Subject: [PATCH] Limit number of incoming connections (#391) * Limit number of incoming connections * Check Endpoint::Listener before checking num_open_connections.incoming * Maintain at least 1-1/n portion of outgoing connections * Remove use * Default incoming_peers_factor to 2 * Use max_incoming_peers and max_outgoing peers to check whether connections should be dropped * Fix expected_max_peers: reserved peers are not counted in config.max_peers * typo: fix test --- substrate/network-libp2p/src/network_state.rs | 74 +++++++++++++------ substrate/network-libp2p/src/traits.rs | 2 +- 2 files changed, 51 insertions(+), 25 deletions(-) diff --git a/substrate/network-libp2p/src/network_state.rs b/substrate/network-libp2p/src/network_state.rs index 2c1634ce91db6..38e26601bfe92 100644 --- a/substrate/network-libp2p/src/network_state.rs +++ b/substrate/network-libp2p/src/network_state.rs @@ -29,7 +29,6 @@ use {Error, ErrorKind, NetworkConfiguration, NonReservedPeerMode}; use {NodeIndex, ProtocolId, SessionInfo}; use parking_lot::{Mutex, RwLock}; use rand::{self, Rng}; -use std::cmp; use std::fs; use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write}; use std::path::Path; @@ -51,10 +50,10 @@ pub struct NetworkState { /// Active connections. connections: RwLock, - /// `min_peers` taken from the configuration. - min_peers: u32, - /// `max_peers` taken from the configuration. - max_peers: u32, + /// Maximum incoming peers. + max_incoming_peers: u32, + /// Maximum outgoing peers. + max_outgoing_peers: u32, /// If true, only reserved peers can connect. reserved_only: atomic::AtomicBool, @@ -201,13 +200,12 @@ impl NetworkState { RwLock::new(reserved_peers) }; - let expected_max_peers = cmp::max(config.max_peers as usize, - config.reserved_nodes.len()); + let expected_max_peers = config.max_peers as usize + config.reserved_nodes.len(); Ok(NetworkState { node_store, - min_peers: config.min_peers, - max_peers: config.max_peers, + max_outgoing_peers: config.min_peers, + max_incoming_peers: config.max_peers.saturating_sub(config.min_peers), connections: RwLock::new(Connections { peer_by_nodeid: FnvHashMap::with_capacity_and_hasher(expected_max_peers, Default::default()), info_by_peer: FnvHashMap::with_capacity_and_hasher(expected_max_peers, Default::default()), @@ -464,19 +462,14 @@ impl NetworkState { } } - /// Returns the number of open and pending connections with - /// custom protocols. - pub fn num_open_custom_connections(&self) -> u32 { - num_open_custom_connections(&self.connections.read()) - } - /// Returns the number of new outgoing custom connections to peers to /// open. This takes into account the number of active peers. pub fn should_open_outgoing_custom_connections(&self) -> u32 { if self.reserved_only.load(atomic::Ordering::Relaxed) { 0 } else { - self.min_peers.saturating_sub(self.num_open_custom_connections()) + let num_open_custom_connections = num_open_custom_connections(&self.connections.read(), &self.reserved_peers.read()); + self.max_outgoing_peers.saturating_sub(num_open_custom_connections.unreserved_outgoing) } } @@ -554,7 +547,7 @@ impl NetworkState { /// You must pass an `UnboundedSender` which will be used by the `send` /// method. Actually sending the data is not covered by this code. /// - /// The various methods of the `NetworkState` that close a connection do + /// The various methods of the `NetworkState` that close a connection do /// so by dropping this sender. pub fn custom_proto( &self, @@ -572,7 +565,7 @@ impl NetworkState { let who = accept_connection(&mut connections, &self.next_node_index, node_id.clone(), endpoint)?; - let num_open_connections = num_open_custom_connections(&connections); + let num_open_connections = num_open_custom_connections(&connections, &self.reserved_peers.read()); let infos = connections.info_by_peer.get_mut(&who) .expect("Newly-created peer id is always valid"); @@ -580,7 +573,10 @@ impl NetworkState { let node_is_reserved = self.reserved_peers.read().contains(&infos.id); if !node_is_reserved { if self.reserved_only.load(atomic::Ordering::Relaxed) || - num_open_connections >= self.max_peers + (endpoint == Endpoint::Listener && + num_open_connections.unreserved_incoming >= self.max_incoming_peers) || + (endpoint == Endpoint::Dialer && + num_open_connections.unreserved_outgoing >= self.max_outgoing_peers) { debug!(target: "sub-libp2p", "Refusing node {:?} because we reached the max number of peers", node_id); return Err(IoError::new(IoErrorKind::PermissionDenied, "maximum number of peers reached")) @@ -771,10 +767,19 @@ fn is_peer_disabled( } } +struct OpenCustomConnectionsNumbers { + /// Total number of open and pending connections. + pub total: u32, + /// Unreserved incoming number of open and pending connections. + pub unreserved_incoming: u32, + /// Unreserved outgoing number of open and pending connections. + pub unreserved_outgoing: u32, +} + /// Returns the number of open and pending connections with /// custom protocols. -fn num_open_custom_connections(connections: &Connections) -> u32 { - connections +fn num_open_custom_connections(connections: &Connections, reserved_peers: &FnvHashSet) -> OpenCustomConnectionsNumbers { + let filtered = connections .info_by_peer .values() .filter(|info| @@ -784,8 +789,29 @@ fn num_open_custom_connections(connections: &Connections) -> u32 { _ => false } ) - ) - .count() as u32 + ); + + let mut total: u32 = 0; + let mut unreserved_incoming: u32 = 0; + let mut unreserved_outgoing: u32 = 0; + + for info in filtered { + total += 1; + let node_is_reserved = reserved_peers.contains(&info.id); + if !node_is_reserved { + if !info.originated { + unreserved_incoming += 1; + } else { + unreserved_outgoing += 1; + } + } + } + + OpenCustomConnectionsNumbers { + total, + unreserved_incoming, + unreserved_outgoing, + } } /// Parses an address of the form `/ip4/x.x.x.x/tcp/x/p2p/xxxxxx`, and adds it @@ -813,7 +839,7 @@ fn parse_and_add_to_node_store( .peer_or_create(&who) .add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)), } - + Ok(who) } diff --git a/substrate/network-libp2p/src/traits.rs b/substrate/network-libp2p/src/traits.rs index 3632bdac8351d..15c9af78c50fa 100644 --- a/substrate/network-libp2p/src/traits.rs +++ b/substrate/network-libp2p/src/traits.rs @@ -224,7 +224,7 @@ pub enum Severity<'a> { /// it could answer. Useless(&'a str), /// Peer has behaved in an invalid manner. This doesn't necessarily need to be Byzantine, but peer - /// must have taken concrete action in order to behave in such a way which is wantanly invalid. + /// must have taken concrete action in order to behave in such a way which is wantanly invalid. Bad(&'a str), }