Skip to content

Commit

Permalink
refactor!: Move local address from config to QuicP2p::bootstrap
Browse files Browse the repository at this point in the history
Keeping the local address in config involved quite a lot of shenanigans
to obtain a default value, and it doesn't provide much value. Typically
users would look at `Endpoint::local_addr`, which is read from the
underlying socket (by quinn).

This does mean passing a local address when constructing an endpoint,
but it's expected that users will have their own configuration
structures, such that this won't be too disruptive.

BREAKING CHANGE: `Config::local_ip` and `Config::local_port` have been
removed. `QuicP2p::bootstrap` and `QuicP2p::new_endpoint` now require a
local address to bind to. The `Error::UnspecifiedLocalIp` variant has
been removed.
  • Loading branch information
Chris Connelly authored and connec committed Aug 27, 2021
1 parent 9b1612a commit de9763d
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 87 deletions.
5 changes: 2 additions & 3 deletions examples/p2p_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use anyhow::Result;
use bytes::Bytes;
use qp2p::{Config, ConnId, QuicP2p};
use std::env;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::net::{Ipv4Addr, SocketAddr};

#[derive(Default, Ord, PartialEq, PartialOrd, Eq, Clone, Copy)]
struct XId(pub [u8; 32]);
Expand All @@ -38,7 +38,6 @@ async fn main() -> Result<()> {
// instantiate QuicP2p with custom config
let qp2p: QuicP2p<XId> = QuicP2p::with_config(
Some(Config {
local_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
idle_timeout_msec: Some(1000 * 3600), // 1 hour idle timeout.
..Default::default()
}),
Expand All @@ -48,7 +47,7 @@ async fn main() -> Result<()> {

// create an endpoint for us to listen on and send from.
let (node, _incoming_conns, mut incoming_messages, _disconnections) =
qp2p.new_endpoint().await?;
qp2p.new_endpoint((Ipv4Addr::LOCALHOST, 0).into()).await?;

// if we received args then we parse them as SocketAddr and send a "marco" msg to each peer.
if args.len() > 1 {
Expand Down
39 changes: 5 additions & 34 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const MAIDSAFE_DOMAIN: &str = "maidsafe.net";
/// Main QuicP2p instance to communicate with QuicP2p using an async API
#[derive(Debug, Clone)]
pub struct QuicP2p<I: ConnId> {
local_addr: SocketAddr,
bootstrap_cache: BootstrapCache,
endpoint_cfg: quinn::ServerConfig,
client_cfg: quinn::ClientConfig,
Expand Down Expand Up @@ -74,15 +73,7 @@ impl<I: ConnId> QuicP2p<I> {
use_bootstrap_cache: bool,
) -> Result<Self> {
debug!("Config passed in to qp2p: {:?}", cfg);
let cfg = unwrap_config_or_default(cfg)?;
debug!(
"Config decided on after unwrap and IGD in to qp2p: {:?}",
cfg
);

let port = cfg.local_port.unwrap_or_default();

let ip = cfg.local_ip.ok_or(Error::UnspecifiedLocalIp)?;
let cfg = cfg.unwrap_or_default();

let idle_timeout_msec = cfg.idle_timeout_msec.unwrap_or(DEFAULT_IDLE_TIMEOUT_MSEC);

Expand Down Expand Up @@ -118,8 +109,6 @@ impl<I: ConnId> QuicP2p<I> {

let qp2p_config = InternalConfig {
hard_coded_contacts: cfg.hard_coded_contacts,
local_port: port,
local_ip: ip,
forward_port: cfg.forward_port,
external_port: cfg.external_port,
external_ip: cfg.external_ip,
Expand All @@ -128,7 +117,6 @@ impl<I: ConnId> QuicP2p<I> {
};

Ok(Self {
local_addr: SocketAddr::new(ip, port),
bootstrap_cache,
endpoint_cfg,
client_cfg,
Expand Down Expand Up @@ -177,6 +165,7 @@ impl<I: ConnId> QuicP2p<I> {
/// ```
pub async fn bootstrap(
&self,
local_addr: SocketAddr,
) -> Result<(
Endpoint<I>,
IncomingConnections,
Expand All @@ -185,7 +174,7 @@ impl<I: ConnId> QuicP2p<I> {
SocketAddr,
)> {
let (endpoint, incoming_connections, incoming_message, disconnections) =
self.new_endpoint().await?;
self.new_endpoint(local_addr).await?;

let bootstrapped_peer = self
.bootstrap_with(&endpoint, endpoint.bootstrap_nodes())
Expand Down Expand Up @@ -229,6 +218,7 @@ impl<I: ConnId> QuicP2p<I> {
/// ```
pub async fn new_endpoint(
&self,
local_addr: SocketAddr,
) -> Result<(
Endpoint<I>,
IncomingConnections,
Expand All @@ -246,7 +236,7 @@ impl<I: ConnId> QuicP2p<I> {
.cloned()
.collect();

let (quinn_endpoint, quinn_incoming) = bind(self.endpoint_cfg.clone(), self.local_addr)?;
let (quinn_endpoint, quinn_incoming) = bind(self.endpoint_cfg.clone(), local_addr)?;

trace!(
"Bound endpoint to local address: {}",
Expand Down Expand Up @@ -339,22 +329,3 @@ pub(crate) fn bind(
}
}
}

fn unwrap_config_or_default(cfg: Option<Config>) -> Result<Config> {
let mut cfg = cfg.map_or(Config::default(), |cfg| cfg);

if cfg.local_ip.is_none() {
debug!("Realizing local IP by connecting to contacts");
let socket = UdpSocket::bind("0.0.0.0:0")?;
let mut local_ip = None;
for addr in cfg.hard_coded_contacts.iter() {
if let Ok(Ok(local_addr)) = socket.connect(addr).map(|()| socket.local_addr()) {
local_ip = Some(local_addr.ip());
break;
}
}
cfg.local_ip = local_ip;
};

Ok(cfg)
}
11 changes: 0 additions & 11 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,6 @@ pub struct Config {
parse(try_from_str = serde_json::from_str)
)]
pub hard_coded_contacts: HashSet<SocketAddr>,
/// Port we want to reserve for QUIC. If none supplied we'll use the OS given random port.
/// If external port is provided it means that the user is carrying out manual port forwarding and this field is mandatory.
/// This will be the internal port number mapped to the process
#[structopt(long)]
pub local_port: Option<u16>,
/// IP address for the listener. If none is supplied and `forward_port` is enabled, we will use IGD to realize the
/// local IP address of the machine. If IGD fails the application will exit.
#[structopt(long)]
pub local_ip: Option<IpAddr>,
/// Specify if port forwarding via UPnP should be done or not. This can be set to false if the network
/// is run locally on the network loopback or on a local area network.
#[structopt(long)]
Expand Down Expand Up @@ -88,8 +79,6 @@ pub struct Config {
#[derive(Clone, Debug)]
pub(crate) struct InternalConfig {
pub(crate) hard_coded_contacts: HashSet<SocketAddr>,
pub(crate) local_port: u16,
pub(crate) local_ip: IpAddr,
pub(crate) forward_port: bool,
pub(crate) external_port: Option<u16>,
pub(crate) external_ip: Option<IpAddr>,
Expand Down
9 changes: 3 additions & 6 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,27 +430,24 @@ async fn handle_endpoint_verification_req<I: ConnId>(
#[cfg(test)]
mod tests {
use crate::api::QuicP2p;
use crate::{config::Config, wire_msg::WireMsg, Error};
use crate::{config::Config, tests::local_addr, wire_msg::WireMsg, Error};
use anyhow::anyhow;
use std::net::{IpAddr, Ipv4Addr};

#[tokio::test(flavor = "multi_thread")]
async fn echo_service() -> Result<(), Error> {
let qp2p = QuicP2p::<[u8; 32]>::with_config(
Some(Config {
local_port: None,
local_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
..Config::default()
}),
Default::default(),
false,
)?;

// Create Endpoint
let (peer1, mut peer1_connections, _, _) = qp2p.new_endpoint().await?;
let (peer1, mut peer1_connections, _, _) = qp2p.new_endpoint(local_addr()).await?;
let peer1_addr = peer1.socket_addr();

let (peer2, _, _, _) = qp2p.new_endpoint().await?;
let (peer2, _, _, _) = qp2p.new_endpoint(local_addr()).await?;
let peer2_addr = peer2.socket_addr();

peer2.connect_to(&peer1_addr).await?;
Expand Down
3 changes: 0 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ pub enum Error {
/// A supposedly impossible internal error occurred
#[error("Unexpected internal error: {0}")]
UnexpectedError(String),
/// Unspecified local IP address
#[error("Unspecified Local IP address")]
UnspecifiedLocalIp,
/// Couldn't resolve Public IP address
#[error("Unresolved Public IP address")]
UnresolvedPublicIp,
Expand Down
58 changes: 31 additions & 27 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use super::{hash, new_qp2p, new_qp2p_with_hcc, random_msg};
use super::{hash, local_addr, new_qp2p, new_qp2p_with_hcc, random_msg};
use anyhow::{anyhow, Result};
use futures::{future, stream::FuturesUnordered, StreamExt};
use std::{
Expand All @@ -21,10 +21,10 @@ use tracing_test::traced_test;
#[tokio::test(flavor = "multi_thread")]
async fn successful_connection() -> Result<()> {
let qp2p = new_qp2p()?;
let (peer1, mut peer1_incoming_connections, _, _) = qp2p.new_endpoint().await?;
let (peer1, mut peer1_incoming_connections, _, _) = qp2p.new_endpoint(local_addr()).await?;
let peer1_addr = peer1.socket_addr();

let (peer2, _, _, _) = qp2p.new_endpoint().await?;
let (peer2, _, _, _) = qp2p.new_endpoint(local_addr()).await?;
peer2.connect_to(&peer1_addr).await?;
let peer2_addr = peer2.socket_addr();

Expand All @@ -41,10 +41,10 @@ async fn successful_connection() -> Result<()> {
async fn single_message() -> Result<()> {
let qp2p = new_qp2p()?;
let (peer1, mut peer1_incoming_connections, mut peer1_incoming_messages, _) =
qp2p.new_endpoint().await?;
qp2p.new_endpoint(local_addr()).await?;
let peer1_addr = peer1.socket_addr();

let (peer2, _, _, _) = qp2p.new_endpoint().await?;
let (peer2, _, _, _) = qp2p.new_endpoint(local_addr()).await?;
let peer2_addr = peer2.socket_addr();

// Peer 2 connects and sends a message
Expand Down Expand Up @@ -74,11 +74,11 @@ async fn single_message() -> Result<()> {
#[tokio::test(flavor = "multi_thread")]
async fn reuse_outgoing_connection() -> Result<()> {
let qp2p = new_qp2p()?;
let (alice, _, _, _) = qp2p.new_endpoint().await?;
let (alice, _, _, _) = qp2p.new_endpoint(local_addr()).await?;
let alice_addr = alice.socket_addr();

let (bob, mut bob_incoming_connections, mut bob_incoming_messages, _) =
qp2p.new_endpoint().await?;
qp2p.new_endpoint(local_addr()).await?;
let bob_addr = bob.socket_addr();

// Connect for the first time and send a message.
Expand Down Expand Up @@ -125,11 +125,11 @@ async fn reuse_outgoing_connection() -> Result<()> {
async fn reuse_incoming_connection() -> Result<()> {
let qp2p = new_qp2p()?;
let (alice, mut alice_incoming_connections, mut alice_incoming_messages, _) =
qp2p.new_endpoint().await?;
qp2p.new_endpoint(local_addr()).await?;
let alice_addr = alice.socket_addr();

let (bob, mut bob_incoming_connections, mut bob_incoming_messages, _) =
qp2p.new_endpoint().await?;
qp2p.new_endpoint(local_addr()).await?;
let bob_addr = bob.socket_addr();

// Connect for the first time and send a message.
Expand Down Expand Up @@ -177,11 +177,11 @@ async fn reuse_incoming_connection() -> Result<()> {
async fn disconnection() -> Result<()> {
let qp2p = new_qp2p()?;
let (alice, mut alice_incoming_connections, _, mut alice_disconnections) =
qp2p.new_endpoint().await?;
qp2p.new_endpoint(local_addr()).await?;
let alice_addr = alice.socket_addr();

let (bob, mut bob_incoming_connections, _, mut bob_disconnections) =
qp2p.new_endpoint().await?;
qp2p.new_endpoint(local_addr()).await?;
let bob_addr = bob.socket_addr();

// Alice connects to Bob who should receive an incoming connection.
Expand Down Expand Up @@ -232,11 +232,11 @@ async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
mut alice_incoming_connections,
mut alice_incoming_messages,
mut alice_disconnections,
) = qp2p.new_endpoint().await?;
) = qp2p.new_endpoint(local_addr()).await?;
let alice_addr = alice.socket_addr();

let (bob, mut bob_incoming_connections, mut bob_incoming_messages, _) =
qp2p.new_endpoint().await?;
qp2p.new_endpoint(local_addr()).await?;
let bob_addr = bob.socket_addr();

future::try_join(alice.connect_to(&bob_addr), bob.connect_to(&alice_addr)).await?;
Expand Down Expand Up @@ -308,10 +308,10 @@ async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> {
let qp2p = new_qp2p()?;
let (alice, mut alice_incoming_connections, mut alice_incoming_messages, _) =
qp2p.new_endpoint().await?;
qp2p.new_endpoint(local_addr()).await?;
let alice_addr = alice.socket_addr();

let (bob, _, mut bob_incoming_messages, _) = qp2p.new_endpoint().await?;
let (bob, _, mut bob_incoming_messages, _) = qp2p.new_endpoint(local_addr()).await?;
let bob_addr = bob.socket_addr();

// Try to establish two connections to the same peer at the same time.
Expand Down Expand Up @@ -365,7 +365,8 @@ async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
let num_messages_total: usize = 1000;

let qp2p = new_qp2p()?;
let (server_endpoint, _, mut recv_incoming_messages, _) = qp2p.new_endpoint().await?;
let (server_endpoint, _, mut recv_incoming_messages, _) =
qp2p.new_endpoint(local_addr()).await?;
let server_addr = server_endpoint.socket_addr();

let test_msgs: Vec<_> = (0..num_messages_each).map(|_| random_msg(1024)).collect();
Expand Down Expand Up @@ -419,7 +420,8 @@ async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
let messages = sending_msgs.clone();
tasks.push(tokio::spawn({
let qp2p = new_qp2p()?;
let (send_endpoint, _, mut recv_incoming_messages, _) = qp2p.new_endpoint().await?;
let (send_endpoint, _, mut recv_incoming_messages, _) =
qp2p.new_endpoint(local_addr()).await?;

async move {
let mut hash_results = BTreeSet::new();
Expand Down Expand Up @@ -471,7 +473,8 @@ async fn multiple_connections_with_many_larger_concurrent_messages() -> Result<(
let num_messages_total: usize = num_senders * num_messages_each;

let qp2p = new_qp2p()?;
let (server_endpoint, _, mut recv_incoming_messages, _) = qp2p.new_endpoint().await?;
let (server_endpoint, _, mut recv_incoming_messages, _) =
qp2p.new_endpoint(local_addr()).await?;
let server_addr = server_endpoint.socket_addr();

let test_msgs: Vec<_> = (0..num_messages_each)
Expand Down Expand Up @@ -527,7 +530,8 @@ async fn multiple_connections_with_many_larger_concurrent_messages() -> Result<(

tasks.push(tokio::spawn({
let qp2p = new_qp2p()?;
let (send_endpoint, _, mut recv_incoming_messages, _) = qp2p.new_endpoint().await?;
let (send_endpoint, _, mut recv_incoming_messages, _) =
qp2p.new_endpoint(local_addr()).await?;

async move {
let mut hash_results = BTreeSet::new();
Expand Down Expand Up @@ -586,8 +590,8 @@ async fn many_messages() -> Result<()> {
let num_messages: usize = 10_000;

let qp2p = new_qp2p()?;
let (send_endpoint, _, _, _) = qp2p.new_endpoint().await?;
let (recv_endpoint, _, mut recv_incoming_messages, _) = qp2p.new_endpoint().await?;
let (send_endpoint, _, _, _) = qp2p.new_endpoint(local_addr()).await?;
let (recv_endpoint, _, mut recv_incoming_messages, _) = qp2p.new_endpoint(local_addr()).await?;

let send_addr = send_endpoint.socket_addr();
let recv_addr = recv_endpoint.socket_addr();
Expand Down Expand Up @@ -644,17 +648,17 @@ async fn many_messages() -> Result<()> {
async fn connection_attempts_to_bootstrap_contacts_should_succeed() -> Result<()> {
let qp2p = new_qp2p()?;

let (ep1, _, _, _) = qp2p.new_endpoint().await?;
let (ep2, _, _, _) = qp2p.new_endpoint().await?;
let (ep3, _, _, _) = qp2p.new_endpoint().await?;
let (ep1, _, _, _) = qp2p.new_endpoint(local_addr()).await?;
let (ep2, _, _, _) = qp2p.new_endpoint(local_addr()).await?;
let (ep3, _, _, _) = qp2p.new_endpoint(local_addr()).await?;

let contacts = vec![ep1.socket_addr(), ep2.socket_addr(), ep3.socket_addr()]
.iter()
.cloned()
.collect::<HashSet<_>>();

let qp2p = new_qp2p_with_hcc(contacts.clone())?;
let (ep, _, _, _, bootstrapped_peer) = qp2p.bootstrap().await?;
let (ep, _, _, _, bootstrapped_peer) = qp2p.bootstrap(local_addr()).await?;

for peer in contacts {
if peer != bootstrapped_peer {
Expand All @@ -668,8 +672,8 @@ async fn connection_attempts_to_bootstrap_contacts_should_succeed() -> Result<()
async fn reachability() -> Result<()> {
let qp2p = new_qp2p()?;

let (ep1, _, _, _) = qp2p.new_endpoint().await?;
let (ep2, _, _, _) = qp2p.new_endpoint().await?;
let (ep1, _, _, _) = qp2p.new_endpoint(local_addr()).await?;
let (ep2, _, _, _) = qp2p.new_endpoint(local_addr()).await?;

if let Ok(()) = ep1.is_reachable(&"127.0.0.1:12345".parse()?).await {
anyhow!("Unexpected success");
Expand Down
Loading

0 comments on commit de9763d

Please sign in to comment.