Skip to content

Commit ed8582b

Browse files
committed
Refactor stats to use atomics
When we are dealing with a high number of connections, generated stats cannot be consumed fast enough by the stats collector loop. This makes the stats subsystem inconsistent and a log of warning messages are thrown due to unregistered server/clients. This change refactors the stats subsystem so it uses atomics: - Now counters are handled using U64 atomics - Event system is dropped and averages are calculated using a loop every 15 seconds. - Now, instead of snapshots being generated ever second we keep track of servers/clients that have registered. Each pool/server/client has its own instance of the counter and makes changes directly, instead of adding an event that gets processed later.
1 parent 7dd9614 commit ed8582b

File tree

16 files changed

+1212
-1176
lines changed

16 files changed

+1212
-1176
lines changed

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ exitcode = "1.1.2"
3737
futures = "0.3"
3838
socket2 = { version = "0.4.7", features = ["all"] }
3939
nix = "0.26.2"
40+
atomic_enum = "0.2.0"
4041

4142
[target.'cfg(not(target_env = "msvc"))'.dependencies]
4243
jemallocator = "0.5.0"

src/admin.rs

Lines changed: 65 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
11
use crate::pool::BanReason;
2-
/// Admin database.
32
use bytes::{Buf, BufMut, BytesMut};
43
use log::{error, info, trace};
54
use nix::sys::signal::{self, Signal};
65
use nix::unistd::Pid;
76
use std::collections::HashMap;
7+
/// Admin database.
8+
use std::sync::atomic::Ordering;
89
use std::time::{SystemTime, UNIX_EPOCH};
910
use tokio::time::Instant;
1011

1112
use crate::config::{get_config, reload_config, VERSION};
1213
use crate::errors::Error;
1314
use crate::messages::*;
1415
use crate::pool::{get_all_pools, get_pool};
15-
use crate::stats::{
16-
get_address_stats, get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState,
17-
};
16+
use crate::stats::{get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState};
1817
use crate::ClientServerMap;
1918

2019
pub fn generate_server_info_for_admin() -> BytesMut {
@@ -158,15 +157,29 @@ where
158157
"free_clients".to_string(),
159158
client_stats
160159
.keys()
161-
.filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Idle)
160+
.filter(|client_id| {
161+
client_stats
162+
.get(client_id)
163+
.unwrap()
164+
.state
165+
.load(Ordering::Relaxed)
166+
== ClientState::Idle
167+
})
162168
.count()
163169
.to_string(),
164170
]));
165171
res.put(data_row(&vec![
166172
"used_clients".to_string(),
167173
client_stats
168174
.keys()
169-
.filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Active)
175+
.filter(|client_id| {
176+
client_stats
177+
.get(client_id)
178+
.unwrap()
179+
.state
180+
.load(Ordering::Relaxed)
181+
== ClientState::Active
182+
})
170183
.count()
171184
.to_string(),
172185
]));
@@ -178,15 +191,29 @@ where
178191
"free_servers".to_string(),
179192
server_stats
180193
.keys()
181-
.filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Idle)
194+
.filter(|server_id| {
195+
server_stats
196+
.get(server_id)
197+
.unwrap()
198+
.state
199+
.load(Ordering::Relaxed)
200+
== ServerState::Idle
201+
})
182202
.count()
183203
.to_string(),
184204
]));
185205
res.put(data_row(&vec![
186206
"used_servers".to_string(),
187207
server_stats
188208
.keys()
189-
.filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Active)
209+
.filter(|server_id| {
210+
server_stats
211+
.get(server_id)
212+
.unwrap()
213+
.state
214+
.load(Ordering::Relaxed)
215+
== ServerState::Active
216+
})
190217
.count()
191218
.to_string(),
192219
]));
@@ -248,28 +275,15 @@ where
248275

