Skip to content

Commit

Permalink
postgres-util: aggressively offer recommended TCP connection timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
Sean Loiselle committed Aug 14, 2023
1 parent 7b4e48d commit 4b17bb3
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 133 deletions.
12 changes: 6 additions & 6 deletions src/adapter/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters {
pub fn storage_config(config: &SystemVars) -> StorageParameters {
StorageParameters {
persist: persist_config(config),
pg_replication_timeouts: mz_postgres_util::ReplicationTimeouts {
connect_timeout: Some(config.pg_replication_connect_timeout()),
keepalives_retries: Some(config.pg_replication_keepalives_retries()),
keepalives_idle: Some(config.pg_replication_keepalives_idle()),
keepalives_interval: Some(config.pg_replication_keepalives_interval()),
tcp_user_timeout: Some(config.pg_replication_tcp_user_timeout()),
pg_source_tcp_timeouts: mz_postgres_util::TcpTimeoutConfig {
connect_timeout: Some(config.pg_connection_connect_timeout()),
keepalives_retries: Some(config.pg_connection_keepalives_retries()),
keepalives_idle: Some(config.pg_connection_keepalives_idle()),
keepalives_interval: Some(config.pg_connection_keepalives_interval()),
tcp_user_timeout: Some(config.pg_connection_tcp_user_timeout()),
},
pg_source_snapshot_statement_timeout: config.pg_source_snapshot_statement_timeout(),
keep_n_source_status_history_entries: config.keep_n_source_status_history_entries(),
Expand Down
7 changes: 3 additions & 4 deletions src/postgres-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,9 @@ pub use schemas::{get_schemas, publication_info};
pub mod tunnel;
#[cfg(feature = "tunnel")]
pub use tunnel::{
drop_replication_slots, Config, ReplicationTimeouts, TunnelConfig,
DEFAULT_REPLICATION_CONNECT_TIMEOUT, DEFAULT_REPLICATION_KEEPALIVE_IDLE,
DEFAULT_REPLICATION_KEEPALIVE_INTERVAL, DEFAULT_REPLICATION_KEEPALIVE_RETRIES,
DEFAULT_REPLICATION_TCP_USER_TIMEOUT, DEFAULT_SNAPSHOT_STATEMENT_TIMEOUT,
drop_replication_slots, Config, TcpTimeoutConfig, TunnelConfig, DEFAULT_CONNECT_TIMEOUT,
DEFAULT_KEEPALIVE_IDLE, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_KEEPALIVE_RETRIES,
DEFAULT_SNAPSHOT_STATEMENT_TIMEOUT, DEFAULT_TCP_USER_TIMEOUT,
};

/// An error representing pg, ssh, ssl, and other failures.
Expand Down
88 changes: 42 additions & 46 deletions src/postgres-util/src/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,37 @@ pub enum TunnelConfig {
},
}

// Some of these defaults were recommended by @ph14
// https://github.com/MaterializeInc/materialize/pull/18644#discussion_r1160071692
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
pub const DEFAULT_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(10);
pub const DEFAULT_KEEPALIVE_IDLE: Duration = Duration::from_secs(10);
pub const DEFAULT_KEEPALIVE_RETRIES: u32 = 5;
// This is meant to be DEFAULT_REPLICATION_KEEPALIVE_IDLE
// + DEFAULT_REPLICATION_KEEPALIVE_RETRIES * DEFAULT_REPLICATION_KEEPALIVE_INTERVAL
pub const DEFAULT_TCP_USER_TIMEOUT: Duration = Duration::from_secs(60);

/// Configurable timeouts that apply only when using [`Config::connect_replication`].
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct ReplicationTimeouts {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct TcpTimeoutConfig {
pub connect_timeout: Option<Duration>,
pub keepalives_retries: Option<u32>,
pub keepalives_idle: Option<Duration>,
pub keepalives_interval: Option<Duration>,
pub tcp_user_timeout: Option<Duration>,
}

// Some of these defaults were recommended by @ph14
// https://github.com/MaterializeInc/materialize/pull/18644#discussion_r1160071692
pub const DEFAULT_REPLICATION_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
pub const DEFAULT_REPLICATION_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(10);
pub const DEFAULT_REPLICATION_KEEPALIVE_IDLE: Duration = Duration::from_secs(10);
pub const DEFAULT_REPLICATION_KEEPALIVE_RETRIES: u32 = 5;
// This is meant to be DEFAULT_REPLICATION_KEEPALIVE_IDLE
// + DEFAULT_REPLICATION_KEEPALIVE_RETRIES * DEFAULT_REPLICATION_KEEPALIVE_INTERVAL
pub const DEFAULT_REPLICATION_TCP_USER_TIMEOUT: Duration = Duration::from_secs(60);
impl Default for TcpTimeoutConfig {
fn default() -> Self {
TcpTimeoutConfig {
connect_timeout: Some(DEFAULT_CONNECT_TIMEOUT),
keepalives_retries: Some(DEFAULT_KEEPALIVE_RETRIES),
keepalives_idle: Some(DEFAULT_KEEPALIVE_IDLE),
keepalives_interval: Some(DEFAULT_KEEPALIVE_INTERVAL),
tcp_user_timeout: Some(DEFAULT_TCP_USER_TIMEOUT),
}
}
}

pub const DEFAULT_SNAPSHOT_STATEMENT_TIMEOUT: Duration = Duration::ZERO;

Expand All @@ -104,16 +116,11 @@ pub const DEFAULT_SNAPSHOT_STATEMENT_TIMEOUT: Duration = Duration::ZERO;
pub struct Config {
inner: tokio_postgres::Config,
tunnel: TunnelConfig,
replication_timeouts: ReplicationTimeouts,
}

impl Config {
pub fn new(inner: tokio_postgres::Config, tunnel: TunnelConfig) -> Result<Self, PostgresError> {
let config = Self {
inner,
tunnel,
replication_timeouts: ReplicationTimeouts::default(),
};
let config = Self { inner, tunnel }.tcp_timeouts(TcpTimeoutConfig::default());

// Early validate that the configuration contains only a single TCP
// server.
Expand All @@ -122,8 +129,22 @@ impl Config {
Ok(config)
}

pub fn replication_timeouts(mut self, replication_timeouts: ReplicationTimeouts) -> Config {
self.replication_timeouts = replication_timeouts;
pub fn tcp_timeouts(mut self, tcp_timeouts: TcpTimeoutConfig) -> Config {
if let Some(connect_timeout) = tcp_timeouts.connect_timeout {
self.inner.connect_timeout(connect_timeout);
}
if let Some(keepalives_retries) = tcp_timeouts.keepalives_retries {
self.inner.keepalives_retries(keepalives_retries);
}
if let Some(keepalives_idle) = tcp_timeouts.keepalives_idle {
self.inner.keepalives_idle(keepalives_idle);
}
if let Some(keepalives_interval) = tcp_timeouts.keepalives_interval {
self.inner.keepalives_interval(keepalives_interval);
}
if let Some(tcp_user_timeout) = tcp_timeouts.tcp_user_timeout {
self.inner.tcp_user_timeout(tcp_user_timeout);
}
self
}

Expand All @@ -135,33 +156,7 @@ impl Config {
/// Starts a replication connection to the configured PostgreSQL database.
pub async fn connect_replication(&self) -> Result<Client, PostgresError> {
self.connect_traced("postgres_connect_replication", |config| {
config
.replication_mode(ReplicationMode::Logical)
.connect_timeout(
self.replication_timeouts
.connect_timeout
.unwrap_or(DEFAULT_REPLICATION_CONNECT_TIMEOUT),
)
.keepalives_interval(
self.replication_timeouts
.keepalives_interval
.unwrap_or(DEFAULT_REPLICATION_KEEPALIVE_INTERVAL),
)
.keepalives_idle(
self.replication_timeouts
.keepalives_idle
.unwrap_or(DEFAULT_REPLICATION_KEEPALIVE_IDLE),
)
.keepalives_retries(
self.replication_timeouts
.keepalives_retries
.unwrap_or(DEFAULT_REPLICATION_KEEPALIVE_RETRIES),
)
.tcp_user_timeout(
self.replication_timeouts
.tcp_user_timeout
.unwrap_or(DEFAULT_REPLICATION_TCP_USER_TIMEOUT),
);
config.replication_mode(ReplicationMode::Logical);
})
.await
}
Expand Down Expand Up @@ -211,6 +206,7 @@ impl Config {
F: FnOnce(&mut tokio_postgres::Config),
{
let mut postgres_config = self.inner.clone();

configure(&mut postgres_config);
let mut tls = make_tls(&postgres_config)?;
match &self.tunnel {
Expand Down
99 changes: 50 additions & 49 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,10 +783,10 @@ mod upsert_rocksdb {
};
}

/// Controls the connect_timeout setting when connecting to PG via replication.
const PG_REPLICATION_CONNECT_TIMEOUT: ServerVar<Duration> = ServerVar {
name: UncasedStr::new("pg_replication_connect_timeout"),
value: &mz_postgres_util::DEFAULT_REPLICATION_CONNECT_TIMEOUT,
/// Controls the connect_timeout setting when connecting to PG via `mz_postgres_util`.
const PG_CONNECTION_CONNECT_TIMEOUT: ServerVar<Duration> = ServerVar {
name: UncasedStr::new("pg_connection_connect_timeout"),
value: &mz_postgres_util::DEFAULT_CONNECT_TIMEOUT,
description: "Sets the timeout applied to socket-level connection attempts for PG \
replication connections. (Materialize)",
internal: true,
Expand Down Expand Up @@ -822,41 +822,42 @@ static OPENTELEMETRY_FILTER: Lazy<ServerVar<CloneableEnvFilter>> = Lazy::new(||
});

/// Sets the maximum number of TCP keepalive probes that will be sent before dropping a connection
/// when connecting to PG via replication.
const PG_REPLICATION_KEEPALIVES_RETRIES: ServerVar<u32> = ServerVar {
name: UncasedStr::new("pg_replication_keepalives_retries"),
value: &mz_postgres_util::DEFAULT_REPLICATION_KEEPALIVE_RETRIES,
/// when connecting to PG via `mz_postgres_util`.
const PG_CONNECTION_KEEPALIVES_RETRIES: ServerVar<u32> = ServerVar {
name: UncasedStr::new("pg_connection_keepalives_retries"),
value: &mz_postgres_util::DEFAULT_KEEPALIVE_RETRIES,
description:
"Sets the maximum number of TCP keepalive probes that will be sent before dropping \
a connection when connecting to PG via replication. (Materialize)",
a connection when connecting to PG via `mz_postgres_util`. (Materialize)",
internal: true,
};

/// Sets the amount of idle time before a keepalive packet is sent on the connection when connecting
/// to PG via replication.
const PG_REPLICATION_KEEPALIVES_IDLE: ServerVar<Duration> = ServerVar {
name: UncasedStr::new("pg_replication_keepalives_idle"),
value: &mz_postgres_util::DEFAULT_REPLICATION_KEEPALIVE_IDLE,
/// to PG via `mz_postgres_util`.
const PG_CONNECTION_KEEPALIVES_IDLE: ServerVar<Duration> = ServerVar {
name: UncasedStr::new("pg_connection_keepalives_idle"),
value: &mz_postgres_util::DEFAULT_KEEPALIVE_IDLE,
description:
"Sets the amount of idle time before a keepalive packet is sent on the connection \
when connecting to PG via replication. (Materialize)",
when connecting to PG via `mz_postgres_util`. (Materialize)",
internal: true,
};

/// Sets the time interval between TCP keepalive probes when connecting to PG via replication.
const PG_REPLICATION_KEEPALIVES_INTERVAL: ServerVar<Duration> = ServerVar {
name: UncasedStr::new("pg_replication_keepalives_interval"),
value: &mz_postgres_util::DEFAULT_REPLICATION_KEEPALIVE_INTERVAL,
/// Sets the time interval between TCP keepalive probes when connecting to PG via `mz_postgres_util`.
const PG_CONNECTION_KEEPALIVES_INTERVAL: ServerVar<Duration> = ServerVar {
name: UncasedStr::new("pg_connection_keepalives_interval"),
value: &mz_postgres_util::DEFAULT_KEEPALIVE_INTERVAL,
description: "Sets the time interval between TCP keepalive probes when connecting to PG via \
replication. (Materialize)",
internal: true,
};

/// Sets the TCP user timeout when connecting to PG via replication.
const PG_REPLICATION_TCP_USER_TIMEOUT: ServerVar<Duration> = ServerVar {
name: UncasedStr::new("pg_replication_tcp_user_timeout"),
value: &mz_postgres_util::DEFAULT_REPLICATION_TCP_USER_TIMEOUT,
description: "Sets the TCP user timeout when connecting to PG via replication. (Materialize)",
/// Sets the TCP user timeout when connecting to PG via `mz_postgres_util`.
const PG_CONNECTION_TCP_USER_TIMEOUT: ServerVar<Duration> = ServerVar {
name: UncasedStr::new("pg_connection_tcp_user_timeout"),
value: &mz_postgres_util::DEFAULT_TCP_USER_TIMEOUT,
description:
"Sets the TCP user timeout when connecting to PG via `mz_postgres_util`. (Materialize)",
internal: true,
};

Expand Down Expand Up @@ -2104,11 +2105,11 @@ impl SystemVars {
.with_var(&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP)
.with_var(&ENABLE_LD_RBAC_CHECKS)
.with_var(&ENABLE_RBAC_CHECKS)
.with_var(&PG_REPLICATION_CONNECT_TIMEOUT)
.with_var(&PG_REPLICATION_KEEPALIVES_IDLE)
.with_var(&PG_REPLICATION_KEEPALIVES_INTERVAL)
.with_var(&PG_REPLICATION_KEEPALIVES_RETRIES)
.with_var(&PG_REPLICATION_TCP_USER_TIMEOUT)
.with_var(&PG_CONNECTION_CONNECT_TIMEOUT)
.with_var(&PG_CONNECTION_KEEPALIVES_IDLE)
.with_var(&PG_CONNECTION_KEEPALIVES_INTERVAL)
.with_var(&PG_CONNECTION_KEEPALIVES_RETRIES)
.with_var(&PG_CONNECTION_TCP_USER_TIMEOUT)
.with_var(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
.with_var(&ENABLE_LAUNCHDARKLY)
.with_var(&MAX_CONNECTIONS)
Expand Down Expand Up @@ -2537,32 +2538,32 @@ impl SystemVars {
*self.expect_value(&PERSIST_CONSENSUS_CONNECTION_POOL_TTL_STAGGER)
}

/// Returns the `pg_replication_connect_timeout` configuration parameter.
pub fn pg_replication_connect_timeout(&self) -> Duration {
*self.expect_value(&PG_REPLICATION_CONNECT_TIMEOUT)
/// Returns the `pg_connection_connect_timeout` configuration parameter.
pub fn pg_connection_connect_timeout(&self) -> Duration {
*self.expect_value(&PG_CONNECTION_CONNECT_TIMEOUT)
}

/// Returns the `pg_replication_keepalives_retries` configuration parameter.
pub fn pg_replication_keepalives_retries(&self) -> u32 {
*self.expect_value(&PG_REPLICATION_KEEPALIVES_RETRIES)
/// Returns the `pg_connection_keepalives_retries` configuration parameter.
pub fn pg_connection_keepalives_retries(&self) -> u32 {
*self.expect_value(&PG_CONNECTION_KEEPALIVES_RETRIES)
}

/// Returns the `pg_replication_keepalives_idle` configuration parameter.
pub fn pg_replication_keepalives_idle(&self) -> Duration {
*self.expect_value(&PG_REPLICATION_KEEPALIVES_IDLE)
/// Returns the `pg_connection_keepalives_idle` configuration parameter.
pub fn pg_connection_keepalives_idle(&self) -> Duration {
*self.expect_value(&PG_CONNECTION_KEEPALIVES_IDLE)
}

/// Returns the `pg_replication_keepalives_interval` configuration parameter.
pub fn pg_replication_keepalives_interval(&self) -> Duration {
*self.expect_value(&PG_REPLICATION_KEEPALIVES_INTERVAL)
/// Returns the `pg_connection_keepalives_interval` configuration parameter.
pub fn pg_connection_keepalives_interval(&self) -> Duration {
*self.expect_value(&PG_CONNECTION_KEEPALIVES_INTERVAL)
}

/// Returns the `pg_replication_tcp_user_timeout` configuration parameter.
pub fn pg_replication_tcp_user_timeout(&self) -> Duration {
*self.expect_value(&PG_REPLICATION_TCP_USER_TIMEOUT)
/// Returns the `pg_connection_tcp_user_timeout` configuration parameter.
pub fn pg_connection_tcp_user_timeout(&self) -> Duration {
*self.expect_value(&PG_CONNECTION_TCP_USER_TIMEOUT)
}

/// Returns the `pg_replication_tcp_user_timeout` configuration parameter.
/// Returns the `pg_source_snapshot_statement_timeout` configuration parameter.
pub fn pg_source_snapshot_statement_timeout(&self) -> Duration {
*self.expect_value(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
}
Expand Down Expand Up @@ -4106,11 +4107,11 @@ pub fn is_compute_config_var(name: &str) -> bool {

/// Returns whether the named variable is a storage configuration parameter.
pub fn is_storage_config_var(name: &str) -> bool {
name == PG_REPLICATION_CONNECT_TIMEOUT.name()
|| name == PG_REPLICATION_KEEPALIVES_IDLE.name()
|| name == PG_REPLICATION_KEEPALIVES_INTERVAL.name()
|| name == PG_REPLICATION_KEEPALIVES_RETRIES.name()
|| name == PG_REPLICATION_TCP_USER_TIMEOUT.name()
name == PG_CONNECTION_CONNECT_TIMEOUT.name()
|| name == PG_CONNECTION_KEEPALIVES_IDLE.name()
|| name == PG_CONNECTION_KEEPALIVES_INTERVAL.name()
|| name == PG_CONNECTION_KEEPALIVES_RETRIES.name()
|| name == PG_CONNECTION_TCP_USER_TIMEOUT.name()
|| name == PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT.name()
|| name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES.name()
|| name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION.name()
Expand Down
4 changes: 2 additions & 2 deletions src/storage-client/src/types/parameters.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package mz_storage_client.types.parameters;
message ProtoStorageParameters {
reserved 2;
mz_persist_client.cfg.ProtoPersistParameters persist = 1;
ProtoPgReplicationTimeouts pg_replication_timeouts = 3;
ProtoPgTcpTimeouts pg_tcp_timeouts = 3;
uint64 keep_n_source_status_history_entries = 4;
mz_rocksdb_types.config.ProtoRocksDbTuningParameters upsert_rocksdb_tuning_config = 5;
bool finalize_shards = 6;
Expand All @@ -34,7 +34,7 @@ message ProtoStorageParameters {
}


message ProtoPgReplicationTimeouts {
message ProtoPgTcpTimeouts {
optional mz_proto.ProtoDuration connect_timeout = 1;
optional uint32 keepalives_retries = 2;
optional mz_proto.ProtoDuration keepalives_idle = 3;
Expand Down
Loading

0 comments on commit 4b17bb3

Please sign in to comment.