Skip to content

Don't drop connections if DB hasn't changed #175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use log::{error, info};
use once_cell::sync::Lazy;
use serde_derive::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::hash::{Hash, Hasher};
use std::path::Path;
use std::sync::Arc;
use tokio::fs::File;
Expand Down Expand Up @@ -122,7 +122,7 @@ impl Address {
}

/// PostgreSQL user.
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Serialize, Deserialize, Debug)]
#[derive(Clone, PartialEq, Hash, Eq, Serialize, Deserialize, Debug)]
pub struct User {
pub username: String,
pub password: String,
Expand Down Expand Up @@ -232,7 +232,7 @@ impl Default for General {
/// Pool mode:
/// - transaction: server serves one transaction,
/// - session: server is attached to the client.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Copy)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
pub enum PoolMode {
#[serde(alias = "transaction", alias = "Transaction")]
Transaction,
Expand All @@ -250,7 +250,7 @@ impl ToString for PoolMode {
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct Pool {
#[serde(default = "Pool::default_pool_mode")]
pub pool_mode: PoolMode,
Expand All @@ -263,11 +263,35 @@ pub struct Pool {
#[serde(default)] // False
pub primary_reads_enabled: bool,

#[serde(default = "General::default_connect_timeout")]
pub connect_timeout: u64,

pub sharding_function: String,
pub shards: HashMap<String, Shard>,
pub users: HashMap<String, User>,
}

impl Hash for Pool {
fn hash<H: Hasher>(&self, state: &mut H) {
self.pool_mode.hash(state);
self.default_role.hash(state);
self.query_parser_enabled.hash(state);
self.primary_reads_enabled.hash(state);
self.sharding_function.hash(state);
self.connect_timeout.hash(state);

for (key, value) in &self.shards {
key.hash(state);
value.hash(state);
}

for (key, value) in &self.users {
key.hash(state);
value.hash(state);
}
}
}

impl Pool {
fn default_pool_mode() -> PoolMode {
PoolMode::Transaction
Expand All @@ -284,6 +308,7 @@ impl Default for Pool {
query_parser_enabled: false,
primary_reads_enabled: false,
sharding_function: "pg_bigint_hash".to_string(),
connect_timeout: General::default_connect_timeout(),
}
}
}
Expand All @@ -296,7 +321,7 @@ pub struct ServerConfig {
}

/// Shard configuration.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Hash, Eq)]
pub struct Shard {
pub database: String,
pub servers: Vec<ServerConfig>,
Expand Down Expand Up @@ -575,7 +600,10 @@ pub async fn parse(path: &str) -> Result<(), Error> {
None => (),
};

for (pool_name, pool) in &config.pools {
for (pool_name, mut pool) in &mut config.pools {
// Copy the connect timeout over for hashing.
pool.connect_timeout = config.general.connect_timeout;

match pool.sharding_function.as_ref() {
"pg_bigint_hash" => (),
"sha1" => (),
Expand Down Expand Up @@ -666,7 +694,7 @@ pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, E
let new_config = get_config();

if old_config.pools != new_config.pools {
info!("Pool configuration changed, re-creating server pools");
info!("Pool configuration changed");
ConnectionPool::from_config(client_server_map).await?;
Ok(true)
} else if old_config != new_config {
Expand Down
21 changes: 19 additions & 2 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use once_cell::sync::Lazy;
use parking_lot::{Mutex, RwLock};
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -26,6 +26,8 @@ pub type PoolMap = HashMap<(String, String), ConnectionPool>;
/// This is atomic and safe and read-optimized.
/// The pool is recreated dynamically when the config is reloaded.
pub static POOLS: Lazy<ArcSwap<PoolMap>> = Lazy::new(|| ArcSwap::from_pointee(HashMap::default()));
static POOLS_HASH: Lazy<ArcSwap<HashSet<crate::config::Pool>>> =
Lazy::new(|| ArcSwap::from_pointee(HashSet::default()));

/// Pool settings.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -101,9 +103,23 @@ impl ConnectionPool {
let mut new_pools = HashMap::new();
let mut address_id = 0;

let mut pools_hash = (*(*POOLS_HASH.load())).clone();

for (pool_name, pool_config) in &config.pools {
let changed = pools_hash.insert(pool_config.clone());

if !changed {
info!("[db: {}] has not changed", pool_name);
continue;
}

// There is one pool per database/user pair.
for (_, user) in &pool_config.users {
info!(
"[pool: {}][user: {}] creating new pool",
pool_name, user.username
);

let mut shards = Vec::new();
let mut addresses = Vec::new();
let mut banlist = Vec::new();
Expand Down Expand Up @@ -156,7 +172,7 @@ impl ConnectionPool {
let pool = Pool::builder()
.max_size(user.pool_size)
.connection_timeout(std::time::Duration::from_millis(
config.general.connect_timeout,
pool_config.connect_timeout,
))
.test_on_check_out(false)
.build(manager)
Expand Down Expand Up @@ -217,6 +233,7 @@ impl ConnectionPool {
}

POOLS.store(Arc::new(new_pools.clone()));
POOLS_HASH.store(Arc::new(pools_hash.clone()));

Ok(())
}
Expand Down