Skip to content

Commit

Permalink
Add keepalive and message size limits
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Apr 3, 2024
1 parent 8df5a1d commit 1477ca3
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 20 deletions.
45 changes: 40 additions & 5 deletions consensus/config/src/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ pub struct Parameters {
/// Anemo network settings.
#[serde(default = "AnemoParameters::default")]
pub anemo: AnemoParameters,

/// Tonic network settings.
#[serde(default = "TonicParameters::default")]
pub tonic: TonicParameters,
}

impl Parameters {
Expand Down Expand Up @@ -82,6 +86,7 @@ impl Default for Parameters {
max_forward_time_drift: Parameters::default_max_forward_time_drift(),
db_path: None,
anemo: AnemoParameters::default(),
tonic: TonicParameters::default(),
}
}
}
Expand All @@ -93,7 +98,13 @@ pub struct AnemoParameters {
///
/// If unspecified, this will default to 8 MiB.
#[serde(default = "AnemoParameters::default_excessive_message_size")]
excessive_message_size: usize,
pub excessive_message_size: usize,
}

impl AnemoParameters {
fn default_excessive_message_size() -> usize {
8 << 20
}
}

impl Default for AnemoParameters {
Expand All @@ -104,12 +115,36 @@ impl Default for AnemoParameters {
}
}

impl AnemoParameters {
pub fn excessive_message_size(&self) -> usize {
self.excessive_message_size
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct TonicParameters {
/// Keepalive interval and timeouts for both client and server, tcp and http2.
///
/// If unspecified, this will default to 10s.
#[serde(default = "TonicParameters::default_keepalive_interval")]
pub keepalive_interval: Duration,

/// Message size limits for both requests and responses.
///
/// If unspecified, this will default to 8MiB.
#[serde(default = "TonicParameters::default_message_size_limit")]
pub message_size_limit: usize,
}

impl TonicParameters {
fn default_keepalive_interval() -> Duration {
Duration::from_secs(10)
}

fn default_excessive_message_size() -> usize {
fn default_message_size_limit() -> usize {
8 << 20
}
}

impl Default for TonicParameters {
fn default() -> Self {
Self {
keepalive_interval: TonicParameters::default_keepalive_interval(),
message_size_limit: TonicParameters::default_message_size_limit(),
}
}
}
4 changes: 2 additions & 2 deletions consensus/core/src/network/anemo_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl<S: NetworkService> NetworkManager<S> for AnemoManager {
)
.layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
inbound_network_metrics,
self.context.parameters.anemo.excessive_message_size(),
self.context.parameters.anemo.excessive_message_size,
)))
.layer(SetResponseHeaderLayer::overriding(
EPOCH_HEADER_KEY.parse().unwrap(),
Expand All @@ -372,7 +372,7 @@ impl<S: NetworkService> NetworkManager<S> for AnemoManager {
)
.layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
outbound_network_metrics,
self.context.parameters.anemo.excessive_message_size(),
self.context.parameters.anemo.excessive_message_size,
)))
.layer(SetRequestHeaderLayer::overriding(
EPOCH_HEADER_KEY.parse().unwrap(),
Expand Down
30 changes: 24 additions & 6 deletions consensus/core/src/network/tonic_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ impl TonicClient {
peer: AuthorityIndex,
timeout: Duration,
) -> ConsensusResult<ConsensusServiceClient<Channel>> {
let config = &self.context.parameters.tonic;
let channel = self.channel_pool.get_channel(peer, timeout).await?;
Ok(ConsensusServiceClient::new(channel))
Ok(ConsensusServiceClient::new(channel)
.max_encoding_message_size(config.message_size_limit)
.max_decoding_message_size(config.message_size_limit))
}
}

Expand Down Expand Up @@ -238,12 +241,19 @@ impl ChannelPool {
ConsensusError::NetworkError(format!("Cannot convert address to host:port: {e:?}"))
})?;
let address = format!("http://{address}");
let config = &self.context.parameters.tonic;
let endpoint = Channel::from_shared(address.clone())
.unwrap()
.connect_timeout(timeout)
.initial_connection_window_size(64 << 20)
.initial_stream_window_size(32 << 20)
.buffer_size(64 << 20);
.buffer_size(64 << 20)
.keep_alive_while_idle(true)
.keep_alive_timeout(config.keepalive_interval)
.http2_keep_alive_interval(config.keepalive_interval)
.tcp_keepalive(Some(config.keepalive_interval))
.user_agent("mysticeti")
.unwrap();
// TODO: tune endpoint options and set TLS config.

let deadline = tokio::time::Instant::now() + timeout;
Expand Down Expand Up @@ -445,11 +455,19 @@ impl<S: NetworkService> NetworkManager<S> for TonicManager {
let (tx, rx) = oneshot::channel::<()>();
self.shutdown = Some(tx);
let service = TonicServiceProxy::new(self.context.clone(), service);
let config = &self.context.parameters.tonic;

let server = Server::builder()
.initial_connection_window_size(64 << 20)
.initial_stream_window_size(32 << 20)
.add_service(ConsensusServiceServer::new(service))
.http2_keepalive_interval(Some(config.keepalive_interval))
.http2_keepalive_timeout(Some(config.keepalive_interval))
.tcp_keepalive(Some(config.keepalive_interval))
.add_service(
ConsensusServiceServer::new(service)
.max_encoding_message_size(config.message_size_limit)
.max_decoding_message_size(config.message_size_limit),
)
.serve_with_shutdown(own_address, async move {
match rx.await {
Ok(()) => {
Expand All @@ -463,13 +481,13 @@ impl<S: NetworkService> NetworkManager<S> for TonicManager {

self.server.spawn(async move {
if let Err(e) = server.await {
warn!("TonicNetwork server failed: {e:?}");
warn!("Tonic server failed: {e:?}");
} else {
info!("TonicNetwork server stopped");
info!("Tonic server stopped");
}
});

info!("TonicNetwork server started at: {own_address}");
info!("Tonic server started at: {own_address}");
}

async fn stop(&mut self) {
Expand Down
12 changes: 5 additions & 7 deletions crates/sui-core/src/consensus_manager/mysticeti_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,11 @@ impl ConsensusManagerTrait for MysticetiManager {
let epoch = epoch_store.epoch();
let protocol_config = epoch_store.protocol_config();
let network_type = match std::env::var("CONSENSUS_NETWORK") {
Ok(type_str) => {
match type_str.to_lowercase().as_str() {
"tonic" => NetworkType::Tonic,
"anemo" => NetworkType::Anemo,
_ => NetworkType::Anemo,
}
}
Ok(type_str) => match type_str.to_lowercase().as_str() {
"tonic" => NetworkType::Tonic,
"anemo" => NetworkType::Anemo,
_ => NetworkType::Anemo,
},
Err(_) => NetworkType::Anemo,
};

Expand Down

0 comments on commit 1477ca3

Please sign in to comment.