diff --git a/Cargo.toml b/Cargo.toml index 5c19a6f5..614c3b94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ async-mutex = "1.4.0" project-root = "0.2.2" strum = { version = "0.25.0", features = ["derive"] } regex = "1.5.4" -rdkafka = {version = "0.34.0", features = ["libz-static"] } +rdkafka = { version = "0.36.0", features = ["libz-static"] } kafka = "0.10.0" reqwest = "0.11.16" rusoto_core = "0.48.0" @@ -56,4 +56,5 @@ dotenv = "0.15.0" redis = { version = "0.23.1", features = ["tokio-comp"] } thiserror = "1.0.44" async-std = "1.12.0" -apache-avro = { version = "0.16.0", features = ["derive"]} \ No newline at end of file +apache-avro = { version = "0.16.0", features = ["derive"]} +async-rwlock = "1.3.0" \ No newline at end of file diff --git a/dt-common/src/monitor/counter.rs b/dt-common/src/monitor/counter.rs index 4a74e94c..c631e900 100644 --- a/dt-common/src/monitor/counter.rs +++ b/dt-common/src/monitor/counter.rs @@ -1,26 +1,23 @@ use std::time::Instant; +#[derive(Clone)] pub struct Counter { pub timestamp: Instant, - pub value: u64, + pub value: usize, + pub description: String, } impl Counter { - pub fn new() -> Self { + pub fn new(value: usize) -> Self { Self { timestamp: Instant::now(), - value: 0, + value, + description: String::new(), } } #[inline(always)] - pub fn add(&mut self, value: u64) { + pub fn add(&mut self, value: usize) { self.value += value; } } - -impl Default for Counter { - fn default() -> Self { - Self::new() - } -} diff --git a/dt-common/src/monitor/mod.rs b/dt-common/src/monitor/mod.rs index 7554230c..c1fca68f 100644 --- a/dt-common/src/monitor/mod.rs +++ b/dt-common/src/monitor/mod.rs @@ -1,2 +1,3 @@ pub mod counter; -pub mod statistic_counter; +pub mod monitor; +pub mod time_window_counter; diff --git a/dt-common/src/monitor/monitor.rs b/dt-common/src/monitor/monitor.rs new file mode 100644 index 00000000..66b9e383 --- /dev/null +++ b/dt-common/src/monitor/monitor.rs @@ -0,0 +1,80 @@ +use std::collections::HashMap; + +use strum::{Display, EnumString, IntoStaticStr}; + +use super::counter::Counter; +use super::time_window_counter::TimeWindowCounter; + +#[derive(Clone)] +pub struct Monitor { + pub accumulate_counters: HashMap, + pub time_window_counters: HashMap, + pub time_window_secs: usize, +} + +#[derive(EnumString, IntoStaticStr, Display, PartialEq, Eq, Hash, Clone)] +pub enum CounterType { + // time window counter, aggregate by: sum by interval + #[strum(serialize = "batch_write_failures")] + BatchWriteFailures, + #[strum(serialize = "serial_writes")] + SerialWrites, + + // time window counter, aggregate by: avg by interval + #[strum(serialize = "rps")] + Records, + + // time window counter, aggregate by: avg by count + #[strum(serialize = "bytes_per_query")] + BytesPerQuery, + #[strum(serialize = "records_per_query")] + RecordsPerQuery, + #[strum(serialize = "rt_per_query")] + RtPerQuery, + #[strum(serialize = "buffer_size")] + BufferSize, + + // accumulate counter + #[strum(serialize = "sinked_count")] + SinkedCount, +} + +const DEFAULT_INTERVAL_SECS: usize = 5; + +impl Monitor { + pub fn new_default() -> Self { + Self::new(DEFAULT_INTERVAL_SECS) + } + + pub fn new(interval_secs: usize) -> Self { + Self { + accumulate_counters: HashMap::new(), + time_window_counters: HashMap::new(), + time_window_secs: interval_secs, + } + } + + pub fn add_counter(&mut self, counter_type: CounterType, value: usize) -> &mut Self { + match counter_type { + CounterType::SinkedCount => { + if let Some(counter) = self.accumulate_counters.get_mut(&counter_type) { + counter.add(value) + } else { + self.accumulate_counters + .insert(counter_type, Counter::new(value)); + } + } + + _ => { + if let Some(counter) = self.time_window_counters.get_mut(&counter_type) { + counter.add(value) + } else { + let mut counter = TimeWindowCounter::new(self.time_window_secs); + counter.add(value); + self.time_window_counters.insert(counter_type, counter); + } + } + } + self + } +} diff --git a/dt-common/src/monitor/statistic_counter.rs b/dt-common/src/monitor/statistic_counter.rs deleted file mode 100644 index f126ceb9..00000000 --- a/dt-common/src/monitor/statistic_counter.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::{collections::LinkedList, time::Instant}; - -use super::counter::Counter; - -pub struct StatisticCounter { - pub interval_secs: u64, - pub counters: LinkedList, -} - -impl StatisticCounter { - pub fn new(interval_secs: u64) -> Self { - Self { - interval_secs, - counters: LinkedList::new(), - } - } - - #[inline(always)] - pub fn add(&mut self, value: u64) { - self.counters.push_back(Counter { - value, - timestamp: Instant::now(), - }); - self.remove_outdated_counters(); - } - - #[inline(always)] - pub fn sum(&mut self) -> u64 { - self.remove_outdated_counters(); - - let mut sum = 0; - for counter in self.counters.iter() { - sum += counter.value; - } - sum - } - - #[inline(always)] - pub fn avg(&mut self) -> u64 { - self.sum() / self.interval_secs - } - - #[inline(always)] - fn remove_outdated_counters(&mut self) { - let mut outdate_count = 0; - for counter in self.counters.iter() { - if self.check_outdate(counter) { - outdate_count += 1; - } else { - break; - } - } - - for _ in 0..outdate_count { - self.counters.pop_front(); - } - } - - #[inline(always)] - fn check_outdate(&self, counter: &Counter) -> bool { - counter.timestamp.elapsed().as_secs() > self.interval_secs - } -} diff --git a/dt-common/src/monitor/time_window_counter.rs b/dt-common/src/monitor/time_window_counter.rs new file mode 100644 index 00000000..5a988879 --- /dev/null +++ b/dt-common/src/monitor/time_window_counter.rs @@ -0,0 +1,69 @@ +use std::collections::LinkedList; + +use super::counter::Counter; + +#[derive(Clone)] +pub struct TimeWindowCounter { + pub time_window_secs: usize, + pub counters: LinkedList, + pub description: String, +} + +impl TimeWindowCounter { + pub fn new(time_window_secs: usize) -> Self { + Self { + time_window_secs, + counters: LinkedList::new(), + description: String::new(), + } + } + + #[inline(always)] + pub fn add(&mut self, value: usize) { + self.counters.push_back(Counter::new(value)); + } + + #[inline(always)] + pub fn sum(&mut self) -> usize { + let mut sum = 0; + for counter in self.counters.iter() { + sum += counter.value; + } + sum + } + + #[inline(always)] + pub fn count(&mut self) -> usize { + self.counters.len() + } + + #[inline(always)] + pub fn avg_by_interval(&mut self) -> usize { + self.sum() / self.time_window_secs + } + + #[inline(always)] + pub fn avg_by_count(&mut self) -> usize { + if self.counters.len() > 0 { + self.sum() / self.counters.len() + } else { + 0 + } + } + + #[inline(always)] + pub fn refresh_window(&mut self) { + let mut outdate_count = 0; + for counter in self.counters.iter() { + if counter.timestamp.elapsed().as_secs() > self.time_window_secs as u64 { + outdate_count += 1; + } else { + break; + } + } + + for _ in 0..outdate_count { + self.counters.pop_front(); + } + } +} diff --git a/dt-connector/Cargo.toml b/dt-connector/Cargo.toml index 6c2b8268..872b29bb 100644 --- a/dt-connector/Cargo.toml +++ b/dt-connector/Cargo.toml @@ -40,4 +40,5 @@ redis = { workspace = true } thiserror = { workspace = true } async-std = { workspace = true } chrono = { workspace = true } -apache-avro = {workspace = true} \ No newline at end of file +apache-avro = {workspace = true} +async-rwlock = { workspace = true } \ No newline at end of file diff --git a/dt-connector/src/extractor/mysql/mysql_snapshot_extractor.rs b/dt-connector/src/extractor/mysql/mysql_snapshot_extractor.rs index 5625db34..1a1f17a2 100644 --- a/dt-connector/src/extractor/mysql/mysql_snapshot_extractor.rs +++ b/dt-connector/src/extractor/mysql/mysql_snapshot_extractor.rs @@ -1,5 +1,6 @@ use std::sync::{atomic::AtomicBool, Arc}; +use async_rwlock::RwLock; use async_trait::async_trait; use concurrent_queue::ConcurrentQueue; use dt_meta::{ @@ -17,7 +18,7 @@ use futures::TryStreamExt; use sqlx::{MySql, Pool}; -use dt_common::{config::config_enums::DbType, error::Error, log_info}; +use dt_common::{config::config_enums::DbType, error::Error, log_info, monitor::monitor::Monitor}; use crate::{ extractor::{base_extractor::BaseExtractor, snapshot_resumer::SnapshotResumer}, @@ -33,6 +34,7 @@ pub struct MysqlSnapshotExtractor { pub db: String, pub tb: String, pub shut_down: Arc, + pub monitor: Arc>, } #[async_trait] @@ -54,6 +56,10 @@ impl Extractor for MysqlSnapshotExtractor { self.conn_pool.close().await; Ok(()) } + + fn get_monitor(&self) -> Option>> { + Some(self.monitor.clone()) + } } impl MysqlSnapshotExtractor { diff --git a/dt-connector/src/lib.rs b/dt-connector/src/lib.rs index eecf162e..d9e72271 100644 --- a/dt-connector/src/lib.rs +++ b/dt-connector/src/lib.rs @@ -5,9 +5,12 @@ pub mod meta_fetcher; pub mod rdb_query_builder; pub mod sinker; +use std::sync::Arc; + +use async_rwlock::RwLock; use async_trait::async_trait; use check_log::check_log::CheckLog; -use dt_common::error::Error; +use dt_common::{error::Error, monitor::monitor::Monitor}; use dt_meta::{ddl_data::DdlData, dt_data::DtData, row_data::RowData}; #[async_trait] @@ -31,6 +34,10 @@ pub trait Sinker { async fn refresh_meta(&mut self, _data: Vec) -> Result<(), Error> { Ok(()) } + + fn get_monitor(&self) -> Option>> { + None + } } #[async_trait] @@ -40,6 +47,10 @@ pub trait Extractor { async fn close(&mut self) -> Result<(), Error> { Ok(()) } + + fn get_monitor(&self) -> Option>> { + None + } } #[async_trait] diff --git a/dt-connector/src/sinker/mysql/mysql_sinker.rs b/dt-connector/src/sinker/mysql/mysql_sinker.rs index c42c3ebe..0b9955d9 100644 --- a/dt-connector/src/sinker/mysql/mysql_sinker.rs +++ b/dt-connector/src/sinker/mysql/mysql_sinker.rs @@ -1,4 +1,4 @@ -use std::str::FromStr; +use std::{str::FromStr, sync::Arc, time::Instant}; use crate::{ call_batch_fn, close_conn_pool, @@ -7,7 +7,12 @@ use crate::{ Sinker, }; -use dt_common::{error::Error, log_error, log_info}; +use async_rwlock::RwLock; +use dt_common::{ + error::Error, + log_error, log_info, + monitor::monitor::{CounterType, Monitor}, +}; use dt_meta::{ ddl_data::DdlData, @@ -31,7 +36,7 @@ pub struct MysqlSinker { pub meta_manager: MysqlMetaManager, pub router: RdbRouter, pub batch_size: usize, - + pub monitor: Arc>, pub transaction_command: String, } @@ -91,6 +96,10 @@ impl Sinker for MysqlSinker { } Ok(()) } + + fn get_monitor(&self) -> Option>> { + Some(self.monitor.clone()) + } } impl MysqlSinker { @@ -147,6 +156,8 @@ impl MysqlSinker { query_builder.get_batch_delete_query(data, start_index, batch_size)?; let query = query_builder.create_mysql_query(&sql, &cols, &binds); + let start_time = Instant::now(); + if self.is_transaction_enable() { let mut transaction = self.conn_pool.begin().await.unwrap(); @@ -159,6 +170,8 @@ impl MysqlSinker { query.execute(&self.conn_pool).await.unwrap(); } + self.update_monitor(batch_size, start_time.elapsed().as_micros()) + .await; Ok(()) } @@ -176,6 +189,7 @@ impl MysqlSinker { sql = self.handle_dialect(&sql); let query = query_builder.create_mysql_query(&sql, &cols, &binds); + let start_time = Instant::now(); let execute_error: Option; if self.is_transaction_enable() { @@ -207,6 +221,9 @@ impl MysqlSinker { let sub_data = &data[start_index..start_index + batch_size]; self.serial_sink(sub_data.to_vec()).await.unwrap(); } + + self.update_monitor(batch_size, start_time.elapsed().as_micros()) + .await; Ok(()) } @@ -230,4 +247,13 @@ impl MysqlSinker { fn is_transaction_enable(&self) -> bool { !self.transaction_command.is_empty() } + + async fn update_monitor(&mut self, record_count: usize, rt: u128) { + self.monitor + .write() + .await + .add_counter(CounterType::RecordsPerQuery, record_count) + .add_counter(CounterType::Records, record_count) + .add_counter(CounterType::RtPerQuery, rt as usize); + } } diff --git a/dt-pipeline/Cargo.toml b/dt-pipeline/Cargo.toml index 292fc6ee..5ababe44 100644 --- a/dt-pipeline/Cargo.toml +++ b/dt-pipeline/Cargo.toml @@ -16,4 +16,6 @@ concurrent-queue = { workspace = true } log = { workspace = true } log4rs = { workspace = true } futures = { workspace = true } -regex = { workspace = true } \ No newline at end of file +regex = { workspace = true } +async-std = { workspace = true } +async-rwlock = { workspace = true } \ No newline at end of file diff --git a/dt-pipeline/src/base_pipeline.rs b/dt-pipeline/src/base_pipeline.rs index fedfb0d1..91fb113c 100644 --- a/dt-pipeline/src/base_pipeline.rs +++ b/dt-pipeline/src/base_pipeline.rs @@ -6,13 +6,14 @@ use std::{ time::Instant, }; +use async_rwlock::RwLock; use async_trait::async_trait; use concurrent_queue::ConcurrentQueue; use dt_common::{ config::sinker_config::SinkerBasicConfig, error::Error, - log_info, log_monitor, log_position, - monitor::{counter::Counter, statistic_counter::StatisticCounter}, + log_info, log_position, + monitor::monitor::{CounterType, Monitor}, utils::time_util::TimeUtil, }; use dt_connector::Sinker; @@ -36,6 +37,7 @@ pub struct BasePipeline { pub checkpoint_interval_secs: u64, pub batch_sink_interval_secs: u64, pub syncer: Arc>, + pub monitor: Arc>, } enum SinkMethod { @@ -63,12 +65,15 @@ impl Pipeline for BasePipeline { let mut last_sink_time = Instant::now(); let mut last_checkpoint_time = Instant::now(); - let mut count_counter = Counter::new(); - let mut tps_counter = StatisticCounter::new(self.checkpoint_interval_secs); let mut last_received_position = Option::None; let mut last_commit_position = Option::None; while !self.shut_down.load(Ordering::Acquire) || !self.buffer.is_empty() { + self.monitor + .write() + .await + .add_counter(CounterType::BufferSize, self.buffer.len()); + // some sinkers (foxlake) need to accumulate data to a big batch and sink let data = if last_sink_time.elapsed().as_secs() < self.batch_sink_interval_secs && !self.buffer.is_full() @@ -80,7 +85,7 @@ impl Pipeline for BasePipeline { }; // process all row_datas in buffer at a time - let mut sink_count = 0; + let mut sinked_count = 0; if !data.is_empty() { let (count, last_received, last_commit) = match Self::get_sink_method(&data) { SinkMethod::Ddl => self.sink_ddl(data).await.unwrap(), @@ -88,7 +93,7 @@ impl Pipeline for BasePipeline { SinkMethod::Raw => self.sink_raw(data).await.unwrap(), }; - sink_count = count; + sinked_count = count; last_received_position = last_received; if last_commit.is_some() { last_commit_position = last_commit; @@ -99,17 +104,23 @@ impl Pipeline for BasePipeline { last_checkpoint_time, &last_received_position, &last_commit_position, - &mut tps_counter, - &mut count_counter, - sink_count as u64, ); + self.monitor + .write() + .await + .add_counter(CounterType::SinkedCount, sinked_count); + // sleep 1 millis for data preparing TimeUtil::sleep_millis(1).await; } Ok(()) } + + fn get_monitor(&self) -> Option>> { + Some(self.monitor.clone()) + } } impl BasePipeline { @@ -267,13 +278,7 @@ impl BasePipeline { last_checkpoint_time: Instant, last_received_position: &Option, last_commit_position: &Option, - tps_counter: &mut StatisticCounter, - count_counter: &mut Counter, - count: u64, ) -> Instant { - tps_counter.add(count); - count_counter.add(count); - if last_checkpoint_time.elapsed().as_secs() < self.checkpoint_interval_secs { return last_checkpoint_time; } @@ -286,10 +291,6 @@ impl BasePipeline { log_position!("checkpoint_position | {}", position.to_string()); self.syncer.lock().unwrap().checkpoint_position = position.clone(); } - - log_monitor!("avg tps: {}", tps_counter.avg(),); - log_monitor!("sinked count: {}", count_counter.value); - Instant::now() } } diff --git a/dt-pipeline/src/lib.rs b/dt-pipeline/src/lib.rs index 59a5e242..fd7119e9 100644 --- a/dt-pipeline/src/lib.rs +++ b/dt-pipeline/src/lib.rs @@ -1,7 +1,10 @@ pub mod base_pipeline; +use std::sync::Arc; + +use async_rwlock::RwLock; use async_trait::async_trait; -use dt_common::error::Error; +use dt_common::{error::Error, monitor::monitor::Monitor}; #[async_trait] pub trait Pipeline { @@ -12,4 +15,8 @@ pub trait Pipeline { async fn stop(&mut self) -> Result<(), Error> { Ok(()) } + + fn get_monitor(&self) -> Option>> { + None + } } diff --git a/dt-task/Cargo.toml b/dt-task/Cargo.toml index 9c2e16fd..23c81a40 100644 --- a/dt-task/Cargo.toml +++ b/dt-task/Cargo.toml @@ -33,4 +33,5 @@ project-root = { workspace = true } regex = { workspace = true } strum = { workspace = true } serde_json = { workspace = true } -redis = { workspace = true } \ No newline at end of file +redis = { workspace = true } +async-rwlock = { workspace = true } \ No newline at end of file diff --git a/dt-task/src/extractor_util.rs b/dt-task/src/extractor_util.rs index 66477506..9ae028c3 100644 --- a/dt-task/src/extractor_util.rs +++ b/dt-task/src/extractor_util.rs @@ -4,6 +4,7 @@ use std::{ sync::{atomic::AtomicBool, Arc, Mutex}, }; +use async_rwlock::RwLock; use concurrent_queue::ConcurrentQueue; use dt_common::{ config::{ @@ -12,6 +13,7 @@ use dt_common::{ }, datamarker::transaction_control::TransactionWorker, error::Error, + monitor::monitor::Monitor, utils::rdb_filter::RdbFilter, }; use dt_connector::{ @@ -247,6 +249,7 @@ impl ExtractorUtil { tb: tb.to_string(), slice_size, shut_down, + monitor: Arc::new(RwLock::new(Monitor::new_default())), }) } diff --git a/dt-task/src/sinker_util.rs b/dt-task/src/sinker_util.rs index 4894c0ec..6cbd9ed6 100644 --- a/dt-task/src/sinker_util.rs +++ b/dt-task/src/sinker_util.rs @@ -1,5 +1,6 @@ use std::{str::FromStr, sync::Arc, time::Duration}; +use async_rwlock::RwLock; use dt_common::{ config::{ config_enums::{ConflictPolicyEnum, DbType}, @@ -7,6 +8,7 @@ use dt_common::{ task_config::TaskConfig, }, error::Error, + monitor::monitor::Monitor, }; use dt_connector::{ sinker::{ @@ -240,6 +242,9 @@ impl SinkerUtil { let meta_manager = MysqlMetaManager::new(conn_pool.clone()).init().await?; let mut sub_sinkers: Vec>>> = Vec::new(); + // to avoid contention for monitor write lock between sinker threads, + // create a monitor for each sinker instead of sharing a single monitor between sinkers, + // sometimes a sinker may cost several millis to get write lock for a global monitor. for _ in 0..parallel_size { let sinker = MysqlSinker { url: url.to_string(), @@ -248,6 +253,7 @@ impl SinkerUtil { router: router.clone(), batch_size, transaction_command: transaction_command.to_owned(), + monitor: Arc::new(RwLock::new(Monitor::new_default())), }; sub_sinkers.push(Arc::new(async_mutex::Mutex::new(Box::new(sinker)))); } diff --git a/dt-task/src/task_runner.rs b/dt-task/src/task_runner.rs index 9d197f9d..40cd7f7a 100644 --- a/dt-task/src/task_runner.rs +++ b/dt-task/src/task_runner.rs @@ -1,9 +1,14 @@ use std::{ + collections::HashMap, fs::{self, File}, io::Read, - sync::{atomic::AtomicBool, Arc, Mutex}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, }; +use async_rwlock::RwLock; use concurrent_queue::ConcurrentQueue; use dt_common::{ config::{ @@ -11,9 +16,11 @@ use dt_common::{ extractor_config::ExtractorConfig, sinker_config::SinkerConfig, task_config::TaskConfig, }, error::Error, - utils::rdb_filter::RdbFilter, + log_monitor, + monitor::monitor::{CounterType, Monitor}, + utils::{rdb_filter::RdbFilter, time_util::TimeUtil}, }; -use dt_connector::{extractor::snapshot_resumer::SnapshotResumer, Extractor}; +use dt_connector::{extractor::snapshot_resumer::SnapshotResumer, Extractor, Sinker}; use dt_meta::{dt_data::DtItem, position::Position, row_type::RowType, syncer::Syncer}; use dt_pipeline::{base_pipeline::BasePipeline, Pipeline}; @@ -131,6 +138,7 @@ impl TaskRunner { checkpoint_position: Position::None, })); + // extractor let mut extractor = self .create_extractor( extractor_config, @@ -139,20 +147,46 @@ impl TaskRunner { syncer.clone(), ) .await?; + let extractor_monitor = extractor.get_monitor(); + + // sinkers + let transaction_command = self.fetch_transaction_command(); + let sinkers = SinkerUtil::create_sinkers(&self.config, transaction_command).await?; + let mut sinker_monitors = Vec::new(); + for sinker in sinkers.iter() { + sinker_monitors.push(sinker.lock().await.get_monitor()) + } + + // pipeline let mut pipeline = self - .create_pipeline(buffer, shut_down.clone(), syncer) + .create_pipeline(buffer, shut_down.clone(), syncer, sinkers) .await?; + let pipeline_monitor = pipeline.get_monitor(); + // start threads let f1 = tokio::spawn(async move { extractor.extract().await.unwrap(); extractor.close().await.unwrap(); }); + let f2 = tokio::spawn(async move { pipeline.start().await.unwrap(); pipeline.stop().await.unwrap(); }); - let _ = try_join!(f1, f2); + + let interval_secs = self.config.pipeline.checkpoint_interval_secs as usize; + let f3 = tokio::spawn(async move { + Self::flush_monitors( + interval_secs, + shut_down, + extractor_monitor, + pipeline_monitor, + sinker_monitors, + ) + .await + }); + let _ = try_join!(f1, f2, f3); Ok(()) } @@ -161,12 +195,9 @@ impl TaskRunner { buffer: Arc>, shut_down: Arc, syncer: Arc>, + sinkers: Vec>>>, ) -> Result, Error> { - let transaction_command = self.fetch_transaction_command(); - let parallelizer = ParallelizerUtil::create_parallelizer(&self.config).await?; - let sinkers = SinkerUtil::create_sinkers(&self.config, transaction_command).await?; - let pipeline = BasePipeline { buffer, parallelizer, @@ -175,7 +206,8 @@ impl TaskRunner { shut_down, checkpoint_interval_secs: self.config.pipeline.checkpoint_interval_secs, batch_sink_interval_secs: self.config.pipeline.batch_sink_interval_secs, - syncer: syncer.to_owned(), + syncer, + monitor: Arc::new(RwLock::new(Monitor::new_default())), }; Ok(Box::new(pipeline)) @@ -476,4 +508,140 @@ impl TaskRunner { log4rs::init_raw_config(config).unwrap(); Ok(()) } + + async fn flush_monitors( + interval_secs: usize, + shut_down: Arc, + extractor_monitor: Option>>, + pipeline_monitor: Option>>, + mut sinker_monitors: Vec>>>, + ) { + // override the interval_secs + if let Some(monitor) = &extractor_monitor { + monitor.write().await.time_window_secs = interval_secs; + } + if let Some(monitor) = &pipeline_monitor { + monitor.write().await.time_window_secs = interval_secs; + } + for monitor in sinker_monitors.iter_mut() { + if let Some(monitor) = monitor { + monitor.write().await.time_window_secs = interval_secs; + } + } + + loop { + // do an extra flush before exit if task finished + let finished = shut_down.load(Ordering::Acquire); + if !finished { + TimeUtil::sleep_millis(interval_secs as u64 * 1000).await; + } + + // aggregate extractor counters + if let Some(monitor) = &extractor_monitor { + for (counter_type, counter) in monitor.write().await.time_window_counters.iter_mut() + { + counter.refresh_window(); + let agrregate = match counter_type { + _ => 0, + }; + log_monitor!("extractor | {} | {}", counter_type.to_string(), agrregate) + } + + for (counter_type, counter) in monitor.read().await.accumulate_counters.iter() { + log_monitor!( + "extractor | {} | {}", + counter_type.to_string(), + counter.value + ) + } + } + + // aggregate pipeline counters + if let Some(monitor) = &pipeline_monitor { + for (counter_type, counter) in monitor.write().await.time_window_counters.iter_mut() + { + counter.refresh_window(); + let agrregate = match counter_type { + CounterType::BufferSize => counter.avg_by_count(), + _ => 0, + }; + log_monitor!("pipeline | {} | {}", counter_type.to_string(), agrregate) + } + + for (counter_type, counter) in monitor.read().await.accumulate_counters.iter() { + log_monitor!( + "pipeline | {} | {}", + counter_type.to_string(), + counter.value + ) + } + } + + // aggregate sinker monitors + let mut time_window_aggregates = HashMap::new(); + let mut accumulate_aggregates = HashMap::new(); + for monitor in sinker_monitors.iter_mut() { + if monitor.is_none() { + continue; + } + + if let Some(monitor) = monitor { + // time window counters + for (counter_type, counter) in + monitor.write().await.time_window_counters.iter_mut() + { + counter.refresh_window(); + let (sum, count) = + if let Some((sum, count)) = time_window_aggregates.get(counter_type) { + (*sum, *count) + } else { + (0, 0) + }; + time_window_aggregates.insert( + counter_type.clone(), + (counter.sum() + sum, counter.count() + count), + ); + } + + // accumulate counters + for (counter_type, counter) in monitor.read().await.accumulate_counters.iter() { + let sum = if let Some(sum) = accumulate_aggregates.get(counter_type) { + *sum + } else { + 0 + }; + accumulate_aggregates.insert(counter_type.clone(), counter.value + sum); + } + } + } + + for (counter_type, (sum, count)) in time_window_aggregates { + let agrregate = match counter_type { + CounterType::BatchWriteFailures | CounterType::SerialWrites => sum, + + CounterType::Records => sum / interval_secs, + + CounterType::BytesPerQuery + | CounterType::RecordsPerQuery + | CounterType::RtPerQuery => { + if count > 0 { + sum / count + } else { + 0 + } + } + _ => 0, + }; + log_monitor!("sinker | {} | {}", counter_type.to_string(), agrregate); + } + + for (counter_type, sum) in accumulate_aggregates { + log_monitor!("sinker | {} | {}", counter_type.to_string(), sum); + } + + if finished { + break; + } + } + } } diff --git a/log4rs.yaml b/log4rs.yaml index 398bc502..842abd4a 100644 --- a/log4rs.yaml +++ b/log4rs.yaml @@ -7,7 +7,7 @@ appenders: append: true path: "LOG_DIR_PLACEHODLER/default.log" encoder: - pattern: "{h({d(%Y-%m-%d %H:%M:%S)})} - {level} - {tid} - {m}{n}" + pattern: "{d(%Y-%m-%d %H:%M:%S.%6f)(utc)} - {level} - [{i}] - {m}{n}" policy: kind: compound trigger: @@ -24,7 +24,7 @@ appenders: append: true path: "LOG_DIR_PLACEHODLER/commit.log" encoder: - pattern: "{h({d(%Y-%m-%d %H:%M:%S)})} - {level} - {tid} - {m}{n}" + pattern: "{d(%Y-%m-%d %H:%M:%S.%6f)(utc)} - {level} - [{i}] - {m}{n}" policy: kind: compound trigger: @@ -41,7 +41,7 @@ appenders: append: true path: "LOG_DIR_PLACEHODLER/position.log" encoder: - pattern: "{h({d(%Y-%m-%d %H:%M:%S)})} | {m}{n}" + pattern: "{d(%Y-%m-%d %H:%M:%S.%6f)(utc)} | {m}{n}" policy: kind: compound trigger: @@ -58,7 +58,7 @@ appenders: append: true path: "LOG_DIR_PLACEHODLER/monitor.log" encoder: - pattern: "{h({d(%Y-%m-%d %H:%M:%S)})} | {m}{n}" + pattern: "{d(%Y-%m-%d %H:%M:%S.%6f)(utc)} | {m}{n}" policy: kind: compound trigger: