Skip to content

Commit 42b2830

Browse files
closes #70
Signed-off-by: Valerian Saliou <valerian@valeriansaliou.name>
1 parent 4f6ac1f commit 42b2830

File tree

9 files changed

+132
-2
lines changed

9 files changed

+132
-2
lines changed

PROTOCOL.md

+1
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ _The Sonic Channel Control mode is used for administration purposes. Once in thi
170170
**➡️ Available commands:**
171171

172172
* `TRIGGER`: trigger an action (syntax: `TRIGGER [<action>]?`; time complexity: `O(1)`)
173+
* `INFO`: get server information (syntax: `INFO`; time complexity: `O(1)`)
173174
* `PING`: ping server (syntax: `PING`; time complexity: `O(1)`)
174175
* `HELP`: show help (syntax: `HELP [<manual>]?`; time complexity: `O(1)`)
175176
* `QUIT`: stop connection (syntax: `QUIT`; time complexity: `O(1)`)

src/channel/command.rs

+25-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::str::{self, SplitWhitespace};
1111
use std::vec::Vec;
1212

1313
use super::format::unescape;
14+
use super::statistics::ChannelStatistics;
1415
use crate::query::builder::{QueryBuilder, QueryBuilderResult};
1516
use crate::query::types::{QueryGenericLang, QuerySearchLimit, QuerySearchOffset};
1617
use crate::store::fst::StoreFSTPool;
@@ -62,7 +63,7 @@ lazy_static! {
6263
pub static ref COMMANDS_MODE_INGEST: Vec<&'static str> =
6364
vec!["PUSH", "POP", "COUNT", "FLUSHC", "FLUSHB", "FLUSHO", "PING", "HELP", "QUIT"];
6465
pub static ref COMMANDS_MODE_CONTROL: Vec<&'static str> =
65-
vec!["TRIGGER", "PING", "HELP", "QUIT"];
66+
vec!["TRIGGER", "INFO", "PING", "HELP", "QUIT"];
6667
pub static ref CONTROL_TRIGGER_ACTIONS: Vec<&'static str> = vec!["consolidate"];
6768
static ref MANUAL_MODE_SEARCH: HashMap<&'static str, &'static Vec<&'static str>> =
6869
[("commands", &*COMMANDS_MODE_SEARCH)]
@@ -794,12 +795,34 @@ impl ChannelCommandControl {
794795
_ => Err(ChannelCommandError::NotFound),
795796
}
796797
} else {
797-
Err(ChannelCommandError::InvalidFormat("HELP [<action>]?"))
798+
Err(ChannelCommandError::InvalidFormat("TRIGGER [<action>]?"))
798799
}
799800
}
800801
}
801802
}
802803

804+
pub fn dispatch_info(_parts: SplitWhitespace) -> ChannelResult {
805+
if let Ok(statistics) = ChannelStatistics::gather() {
806+
let statistics_result = format!(
807+
"uptime({}) clients_connected({}) commands_total({}) \
808+
command_latency_best({}) command_latency_worst({}) \
809+
kv_open_count({}) fst_open_count({}) fst_consolidate_count({})",
810+
statistics.uptime,
811+
statistics.clients_connected,
812+
statistics.commands_total,
813+
statistics.command_latency_best,
814+
statistics.command_latency_worst,
815+
statistics.kv_open_count,
816+
statistics.fst_open_count,
817+
statistics.fst_consolidate_count
818+
);
819+
820+
Ok(vec![ChannelCommandResponse::Result(statistics_result)])
821+
} else {
822+
Err(ChannelCommandError::InternalError)
823+
}
824+
}
825+
803826
pub fn dispatch_help(parts: SplitWhitespace) -> ChannelResult {
804827
ChannelCommandBase::generic_dispatch_help(parts, &*MANUAL_MODE_CONTROL)
805828
}

src/channel/handle.rs

