Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Permit concurrent dialing attempts per peer. #1506

Merged
merged 9 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,7 @@ where
TPeerId: Clone + Send + 'static,
{
let endpoint = info.to_connected_point();
if let Some(limit) = self.limits.max_incoming {
let current = self.iter_pending_incoming().count();
if current >= limit {
return Err(ConnectionLimit { limit, current })
}
}
self.limits.check_incoming(|| self.iter_pending_incoming().count())?;
Ok(self.add_pending(future, handler, endpoint, None))
}

Expand Down Expand Up @@ -267,6 +262,11 @@ where
TPeerId: Clone + Send + 'static,
{
self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?;

if let Some(peer) = &info.peer_id {
self.limits.check_outgoing_per_peer(|| self.num_peer_outgoing(peer))?;
}

let endpoint = info.to_connected_point();
Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
}
Expand Down Expand Up @@ -465,6 +465,13 @@ where
self.established.get(peer).map_or(0, |conns| conns.len())
}

/// Counts the number of pending outgoing connections to the given peer.
pub fn num_peer_outgoing(&self, peer: &TPeerId) -> usize {
self.iter_pending_outgoing()
.filter(|info| info.peer_id == Some(peer))
.count()
}

/// Returns an iterator over all established connections of `peer`.
pub fn iter_peer_established<'a>(&'a mut self, peer: &TPeerId)
-> EstablishedConnectionIter<'a,
Expand Down Expand Up @@ -837,6 +844,7 @@ pub struct PoolLimits {
pub max_outgoing: Option<usize>,
pub max_incoming: Option<usize>,
pub max_established_per_peer: Option<usize>,
pub max_outgoing_per_peer: Option<usize>,
}

impl PoolLimits {
Expand All @@ -854,6 +862,20 @@ impl PoolLimits {
Self::check(current, self.max_outgoing)
}

fn check_incoming<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_incoming)
}

fn check_outgoing_per_peer<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_outgoing_per_peer)
}

fn check<F>(current: F, limit: Option<usize>) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
Expand Down
74 changes: 44 additions & 30 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::{
};
use fnv::{FnvHashMap};
use futures::{prelude::*, future};
use smallvec::SmallVec;
use std::{
collections::hash_map,
convert::TryFrom as _,
Expand Down Expand Up @@ -78,21 +79,17 @@ where

/// The ongoing dialing attempts.
///
/// The `Network` enforces a single ongoing dialing attempt per peer,
/// even if multiple (established) connections per peer are allowed.
/// However, a single dialing attempt operates on a list of addresses
/// to connect to, which can be extended with new addresses while
/// the connection attempt is still in progress. Thereby each
/// dialing attempt is associated with a new connection and hence a new
/// connection ID.
/// There may be multiple ongoing dialing attempts to the same peer.
/// Each dialing attempt is associated with a new connection and hence
/// a new connection ID.
///
/// > **Note**: `dialing` must be consistent with the pending outgoing
/// > connections in `pool`. That is, for every entry in `dialing`
/// > there must exist a pending outgoing connection in `pool` with
/// > the same connection ID. This is ensured by the implementation of
/// > `Network` (see `dial_peer_impl` and `on_connection_failed`)
/// > together with the implementation of `DialingConnection::abort`.
dialing: FnvHashMap<TPeerId, peer::DialingAttempt>,
/// > together with the implementation of `DialingAttempt::abort`.
dialing: FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
}

impl<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
Expand Down Expand Up @@ -381,8 +378,11 @@ where
Poll::Pending => return Poll::Pending,
Poll::Ready(PoolEvent::ConnectionEstablished { connection, num_established }) => {
match self.dialing.entry(connection.peer_id().clone()) {
hash_map::Entry::Occupied(e) if e.get().id == connection.id() => {
e.remove();
hash_map::Entry::Occupied(mut e) => {
e.get_mut().retain(|s| s.current.0 != connection.id());
if e.get().is_empty() {
e.remove();
}
},
_ => {}
}
Expand Down Expand Up @@ -453,7 +453,7 @@ fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans, TConnInfo, TPee
transport: TTrans,
pool: &mut Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error, TConnInfo, TPeerId>,
dialing: &mut FnvHashMap<TPeerId, peer::DialingAttempt>,
dialing: &mut FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
opts: DialingOpts<TPeerId, THandler>
) -> Result<ConnectionId, ConnectionLimit>
where
Expand Down Expand Up @@ -489,14 +489,12 @@ where
};

if let Ok(id) = &result {
let former = dialing.insert(opts.peer,
peer::DialingAttempt {
id: *id,
current: opts.address,
next: opts.remaining,
dialing.entry(opts.peer).or_default().push(
peer::DialingState {
current: (*id, opts.address),
remaining: opts.remaining,
},
);
debug_assert!(former.is_none());
}

result
Expand All @@ -508,7 +506,7 @@ where
/// If the failed connection attempt was a dialing attempt and there
/// are more addresses to try, new `DialingOpts` are returned.
fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>(
dialing: &mut FnvHashMap<TPeerId, peer::DialingAttempt>,
dialing: &mut FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
id: ConnectionId,
endpoint: ConnectedPoint,
error: PendingConnectionError<TTrans::Error>,
Expand All @@ -521,26 +519,33 @@ where
TPeerId: Eq + Hash + Clone,
{
// Check if the failed connection is associated with a dialing attempt.
// TODO: could be more optimal than iterating over everything
let dialing_peer = dialing.iter() // (1)
.find(|(_, a)| a.id == id)
.map(|(p, _)| p.clone());
let dialing_failed = dialing.iter_mut()
.find_map(|(peer, attempts)| {
if let Some(pos) = attempts.iter().position(|s| s.current.0 == id) {
let attempt = attempts.remove(pos);
let last = attempts.is_empty();
Some((peer.clone(), attempt, last))
} else {
None
}
});

if let Some(peer_id) = dialing_peer {
// A pending outgoing connection to a known peer failed.
let mut attempt = dialing.remove(&peer_id).expect("by (1)");
if let Some((peer_id, mut attempt, last)) = dialing_failed {
if last {
dialing.remove(&peer_id);
}

let num_remain = u32::try_from(attempt.next.len()).unwrap();
let failed_addr = attempt.current.clone();
let num_remain = u32::try_from(attempt.remaining.len()).unwrap();
let failed_addr = attempt.current.1.clone();

let opts =
if num_remain > 0 {
let next_attempt = attempt.next.remove(0);
let next_attempt = attempt.remaining.remove(0);
let opts = DialingOpts {
peer: peer_id.clone(),
handler,
address: next_attempt,
remaining: attempt.next
remaining: attempt.remaining
};
Some(opts)
} else {
Expand Down Expand Up @@ -575,9 +580,13 @@ where
/// Information about the network obtained by [`Network::info()`].
#[derive(Clone, Debug)]
pub struct NetworkInfo {
/// The total number of connected peers.
pub num_peers: usize,
/// The total number of connections, both established and pending.
pub num_connections: usize,
/// The total number of pending connections, both incoming and outgoing.
pub num_connections_pending: usize,
/// The total number of established connections.
pub num_connections_established: usize,
}

Expand Down Expand Up @@ -627,4 +636,9 @@ impl NetworkConfig {
self.pool_limits.max_established_per_peer = Some(n);
self
}

pub fn set_outgoing_per_peer_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_outgoing_per_peer = Some(n);
self
}
}
Loading