Skip to content

Adds tests and move non-admin connection rejection to startup #145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 15 additions & 27 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ pub struct Client<S, T> {

/// Used to notify clients about an impending shutdown
shutdown: Receiver<()>,

// Allow only admin connections.
admin_only: bool,
}

/// Client entrypoint.
Expand Down Expand Up @@ -224,17 +221,7 @@ pub async fn client_entrypoint(
let (read, write) = split(stream);

// Continue with cancel query request.
match Client::cancel(
read,
write,
addr,
bytes,
client_server_map,
shutdown,
admin_only,
)
.await
{
match Client::cancel(read, write, addr, bytes, client_server_map, shutdown).await {
Ok(mut client) => {
info!("Client {:?} issued a cancel query request", addr);

Expand Down Expand Up @@ -384,6 +371,20 @@ where
.count()
== 1;

// Kick any client that's not admin while we're in admin-only mode.
if !admin && admin_only {
debug!(
"Rejecting non-admin connection to {} when in admin only mode",
pool_name
);
error_response_terminal(
&mut write,
&format!("terminating connection due to administrator command"),
)
.await?;
return Err(Error::ShuttingDown);
}

// Generate random backend ID and secret key
let process_id: i32 = rand::random();
let secret_key: i32 = rand::random();
Expand Down Expand Up @@ -494,7 +495,6 @@ where
username: username.clone(),
shutdown,
connected_to_server: false,
admin_only,
});
}

Expand All @@ -506,7 +506,6 @@ where
mut bytes: BytesMut, // The rest of the startup message.
client_server_map: ClientServerMap,
shutdown: Receiver<()>,
admin_only: bool,
) -> Result<Client<S, T>, Error> {
let process_id = bytes.get_i32();
let secret_key = bytes.get_i32();
Expand All @@ -529,7 +528,6 @@ where
username: String::from("undefined"),
shutdown,
connected_to_server: false,
admin_only,
});
}

Expand Down Expand Up @@ -565,16 +563,6 @@ where
return Ok(Server::cancel(&address, port, process_id, secret_key).await?);
}

// Kick any client that's not admin while we're in admin-only mode.
if !self.admin && self.admin_only {
error_response_terminal(
&mut self.write,
&format!("terminating connection due to administrator command"),
)
.await?;
return Err(Error::ShuttingDown);
}

// The query router determines where the query is going to go,
// e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new();
Expand Down
137 changes: 111 additions & 26 deletions tests/python/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
def pgcat_start():
pg_cat_send_signal(signal.SIGTERM)
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
time.sleep(2)


def pg_cat_send_signal(signal: signal.Signals):
Expand All @@ -27,11 +28,23 @@ def pg_cat_send_signal(signal: signal.Signals):
raise Exception("pgcat not closed after SIGTERM")


def connect_normal_db(
autocommit: bool = False,
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:

if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"

conn = psycopg2.connect(
f"postgres://sharding_user:sharding_user@{PGCAT_HOST}:{PGCAT_PORT}/sharded_db?application_name=testing_pgcat"
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
Expand All @@ -45,19 +58,15 @@ def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.


def test_normal_db_access():
conn, cur = connect_normal_db()
conn, cur = connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
cleanup_conn(conn, cur)


def test_admin_db_access():
conn = psycopg2.connect(
f"postgres://admin_user:admin_pass@{PGCAT_HOST}:{PGCAT_PORT}/pgcat"
)
conn.autocommit = True # BEGIN/COMMIT is not supported by admin db
cur = conn.cursor()
conn, cur = connect_db(admin=True)

cur.execute("SHOW POOLS")
res = cur.fetchall()
Expand All @@ -67,15 +76,14 @@ def test_admin_db_access():

def test_shutdown_logic():

##### NO ACTIVE QUERIES SIGINT HANDLING #####
# - - - - - - - - - - - - - - - - - -
# NO ACTIVE QUERIES SIGINT HANDLING

# Start pgcat
pgcat_start()

# Wait for server to fully start up
time.sleep(2)

# Create client connection and send query (not in transaction)
conn, cur = connect_normal_db(True)
conn, cur = connect_db()

cur.execute("BEGIN;")
cur.execute("SELECT 1;")
Expand All @@ -97,17 +105,14 @@ def test_shutdown_logic():
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)

##### END #####
# - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH SIGINT

##### HANDLE TRANSACTION WITH SIGINT #####
# Start pgcat
pgcat_start()

# Wait for server to fully start up
time.sleep(2)

# Create client connection and begin transaction
conn, cur = connect_normal_db(True)
conn, cur = connect_db()

cur.execute("BEGIN;")
cur.execute("SELECT 1;")
Expand All @@ -126,17 +131,97 @@ def test_shutdown_logic():
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)

##### END #####
# - - - - - - - - - - - - - - - - - -
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat
pgcat_start()

# Create client connection and begin transaction
transaction_conn, transaction_cur = connect_db()

transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")

# Send sigint to pgcat while still in transaction
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)

##### HANDLE SHUTDOWN TIMEOUT WITH SIGINT #####
start = time.perf_counter()
try:
conn, cur = connect_db()
cur.execute("SELECT 1;")
cleanup_conn(conn, cur)
except psycopg2.OperationalError as e:
time_taken = time.perf_counter() - start
if time_taken > 0.1:
raise Exception(
"Failed to reject connection within 0.1 seconds, got", time_taken, "seconds")
pass
else:
raise Exception("Able connect to database during shutdown")

cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM)

# - - - - - - - - - - - - - - - - - -
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat
pgcat_start()

# Wait for server to fully start up
time.sleep(3)
# Create client connection and begin transaction
transaction_conn, transaction_cur = connect_db()

transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")

# Send sigint to pgcat while still in transaction
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)

try:
conn, cur = connect_db(admin=True)
cur.execute("SHOW DATABASES;")
cleanup_conn(conn, cur)
except psycopg2.OperationalError as e:
raise Exception(e)

cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM)

# - - - - - - - - - - - - - - - - - -
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
# Start pgcat
pgcat_start()

# Create client connection and begin transaction
transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")

admin_conn, admin_cur = connect_db(admin=True)
admin_cur.execute("SHOW DATABASES;")

# Send sigint to pgcat while still in transaction
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)

try:
admin_cur.execute("SHOW DATABASES;")
except psycopg2.OperationalError as e:
raise Exception("Could not execute admin command:", e)

cleanup_conn(transaction_conn, transaction_cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)

# - - - - - - - - - - - - - - - - - -
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT

# Start pgcat
pgcat_start()

# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
conn, cur = connect_normal_db(True)
conn, cur = connect_db()

cur.execute("BEGIN;")
cur.execute("SELECT 1;")
Expand All @@ -159,7 +244,7 @@ def test_shutdown_logic():
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)

##### END #####
# - - - - - - - - - - - - - - - - - -


test_normal_db_access()
Expand Down