Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Limit number of incoming connections (#391)
Browse files Browse the repository at this point in the history
* Limit number of incoming connections

* Check Endpoint::Listener before checking num_open_connections.incoming

* Maintain at least 1-1/n portion of outgoing connections

* Remove use

* Default incoming_peers_factor to 2

* Use max_incoming_peers and max_outgoing peers to check whether connections should be dropped

* Fix expected_max_peers: reserved peers are not counted in config.max_peers

* typo: fix test
  • Loading branch information
sorpaas authored and gavofyork committed Jul 27, 2018
1 parent dc925ca commit 0d99cb2
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 25 deletions.
74 changes: 50 additions & 24 deletions substrate/network-libp2p/src/network_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use {Error, ErrorKind, NetworkConfiguration, NonReservedPeerMode};
use {NodeIndex, ProtocolId, SessionInfo};
use parking_lot::{Mutex, RwLock};
use rand::{self, Rng};
use std::cmp;
use std::fs;
use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write};
use std::path::Path;
Expand All @@ -51,10 +50,10 @@ pub struct NetworkState {
/// Active connections.
connections: RwLock<Connections>,

/// `min_peers` taken from the configuration.
min_peers: u32,
/// `max_peers` taken from the configuration.
max_peers: u32,
/// Maximum incoming peers.
max_incoming_peers: u32,
/// Maximum outgoing peers.
max_outgoing_peers: u32,

/// If true, only reserved peers can connect.
reserved_only: atomic::AtomicBool,
Expand Down Expand Up @@ -201,13 +200,12 @@ impl NetworkState {
RwLock::new(reserved_peers)
};

let expected_max_peers = cmp::max(config.max_peers as usize,
config.reserved_nodes.len());
let expected_max_peers = config.max_peers as usize + config.reserved_nodes.len();

Ok(NetworkState {
node_store,
min_peers: config.min_peers,
max_peers: config.max_peers,
max_outgoing_peers: config.min_peers,
max_incoming_peers: config.max_peers.saturating_sub(config.min_peers),
connections: RwLock::new(Connections {
peer_by_nodeid: FnvHashMap::with_capacity_and_hasher(expected_max_peers, Default::default()),
info_by_peer: FnvHashMap::with_capacity_and_hasher(expected_max_peers, Default::default()),
Expand Down Expand Up @@ -464,19 +462,14 @@ impl NetworkState {
}
}

/// Returns the number of open and pending connections with
/// custom protocols.
pub fn num_open_custom_connections(&self) -> u32 {
num_open_custom_connections(&self.connections.read())
}

/// Returns the number of new outgoing custom connections to peers to
/// open. This takes into account the number of active peers.
pub fn should_open_outgoing_custom_connections(&self) -> u32 {
if self.reserved_only.load(atomic::Ordering::Relaxed) {
0
} else {
self.min_peers.saturating_sub(self.num_open_custom_connections())
let num_open_custom_connections = num_open_custom_connections(&self.connections.read(), &self.reserved_peers.read());
self.max_outgoing_peers.saturating_sub(num_open_custom_connections.unreserved_outgoing)
}
}

Expand Down Expand Up @@ -554,7 +547,7 @@ impl NetworkState {
/// You must pass an `UnboundedSender` which will be used by the `send`
/// method. Actually sending the data is not covered by this code.
///
/// The various methods of the `NetworkState` that close a connection do
/// The various methods of the `NetworkState` that close a connection do
/// so by dropping this sender.
pub fn custom_proto(
&self,
Expand All @@ -572,15 +565,18 @@ impl NetworkState {
let who = accept_connection(&mut connections, &self.next_node_index,
node_id.clone(), endpoint)?;

let num_open_connections = num_open_custom_connections(&connections);
let num_open_connections = num_open_custom_connections(&connections, &self.reserved_peers.read());

let infos = connections.info_by_peer.get_mut(&who)
.expect("Newly-created peer id is always valid");

let node_is_reserved = self.reserved_peers.read().contains(&infos.id);
if !node_is_reserved {
if self.reserved_only.load(atomic::Ordering::Relaxed) ||
num_open_connections >= self.max_peers
(endpoint == Endpoint::Listener &&
num_open_connections.unreserved_incoming >= self.max_incoming_peers) ||
(endpoint == Endpoint::Dialer &&
num_open_connections.unreserved_outgoing >= self.max_outgoing_peers)
{
debug!(target: "sub-libp2p", "Refusing node {:?} because we reached the max number of peers", node_id);
return Err(IoError::new(IoErrorKind::PermissionDenied, "maximum number of peers reached"))
Expand Down Expand Up @@ -771,10 +767,19 @@ fn is_peer_disabled(
}
}

struct OpenCustomConnectionsNumbers {
/// Total number of open and pending connections.
pub total: u32,
/// Unreserved incoming number of open and pending connections.
pub unreserved_incoming: u32,
/// Unreserved outgoing number of open and pending connections.
pub unreserved_outgoing: u32,
}

/// Returns the number of open and pending connections with
/// custom protocols.
fn num_open_custom_connections(connections: &Connections) -> u32 {
connections
fn num_open_custom_connections(connections: &Connections, reserved_peers: &FnvHashSet<PeerId>) -> OpenCustomConnectionsNumbers {
let filtered = connections
.info_by_peer
.values()
.filter(|info|
Expand All @@ -784,8 +789,29 @@ fn num_open_custom_connections(connections: &Connections) -> u32 {
_ => false
}
)
)
.count() as u32
);

let mut total: u32 = 0;
let mut unreserved_incoming: u32 = 0;
let mut unreserved_outgoing: u32 = 0;

for info in filtered {
total += 1;
let node_is_reserved = reserved_peers.contains(&info.id);
if !node_is_reserved {
if !info.originated {
unreserved_incoming += 1;
} else {
unreserved_outgoing += 1;
}
}
}

OpenCustomConnectionsNumbers {
total,
unreserved_incoming,
unreserved_outgoing,
}
}

/// Parses an address of the form `/ip4/x.x.x.x/tcp/x/p2p/xxxxxx`, and adds it
Expand Down Expand Up @@ -813,7 +839,7 @@ fn parse_and_add_to_node_store(
.peer_or_create(&who)
.add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
}

Ok(who)
}

Expand Down
2 changes: 1 addition & 1 deletion substrate/network-libp2p/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ pub enum Severity<'a> {
/// it could answer.
Useless(&'a str),
/// Peer has behaved in an invalid manner. This doesn't necessarily need to be Byzantine, but peer
/// must have taken concrete action in order to behave in such a way which is wantanly invalid.
/// must have taken concrete action in order to behave in such a way which is wantanly invalid.
Bad(&'a str),
}

Expand Down

0 comments on commit 0d99cb2

Please sign in to comment.