Skip to content

Commit

Permalink
add basic counters
Browse files Browse the repository at this point in the history
  • Loading branch information
qianyiwen2019 committed Nov 23, 2023
1 parent ed658bb commit ffef5c5
Show file tree
Hide file tree
Showing 18 changed files with 435 additions and 118 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async-mutex = "1.4.0"
project-root = "0.2.2"
strum = { version = "0.25.0", features = ["derive"] }
regex = "1.5.4"
rdkafka = {version = "0.34.0", features = ["libz-static"] }
rdkafka = { version = "0.36.0", features = ["libz-static"] }
kafka = "0.10.0"
reqwest = "0.11.16"
rusoto_core = "0.48.0"
Expand All @@ -56,4 +56,5 @@ dotenv = "0.15.0"
redis = { version = "0.23.1", features = ["tokio-comp"] }
thiserror = "1.0.44"
async-std = "1.12.0"
apache-avro = { version = "0.16.0", features = ["derive"]}
apache-avro = { version = "0.16.0", features = ["derive"]}
async-rwlock = "1.3.0"
17 changes: 7 additions & 10 deletions dt-common/src/monitor/counter.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
use std::time::Instant;

#[derive(Clone)]
pub struct Counter {
pub timestamp: Instant,
pub value: u64,
pub value: usize,
pub description: String,
}

