Skip to content

Commit

Permalink
fix: test and connection stability
Browse files Browse the repository at this point in the history
`stream_receive_window` on connection has been reduced to reduce
memory usage.

Limiting concurrent streams seems to increase stability.
  • Loading branch information
joshuef committed Jan 31, 2022
1 parent d6f2132 commit aee6dfc
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 14 deletions.
26 changes: 21 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

//! Configuration for `Endpoint`s.
use quinn::IdleTimeout;
use quinn::{IdleTimeout, VarInt};

use rustls::{Certificate, ClientConfig, ServerName};
use serde::{Deserialize, Serialize};
Expand All @@ -34,6 +34,11 @@ pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(18);
/// gives 5-6 retries in ~30 s total retry time.
pub const DEFAULT_MAX_RETRY_INTERVAL: Duration = Duration::from_secs(15);

/// Default for [`Config::keep_alive_interval`] (30seconds).
///
/// This is a time lower than the idlea timeout, but large enough not to be consuming too much traffic
pub const DEFAULT_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10);

/// Default for [`RetryConfig::retry_delay_multiplier`] (x1.5).
///
/// Together with the default max and initial,
Expand Down Expand Up @@ -232,8 +237,11 @@ impl InternalConfig {
let upnp_lease_duration = config
.upnp_lease_duration
.unwrap_or(DEFAULT_UPNP_LEASE_DURATION);
let keep_alive_interval = config
.keep_alive_interval
.or(Some(DEFAULT_KEEP_ALIVE_INTERVAL));

let transport = Self::new_transport_config(idle_timeout, config.keep_alive_interval);
let transport = Self::new_transport_config(idle_timeout, keep_alive_interval);

// setup certificates
let mut roots = rustls::RootCertStore::empty();
Expand Down Expand Up @@ -274,12 +282,20 @@ impl InternalConfig {
) -> Arc<quinn::TransportConfig> {
let mut config = quinn::TransportConfig::default();

// QUIC encodes idle timeout in a varint with max size 2^62, which is below what can be
// represented by Duration. For now, just ignore too large idle timeouts.
// FIXME: don't ignore (e.g. clamp/error/panic)?
let _ = config.max_idle_timeout(Some(idle_timeout));
let _ = config.keep_alive_interval(keep_alive_interval);

// This has a bearing on max memory: https://docs.rs/quinn/latest/quinn/struct.TransportConfig.html#method.max_concurrent_bidi_streams
// here we/ve lowered it from 1250000
// eg this reduced max mem for: multiple_connections_with_many_larger_concurrent_messages
// from ~1gb to 300mb
let _ = config.stream_receive_window(VarInt::from_u32(500));

// defaults here are 100, we've lowered this here to prevent hammering of nodes...
// a lower count appears to reduce the change of dropped connections
let _ = config.max_concurrent_bidi_streams(VarInt::from_u32(10));
let _ = config.max_concurrent_uni_streams(VarInt::from_u32(10));

Arc::new(config)
}

Expand Down
7 changes: 5 additions & 2 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ const INCOMING_MESSAGE_BUFFER_LEN: usize = 10_000;
// TODO: this seems arbitrary - it may need tuned or made configurable.
const ENDPOINT_VERIFICATION_TIMEOUT: Duration = Duration::from_secs(30);

// Error reason for closing a connection when triggered manually by qp2p apis
const QP2P_CLOSED_CONNECTION: &str = "The connection was closed intentionally by qp2p.";

/// The sending API for a connection.
#[derive(Clone)]
pub struct Connection {
Expand Down Expand Up @@ -146,11 +149,11 @@ impl Connection {
/// [`ConnectionError::Closed`]`(`[`Close::Local`]`)`, and data on unfinished streams is not
/// guaranteed to be delivered.
pub fn close(&self, reason: Option<String>) {
let reason = reason
.unwrap_or_else(|| "The connection was closed intentionally by qp2p.".to_string());
let reason = reason.unwrap_or_else(|| QP2P_CLOSED_CONNECTION.to_string());
self.inner.close(0u8.into(), &reason.into_bytes());
}

/// Opens a uni directional stream and sends message on this stream
async fn send_uni(&self, msg: Bytes, priority: i32) -> Result<(), SendError> {
let mut send_stream = self.open_uni().await.map_err(SendError::ConnectionLost)?;
send_stream.set_priority(priority);
Expand Down
2 changes: 1 addition & 1 deletion src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl Endpoint {
/// Close all the connections of this endpoint immediately and stop accepting new connections.
pub fn close(&self) {
let _ = self.termination_tx.send(());
self.quinn_endpoint.close(0_u32.into(), b"")
self.quinn_endpoint.close(0_u32.into(), b"Endpoint closed")
}

/// Attempt a connection to a node_addr.
Expand Down
12 changes: 6 additions & 6 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,14 +339,14 @@ async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> {
async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
use futures::future;

let num_senders: usize = 10;
let num_messages_each: usize = 100;
let num_messages_total: usize = 1000;
let num_senders: usize = 200;
let num_messages_each: usize = 1000;
let num_messages_total: usize = num_senders * num_messages_each;

let (server_endpoint, mut recv_incoming_connections, _) = new_endpoint().await?;
let server_addr = server_endpoint.public_addr();

let test_msgs: Vec<_> = (0..num_messages_each).map(|_| random_msg(1024)).collect();
let test_msgs: Vec<_> = (0..num_messages_each).map(|_| random_msg(200)).collect();
let sending_msgs = test_msgs.clone();

let mut tasks = Vec::new();
Expand Down Expand Up @@ -453,8 +453,8 @@ async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
#[tokio::test(flavor = "multi_thread")]
#[traced_test]
async fn multiple_connections_with_many_larger_concurrent_messages() -> Result<()> {
let num_senders: usize = 10;
let num_messages_each: usize = 100;
let num_senders: usize = 100;
let num_messages_each: usize = 10;
let num_messages_total: usize = num_senders * num_messages_each;

let (server_endpoint, recv_incoming_connections, _) = new_endpoint().await?;
Expand Down

0 comments on commit aee6dfc

Please sign in to comment.