Skip to content

Commit

Permalink
feat: support rds_proxy=true on postgres connections
Browse files Browse the repository at this point in the history
  • Loading branch information
tmm1 committed Oct 9, 2024
1 parent 00f0d73 commit b37fbc8
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 6 deletions.
9 changes: 9 additions & 0 deletions quaint/src/connector/connection_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,15 @@ impl ConnectionInfo {
}
}

/// Whether the rdsproxy mode is enabled.
pub fn rds_proxy(&self) -> bool {
match self {
#[cfg(all(not(target_arch = "wasm32"), feature = "postgresql"))]
ConnectionInfo::Native(NativeConnectionInfo::Postgres(url)) => url.rds_proxy(),
_ => false,
}
}

/// A string describing the database location, meant for error messages. It will be the host
/// and port on MySQL/Postgres, and the file path on SQLite.
pub fn database_location(&self) -> String {
Expand Down
14 changes: 12 additions & 2 deletions quaint/src/connector/postgres/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl Debug for PostgresClient {
pub struct PostgreSql {
client: PostgresClient,
pg_bouncer: bool,
rds_proxy: bool,
socket_timeout: Option<Duration>,
statement_cache: Mutex<StatementCache>,
is_healthy: AtomicBool,
Expand Down Expand Up @@ -164,6 +165,9 @@ impl PostgresUrl {
pub(crate) fn cache(&self) -> StatementCache {
if self.query_params.pg_bouncer {
StatementCache::new(0)
} else if self.query_params.rds_proxy {
// RDS proxy supports statement caching as of Nov 2023: https://aws.amazon.com/blogs/database/amazon-rds-proxy-multiplexing-support-for-postgresql-extended-query-protocol/
StatementCache::new(self.query_params.statement_cache_size)
} else {
StatementCache::new(self.query_params.statement_cache_size)
}
Expand Down Expand Up @@ -202,7 +206,8 @@ impl PostgresUrl {
config.host(self.host());
config.port(self.port());
config.dbname(self.dbname());
config.pgbouncer_mode(self.query_params.pg_bouncer);
// rust-postgres' pgbouncer_mode only explictly disables internal statement caching for enum and other custom types. This is necessary with PGBouncer so DEALLOCATE ALL doesn't break things. If the user asked to disable statement cache altogether, we request the same of the rust-postgres type cache.
config.pgbouncer_mode(self.query_params.pg_bouncer || self.query_params.statement_cache_size == 0);

if let Some(options) = self.options() {
config.options(options);
Expand Down Expand Up @@ -261,7 +266,7 @@ impl PostgreSql {
}
}));

// On Postgres, we set the SEARCH_PATH and client-encoding through client connection parameters to save a network roundtrip on connection.
// On Postgres, we set the SEARCH_PATH and client-encoding through client connection parameters, when possible, to save a network roundtrip on connection. In unsupported cases like PGBouncer, we fallback to a simple SET query here.
// We can't always do it for CockroachDB because it does not expect quotes for unsafe identifiers (https://github.com/cockroachdb/cockroach/issues/101328), which might change once the issue is fixed.
// To circumvent that problem, we only set the SEARCH_PATH through client connection parameters for Cockroach when the identifier is safe, so that the quoting does not matter.
// Finally, to ensure backward compatibility, we keep sending a database query in case the flavour is set to Unknown.
Expand All @@ -285,6 +290,7 @@ impl PostgreSql {
client: PostgresClient(client),
socket_timeout: url.query_params.socket_timeout,
pg_bouncer: url.query_params.pg_bouncer,
rds_proxy: url.query_params.rds_proxy,
statement_cache: Mutex::new(url.cache()),
is_healthy: AtomicBool::new(true),
is_cockroachdb,
Expand Down Expand Up @@ -744,6 +750,10 @@ impl Queryable for PostgreSql {
async fn server_reset_query(&self, tx: &dyn Transaction) -> crate::Result<()> {
if self.pg_bouncer {
tx.raw_cmd("DEALLOCATE ALL").await
} else if self.rds_proxy {
// Sending DEALLOCATE ALL to RDS proxy will force connection-pinning: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/rds-proxy-pinning.html#rds-proxy-pinning.postgres
// It is also unnecessary as RDS supports statement caching. Further, `server_reset_query = DEALLOCATE ALL` is a pgbouncer-specific hack. See https://github.com/prisma/prisma-engines/commit/ccf54b230bea8dd3e1d005f3a8b6c3150db4e62e and https://github.com/sfackler/rust-postgres/commit/100e4cf
Ok(())
} else {
Ok(())
}
Expand Down
13 changes: 13 additions & 0 deletions quaint/src/connector/postgres/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ impl PostgresUrl {
self.query_params.pg_bouncer
}

/// Whether the rdsproxy mode is enabled.
pub fn rds_proxy(&self) -> bool {
self.query_params.rds_proxy
}

/// The connection timeout.
pub fn connect_timeout(&self) -> Option<Duration> {
self.query_params.connect_timeout
Expand Down Expand Up @@ -225,6 +230,7 @@ impl PostgresUrl {
let mut connect_timeout = Some(Duration::from_secs(5));
let mut pool_timeout = Some(Duration::from_secs(10));
let mut pg_bouncer = false;
let mut rds_proxy = false;
let mut statement_cache_size = 100;
let mut max_connection_lifetime = None;
let mut max_idle_connection_lifetime = Some(Duration::from_secs(300));
Expand All @@ -237,6 +243,11 @@ impl PostgresUrl {
.parse()
.map_err(|_| Error::builder(ErrorKind::InvalidConnectionArguments).build())?;
}
"rdsproxy" => {
rds_proxy = v
.parse()
.map_err(|_| Error::builder(ErrorKind::InvalidConnectionArguments).build())?;
}
#[cfg(feature = "postgresql-native")]
"sslmode" => {
match v.as_ref() {
Expand Down Expand Up @@ -382,6 +393,7 @@ impl PostgresUrl {
pool_timeout,
socket_timeout,
pg_bouncer,
rds_proxy,
statement_cache_size,
max_connection_lifetime,
max_idle_connection_lifetime,
Expand Down Expand Up @@ -414,6 +426,7 @@ pub(crate) struct PostgresUrlQueryParams {
pub(crate) connection_limit: Option<usize>,
pub(crate) schema: Option<String>,
pub(crate) pg_bouncer: bool,
pub(crate) rds_proxy: bool,
pub(crate) host: Option<String>,
pub(crate) socket_timeout: Option<Duration>,
pub(crate) connect_timeout: Option<Duration>,
Expand Down
4 changes: 2 additions & 2 deletions quaint/src/pooled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,13 @@ impl Builder {

fn log_start(info: &ConnectionInfo, connection_limit: usize) {
let family = info.sql_family();
let pg_bouncer = if info.pg_bouncer() { " in PgBouncer mode" } else { "" };
let suffix = if info.pg_bouncer() { " in PgBouncer mode" } else if info.rds_proxy() { " in RDS Proxy mode" } else { "" };

tracing::info!(
"Starting a {} pool with {} connections{}.",
family,
connection_limit,
pg_bouncer
suffix
);
}
}
Expand Down
4 changes: 2 additions & 2 deletions quaint/src/single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ impl Quaint {
#[cfg(native)]
fn log_start(info: &ConnectionInfo) {
let family = info.sql_family();
let pg_bouncer = if info.pg_bouncer() { " in PgBouncer mode" } else { "" };
let suffix = if info.pg_bouncer() { " in PgBouncer mode" } else if info.rds_proxy() { " in RDS Proxy mode" } else { "" };

tracing::info!("Starting a {} connection{}.", family, pg_bouncer);
tracing::info!("Starting a {} connection{}.", family, suffix);
}
}

Expand Down

0 comments on commit b37fbc8

Please sign in to comment.