Skip to content

Commit

Permalink
feat(upnp): add config to disable port forwarding for clients
Browse files Browse the repository at this point in the history
  • Loading branch information
lionel-faber committed Nov 10, 2020
1 parent 2167ead commit 4a14488
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 45 deletions.
2 changes: 1 addition & 1 deletion examples/echo_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn main() -> Result<(), Error> {
&bootstrap_nodes,
false,
)?;
let mut endpoint = qp2p.new_endpoint()?;
let endpoint = qp2p.new_endpoint()?;
let socket_addr = endpoint.socket_addr().await?;
println!("Process running at: {}", &socket_addr);
if genesis {
Expand Down
30 changes: 19 additions & 11 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub struct QuicP2p {
bootstrap_cache: BootstrapCache,
endpoint_cfg: quinn::ServerConfig,
client_cfg: quinn::ClientConfig,
upnp_lease_duration: u32,
qp2p_config: Config,
}

impl QuicP2p {
Expand Down Expand Up @@ -157,6 +157,8 @@ impl QuicP2p {
.clone()
.map(|custom_dir| Dirs::Overide(OverRide::new(&custom_dir)));

let mut qp2p_config = cfg.clone();

let mut bootstrap_cache =
BootstrapCache::new(cfg.hard_coded_contacts, custom_dirs.as_ref())?;
if use_bootstrap_cache {
Expand All @@ -178,13 +180,19 @@ impl QuicP2p {
.upnp_lease_duration
.unwrap_or(DEFAULT_UPNP_LEASE_DURATION_SEC);

qp2p_config.ip = Some(ip);
qp2p_config.port = Some(port);
qp2p_config.keep_alive_interval_msec = Some(keep_alive_interval_msec);
qp2p_config.idle_timeout_msec = Some(idle_timeout_msec);
qp2p_config.upnp_lease_duration = Some(upnp_lease_duration);

Ok(Self {
local_addr: SocketAddr::new(ip, port),
allow_random_port,
bootstrap_cache,
endpoint_cfg,
client_cfg,
upnp_lease_duration,
qp2p_config,
})
}

Expand All @@ -209,7 +217,7 @@ impl QuicP2p {
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// config.port = Some(3000);
/// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
/// let mut endpoint = quic_p2p.new_endpoint()?;
/// let endpoint = quic_p2p.new_endpoint()?;
/// let peer_addr = endpoint.socket_addr().await?;
///
/// config.port = Some(3001);
Expand All @@ -233,21 +241,21 @@ impl QuicP2p {
// Attempt to connect to all nodes and return the first one to succeed
let mut tasks = Vec::default();
for node_addr in bootstrap_nodes.iter().cloned() {
let qp2p_config = self.qp2p_config.clone();
let nodes = bootstrap_nodes.clone();
let endpoint_cfg = self.endpoint_cfg.clone();
let client_cfg = self.client_cfg.clone();
let local_addr = self.local_addr;
let allow_random_port = self.allow_random_port;
let upnp_lease_duration = self.upnp_lease_duration;
let task_handle = tokio::spawn(async move {
new_connection_to(
&node_addr,
endpoint_cfg,
client_cfg,
local_addr,
allow_random_port,
upnp_lease_duration,
nodes,
qp2p_config,
)
.await
});
Expand Down Expand Up @@ -278,14 +286,14 @@ impl QuicP2p {
/// let mut config = Config::default();
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
/// let mut peer_1 = quic_p2p.new_endpoint()?;
/// let peer_1 = quic_p2p.new_endpoint()?;
/// let peer1_addr = peer_1.socket_addr().await?;
///
/// let (peer_2, connection) = quic_p2p.connect_to(&peer1_addr).await?;
/// Ok(())
/// }
/// ```
pub async fn connect_to(&mut self, node_addr: &SocketAddr) -> Result<(Endpoint, Connection)> {
pub async fn connect_to(&self, node_addr: &SocketAddr) -> Result<(Endpoint, Connection)> {
let bootstrap_nodes: Vec<SocketAddr> = self
.bootstrap_cache
.peers()
Expand All @@ -301,8 +309,8 @@ impl QuicP2p {
self.client_cfg.clone(),
self.local_addr,
self.allow_random_port,
self.upnp_lease_duration,
bootstrap_nodes,
self.qp2p_config.clone(),
)
.await
}
Expand Down Expand Up @@ -349,8 +357,8 @@ impl QuicP2p {
quinn_endpoint,
quinn_incoming,
self.client_cfg.clone(),
self.upnp_lease_duration,
bootstrap_nodes,
self.qp2p_config.clone(),
)?;

Ok(endpoint)
Expand All @@ -366,8 +374,8 @@ async fn new_connection_to(
client_cfg: quinn::ClientConfig,
local_addr: SocketAddr,
allow_random_port: bool,
upnp_lease_duration: u32,
bootstrap_nodes: Vec<SocketAddr>,
qp2p_config: Config,
) -> Result<(Endpoint, Connection)> {
trace!("Attempting to connect to peer: {}", node_addr);

Expand All @@ -379,8 +387,8 @@ async fn new_connection_to(
quinn_endpoint,
quinn_incoming,
client_cfg,
upnp_lease_duration,
bootstrap_nodes,
qp2p_config,
)?;
let connection = endpoint.connect_to(node_addr).await?;

Expand Down
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub struct Config {
/// Duration of a UPnP port mapping.
#[structopt(long)]
pub upnp_lease_duration: Option<u32>,
/// Specify if port forwarding via UPnP should be done or not
#[structopt(long)]
pub forward_port: bool,
}

impl Config {
Expand Down
4 changes: 2 additions & 2 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Connection {
/// let mut config = Config::default();
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
/// let mut peer_1 = quic_p2p.new_endpoint()?;
/// let peer_1 = quic_p2p.new_endpoint()?;
/// let peer1_addr = peer_1.socket_addr().await?;
///
/// let (peer_2, connection) = quic_p2p.connect_to(&peer1_addr).await?;
Expand All @@ -74,7 +74,7 @@ impl Connection {
/// let mut config = Config::default();
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
/// let mut peer_1 = quic_p2p.new_endpoint()?;
/// let peer_1 = quic_p2p.new_endpoint()?;
/// let peer1_addr = peer_1.socket_addr().await?;
///
/// let (peer_2, connection) = quic_p2p.connect_to(&peer1_addr).await?;
Expand Down
56 changes: 32 additions & 24 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ use super::wire_msg::WireMsg;
use super::{
connections::{Connection, IncomingConnections},
error::Result,
Config,
};
use futures::{lock::Mutex, FutureExt};
use futures::lock::Mutex;
use log::trace;
use log::{debug, info};
use std::{net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::time::timeout;

/// Host name of the Quic communication certificate used by peers
// FIXME: make it configurable
Expand All @@ -30,8 +32,8 @@ pub struct Endpoint {
quic_endpoint: quinn::Endpoint,
quic_incoming: Arc<Mutex<quinn::Incoming>>,
client_cfg: quinn::ClientConfig,
upnp_lease_duration: u32,
bootstrap_nodes: Vec<SocketAddr>,
qp2p_config: Config,
}

impl std::fmt::Debug for Endpoint {
Expand All @@ -50,17 +52,17 @@ impl Endpoint {
quic_endpoint: quinn::Endpoint,
quic_incoming: quinn::Incoming,
client_cfg: quinn::ClientConfig,
upnp_lease_duration: u32,
bootstrap_nodes: Vec<SocketAddr>,
qp2p_config: Config,
) -> Result<Self> {
let local_addr = quic_endpoint.local_addr()?;
Ok(Self {
local_addr,
quic_endpoint,
quic_incoming: Arc::new(Mutex::new(quic_incoming)),
client_cfg,
upnp_lease_duration,
bootstrap_nodes,
qp2p_config,
})
}

Expand All @@ -71,7 +73,7 @@ impl Endpoint {

/// Returns the socket address of the endpoint
pub async fn socket_addr(&self) -> Result<SocketAddr> {
if cfg!(test) {
if cfg!(test) || !self.qp2p_config.forward_port {
self.local_addr().await
} else {
self.public_addr().await
Expand All @@ -94,7 +96,17 @@ impl Endpoint {
let mut addr = None;

// Attempt to use IGD for port forwarding
match tokio::time::timeout(std::time::Duration::from_secs(30), forward_port(self.local_addr, self.upnp_lease_duration)).await {
match timeout(
Duration::from_secs(30),
forward_port(
self.local_addr,
self.qp2p_config.upnp_lease_duration.ok_or_else(|| {
Error::Unexpected("Missing UPnP config parameter".to_string())
})?,
),
)
.await
{
Ok(res) => {
match res {
Ok(public_sa) => {
Expand All @@ -114,32 +126,28 @@ impl Endpoint {
}

// Try to contact an echo service
match tokio::time::timeout(std::time::Duration::from_secs(30), self.query_ip_echo_service()).await {
Ok(res) => {
match res {
Ok(echo_res) => match addr {
None => {
addr = Some(echo_res);
}
Some(address) => {
info!("Got response from echo service: {:?}, but IGD has already provided our external address: {:?}", echo_res, address);
}
},
Err(err) => {
info!("Could not contact echo service: {} - {:?}", err, err);
match timeout(Duration::from_secs(30), self.query_ip_echo_service()).await {
Ok(res) => match res {
Ok(echo_res) => match addr {
None => {
addr = Some(echo_res);
}
Some(address) => {
info!("Got response from echo service: {:?}, but IGD has already provided our external address: {:?}", echo_res, address);
}
},
Err(err) => {
info!("Could not contact echo service: {} - {:?}", err, err);
}
},
Err(e) => {
info!("Echo service timed out: {:?}", e)
}
Err(e) => info!("Echo service timed out: {:?}", e),
}
if let Some(socket_addr) = addr {
Ok(socket_addr)
} else {
Err(Error::Unexpected(
"No response from echo service".to_string(),
))
))
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ fn random_msg() -> Bytes {
#[tokio::test]
async fn successful_connection() -> Result<()> {
let qp2p = new_qp2p();
let mut peer1 = qp2p.new_endpoint()?;
let peer1 = qp2p.new_endpoint()?;
let peer1_addr = peer1.socket_addr().await?;

let mut peer2 = qp2p.new_endpoint()?;
let peer2 = qp2p.new_endpoint()?;
let _connection = peer2.connect_to(&peer1_addr).await?;

let mut incoming_conn = peer1.listen()?;
Expand All @@ -53,7 +53,7 @@ async fn successful_connection() -> Result<()> {
#[tokio::test]
async fn bi_directional_streams() -> Result<()> {
let qp2p = new_qp2p();
let mut peer1 = qp2p.new_endpoint()?;
let peer1 = qp2p.new_endpoint()?;
let peer1_addr = peer1.socket_addr().await?;

let peer2 = qp2p.new_endpoint()?;
Expand Down Expand Up @@ -109,11 +109,11 @@ async fn bi_directional_streams() -> Result<()> {
#[tokio::test]
async fn uni_directional_streams() -> Result<()> {
let qp2p = new_qp2p();
let mut peer1 = qp2p.new_endpoint()?;
let peer1 = qp2p.new_endpoint()?;
let peer1_addr = peer1.socket_addr().await?;
let mut incoming_conn_peer1 = peer1.listen()?;

let mut peer2 = qp2p.new_endpoint()?;
let peer2 = qp2p.new_endpoint()?;
let peer2_addr = peer2.socket_addr().await?;
let mut incoming_conn_peer2 = peer2.listen()?;

Expand Down
4 changes: 2 additions & 2 deletions src/tests/echo_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async fn echo_service() -> Result<()> {
false,
)?;
// Create Endpoint
let mut peer1 = qp2p.new_endpoint()?;
let peer1 = qp2p.new_endpoint()?;
let peer1_addr = peer1.socket_addr().await?;

// Listen for messages / connections at peer 1
Expand All @@ -29,7 +29,7 @@ async fn echo_service() -> Result<()> {

// In parallel create another endpoint and send an EchoServiceReq
let handle2 = tokio::spawn(async move {
let mut peer2 = qp2p.new_endpoint()?;
let peer2 = qp2p.new_endpoint()?;
let socket_addr = peer2.socket_addr().await?;
let connection = peer2.connect_to(&peer1_addr).await?;
let (mut send_stream, mut recv_stream) = connection.open_bi_stream().await?;
Expand Down

0 comments on commit 4a14488

Please sign in to comment.