+7
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use super::message::{
1616
ChannelMessageResult,
1717
};
1818
use super::mode::ChannelMode;
19+
use super::statistics::CLIENTS_CONNECTED;
1920
use crate::APP_CONF;
2021
use crate::LINE_FEED;
2122

@@ -72,6 +73,9 @@ impl ChannelHandle {
7273
// Send connected banner
7374
write!(stream, "{}{}", *CONNECTED_BANNER, LINE_FEED).expect("write failed");
7475

76+
// Increment connected clients count
77+
*CLIENTS_CONNECTED.write().unwrap() += 1;
78+
7579
// Ensure channel mode is set
7680
match Self::ensure_start(&stream) {
7781
Ok(mode) => {
@@ -95,6 +99,9 @@ impl ChannelHandle {
9599
write!(stream, "ENDED {}{}", err.to_str(), LINE_FEED).expect("write failed");
96100
}
97101
}
102+
103+
// Decrement connected clients count
104+
*CLIENTS_CONNECTED.write().unwrap() -= 1;
98105
}
99106

100107
fn configure_stream(stream: &TcpStream, is_established: bool) {

src/channel/message.rs

+23
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use super::command::{
1414
ChannelCommandResponse, ChannelCommandSearch, COMMANDS_MODE_CONTROL, COMMANDS_MODE_INGEST,
1515
COMMANDS_MODE_SEARCH,
1616
};
17+
use super::statistics::{COMMANDS_TOTAL, COMMAND_LATENCY_BEST, COMMAND_LATENCY_WORST};
1718
use crate::LINE_FEED;
1819

1920
pub struct ChannelMessage;
@@ -107,6 +108,27 @@ impl ChannelMessage {
107108
);
108109
}
109110

111+
// Update command statistics
112+
{
113+
// Update performance measures
114+
// Notice: commands that take 0ms are not accounted for there (ie. those are usually \
115+
// commands that do no work or I/O; they would make statistics less accurate)
116+
let command_took_millis = command_took.as_millis() as u32;
117+
118+
if command_took_millis > *COMMAND_LATENCY_WORST.read().unwrap() {
119+
*COMMAND_LATENCY_WORST.write().unwrap() = command_took_millis;
120+
}
121+
if command_took_millis > 0
122+
&& (*COMMAND_LATENCY_BEST.read().unwrap() == 0
123+
|| command_took_millis < *COMMAND_LATENCY_BEST.read().unwrap())
124+
{
125+
*COMMAND_LATENCY_BEST.write().unwrap() = command_took_millis;
126+
}
127+
128+
// Increment total commands
129+
*COMMANDS_TOTAL.write().unwrap() += 1;
130+
}
131+
110132
result
111133
}
112134

