Skip to content

Add SHOW CLIENTS / SHOW SERVERS + Stats refactor and tests #159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::config::{get_config, reload_config, VERSION};
use crate::errors::Error;
use crate::messages::*;
use crate::pool::get_all_pools;
use crate::stats::get_stats;
use crate::stats::{get_client_stats, get_stats};
use crate::ClientServerMap;

pub fn generate_server_info_for_admin() -> BytesMut {
Expand Down Expand Up @@ -72,6 +72,10 @@ where
trace!("SHOW POOLS");
show_pools(stream).await
}
"CLIENTS" => {
trace!("SHOW CLIENTS");
show_clients(stream).await
}
"STATS" => {
trace!("SHOW STATS");
show_stats(stream).await
Expand Down Expand Up @@ -439,3 +443,46 @@ where

write_all_half(stream, res).await
}

/// Show currently connected clients
async fn show_clients<T>(stream: &mut T) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let columns = vec![
("client_id", DataType::Text),
("database", DataType::Text),
("user", DataType::Text),
("application_name", DataType::Text),
("transaction_count", DataType::Numeric),
("query_count", DataType::Numeric),
("error_count", DataType::Numeric),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Errors are pooler errors encountered by the client. In this PR I introduce two kinds

  • Checkout Failure
  • Server Connection Banned while client is talking to it

];

let new_map = get_client_stats();
let mut res = BytesMut::new();
res.put(row_description(&columns));

for (_, client) in new_map {
let row = vec![
format!("{:#08X}", client.process_id),
client.pool_name,
client.username,
client.application_name.clone(),
client.transaction_count.to_string(),
client.query_count.to_string(),
client.error_count.to_string(),
];

res.put(data_row(&row));
}

res.put(command_complete("SHOW"));

// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');

write_all_half(stream, res).await
}
27 changes: 14 additions & 13 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,12 @@ where
// The query router determines where the query is going to go,
// e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new();
self.stats.register_client(
self.process_id,
self.pool_name.clone(),
self.username.clone(),
self.application_name.clone(),
);

// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
Expand Down Expand Up @@ -764,11 +770,7 @@ where
server.claim(self.process_id, self.secret_key);
self.connected_to_server = true;

// Update statistics.
if let Some(last_address_id) = self.last_address_id {
self.stats
.client_disconnecting(self.process_id, last_address_id);
}
Comment on lines -768 to -771
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client_disconnecting has no use for last_address_id (Based on my reading of the code)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I thought as well.

But it's used here, https://github.com/levkk/pgcat/blob/main/src/stats.rs#L398 to remove from the correct dict here, https://github.com/levkk/pgcat/blob/main/src/stats.rs#L456

// Update statistics
self.stats.client_active(self.process_id, address.id);

self.last_address_id = Some(address.id);
Expand Down Expand Up @@ -1091,14 +1093,13 @@ impl<S, T> Drop for Client<S, T> {

// Dirty shutdown
// TODO: refactor, this is not the best way to handle state management.
if let Some(address_id) = self.last_address_id {
self.stats.client_disconnecting(self.process_id, address_id);

if self.connected_to_server {
if let Some(process_id) = self.last_server_id {
self.stats.server_idle(process_id, address_id);
}
}
self.stats.client_disconnecting(self.process_id, 0);
if self.connected_to_server
&& self.last_server_id.is_some()
&& self.last_address_id.is_some()
{
self.stats
.server_idle(self.last_server_id.unwrap(), self.last_address_id.unwrap());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We register any clients that connects, even if it does not perform queries. As such, we must also deregister any client that disconnects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're going back and forth on this one I think, didn't we remove this call at some point?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was server_idle, I don't recall we touched client_disconnecting.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel a bit more confident about that stats after the refactor, having tracking objects for individual connections + registering all connections as early as possible makes it easier to reason about.

}
}
}
141 changes: 129 additions & 12 deletions src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
use arc_swap::ArcSwap;
/// Statistics and reporting.
use log::{error, info, trace};
use log::{error, info, trace, warn};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{channel, Receiver, Sender};

use crate::pool::get_number_of_addresses;

type ClientInformationLookup = HashMap<i32, ClientInformation>;

/// Latest client stats updated every second; used in SHOW CLIENTS.
static LATEST_CLIENT_STATS: Lazy<ArcSwap<ClientInformationLookup>> =
Lazy::new(|| ArcSwap::from_pointee(ClientInformationLookup::default()));

pub static REPORTER: Lazy<ArcSwap<Reporter>> =
Lazy::new(|| ArcSwap::from_pointee(Reporter::default()));

Expand All @@ -20,15 +27,39 @@ static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> =
/// 15 seconds.
static STAT_PERIOD: u64 = 15000;

/// The various states that a client can be in
#[derive(Debug, Clone, Copy)]
pub enum ClientState {
ClientWaiting,
ClientActive,
ClientIdle,
}

/// Information we keep track off which can be queried by SHOW CLIENTS
#[derive(Debug, Clone)]
pub struct ClientInformation {
pub state: ClientState,
pub process_id: i32,

pub application_name: String,
pub username: String,
pub pool_name: String,

pub transaction_count: u64,
pub query_count: u64,
pub error_count: u64,
}