impl Counter {
pub fn new() -> Self {
pub fn new(value: usize) -> Self {
Self {
timestamp: Instant::now(),
value: 0,
value,
description: String::new(),
}
}

#[inline(always)]
pub fn add(&mut self, value: u64) {
pub fn add(&mut self, value: usize) {
self.value += value;
}
}

impl Default for Counter {
fn default() -> Self {
Self::new()
}
}
3 changes: 2 additions & 1 deletion dt-common/src/monitor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod counter;
pub mod statistic_counter;
pub mod monitor;
pub mod time_window_counter;
80 changes: 80 additions & 0 deletions dt-common/src/monitor/monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::collections::HashMap;

use strum::{Display, EnumString, IntoStaticStr};

use super::counter::Counter;
use super::time_window_counter::TimeWindowCounter;

#[derive(Clone)]
pub struct Monitor {
pub accumulate_counters: HashMap<CounterType, Counter>,
pub time_window_counters: HashMap<CounterType, TimeWindowCounter>,
pub time_window_secs: usize,
}

#[derive(EnumString, IntoStaticStr, Display, PartialEq, Eq, Hash, Clone)]
pub enum CounterType {
// time window counter, aggregate by: sum by interval
#[strum(serialize = "batch_write_failures")]
BatchWriteFailures,
#[strum(serialize = "serial_writes")]
SerialWrites,

// time window counter, aggregate by: avg by interval
#[strum(serialize = "rps")]
Records,

// time window counter, aggregate by: avg by count
#[strum(serialize = "bytes_per_query")]
BytesPerQuery,
#[strum(serialize = "records_per_query")]
RecordsPerQuery,
#[strum(serialize = "rt_per_query")]
RtPerQuery,
#[strum(serialize = "buffer_size")]
BufferSize,

// accumulate counter
#[strum(serialize = "sinked_count")]
SinkedCount,
}

const DEFAULT_INTERVAL_SECS: usize = 5;

impl Monitor {
pub fn new_default() -> Self {
Self::new(DEFAULT_INTERVAL_SECS)
}

pub fn new(interval_secs: usize) -> Self {
Self {
accumulate_counters: HashMap::new(),
time_window_counters: HashMap::new(),
time_window_secs: interval_secs,
}
}

pub fn add_counter(&mut self, counter_type: CounterType, value: usize) -> &mut Self {
match counter_type {
CounterType::SinkedCount => {
if let Some(counter) = self.accumulate_counters.get_mut(&counter_type) {
counter.add(value)
} else {
self.accumulate_counters
.insert(counter_type, Counter::new(value));
}
}

_ => {
if let Some(counter) = self.time_window_counters.get_mut(&counter_type) {
counter.add(value)
} else {
let mut counter = TimeWindowCounter::new(self.time_window_secs);
counter.add(value);
self.time_window_counters.insert(counter_type, counter);
}
}
}
self
}
}
63 changes: 0 additions & 63 deletions dt-common/src/monitor/statistic_counter.rs

This file was deleted.

69 changes: 69 additions & 0 deletions dt-common/src/monitor/time_window_counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::collections::LinkedList;

use super::counter::Counter;

#[derive(Clone)]
pub struct TimeWindowCounter {
pub time_window_secs: usize,
pub counters: LinkedList<Counter>,
pub description: String,
}

impl TimeWindowCounter {
pub fn new(time_window_secs: usize) -> Self {
Self {
time_window_secs,
counters: LinkedList::new(),
description: String::new(),
}
}

#[inline(always)]
pub fn add(&mut self, value: usize) {
self.counters.push_back(Counter::new(value));
}

#[inline(always)]
pub fn sum(&mut self) -> usize {
let mut sum = 0;
for counter in self.counters.iter() {
sum += counter.value;
}
sum
}

#[inline(always)]
pub fn count(&mut self) -> usize {
self.counters.len()
}

#[inline(always)]
pub fn avg_by_interval(&mut self) -> usize {
self.sum() / self.time_window_secs
}

#[inline(always)]
pub fn avg_by_count(&mut self) -> usize {
if self.counters.len() > 0 {
self.sum() / self.counters.len()
} else {
0
}
}

#[inline(always)]
pub fn refresh_window(&mut self) {
let mut outdate_count = 0;
for counter in self.counters.iter() {
if counter.timestamp.elapsed().as_secs() > self.time_window_secs as u64 {
outdate_count += 1;
} else {
break;
}
}

for _ in 0..outdate_count {
self.counters.pop_front();
}
}
}
3 changes: 2 additions & 1 deletion dt-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ redis = { workspace = true }
thiserror = { workspace = true }
async-std = { workspace = true }
chrono = { workspace = true }
apache-avro = {workspace = true}
apache-avro = {workspace = true}
async-rwlock = { workspace = true }
8 changes: 7 additions & 1 deletion dt-connector/src/extractor/mysql/mysql_snapshot_extractor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::{atomic::AtomicBool, Arc};

use async_rwlock::RwLock;
use async_trait::async_trait;
use concurrent_queue::ConcurrentQueue;
use dt_meta::{
Expand All @@ -17,7 +18,7 @@ use futures::TryStreamExt;

use sqlx::{MySql, Pool};

use dt_common::{config::config_enums::DbType, error::Error, log_info};
use dt_common::{config::config_enums::DbType, error::Error, log_info, monitor::monitor::Monitor};

use crate::{
extractor::{base_extractor::BaseExtractor, snapshot_resumer::SnapshotResumer},
Expand All @@ -33,6 +34,7 @@ pub struct MysqlSnapshotExtractor {
pub db: String,
pub tb: String,
pub shut_down: Arc<AtomicBool>,
pub monitor: Arc<RwLock<Monitor>>,
}

#[async_trait]
Expand All @@ -54,6 +56,10 @@ impl Extractor for MysqlSnapshotExtractor {
self.conn_pool.close().await;
Ok(())
}

fn get_monitor(&self) -> Option<Arc<RwLock<Monitor>>> {
Some(self.monitor.clone())
}
}

impl MysqlSnapshotExtractor {
Expand Down
13 changes: 12 additions & 1 deletion dt-connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ pub mod meta_fetcher;
pub mod rdb_query_builder;
pub mod sinker;

use std::sync::Arc;

use async_rwlock::RwLock;
use async_trait::async_trait;
use check_log::check_log::CheckLog;
use dt_common::error::Error;
use dt_common::{error::Error, monitor::monitor::Monitor};
use dt_meta::{ddl_data::DdlData, dt_data::DtData, row_data::RowData};

#[async_trait]
Expand All @@ -31,6 +34,10 @@ pub trait Sinker {
async fn refresh_meta(&mut self, _data: Vec<DdlData>) -> Result<(), Error> {
Ok(())
}

fn get_monitor(&self) -> Option<Arc<RwLock<Monitor>>> {
None
}
}

#[async_trait]
Expand All @@ -40,6 +47,10 @@ pub trait Extractor {
async fn close(&mut self) -> Result<(), Error> {
Ok(())
}

fn get_monitor(&self) -> Option<Arc<RwLock<Monitor>>> {
None
}
}

#[async_trait]
Expand Down
Loading

0 comments on commit ffef5c5

Please sign in to comment.