@@ -149,6 +171,7 @@ impl ChannelMessageMode for ChannelMessageModeControl {
149171
fn handle(message: &str) -> Result<Vec<ChannelCommandResponse>, ChannelCommandError> {
150172
gen_channel_message_mode_handle!(message, COMMANDS_MODE_CONTROL, {
151173
"TRIGGER" => ChannelCommandControl::dispatch_trigger,
174+
"INFO" => ChannelCommandControl::dispatch_info,
152175
"HELP" => ChannelCommandControl::dispatch_help,
153176
})
154177
}

src/channel/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ mod message;
1414
mod mode;
1515

1616
pub mod listen;
17+
pub mod statistics;

src/channel/statistics.rs

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Sonic
2+
//
3+
// Fast, lightweight and schema-less search backend
4+
// Copyright: 2019, Valerian Saliou <valerian@valeriansaliou.name>
5+
// License: Mozilla Public License v2.0 (MPL v2.0)
6+
7+
use std::ops::Deref;
8+
use std::sync::RwLock;
9+
use std::time::Instant;
10+
11+
use crate::store::fst::StoreFSTPool;
12+
use crate::store::kv::StoreKVPool;
13+
14+
lazy_static! {
15+
static ref START_TIME: Instant = Instant::now();
16+
pub static ref CLIENTS_CONNECTED: RwLock<u32> = RwLock::new(0);
17+
pub static ref COMMANDS_TOTAL: RwLock<u64> = RwLock::new(0);
18+
pub static ref COMMAND_LATENCY_BEST: RwLock<u32> = RwLock::new(0);
19+
pub static ref COMMAND_LATENCY_WORST: RwLock<u32> = RwLock::new(0);
20+
}
21+
22+
#[derive(Default)]
23+
pub struct ChannelStatistics {
24+
pub uptime: u64,
25+
pub clients_connected: u32,
26+
pub commands_total: u64,
27+
pub command_latency_best: u32,
28+
pub command_latency_worst: u32,
29+
pub kv_open_count: usize,
30+
pub fst_open_count: usize,
31+
pub fst_consolidate_count: usize,
32+
}
33+
34+
pub fn ensure_states() {
35+
// Ensure all statics are initialized (a `deref` is enough to lazily initialize them)
36+
let (_, _, _, _, _) = (
37+
START_TIME.deref(),
38+
CLIENTS_CONNECTED.deref(),
39+
COMMANDS_TOTAL.deref(),
40+
COMMAND_LATENCY_BEST.deref(),
41+
COMMAND_LATENCY_WORST.deref(),
42+
);
43+
}
44+
45+
impl ChannelStatistics {
46+
pub fn gather() -> Result<ChannelStatistics, ()> {
47+
let (kv_count, fst_count) = (StoreKVPool::count(), StoreFSTPool::count());
48+
49+
Ok(ChannelStatistics {
50+
uptime: START_TIME.elapsed().as_secs(),
51+
clients_connected: *CLIENTS_CONNECTED.read().unwrap(),
52+
commands_total: *COMMANDS_TOTAL.read().unwrap(),
53+
command_latency_best: *COMMAND_LATENCY_BEST.read().unwrap(),
54+
command_latency_worst: *COMMAND_LATENCY_WORST.read().unwrap(),
55+
kv_open_count: kv_count,
56+
fst_open_count: fst_count.0,
57+
fst_consolidate_count: fst_count.1,
58+
})
59+
}
60+
}

src/main.rs

+4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use graceful::SignalGuard;
5151
use log::LevelFilter;
5252

5353
use channel::listen::ChannelListenBuilder;
54+
use channel::statistics::ensure_states as ensure_states_channel_statistics;
5455
use config::config::Config;
5556
use config::logger::ConfigLogger;
5657
use config::reader::ConfigReader;
@@ -137,6 +138,9 @@ fn make_app_args() -> AppArgs {
137138
fn ensure_states() {
138139
// Ensure all statics are valid (a `deref` is enough to lazily initialize them)
139140
let (_, _) = (APP_ARGS.deref(), APP_CONF.deref());
141+
142+
// Ensure per-module states
143+
ensure_states_channel_statistics();
140144
}
141145

142146
fn main() {

src/store/fst.rs

+7
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,13 @@ impl StoreFSTPathMode {
9393
}
9494

9595
impl StoreFSTPool {
96+
pub fn count() -> (usize, usize) {
97+
(
98+
GRAPH_POOL.read().unwrap().len(),
99+
GRAPH_CONSOLIDATE.read().unwrap().len(),
100+
)
101+
}
102+
96103
pub fn acquire<'a, T: Into<&'a str>>(collection: T, bucket: T) -> Result<StoreFSTBox, ()> {
97104
let (collection_str, bucket_str) = (collection.into(), bucket.into());
98105

src/store/kv.rs

+4
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ lazy_static! {
6464
}
6565

6666
impl StoreKVPool {
67+
pub fn count() -> usize {
68+
STORE_POOL.read().unwrap().len()
69+
}
70+
6771
pub fn acquire<'a, T: Into<&'a str>>(
6872
mode: StoreKVAcquireMode,
6973
collection: T,

0 commit comments

Comments
 (0)