/// The names for the events reported
/// to the statistics collector.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
enum EventName {
CheckoutTime,
Query,
Transaction,
DataSent,
DataReceived,
ClientRegistered(String, String, String),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really cool

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this either a struct or at least named parameters

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already made a change to my local branch to use anonymous struct

ClientWaiting,
ClientActive,
ClientIdle,
Expand Down Expand Up @@ -82,14 +113,15 @@ impl Reporter {

/// Send statistics to the task keeping track of stats.
fn send(&self, event: Event) {
let name = event.name;
let result = self.tx.try_send(event);
let name = event.name.clone();
let result = self.tx.try_send(event.clone());

match result {
Ok(_) => trace!(
"{:?} event reported successfully, capacity: {}",
"{:?} event reported successfully, capacity: {} {:?}",
name,
self.tx.capacity()
self.tx.capacity(),
event
),

Err(err) => match err {
Expand All @@ -99,6 +131,27 @@ impl Reporter {
};
}

pub fn register_client(
&self,
process_id: i32,
pool_name: String,
username: String,
app_name: String,
) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This walks around the need to send app_name, username and pool_name to send these information with every client event.

We can do that anyway to keep the stats interface stateless

let event = Event {
name: EventName::ClientRegistered(
pool_name.clone(),
username.clone(),
app_name.clone(),
),
value: 1,
process_id: process_id,
address_id: 0,
};

self.send(event);
}

/// Report a query executed by a client against
/// a server identified by the `address_id`.
pub fn query(&self, process_id: i32, address_id: usize) {
Expand Down Expand Up @@ -211,7 +264,7 @@ impl Reporter {
name: EventName::ClientDisconnecting,
value: 1,
process_id: process_id,
address_id: address_id,
address_id: address_id, // No used
};

self.send(event)
Expand Down Expand Up @@ -345,6 +398,8 @@ impl Collector {
// Track which state the client and server are at any given time.
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();

let mut client_states = ClientInformationLookup::default();

// Flush stats to StatsD and calculate averages every 15 seconds.
let tx = self.tx.clone();
tokio::task::spawn(async move {
Expand Down Expand Up @@ -403,11 +458,19 @@ impl Collector {
// Some are counters, some are gauges...
match stat.name {
EventName::Query => {
match client_states.get_mut(&stat.process_id) {
Some(client) => client.query_count += stat.value as u64,
None => (),
}
let counter = stats.entry("total_query_count").or_insert(0);
*counter += stat.value;
}

EventName::Transaction => {
match client_states.get_mut(&stat.process_id) {
Some(client) => client.transaction_count += stat.value as u64,
None => (),
}
let counter = stats.entry("total_xact_count").or_insert(0);
*counter += stat.value;
}
Expand Down Expand Up @@ -442,17 +505,63 @@ impl Collector {
}
}

EventName::ClientActive
| EventName::ClientWaiting
| EventName::ClientIdle
| EventName::ServerActive
EventName::ClientRegistered(pool_name, username, app_name) => {
match client_states.get_mut(&stat.process_id) {
Some(_) => {
warn!("Client double registered!");
}

None => {
client_states.insert(
stat.process_id,
ClientInformation {
state: ClientState::ClientIdle,
process_id: stat.process_id,
pool_name: pool_name.clone(),
username: username.clone(),
application_name: app_name.clone(),
transaction_count: 0,
query_count: 0,
error_count: 0,
},
);
}
};
}

EventName::ClientActive | EventName::ClientWaiting | EventName::ClientIdle => {
client_server_states.insert(stat.process_id, stat.name.clone());
let new_state = match stat.name {
EventName::ClientActive => ClientState::ClientActive,
EventName::ClientWaiting => ClientState::ClientWaiting,
EventName::ClientIdle => ClientState::ClientIdle,
_ => unreachable!(),
};

match client_states.get_mut(&stat.process_id) {
Some(client_state) => {
client_state.state = new_state;
}

None => {
warn!("Stats on unregistered client!");
}
};
}

EventName::ClientDisconnecting => {
client_server_states.remove(&stat.process_id);
client_states.remove(&stat.process_id);
}

EventName::ServerActive
| EventName::ServerIdle
| EventName::ServerTested
| EventName::ServerLogin => {
client_server_states.insert(stat.process_id, stat.name);
}

EventName::ClientDisconnecting | EventName::ServerDisconnecting => {
EventName::ServerDisconnecting => {
client_server_states.remove(&stat.process_id);
}

Expand Down Expand Up @@ -506,6 +615,8 @@ impl Collector {
entry.insert(key.to_string(), value.clone());
}

LATEST_CLIENT_STATS.store(Arc::new(client_states.clone()));

// These are re-calculated every iteration of the loop, so we don't want to add values
// from the last iteration.
for stat in &[
Expand Down Expand Up @@ -558,6 +669,12 @@ pub fn get_stats() -> HashMap<usize, HashMap<String, i64>> {
LATEST_STATS.lock().clone()
}

/// Get a snapshot of client statistics. Updated once a second
/// by the `Collector`.
pub fn get_client_stats() -> ClientInformationLookup {
(*(*LATEST_CLIENT_STATS.load())).clone()
}

/// Get the statistics reporter used to update stats across the pools/clients.
pub fn get_reporter() -> Reporter {
(*(*REPORTER.load())).clone()
Expand Down
2 changes: 1 addition & 1 deletion tests/ruby/helpers/pgcat_process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,6 @@ def example_connection_string
username = cfg["pools"][first_pool_name]["users"]["0"]["username"]
password = cfg["pools"][first_pool_name]["users"]["0"]["password"]

"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}"
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}?application_name=example_app"
end
end