Skip to content

Reloadable config #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .circleci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > /
# Replica/primary selection & more sharding tests
psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null

# Test reload config
kill -SIGHUP $(pgrep pgcat)

#
# ActiveRecord tests!
#
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ once_cell = "1"
statsd = "0.15"
sqlparser = "0.14"
log = "0.4"
arc-swap = "1"
71 changes: 47 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,18 @@ Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover su
**Alpha**: don't use in production just yet.

## Features

| **Feature** | **Status** | **Comments** |
|--------------------------------|--------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| Transaction pooling | :heavy_check_mark: | Identical to PgBouncer. |
| Session pooling | :heavy_check_mark: | Identical to PgBouncer. |
| `COPY` support | :heavy_check_mark: | Both `COPY TO` and `COPY FROM` are supported. |
| Query cancellation | :heavy_check_mark: | Supported both in transaction and session pooling modes. |
| Load balancing of read queries | :heavy_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). |
| Sharding | :heavy_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. |
| Failover | :heavy_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. |
| Statistics reporting | :heavy_check_mark: | Statistics similar to PgBouncers are reported via StatsD. |
| Live configuration reloading | :x: :wrench: | On the roadmap; currently config changes require restart. |
| Client authentication | :x: :wrench: | On the roadmap; currently all clients are allowed to connect and one user is used to connect to Postgres. |
| **Feature** | **Status** | **Comments** |
|--------------------------------|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| Transaction pooling | :heavy_check_mark: | Identical to PgBouncer. |
| Session pooling | :heavy_check_mark: | Identical to PgBouncer. |
| `COPY` support | :heavy_check_mark: | Both `COPY TO` and `COPY FROM` are supported. |
| Query cancellation | :heavy_check_mark: | Supported both in transaction and session pooling modes. |
| Load balancing of read queries | :heavy_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). |
| Sharding | :heavy_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. |
| Failover | :heavy_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. |
| Statistics reporting | :heavy_check_mark: | Statistics similar to PgBouncers are reported via StatsD. |
| Live configuration reloading | :construction_worker: | Reload config with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)`. Not all settings can be reloaded without a restart. |
| Client authentication | :x: :wrench: | On the roadmap; currently all clients are allowed to connect and one user is used to connect to Postgres. |

## Deployment

Expand All @@ -48,17 +47,17 @@ pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended

See [sharding README](./tests/sharding/README.md) for sharding logic testing.

| **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
| Transaction pooling | :heavy_check_mark: | :heavy_check_mark: | Used by default for all tests. |
| Session pooling | :x: | :heavy_check_mark: | Easiest way to test is to enable it and run pgbench - results will be better than transaction pooling as expected. |
| `COPY` | :heavy_check_mark: | :heavy_check_mark: | `pgbench -i` uses `COPY`. `COPY FROM` is tested as well. |
| Query cancellation | :heavy_check_mark: | :heavy_check_mark: | `psql -c 'SELECT pg_sleep(1000);'` and press `Ctrl-C`. |
| Load balancing | :x: | :heavy_check_mark: | We could test this by emitting statistics for each replica and compare them. |
| Failover | :x: | :heavy_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing could include using Toxiproxy. |
| Sharding | :heavy_check_mark: | :heavy_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. |
| Statistics reporting | :x: | :heavy_check_mark: | Run `nc -l -u 8125` and watch the stats come in every 15 seconds. |

| **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
| Transaction pooling | :heavy_check_mark: | :heavy_check_mark: | Used by default for all tests. |
| Session pooling | :x: | :heavy_check_mark: | Easiest way to test is to enable it and run pgbench - results will be better than transaction pooling as expected. |
| `COPY` | :heavy_check_mark: | :heavy_check_mark: | `pgbench -i` uses `COPY`. `COPY FROM` is tested as well. |
| Query cancellation | :heavy_check_mark: | :heavy_check_mark: | `psql -c 'SELECT pg_sleep(1000);'` and press `Ctrl-C`. |
| Load balancing | :x: | :heavy_check_mark: | We could test this by emitting statistics for each replica and compare them. |
| Failover | :x: | :heavy_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing could include using Toxiproxy. |
| Sharding | :heavy_check_mark: | :heavy_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. |
| Statistics reporting | :x: | :heavy_check_mark: | Run `nc -l -u 8125` and watch the stats come in every 15 seconds. |
| Live config reloading | :heavy_check_mark: | :heavy_check_mark: | Run `kill -s SIGHUP $(pgrep pgcat)` and watch the config reload. |

## Usage

Expand Down Expand Up @@ -173,6 +172,30 @@ SET SERVER ROLE TO 'auto'; -- let the query router figure out where the query sh
SELECT * FROM users WHERE email = 'test@example.com'; -- shard setting lasts until set again; we are reading from the primary
```

