Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions example.pgdog.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ tls_server_ca_certificate = "relative/or/absolute/path/to/certificate.pem"
# Default: 60 seconds
shutdown_timeout = 60_000

# How long to wait for active connections to be forcibly terminated
# after shutdown_timeout expires.
#
# Default: disabled
shutdown_termination_timeout = 60_000

# OpenMetrics server port.
#
# If set, enables Prometheus-style metrics exporter.
Expand Down
11 changes: 11 additions & 0 deletions integration/complex/cancel_query/pgdog.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[general]
query_timeout = 60000
shutdown_timeout = 0
shutdown_termination_timeout = 1000

[[databases]]
name = "pgdog"
host = "127.0.0.1"

[admin]
password = "pgdog"
60 changes: 60 additions & 0 deletions integration/complex/cancel_query/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import asyncio
import sys

import asyncpg
import psycopg


SHUTDOWN_TIMEOUT = 1
SLEEP_SECONDS = 10000
APPLICATION_NAME = "pgdog_cancel_query"


async def trigger_shutdown() -> None:
conn = await asyncpg.connect(
host="127.0.0.1",
port=6432,
database="pgdog",
user="pgdog",
password="pgdog",
)

try:
await conn.execute(f"SET application_name = '{APPLICATION_NAME}'")
sleep_task = asyncio.create_task(conn.execute(f"SELECT pg_sleep({SLEEP_SECONDS})"))

# Give the backend time to register the long running query.
await asyncio.sleep(1)

admin = psycopg.connect(
"dbname=admin user=admin host=127.0.0.1 port=6432 password=pgdog"
)
admin.autocommit = True
try:
admin.execute("SHUTDOWN")
finally:
admin.close()

try:
await asyncio.wait_for(sleep_task, timeout=SHUTDOWN_TIMEOUT)
except asyncio.TimeoutError:
print("pg_sleep query did not terminate after PgDog shutdown", file=sys.stderr)
raise SystemExit(1)
except (asyncpg.exceptions.PostgresError, asyncpg.exceptions.InterfaceError):
# Expected: connection terminates as PgDog shuts down.
return
except (ConnectionError, psycopg.Error):
# psycopg errors propagate through asyncpg when connection drops.
return
else:
print("pg_sleep query completed without interruption", file=sys.stderr)
raise SystemExit(1)
finally:
try:
await conn.close()
except Exception:
pass


if __name__ == "__main__":
asyncio.run(trigger_shutdown())
30 changes: 30 additions & 0 deletions integration/complex/cancel_query/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash
set -e
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
source ${SCRIPT_DIR}/../../common.sh

APPLICATION_NAME="pgdog_cancel_query"
QUERY="SELECT COUNT(*) FROM pg_stat_activity WHERE application_name = '${APPLICATION_NAME}'"

export PGPASSWORD=pgdog

active_venv

run_pgdog "${SCRIPT_DIR}"
wait_for_pgdog

pushd ${SCRIPT_DIR}
python run.py
popd

attempts=0
until [[ "$(psql -h localhost -U pgdog -tAq -c "${QUERY}")" == "0" ]]; do
if [[ ${attempts} -ge 5 ]]; then
echo "Found lingering sessions with application_name='${APPLICATION_NAME}'" >&2
exit 1
fi
attempts=$((attempts + 1))
sleep 5
done

stop_pgdog
57 changes: 57 additions & 0 deletions integration/complex/cancel_query/users.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
[[users]]
name = "pgdog"
database = "pgdog"
password = "pgdog"

[[users]]
name = "pgdog"
database = "pgdog_sharded"
password = "pgdog"

[[users]]
name = "pgdog_session"
database = "pgdog"
password = "pgdog"
server_user = "pgdog"
pooler_mode = "session"

[[users]]
name = "pgdog_2pc"
database = "pgdog"
password = "pgdog"
server_user = "pgdog"
two_phase_commit = true
min_pool_size = 0

[[users]]
name = "pgdog_2pc"
database = "pgdog_sharded"
password = "pgdog"
server_user = "pgdog"
two_phase_commit = true
min_pool_size = 0

[[users]]
name = "pgdog_migrator"
database = "pgdog_sharded"
password = "pgdog"
server_user = "pgdog"
schema_admin = true

[[users]]
name = "pgdog"
database = "failover"
password = "pgdog"

[[users]]
name = "pgdog"
database = "single_sharded_list"
password = "pgdog"

[[users]]
name = "pgdog_no_cross_shard"
database = "single_sharded_list"
password = "pgdog"
server_user = "pgdog"
cross_shard_disabled = true
min_pool_size = 0
4 changes: 4 additions & 0 deletions integration/complex/passthrough_auth/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ PGDOG_BIN_PATH="${PGDOG_BIN:-${SCRIPT_DIR}/../../../target/release/pgdog}"
"${PGDOG_BIN_PATH}" \
--config ${SCRIPT_DIR}/pgdog-enabled.toml \
--users ${SCRIPT_DIR}/users.toml &
PGDOG_PID=$!

until pg_isready -h 127.0.0.1 -p 6432 -U pgdog -d pgdog; do
sleep 1
Expand All @@ -32,10 +33,12 @@ if [[ "$statement_timeout" != *"100ms"* ]]; then
fi

killall -TERM pgdog
wait "${PGDOG_PID}" 2> /dev/null || true

"${PGDOG_BIN_PATH}" \
--config ${SCRIPT_DIR}/pgdog-disabled.toml \
--users ${SCRIPT_DIR}/users.toml &
PGDOG_PID=$!

