Skip to content

Prewarmer #435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.0.2-alpha1"
version = "1.0.2-alpha2"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
22 changes: 22 additions & 0 deletions pgcat.minimal.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# This is an example of the most basic config
# that will mimic what PgBouncer does in transaction mode with one server.

[general]

host = "0.0.0.0"
port = 6433
admin_username = "pgcat"
admin_password = "pgcat"

[pools.pgml.users.0]
username = "postgres"
password = "postgres"
pool_size = 10
min_pool_size = 1
pool_mode = "transaction"

[pools.pgml.shards.0]
servers = [
["127.0.0.1", 28815, "primary"]
]
database = "postgres"
72 changes: 66 additions & 6 deletions pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,58 @@ admin_username = "admin_user"
# Password to access the virtual administrative database
admin_password = "admin_pass"

# Default plugins that are configured on all pools.
[plugins]

# Prewarmer plugin that runs queries on server startup, before giving the connection
# to the client.
[plugins.prewarmer]
enabled = false
queries = [
"SELECT pg_prewarm('pgbench_accounts')",
]

# Log all queries to stdout.
[plugins.query_logger]
enabled = false

# Block access to tables that Postgres does not allow us to control.
[plugins.table_access]
enabled = false
tables = [
"pg_user",
"pg_roles",
"pg_database",
]

# Intercept user queries and give a fake reply.
[plugins.intercept]
enabled = true

[plugins.intercept.queries.0]

query = "select current_database() as a, current_schemas(false) as b"
schema = [
["a", "text"],
["b", "text"],
]
result = [
["${DATABASE}", "{public}"],
]

[plugins.intercept.queries.1]

query = "select current_database(), current_schema(), current_user"
schema = [
["current_database", "text"],
["current_schema", "text"],
["current_user", "text"],
]
result = [
["${DATABASE}", "public", "${USER}"],
]


# pool configs are structured as pool.<pool_name>
# the pool_name is what clients use as database name when connecting.
# For a pool named `sharded_db`, clients access that pool using connection string like
Expand Down Expand Up @@ -154,23 +206,31 @@ connect_timeout = 3000
# Specifies how often (in seconds) cached ip addresses for servers are rechecked (see `dns_cache_enabled`).
# dns_max_ttl = 30

[plugins]
# Plugins can be configured on a pool-per-pool basis. This overrides the global plugins setting,
# so all plugins have to be configured here again.
[pool.sharded_db.plugins]

[plugins.query_logger]
[pools.sharded_db.plugins.prewarmer]
enabled = true
queries = [
"SELECT pg_prewarm('pgbench_accounts')",
]

[pools.sharded_db.plugins.query_logger]
enabled = false

[plugins.table_access]
[pools.sharded_db.plugins.table_access]
enabled = false
tables = [
"pg_user",
"pg_roles",
"pg_database",
]

[plugins.intercept]
[pools.sharded_db.plugins.intercept]
enabled = true

[plugins.intercept.queries.0]
[pools.sharded_db.plugins.intercept.queries.0]

query = "select current_database() as a, current_schemas(false) as b"
schema = [
Expand All @@ -181,7 +241,7 @@ result = [
["${DATABASE}", "{public}"],
]

[plugins.intercept.queries.1]
[pools.sharded_db.plugins.intercept.queries.1]

query = "select current_database(), current_schema(), current_user"
schema = [
Expand Down
78 changes: 73 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ impl Default for Address {
}
}

impl std::fmt::Display for Address {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"[address: {}:{}][database: {}][user: {}]",
self.host, self.port, self.database, self.username
)
}
}

