-
Notifications
You must be signed in to change notification settings - Fork 231
Add SHOW CLIENTS / SHOW SERVERS + Stats refactor and tests #159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
0561cac
d512e1d
62e486b
1de08e0
9fbdd61
75c9cff
fd69c0b
0ea0b16
fa6b0bc
3771ea7
e7d5bc3
a45ba13
6df4a5e
1269d00
2dff989
7b83c69
4669a2a
c395eec
badd914
04b7472
2e0df10
e9392d0
3059ec1
79d1e3c
b2c5ee6
f6cca64
08af06d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -577,6 +577,12 @@ where | |
// The query router determines where the query is going to go, | ||
// e.g. primary, replica, which shard. | ||
let mut query_router = QueryRouter::new(); | ||
self.stats.register_client( | ||
self.process_id, | ||
self.pool_name.clone(), | ||
self.username.clone(), | ||
self.application_name.clone(), | ||
); | ||
|
||
// Our custom protocol loop. | ||
// We expect the client to either start a transaction with regular queries | ||
|
@@ -764,11 +770,7 @@ where | |
server.claim(self.process_id, self.secret_key); | ||
self.connected_to_server = true; | ||
|
||
// Update statistics. | ||
if let Some(last_address_id) = self.last_address_id { | ||
self.stats | ||
.client_disconnecting(self.process_id, last_address_id); | ||
} | ||
Comment on lines
-768
to
-771
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's what I thought as well. But it's used here, https://github.com/levkk/pgcat/blob/main/src/stats.rs#L398 to remove from the correct dict here, https://github.com/levkk/pgcat/blob/main/src/stats.rs#L456 |
||
// Update statistics | ||
self.stats.client_active(self.process_id, address.id); | ||
|
||
self.last_address_id = Some(address.id); | ||
|
@@ -1091,14 +1093,13 @@ impl<S, T> Drop for Client<S, T> { | |
|
||
// Dirty shutdown | ||
// TODO: refactor, this is not the best way to handle state management. | ||
if let Some(address_id) = self.last_address_id { | ||
self.stats.client_disconnecting(self.process_id, address_id); | ||
|
||
if self.connected_to_server { | ||
if let Some(process_id) = self.last_server_id { | ||
self.stats.server_idle(process_id, address_id); | ||
} | ||
} | ||
self.stats.client_disconnecting(self.process_id, 0); | ||
if self.connected_to_server | ||
&& self.last_server_id.is_some() | ||
&& self.last_address_id.is_some() | ||
{ | ||
self.stats | ||
.server_idle(self.last_server_id.unwrap(), self.last_address_id.unwrap()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We register any clients that connects, even if it does not perform queries. As such, we must also deregister any client that disconnects. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're going back and forth on this one I think, didn't we remove this call at some point? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this was There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel a bit more confident about that stats after the refactor, having tracking objects for individual connections + registering all connections as early as possible makes it easier to reason about. |
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,21 @@ | ||
use arc_swap::ArcSwap; | ||
/// Statistics and reporting. | ||
use log::{error, info, trace}; | ||
use log::{error, info, trace, warn}; | ||
use once_cell::sync::Lazy; | ||
use parking_lot::Mutex; | ||
use std::collections::HashMap; | ||
use std::sync::Arc; | ||
use tokio::sync::mpsc::error::TrySendError; | ||
use tokio::sync::mpsc::{channel, Receiver, Sender}; | ||
|
||
use crate::pool::get_number_of_addresses; | ||
|
||
type ClientInformationLookup = HashMap<i32, ClientInformation>; | ||
|
||
/// Latest client stats updated every second; used in SHOW CLIENTS. | ||
static LATEST_CLIENT_STATS: Lazy<ArcSwap<ClientInformationLookup>> = | ||
Lazy::new(|| ArcSwap::from_pointee(ClientInformationLookup::default())); | ||
|
||
pub static REPORTER: Lazy<ArcSwap<Reporter>> = | ||
Lazy::new(|| ArcSwap::from_pointee(Reporter::default())); | ||
|
||
|
@@ -20,15 +27,39 @@ static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> = | |
/// 15 seconds. | ||
static STAT_PERIOD: u64 = 15000; | ||
|
||
/// The various states that a client can be in | ||
#[derive(Debug, Clone, Copy)] | ||
pub enum ClientState { | ||
ClientWaiting, | ||
ClientActive, | ||
ClientIdle, | ||
} | ||
|
||
/// Information we keep track off which can be queried by SHOW CLIENTS | ||
#[derive(Debug, Clone)] | ||
pub struct ClientInformation { | ||
pub state: ClientState, | ||
pub process_id: i32, | ||
|
||
pub application_name: String, | ||
pub username: String, | ||
pub pool_name: String, | ||
|
||
pub transaction_count: u64, | ||
pub query_count: u64, | ||
pub error_count: u64, | ||
} | ||
|
||
/// The names for the events reported | ||
/// to the statistics collector. | ||
#[derive(Debug, Clone, Copy)] | ||
#[derive(Debug, Clone)] | ||
enum EventName { | ||
CheckoutTime, | ||
Query, | ||
Transaction, | ||
DataSent, | ||
DataReceived, | ||
ClientRegistered(String, String, String), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is really cool There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make this either a struct or at least named parameters There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I already made a change to my local branch to use anonymous struct |
||
ClientWaiting, | ||
ClientActive, | ||
ClientIdle, | ||
|
@@ -82,14 +113,15 @@ impl Reporter { | |
|
||
/// Send statistics to the task keeping track of stats. | ||
fn send(&self, event: Event) { | ||
let name = event.name; | ||
let result = self.tx.try_send(event); | ||
let name = event.name.clone(); | ||
let result = self.tx.try_send(event.clone()); | ||
|
||
match result { | ||
Ok(_) => trace!( | ||
"{:?} event reported successfully, capacity: {}", | ||
"{:?} event reported successfully, capacity: {} {:?}", | ||
name, | ||
self.tx.capacity() | ||
self.tx.capacity(), | ||
event | ||
), | ||
|
||
Err(err) => match err { | ||
|
@@ -99,6 +131,27 @@ impl Reporter { | |
}; | ||
} | ||
|
||
pub fn register_client( | ||
&self, | ||
process_id: i32, | ||
pool_name: String, | ||
username: String, | ||
app_name: String, | ||
) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This walks around the need to send We can do that anyway to keep the stats interface stateless |
||
let event = Event { | ||
name: EventName::ClientRegistered( | ||
pool_name.clone(), | ||
username.clone(), | ||
app_name.clone(), | ||
), | ||
value: 1, | ||
process_id: process_id, | ||
address_id: 0, | ||
}; | ||
|
||
self.send(event); | ||
} | ||
|
||
/// Report a query executed by a client against | ||
/// a server identified by the `address_id`. | ||
pub fn query(&self, process_id: i32, address_id: usize) { | ||
|
@@ -211,7 +264,7 @@ impl Reporter { | |
name: EventName::ClientDisconnecting, | ||
value: 1, | ||
process_id: process_id, | ||
address_id: address_id, | ||
address_id: address_id, // No used | ||
}; | ||
|
||
self.send(event) | ||
|
@@ -345,6 +398,8 @@ impl Collector { | |
// Track which state the client and server are at any given time. | ||
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new(); | ||
|
||
let mut client_states = ClientInformationLookup::default(); | ||
|
||
// Flush stats to StatsD and calculate averages every 15 seconds. | ||
let tx = self.tx.clone(); | ||
tokio::task::spawn(async move { | ||
|
@@ -403,11 +458,19 @@ impl Collector { | |
// Some are counters, some are gauges... | ||
match stat.name { | ||
EventName::Query => { | ||
match client_states.get_mut(&stat.process_id) { | ||
Some(client) => client.query_count += stat.value as u64, | ||
None => (), | ||
} | ||
let counter = stats.entry("total_query_count").or_insert(0); | ||
*counter += stat.value; | ||
} | ||
|
||
EventName::Transaction => { | ||
match client_states.get_mut(&stat.process_id) { | ||
Some(client) => client.transaction_count += stat.value as u64, | ||
None => (), | ||
} | ||
let counter = stats.entry("total_xact_count").or_insert(0); | ||
*counter += stat.value; | ||
} | ||
|
@@ -442,17 +505,63 @@ impl Collector { | |
} | ||
} | ||
|
||
EventName::ClientActive | ||
| EventName::ClientWaiting | ||
| EventName::ClientIdle | ||
| EventName::ServerActive | ||
EventName::ClientRegistered(pool_name, username, app_name) => { | ||
match client_states.get_mut(&stat.process_id) { | ||
Some(_) => { | ||
warn!("Client double registered!"); | ||
} | ||
|
||
None => { | ||
client_states.insert( | ||
stat.process_id, | ||
ClientInformation { | ||
state: ClientState::ClientIdle, | ||
process_id: stat.process_id, | ||
pool_name: pool_name.clone(), | ||
username: username.clone(), | ||
application_name: app_name.clone(), | ||
transaction_count: 0, | ||
query_count: 0, | ||
error_count: 0, | ||
}, | ||
); | ||
} | ||
}; | ||
} | ||
|
||
EventName::ClientActive | EventName::ClientWaiting | EventName::ClientIdle => { | ||
client_server_states.insert(stat.process_id, stat.name.clone()); | ||
let new_state = match stat.name { | ||
EventName::ClientActive => ClientState::ClientActive, | ||
EventName::ClientWaiting => ClientState::ClientWaiting, | ||
EventName::ClientIdle => ClientState::ClientIdle, | ||
_ => unreachable!(), | ||
}; | ||
|
||
match client_states.get_mut(&stat.process_id) { | ||
Some(client_state) => { | ||
client_state.state = new_state; | ||
} | ||
|
||
None => { | ||
warn!("Stats on unregistered client!"); | ||
} | ||
}; | ||
} | ||
|
||
EventName::ClientDisconnecting => { | ||
client_server_states.remove(&stat.process_id); | ||
client_states.remove(&stat.process_id); | ||
} | ||
|
||
EventName::ServerActive | ||
| EventName::ServerIdle | ||
| EventName::ServerTested | ||
| EventName::ServerLogin => { | ||
client_server_states.insert(stat.process_id, stat.name); | ||
} | ||
|
||
EventName::ClientDisconnecting | EventName::ServerDisconnecting => { | ||
EventName::ServerDisconnecting => { | ||
client_server_states.remove(&stat.process_id); | ||
} | ||
|
||
|
@@ -506,6 +615,8 @@ impl Collector { | |
entry.insert(key.to_string(), value.clone()); | ||
} | ||
|
||
LATEST_CLIENT_STATS.store(Arc::new(client_states.clone())); | ||
|
||
// These are re-calculated every iteration of the loop, so we don't want to add values | ||
// from the last iteration. | ||
for stat in &[ | ||
|
@@ -558,6 +669,12 @@ pub fn get_stats() -> HashMap<usize, HashMap<String, i64>> { | |
LATEST_STATS.lock().clone() | ||
} | ||
|
||
/// Get a snapshot of client statistics. Updated once a second | ||
/// by the `Collector`. | ||
pub fn get_client_stats() -> ClientInformationLookup { | ||
(*(*LATEST_CLIENT_STATS.load())).clone() | ||
} | ||
|
||
/// Get the statistics reporter used to update stats across the pools/clients. | ||
pub fn get_reporter() -> Reporter { | ||
(*(*REPORTER.load())).clone() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errors are pooler errors encountered by the client. In this PR I introduce two kinds