until pg_isready -h 127.0.0.1 -p 6432 -U pgdog -d pgdog; do
sleep 1
Expand All @@ -49,3 +52,4 @@ fi
psql -U pgdog pgdog -c 'SELECT 1' > /dev/null

killall -TERM pgdog
wait "${PGDOG_PID}" 2> /dev/null || true
1 change: 1 addition & 0 deletions integration/complex/run.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
pushd ${SCRIPT_DIR}
bash shutdown.sh
bash passthrough_auth/run.sh
bash cancel_query/run.sh
popd
19 changes: 19 additions & 0 deletions pgdog/src/config/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ pub struct General {
/// Shutdown timeout.
#[serde(default = "General::default_shutdown_timeout")]
pub shutdown_timeout: u64,
/// Shutdown termination timeout (after shutdown_timeout expires, forcibly terminate).
#[serde(default = "General::default_shutdown_termination_timeout")]
pub shutdown_termination_timeout: Option<u64>,
/// Broadcast IP.
pub broadcast_address: Option<Ipv4Addr>,
/// Broadcast port.
Expand Down Expand Up @@ -176,6 +179,7 @@ impl Default for General {
tls_verify: Self::default_tls_verify(),
tls_server_ca_certificate: Self::tls_server_ca_certificate(),
shutdown_timeout: Self::default_shutdown_timeout(),
shutdown_termination_timeout: Self::default_shutdown_termination_timeout(),
broadcast_address: Self::broadcast_address(),
broadcast_port: Self::broadcast_port(),
query_log: Self::query_log(),
Expand Down Expand Up @@ -342,6 +346,10 @@ impl General {
Self::env_or_default("PGDOG_SHUTDOWN_TIMEOUT", 60_000)
}

fn default_shutdown_termination_timeout() -> Option<u64> {
Self::env_option("PGDOG_SHUTDOWN_TERMINATION_TIMEOUT")
}

fn default_connect_timeout() -> u64 {
Self::env_or_default("PGDOG_CONNECT_TIMEOUT", 5_000)
}
Expand Down Expand Up @@ -486,6 +494,10 @@ impl General {
Duration::from_millis(self.shutdown_timeout)
}

pub fn shutdown_termination_timeout(&self) -> Option<Duration> {
self.shutdown_termination_timeout.map(Duration::from_millis)
}

/// Get TLS config, if any.
pub fn tls(&self) -> Option<(&PathBuf, &PathBuf)> {
if let Some(cert) = &self.tls_certificate {
Expand Down Expand Up @@ -654,6 +666,7 @@ mod tests {
env::set_var("PGDOG_BAN_TIMEOUT", "600000");
env::set_var("PGDOG_ROLLBACK_TIMEOUT", "10000");
env::set_var("PGDOG_SHUTDOWN_TIMEOUT", "120000");
env::set_var("PGDOG_SHUTDOWN_TERMINATION_TIMEOUT", "15000");
env::set_var("PGDOG_CONNECT_ATTEMPT_DELAY", "1000");
env::set_var("PGDOG_QUERY_TIMEOUT", "30000");
env::set_var("PGDOG_CLIENT_IDLE_TIMEOUT", "3600000");
Expand All @@ -663,6 +676,10 @@ mod tests {
assert_eq!(General::ban_timeout(), 600000);
assert_eq!(General::rollback_timeout(), 10000);
assert_eq!(General::default_shutdown_timeout(), 120000);
assert_eq!(
General::default_shutdown_termination_timeout(),
Some(15_000)
);
assert_eq!(General::default_connect_attempt_delay(), 1000);
assert_eq!(General::default_query_timeout(), 30000);
assert_eq!(General::default_client_idle_timeout(), 3600000);
Expand All @@ -672,6 +689,7 @@ mod tests {
env::remove_var("PGDOG_BAN_TIMEOUT");
env::remove_var("PGDOG_ROLLBACK_TIMEOUT");
env::remove_var("PGDOG_SHUTDOWN_TIMEOUT");
env::remove_var("PGDOG_SHUTDOWN_TERMINATION_TIMEOUT");
env::remove_var("PGDOG_CONNECT_ATTEMPT_DELAY");
env::remove_var("PGDOG_QUERY_TIMEOUT");
env::remove_var("PGDOG_CLIENT_IDLE_TIMEOUT");
Expand All @@ -681,6 +699,7 @@ mod tests {
assert_eq!(General::ban_timeout(), 300000);
assert_eq!(General::rollback_timeout(), 5000);
assert_eq!(General::default_shutdown_timeout(), 60000);
assert_eq!(General::default_shutdown_termination_timeout(), None);
assert_eq!(General::default_connect_attempt_delay(), 0);
}

Expand Down
22 changes: 22 additions & 0 deletions pgdog/src/frontend/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,28 @@ impl Listener {
"terminating {} client connections due to shutdown timeout",
comms.tracker().len()
);

// If a shutdown termination timeout is configured, enforce it here.
// This will ensure that we don't wait indefinitely for databases to respond.
if let Some(termination_timeout) =
config().config.general.shutdown_termination_timeout()
{
// Shutdown timeout elapsed; cancel any still-running queries before tearing pools down.
let cancel_futures = comms.clients().into_keys().map(|id| async move {
if let Err(err) = databases().cancel(&id).await {
error!(?id, "cancel request failed during shutdown: {err}");
}
});
let cancel_all = futures::future::join_all(cancel_futures);

if timeout(termination_timeout, cancel_all).await.is_err() {
error!(
"forced shutdown: abandoning {} outstanding cancel requests after waiting {:.3}s" ,
comms.clients().len(),
termination_timeout.as_secs_f64()
);
}
}
}

self.shutdown.notify_waiters();
Expand Down