diff --git a/CHANGELOG.md b/CHANGELOG.md index f618e502..2d4c9077 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [0.11.6](https://github.com/maidsafe/qp2p/compare/v0.11.5...v0.11.6) (2021-04-07) + + +### Bug Fixes + +* **bootstrap:** fix stalled connections w/multiple bootstrap contacts ([cd02b6a](https://github.com/maidsafe/qp2p/commit/cd02b6a5d8819ac96b997b9271c712b2f679be8a)) + ### [0.11.5](https://github.com/maidsafe/qp2p/compare/v0.11.4...v0.11.5) (2021-04-06) diff --git a/Cargo.toml b/Cargo.toml index 07fe2921..04a25cb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ homepage = "https://maidsafe.net" license = "MIT OR BSD-3-Clause" readme = "README.md" repository = "https://github.com/maidsafe/qp2p" -version = "0.11.5" +version = "0.11.6" authors = [ "MaidSafe Developers " ] keywords = [ "quic" ] edition = "2018" diff --git a/src/api.rs b/src/api.rs index bea1c638..ecc1d312 100644 --- a/src/api.rs +++ b/src/api.rs @@ -14,7 +14,7 @@ use super::{ error::{Error, Result}, peer_config::{self, DEFAULT_IDLE_TIMEOUT_MSEC, DEFAULT_KEEP_ALIVE_INTERVAL_MSEC}, }; -use futures::{future, TryFutureExt}; +use futures::future; use log::{debug, error, info, trace}; use std::net::{SocketAddr, UdpSocket}; use std::path::PathBuf; @@ -179,13 +179,13 @@ impl QuicP2p { return Err(Error::EmptyBootstrapNodesList); } - // Attempt to connect to all nodes and return the first one to succeed + // Attempt to create a new connection to all nodes and return the first one to succeed let tasks = endpoint .bootstrap_nodes() .iter() - .map(|addr| Box::pin(endpoint.connect_to(addr).map_ok(move |()| *addr))); + .map(|addr| Box::pin(endpoint.create_new_connection(addr))); - let bootstrapped_peer = future::select_ok(tasks) + let successful_connection = future::select_ok(tasks) .await .map_err(|err| { error!("Failed to bootstrap to the network: {}", err); @@ -193,6 +193,9 @@ impl QuicP2p { })? .0; + let bootstrapped_peer = successful_connection.connection.remote_address(); + endpoint.add_new_connection_to_pool(successful_connection); + Ok(( endpoint, incoming_connections, diff --git a/src/endpoint.rs b/src/endpoint.rs index b8b21de8..cf7e6d40 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -371,24 +371,42 @@ impl Endpoint { trace!("Successfully connected to peer: {}", node_addr); + self.add_new_connection_to_pool(new_conn); + + self.connection_deduplicator + .complete(node_addr, Ok(())) + .await; + + Ok(()) + } + + /// Creates a fresh connection without looking at the connection pool and connection duplicator. + pub(crate) async fn create_new_connection( + &self, + peer_addr: &SocketAddr, + ) -> Result { + let new_connection = self + .quic_endpoint + .connect_with(self.client_cfg.clone(), peer_addr, CERT_SERVER_NAME)? + .await?; + + trace!("Successfully created new connection to peer: {}", peer_addr); + Ok(new_connection) + } + + pub(crate) fn add_new_connection_to_pool(&self, conn: quinn::NewConnection) { let guard = self .connection_pool - .insert(*node_addr, new_conn.connection.clone()); + .insert(conn.connection.remote_address(), conn.connection); listen_for_incoming_messages( - new_conn.uni_streams, - new_conn.bi_streams, + conn.uni_streams, + conn.bi_streams, guard, self.message_tx.clone(), self.disconnection_tx.clone(), self.clone(), ); - - self.connection_deduplicator - .complete(node_addr, Ok(())) - .await; - - Ok(()) } /// Get an existing connection for the peer address. diff --git a/src/tests/common.rs b/src/tests/common.rs index b041244a..204aa7df 100644 --- a/src/tests/common.rs +++ b/src/tests/common.rs @@ -1,8 +1,8 @@ -use super::{new_qp2p, random_msg}; +use super::{new_qp2p, new_qp2p_with_hcc, random_msg}; use crate::utils; use anyhow::{anyhow, Result}; use futures::future; -use std::time::Duration; +use std::{collections::HashSet, time::Duration}; use tokio::time::timeout; #[tokio::test] @@ -417,3 +417,30 @@ async fn many_messages() -> Result<()> { let _ = future::try_join_all(tasks).await?; Ok(()) } + +// When we bootstrap with multiple bootstrap contacts, we will use the first connection +// that succeeds. We should still be able to establish a connection with the rest of the +// bootstrap contacts later. +#[tokio::test] +async fn connection_attempts_to_bootstrap_contacts_should_succeed() -> Result<()> { + let qp2p = new_qp2p()?; + + let (ep1, _, _, _) = qp2p.new_endpoint().await?; + let (ep2, _, _, _) = qp2p.new_endpoint().await?; + let (ep3, _, _, _) = qp2p.new_endpoint().await?; + + let contacts = vec![ep1.socket_addr(), ep2.socket_addr(), ep3.socket_addr()] + .iter() + .cloned() + .collect::>(); + + let qp2p = new_qp2p_with_hcc(contacts.clone())?; + let (ep, _, _, _, bootstrapped_peer) = qp2p.bootstrap().await?; + + for peer in contacts { + if peer != bootstrapped_peer { + ep.connect_to(&peer).await?; + } + } + Ok(()) +}