249276
let mut res = BytesMut::new();
250277
res.put(row_description(&columns));
251-
for (user_pool, pool) in get_all_pools() {
252-
let def = HashMap::default();
253-
let pool_stats = all_pool_stats
254-
.get(&(user_pool.db.clone(), user_pool.user.clone()))
255-
.unwrap_or(&def);
256278

257-
let pool_config = &pool.settings;
279+
for ((_user_pool, _pool), pool_stats) in all_pool_stats {
258280
let mut row = vec![
259-
user_pool.db.clone(),
260-
user_pool.user.clone(),
261-
pool_config.pool_mode.to_string(),
281+
pool_stats.database(),
282+
pool_stats.user(),
283+
pool_stats.pool_mode().to_string(),
262284
];
263-
for column in &columns[3..columns.len()] {
264-
let value = match column.0 {
265-
"maxwait" => (pool_stats.get("maxwait_us").unwrap_or(&0) / 1_000_000).to_string(),
266-
"maxwait_us" => {
267-
(pool_stats.get("maxwait_us").unwrap_or(&0) % 1_000_000).to_string()
268-
}
269-
_other_values => pool_stats.get(column.0).unwrap_or(&0).to_string(),
270-
};
271-
row.push(value);
272-
}
285+
pool_stats.populate_row(&mut row);
286+
pool_stats.clear_maxwait();
273287
res.put(data_row(&row));
274288
}
275289

@@ -400,7 +414,7 @@ where
400414
for (id, pool) in get_all_pools().iter() {
401415
for address in pool.get_addresses_from_host(host) {
402416
if !pool.is_banned(&address) {
403-
pool.ban(&address, BanReason::AdminBan(duration_seconds), -1);
417+
pool.ban(&address, BanReason::AdminBan(duration_seconds), None);
404418
res.put(data_row(&vec![
405419
id.db.clone(),
406420
id.user.clone(),
@@ -617,23 +631,17 @@ where
617631
("avg_wait_time", DataType::Numeric),
618632
];
619633

620-
let all_stats = get_address_stats();
621634
let mut res = BytesMut::new();
622635
res.put(row_description(&columns));
623636

624637
for (user_pool, pool) in get_all_pools() {
625638
for shard in 0..pool.shards() {
626639
for server in 0..pool.servers(shard) {
627640
let address = pool.address(shard, server);
628-
let stats = match all_stats.get(&address.id) {
629-
Some(stats) => stats.clone(),
630-
None => HashMap::new(),
631-
};
632641

633642
let mut row = vec![address.name(), user_pool.db.clone(), user_pool.user.clone()];
634-
for column in &columns[3..] {
635-
row.push(stats.get(column.0).unwrap_or(&0).to_string());
636-
}
643+
let stats = address.stats.clone();
644+
stats.populate_row(&mut row);
637645

638646
res.put(data_row(&row));
639647
}
@@ -673,16 +681,16 @@ where
673681

674682
for (_, client) in new_map {
675683
let row = vec![
676-
format!("{:#010X}", client.client_id),
677-
client.pool_name,
678-
client.username,
679-
client.application_name.clone(),
680-
client.state.to_string(),
681-
client.transaction_count.to_string(),
682-
client.query_count.to_string(),
683-
client.error_count.to_string(),
684+
format!("{:#010X}", client.client_id()),
685+
client.pool_name(),
686+
client.username(),
687+
client.application_name(),
688+
client.state.load(Ordering::Relaxed).to_string(),
689+
client.transaction_count.load(Ordering::Relaxed).to_string(),
690+
client.query_count.load(Ordering::Relaxed).to_string(),
691+
client.error_count.load(Ordering::Relaxed).to_string(),
684692
Instant::now()
685-
.duration_since(client.connect_time)
693+
.duration_since(client.connect_time())
686694
.as_secs()
687695
.to_string(),
688696
];
@@ -724,19 +732,20 @@ where
724732
res.put(row_description(&columns));
725733

726734
for (_, server) in new_map {
735+
let application_name = server.application_name.read();
727736
let row = vec![
728-
format!("{:#010X}", server.server_id),
729-
server.pool_name,
730-
server.username,
731-
server.address_name,
732-
server.application_name,
733-
server.state.to_string(),
734-
server.transaction_count.to_string(),
735-
server.query_count.to_string(),
736-
server.bytes_sent.to_string(),
737-
server.bytes_received.to_string(),
737+
format!("{:#010X}", server.server_id()),
738+
server.pool_name(),
739+
server.username(),
740+
server.address_name(),
741+
application_name.clone(),
742+
server.state.load(Ordering::Relaxed).to_string(),
743+
server.transaction_count.load(Ordering::Relaxed).to_string(),
744+
server.query_count.load(Ordering::Relaxed).to_string(),
745+
server.bytes_sent.load(Ordering::Relaxed).to_string(),
746+
server.bytes_received.load(Ordering::Relaxed).to_string(),
738747
Instant::now()
739-
.duration_since(server.connect_time)
748+
.duration_since(server.connect_time())
740749
.as_secs()
741750
.to_string(),
742751
];

0 commit comments

Comments
 (0)