// We need to implement PartialEq by ourselves so we skip stats in the comparison
impl PartialEq for Address {
fn eq(&self, other: &Self) -> bool {
Expand Down Expand Up @@ -235,6 +245,8 @@ pub struct General {
pub port: u16,

pub enable_prometheus_exporter: Option<bool>,

#[serde(default = "General::default_prometheus_exporter_port")]
pub prometheus_exporter_port: i16,

#[serde(default = "General::default_connect_timeout")]
Expand Down Expand Up @@ -374,6 +386,10 @@ impl General {
pub fn default_validate_config() -> bool {
true
}

pub fn default_prometheus_exporter_port() -> i16 {
9930
}
}

impl Default for General {
Expand Down Expand Up @@ -462,6 +478,7 @@ pub struct Pool {
#[serde(default = "Pool::default_load_balancing_mode")]
pub load_balancing_mode: LoadBalancingMode,

#[serde(default = "Pool::default_default_role")]
pub default_role: String,

#[serde(default)] // False
Expand All @@ -476,6 +493,7 @@ pub struct Pool {

pub server_lifetime: Option<u64>,

#[serde(default = "Pool::default_sharding_function")]
pub sharding_function: ShardingFunction,

#[serde(default = "Pool::default_automatic_sharding_key")]
Expand All @@ -489,6 +507,7 @@ pub struct Pool {
pub auth_query_user: Option<String>,
pub auth_query_password: Option<String>,

pub plugins: Option<Plugins>,
pub shards: BTreeMap<String, Shard>,
pub users: BTreeMap<String, User>,
// Note, don't put simple fields below these configs. There's a compatibility issue with TOML that makes it
Expand Down Expand Up @@ -521,6 +540,14 @@ impl Pool {
None
}

pub fn default_default_role() -> String {
"any".into()
}

pub fn default_sharding_function() -> ShardingFunction {
ShardingFunction::PgBigintHash
}

pub fn validate(&mut self) -> Result<(), Error> {
match self.default_role.as_ref() {
"any" => (),
Expand Down Expand Up @@ -609,6 +636,7 @@ impl Default for Pool {
auth_query_user: None,
auth_query_password: None,
server_lifetime: None,
plugins: None,
}
}
}
Expand Down Expand Up @@ -687,30 +715,50 @@ impl Default for Shard {
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct Plugins {
pub intercept: Option<Intercept>,
pub table_access: Option<TableAccess>,
pub query_logger: Option<QueryLogger>,
pub prewarmer: Option<Prewarmer>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
impl std::fmt::Display for Plugins {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"interceptor: {}, table_access: {}, query_logger: {}, prewarmer: {}",
self.intercept.is_some(),
self.table_access.is_some(),
self.query_logger.is_some(),
self.prewarmer.is_some(),
)
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct Intercept {
pub enabled: bool,
pub queries: BTreeMap<String, Query>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct TableAccess {
pub enabled: bool,
pub tables: Vec<String>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct QueryLogger {
pub enabled: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct Prewarmer {
pub enabled: bool,
pub queries: Vec<String>,
}

impl Intercept {
pub fn substitute(&mut self, db: &str, user: &str) {
for (_, query) in self.queries.iter_mut() {
Expand All @@ -720,7 +768,7 @@ impl Intercept {
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct Query {
pub query: String,
pub schema: Vec<Vec<String>>,
Expand Down Expand Up @@ -754,8 +802,13 @@ pub struct Config {
#[serde(default = "Config::default_path")]
pub path: String,

// General and global settings.
pub general: General,

// Plugins that should run in all pools.
pub plugins: Option<Plugins>,

// Connection pools.
pub pools: HashMap<String, Pool>,
}

Expand Down Expand Up @@ -940,6 +993,13 @@ impl Config {
"Server TLS certificate verification: {}",
self.general.verify_server_certificate
);
info!(
"Plugins: {}",
match self.plugins {
Some(ref plugins) => plugins.to_string(),
None => "not configured".into(),
}
);

for (pool_name, pool_config) in &self.pools {
// TODO: Make this output prettier (maybe a table?)
Expand Down Expand Up @@ -1006,6 +1066,14 @@ impl Config {
None => "default".to_string(),
}
);
info!(
"[pool: {}] Plugins: {}",
pool_name,
match pool_config.plugins {
Some(ref plugins) => plugins.to_string(),
None => "not configured".into(),
}
);

for user in &pool_config.users {
info!(
Expand Down
1 change: 1 addition & 0 deletions src/mirrors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl MirroredClient {
ClientServerMap::default(),
Arc::new(PoolStats::new(identifier, cfg.clone())),
Arc::new(RwLock::new(None)),
None,
);

Pool::builder()
Expand Down
Loading