### Statistics reporting

Stats are reported using StatsD every 15 seconds. The address is configurable with `statsd_address`, the default is `127.0.0.1:8125`. The stats are very similar to what Pgbouncer reports and the names are kept to be comparable.

### Live configuration reloading

The config can be reloaded by sending a `kill -s SIGHUP` to the process. Not all settings are currently supported by live reload:

| **Config** | **Requires restart** |
|-------------------------|----------------------|
| `host` | yes |
| `port` | yes |
| `pool_mode` | no |
| `connect_timeout` | yes |
| `healthcheck_timeout` | no |
| `ban_time` | no |
| `statsd_address` | yes |
| `user` | yes |
| `shards` | yes |
| `default_role` | no |
| `primary_reads_enabled` | no |
| `query_parser_enabled` | no |


## Benchmarks

You can setup PgBench locally through PgCat:
Expand Down
3 changes: 3 additions & 0 deletions pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ healthcheck_timeout = 1000
# For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # Seconds

# Stats will be sent here
statsd_address = "127.0.0.1:8125"

#
# User to use for authentication against the server.
[user]
Expand Down
13 changes: 7 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::net::{

use std::collections::HashMap;

use crate::config::get_config;
use crate::constants::*;
use crate::errors::Error;
use crate::messages::*;
Expand Down Expand Up @@ -61,10 +62,12 @@ impl Client {
pub async fn startup(
mut stream: TcpStream,
client_server_map: ClientServerMap,
transaction_mode: bool,
server_info: BytesMut,
stats: Reporter,
) -> Result<Client, Error> {
let config = get_config();
let transaction_mode = config.general.pool_mode.starts_with("t");
drop(config);
loop {
// Could be StartupMessage or SSLRequest
// which makes this variable length.
Expand Down Expand Up @@ -154,11 +157,7 @@ impl Client {
}

/// Client loop. We handle all messages between the client and the database here.
pub async fn handle(
&mut self,
mut pool: ConnectionPool,
mut query_router: QueryRouter,
) -> Result<(), Error> {
pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> {
// The client wants to cancel a query it has issued previously.
if self.cancel_mode {
let (process_id, secret_key, address, port) = {
Expand Down Expand Up @@ -187,6 +186,8 @@ impl Client {
return Ok(Server::cancel(&address, &port, process_id, secret_key).await?);
}

let mut query_router = QueryRouter::new();

// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocols.
Expand Down
106 changes: 98 additions & 8 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use arc_swap::{ArcSwap, Guard};
use once_cell::sync::Lazy;
use serde_derive::Deserialize;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use toml;

use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use crate::errors::Error;

static CONFIG: Lazy<ArcSwap<Config>> = Lazy::new(|| ArcSwap::from_pointee(Config::default()));

#[derive(Clone, PartialEq, Deserialize, Hash, std::cmp::Eq, Debug, Copy)]
pub enum Role {
Primary,
Expand Down Expand Up @@ -39,12 +44,32 @@ pub struct Address {
pub role: Role,
}

impl Default for Address {
fn default() -> Address {
Address {
host: String::from("127.0.0.1"),
port: String::from("5432"),
shard: 0,
role: Role::Replica,
}
}
}

#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)]
pub struct User {
pub name: String,
pub password: String,
}

impl Default for User {
fn default() -> User {
User {
name: String::from("postgres"),
password: String::new(),
}
}
}

#[derive(Deserialize, Debug, Clone)]
pub struct General {
pub host: String,
Expand All @@ -54,6 +79,22 @@ pub struct General {
pub connect_timeout: u64,
pub healthcheck_timeout: u64,
pub ban_time: i64,
pub statsd_address: String,
}

impl Default for General {
fn default() -> General {
General {
host: String::from("localhost"),
port: 5432,
pool_size: 15,
pool_mode: String::from("transaction"),
connect_timeout: 5000,
healthcheck_timeout: 1000,
ban_time: 60,
statsd_address: String::from("127.0.0.1:8125"),
}
}
}

#[derive(Deserialize, Debug, Clone)]
Expand All @@ -62,13 +103,32 @@ pub struct Shard {
pub database: String,
}

impl Default for Shard {
fn default() -> Shard {
Shard {
servers: vec![(String::from("localhost"), 5432, String::from("primary"))],
database: String::from("postgres"),
}
}
}

#[derive(Deserialize, Debug, Clone)]
pub struct QueryRouter {
pub default_role: String,
pub query_parser_enabled: bool,
pub primary_reads_enabled: bool,
}

impl Default for QueryRouter {
fn default() -> QueryRouter {
QueryRouter {
default_role: String::from("any"),
query_parser_enabled: false,
primary_reads_enabled: true,
}
}
}

#[derive(Deserialize, Debug, Clone)]
pub struct Config {
pub general: General,
Expand All @@ -77,8 +137,36 @@ pub struct Config {
pub query_router: QueryRouter,
}

impl Default for Config {
fn default() -> Config {
Config {
general: General::default(),
user: User::default(),
shards: HashMap::from([(String::from("1"), Shard::default())]),
query_router: QueryRouter::default(),
}
}
}

impl Config {
pub fn show(&self) {
println!("> Pool size: {}", self.general.pool_size);
println!("> Pool mode: {}", self.general.pool_mode);
println!("> Ban time: {}s", self.general.ban_time);
println!(
"> Healthcheck timeout: {}ms",
self.general.healthcheck_timeout
);
println!("> Connection timeout: {}ms", self.general.connect_timeout);
}
}

pub fn get_config() -> Guard<Arc<Config>> {
CONFIG.load()
}

/// Parse the config.
pub async fn parse(path: &str) -> Result<Config, Error> {
pub async fn parse(path: &str) -> Result<(), Error> {
let mut contents = String::new();
let mut file = match File::open(path).await {
Ok(file) => file,
Expand Down Expand Up @@ -163,7 +251,9 @@ pub async fn parse(path: &str) -> Result<Config, Error> {
}
};

Ok(config)
CONFIG.store(Arc::new(config.clone()));

Ok(())
}

#[cfg(test)]
Expand All @@ -172,11 +262,11 @@ mod test {

#[tokio::test]
async fn test_config() {
let config = parse("pgcat.toml").await.unwrap();
assert_eq!(config.general.pool_size, 15);
assert_eq!(config.shards.len(), 3);
assert_eq!(config.shards["1"].servers[0].0, "127.0.0.1");
assert_eq!(config.shards["0"].servers[0].2, "primary");
assert_eq!(config.query_router.default_role, "any");
parse("pgcat.toml").await.unwrap();
assert_eq!(get_config().general.pool_size, 15);
assert_eq!(get_config().shards.len(), 3);
assert_eq!(get_config().shards["1"].servers[0].0, "127.0.0.1");
assert_eq!(get_config().shards["0"].servers[0].2, "primary");
assert_eq!(get_config().query_router.default_role, "any");
}
}
Loading