Skip to content

Live reloading entire config and bug fixes #84

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ default_role = "any"
# every incoming query to determine if it's a read or a write.
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
# we'll direct it to the primary.
query_parser_enabled = false
query_parser_enabled = true

# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
Expand Down
12 changes: 10 additions & 2 deletions .circleci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,15 @@ pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null

# Query cancellation test
(psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(5)' || true) &
(psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(50)' || true) &
sleep 1
killall psql -s SIGINT

# Reload pool (closing unused server connections)
psql -U sharding_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD'

(psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(50)' || true) &
sleep 1
killall psql -s SIGINT

# Sharding insert
Expand Down Expand Up @@ -94,7 +102,7 @@ toxiproxy-cli toxic remove --toxicName latency_downstream postgres_replica
start_pgcat "info"

# Test session mode (and config reload)
sed -i 's/pool_mode = "transaction"/pool_mode = "session"/' pgcat.toml
sed -i 's/pool_mode = "transaction"/pool_mode = "session"/' .circleci/pgcat.toml

# Reload config test
kill -SIGHUP $(pgrep pgcat)
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "0.2.1-beta1"
version = "0.4.0-beta1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down Expand Up @@ -28,4 +28,4 @@ parking_lot = "0.11"
hmac = "0.12"
sha2 = "0.10"
base64 = "0.13"
stringprep = "0.1"
stringprep = "0.1"
2 changes: 1 addition & 1 deletion pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ default_role = "any"
# every incoming query to determine if it's a read or a write.
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
# we'll direct it to the primary.
query_parser_enabled = false
query_parser_enabled = true

# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
Expand Down
33 changes: 13 additions & 20 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ use log::{info, trace};
use std::collections::HashMap;
use tokio::net::tcp::OwnedWriteHalf;

use crate::config::{get_config, parse};
use crate::config::{get_config, reload_config};
use crate::errors::Error;
use crate::messages::*;
use crate::pool::ConnectionPool;
use crate::stats::get_stats;
use crate::ClientServerMap;

/// Handle admin client.
pub async fn handle_admin(
stream: &mut OwnedWriteHalf,
mut query: BytesMut,
pool: ConnectionPool,
client_server_map: ClientServerMap,
) -> Result<(), Error> {
let code = query.get_u8() as char;

Expand All @@ -34,7 +36,7 @@ pub async fn handle_admin(
show_stats(stream, &pool).await
} else if query.starts_with("RELOAD") {
trace!("RELOAD");
reload(stream).await
reload(stream, client_server_map).await
} else if query.starts_with("SHOW CONFIG") {
trace!("SHOW CONFIG");
show_config(stream).await
Expand Down Expand Up @@ -143,10 +145,7 @@ async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
/// Show utilization of connection pools for each shard and replicas.
async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let stats = get_stats();
let config = {
let guard = get_config();
&*guard.clone()
};
let config = get_config();

let columns = vec![
("database", DataType::Text),
Expand Down Expand Up @@ -199,9 +198,7 @@ async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul

/// Show shards and replicas.
async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let guard = get_config();
let config = &*guard.clone();
drop(guard);
let config = get_config();

// Columns
let columns = vec![
Expand Down Expand Up @@ -266,17 +263,15 @@ async fn ignore_set(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
}

/// Reload the configuration file without restarting the process.
async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
async fn reload(
stream: &mut OwnedWriteHalf,
client_server_map: ClientServerMap,
) -> Result<(), Error> {
info!("Reloading config");

let config = get_config();
let path = config.path.clone().unwrap();

parse(&path).await?;

let config = get_config();
reload_config(client_server_map).await?;

config.show();
get_config().show();

let mut res = BytesMut::new();

Expand All @@ -292,10 +287,8 @@ async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {

/// Shows current configuration.
async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
let guard = get_config();
let config = &*guard.clone();
let config = &get_config();
let config: HashMap<String, String> = config.into();
drop(guard);

// Configs that cannot be changed without restarting.
let immutables = ["host", "port", "connect_timeout"];
Expand Down
106 changes: 71 additions & 35 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use crate::config::get_config;
use crate::constants::*;
use crate::errors::Error;
use crate::messages::*;
use crate::pool::{ClientServerMap, ConnectionPool};
use crate::pool::{get_pool, ClientServerMap};
use crate::query_router::{Command, QueryRouter};
use crate::server::Server;
use crate::stats::Reporter;
use crate::stats::{get_reporter, Reporter};

/// The client state. One of these is created per client.
pub struct Client {
Expand Down Expand Up @@ -69,12 +69,11 @@ impl Client {
pub async fn startup(
mut stream: TcpStream,
client_server_map: ClientServerMap,
server_info: BytesMut,
stats: Reporter,
) -> Result<Client, Error> {
let config = get_config().clone();
let transaction_mode = config.general.pool_mode.starts_with("t");
// drop(config);
let config = get_config();
let transaction_mode = config.general.pool_mode == "transaction";
let stats = get_reporter();

loop {
trace!("Waiting for StartupMessage");

Expand Down Expand Up @@ -154,9 +153,10 @@ impl Client {
debug!("Password authentication successful");

auth_ok(&mut stream).await?;
write_all(&mut stream, server_info).await?;
write_all(&mut stream, get_pool().server_info()).await?;
backend_key_data(&mut stream, process_id, secret_key).await?;
ready_for_query(&mut stream).await?;

trace!("Startup OK");

let database = parameters
Expand Down Expand Up @@ -221,7 +221,7 @@ impl Client {
}

/// Handle a connected and authenticated client.
pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> {
pub async fn handle(&mut self) -> Result<(), Error> {
// The client wants to cancel a query it has issued previously.
if self.cancel_mode {
trace!("Sending CancelRequest");
Expand Down Expand Up @@ -252,13 +252,19 @@ impl Client {
return Ok(Server::cancel(&address, &port, process_id, secret_key).await?);
}

// The query router determines where the query is going to go,
// e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new();
let mut round_robin = 0;

// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocol.
loop {
trace!("Client idle, waiting for message");
trace!(
"Client idle, waiting for message, transaction mode: {}",
self.transaction_mode
);

// Read a complete message from the client, which normally would be
// either a `Q` (query) or `P` (prepare, extended protocol).
Expand All @@ -267,32 +273,63 @@ impl Client {
// SET SHARDING KEY TO 'bigint';
let mut message = read_message(&mut self.read).await?;

// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = get_pool();

// Avoid taking a server if the client just wants to disconnect.
if message[0] as char == 'X' {
trace!("Client disconnecting");
debug!("Client disconnecting");
return Ok(());
}

// Handle admin database queries.
if self.admin {
trace!("Handling admin command");
handle_admin(&mut self.write, message, pool.clone()).await?;
debug!("Handling admin command");
handle_admin(
&mut self.write,
message,
pool.clone(),
self.client_server_map.clone(),
)
.await?;
continue;
}

let current_shard = query_router.shard();

// Handle all custom protocol commands, if any.
match query_router.try_execute_command(message.clone()) {
// Normal query, not a custom command.
None => {
// Attempt to infer which server we want to query, i.e. primary or replica.
if query_router.query_parser_enabled() && query_router.role() == None {
query_router.infer_role(message.clone());
}
}
None => (),

// SET SHARD TO
Some((Command::SetShard, _)) => {
custom_protocol_response_ok(&mut self.write, "SET SHARD").await?;
// Selected shard is not configured.
if query_router.shard() >= pool.shards() {
// Set the shard back to what it was.
query_router.set_shard(current_shard);

error_response(
&mut self.write,
&format!(
"shard {} is more than configured {}, staying on shard {}",
query_router.shard(),
pool.shards(),
current_shard,
),
)
.await?;
} else {
custom_protocol_response_ok(&mut self.write, "SET SHARD").await?;
}
continue;
}

// SET PRIMARY READS TO
Some((Command::SetPrimaryReads, _)) => {
custom_protocol_response_ok(&mut self.write, "SET PRIMARY READS").await?;
continue;
}

Expand All @@ -319,27 +356,24 @@ impl Client {
show_response(&mut self.write, "shard", &value).await?;
continue;
}
};

// Make sure we selected a valid shard.
if query_router.shard() >= pool.shards() {
error_response(
&mut self.write,
&format!(
"shard {} is more than configured {}",
query_router.shard(),
pool.shards()
),
)
.await?;
continue;
}
// SHOW PRIMARY READS
Some((Command::ShowPrimaryReads, value)) => {
show_response(&mut self.write, "primary reads", &value).await?;
continue;
}
};

debug!("Waiting for connection from pool");

// Grab a server from the pool.
let connection = match pool
.get(query_router.shard(), query_router.role(), self.process_id)
.get(
query_router.shard(),
query_router.role(),
self.process_id,
round_robin,
)
.await
{
Ok(conn) => {
Expand All @@ -358,6 +392,8 @@ impl Client {
let address = connection.1;
let server = &mut *reference;

round_robin += 1;

// Server is assigned to the client in case the client wants to
// cancel a query later.
server.claim(self.process_id, self.secret_key);
Expand Down
Loading