diff --git a/src/db_client.rs b/src/db_client.rs index e55aed4..3f3bd25 100644 --- a/src/db_client.rs +++ b/src/db_client.rs @@ -1,12 +1,8 @@ -use std::collections::HashMap; extern crate bincode; use crate::types::Group; use async_trait::async_trait; -use rocksdb::IteratorMode; use rocksdb::DB; -pub type Result = std::result::Result>; - pub struct LagDB { pub lag_db: DB, } @@ -14,8 +10,7 @@ pub struct LagDB { #[async_trait] pub trait DBClient { fn put(&self, k: Vec, v: Vec) -> bool; - fn get(&self, k: Vec) -> Option>; - async fn get_all(&self) -> HashMap; + fn get(&self, k: Vec) -> Option; } #[async_trait] @@ -24,36 +19,14 @@ impl DBClient for LagDB { self.lag_db.put(k, v).is_ok() } - fn get(&self, k: Vec) -> Option> { + fn get(&self, k: Vec) -> Option { let value = self.lag_db.get(k); match value { Ok(Some(v)) => { - let payload: Vec = bincode::deserialize(&v).unwrap(); + let payload: Group = bincode::deserialize(&v).unwrap(); Some(payload) } _ => None, } } - - async fn get_all(&self) -> HashMap { - let iter = self.lag_db.iterator(IteratorMode::Start); - let mut group_map: HashMap = HashMap::new(); - for (group_key, group_payload) in iter { - let key: Result = bincode::deserialize(&group_key); - let payload: Result = bincode::deserialize(&group_payload); - let key: Option = match key { - Ok(k) => Some(k), - Err(_) => None, - }; - let payload: Option = match payload { - Ok(p) => Some(p), - Err(_) => None, - }; - match (key, payload) { - (Some(k), Some(p)) => group_map.insert(k, p), - _ => None, - }; - } - group_map - } } diff --git a/src/lag_consumer.rs b/src/lag_consumer.rs index b3bb35c..e6c5827 100644 --- a/src/lag_consumer.rs +++ b/src/lag_consumer.rs @@ -4,8 +4,8 @@ extern crate slog_term; use crate::config_reader::read; use crate::db_client::{DBClient, LagDB}; use crate::parser::{parse_date, parse_message}; -use crate::types::{Group, TopicPartition}; -use futures::{join, TryStreamExt}; +use crate::types::{Group, Lag}; +use futures::TryStreamExt; use rdkafka::config::ClientConfig; use rdkafka::consumer::stream_consumer::StreamConsumer; use rdkafka::consumer::Consumer; @@ -17,6 +17,7 @@ use std::time::Duration; lazy_static! { static ref LOG: slog::Logger = create_log(); + static ref WATERMARK_CONSUMER: StreamConsumer = read().create().unwrap(); pub static ref LAG_CONSUMER: LagConsumer = LagConsumer { lag_db: Arc::new(LagDB { lag_db: rocksdb::DB::open_default("/tmp/rocksdb".to_string()).unwrap() @@ -41,7 +42,7 @@ impl LagConsumer { let stream_processor = consumer .start() .try_for_each(|borrowed_message| async move { - info!(LOG, "Fetching watermarks..."); + info!(LOG, "Consuming messages..."); let owned_message = borrowed_message.detach(); tokio::spawn(async move { tokio::task::spawn_blocking(move || self.push_group_data(owned_message)) @@ -66,91 +67,69 @@ impl LagConsumer { partition, offset, }) => { - let group_key = Group::GroupKey { - group: group, - topic_partition: TopicPartition { topic, partition }, + let group_key = Group::GroupKey { group: group }; + let serialized_group_key = bincode::serialize(&group_key).unwrap(); + let group_payload: Option = match self.lag_db.get(serialized_group_key) { + Some(Group::GroupPayload { mut payload, topic }) => { + payload.insert(partition, (offset, parse_date(timestamp))); + Some(Group::GroupPayload { topic, payload }) + } + None => { + let mut map: HashMap = HashMap::new(); + map.insert(partition, (offset, parse_date(timestamp))); + Some(Group::GroupPayload { + topic: topic, + payload: map, + }) + } + _ => None, }; - let group_payload = Group::GroupPayload { - group_offset: offset, - commit_timestamp: parse_date(timestamp), - }; - self.lag_db.put( - bincode::serialize(&group_key).unwrap(), - bincode::serialize(&group_payload).unwrap(), - ); + match group_payload { + Some(payload) => { + self.lag_db.put( + bincode::serialize(&group_key).unwrap(), + bincode::serialize(&payload).unwrap(), + ); + } + _ => (), + } } - Err(e) => warn!(LOG, "Error to process High Topic Watermarks, {}", e), _ => (), } } - async fn calculate_lag(&self) { - info!(LOG, "CALCULATING LAG..."); - let groups_future = self.lag_db.get_all(); - let hwms_future = self.get_hwms(); - let result = join!(hwms_future, groups_future); - let hwms = result.0; - let groups = result.1; - let mut all_lag: Vec = Vec::new(); - for (k, v) in groups { - match (k, v) { - ( - Group::GroupKey { - group, - topic_partition, - }, - Group::GroupPayload { - group_offset, - commit_timestamp, - }, - ) => { - let topic_hwms = hwms.get(&topic_partition); - let group_lag = Group::GroupLag { - topic: topic_partition.topic, - partition: topic_partition.partition, - group: group, - lag: topic_hwms.unwrap() - group_offset, - last_commit: commit_timestamp, + pub fn get_lag(&self, group: &str) -> Option { + match self.lag_db.get( + bincode::serialize(&Group::GroupKey { + group: group.to_string(), + }) + .unwrap(), + ) { + Some(Group::GroupPayload { payload, topic }) => { + let mut partitions_lag: Vec = Vec::new(); + for (partition, value) in payload { + let hwms = self.get_hwms(topic.clone(), partition); + let partition_lag = Lag { + partition: partition, + lag: hwms.1 - value.0, + timestamp: value.1, }; - all_lag.push(group_lag); - self.lag_db.put( - b"last_calculated_lag".to_vec(), - bincode::serialize(&all_lag).unwrap(), - ); + partitions_lag.push(partition_lag); } - _ => {} + Some(Group::GroupLag { + group: group.to_string(), + topic: topic, + lag: partitions_lag, + }) } + _ => None, } } - async fn get_hwms(&self) -> HashMap { - let watermark_consumer: StreamConsumer = read().create().unwrap(); - let metadata = &watermark_consumer - .fetch_metadata(None, Duration::from_secs(1)) - .expect("errou"); - let mut topics: HashMap = HashMap::new(); - for topic in metadata.topics() { - let partitions_count: i32 = topic.partitions().len() as i32; - for p in 0..partitions_count { - let hwms = watermark_consumer - .fetch_watermarks(&topic.name(), p, Duration::from_millis(100)) - .unwrap(); - topics.insert(TopicPartition::new(topic.name().to_string(), p), hwms.1); - } - } - topics - } - - pub async fn lag_calc_update(&self) { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10)); - loop { - interval.tick().await; - self.calculate_lag().await; - } - } - - pub fn get_lag(&self) -> Option> { - self.lag_db.get(b"last_calculated_lag".to_vec()) + fn get_hwms(&self, topic: String, partition: i32) -> (i64, i64) { + WATERMARK_CONSUMER + .fetch_watermarks(&topic, partition, Duration::from_millis(100)) + .unwrap() } } diff --git a/src/main.rs b/src/main.rs index 5f78a35..b51745a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,12 +24,12 @@ fn create_log() -> slog::Logger { #[tokio::main] async fn main() { tokio::task::spawn(LAG_CONSUMER.consume()); - tokio::task::spawn(LAG_CONSUMER.lag_calc_update()); let cors = warp::cors().allow_any_origin(); let lag = warp::path("lag") - .map(move || match LAG_CONSUMER.get_lag() { + .and(warp::path::param()) + .map(|group: String| match LAG_CONSUMER.get_lag(&group) { Some(v) => warp::reply::with_status(warp::reply::json(&v), StatusCode::OK), None => { warp::reply::with_status(warp::reply::json(&"Lag not found"), StatusCode::NOT_FOUND) diff --git a/src/types.rs b/src/types.rs index 31b61c9..a0f2484 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,22 +1,26 @@ use serde::{Deserialize, Serialize}; -use std::fmt; +use std::collections::HashMap; -#[derive(Eq, Hash, Serialize, Deserialize, PartialEq, Debug)] +#[derive(Eq, Serialize, Deserialize, PartialEq, Debug)] +pub struct Lag { + pub partition: i32, + pub lag: i64, + pub timestamp: String, +} + +#[derive(Eq, Serialize, Deserialize, PartialEq, Debug)] pub enum Group { GroupKey { group: String, - topic_partition: TopicPartition, }, GroupPayload { - group_offset: i64, - commit_timestamp: String, + topic: String, + payload: HashMap, }, GroupLag { topic: String, - partition: i32, group: String, - lag: i64, - last_commit: String, + lag: Vec, }, OffsetCommit { group: String, @@ -26,41 +30,3 @@ pub enum Group { }, None, } - -impl fmt::Display for Group { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Group::GroupLag { - topic, - partition, - group, - lag, - last_commit, - } => write!( - f, - "Topic: {}, - Partition: {}, - Group: {}, - Lag: {}, - Last Commit: {}", - topic, partition, group, lag, last_commit - ), - _ => Ok(()), - } - } -} - -#[derive(Serialize, Deserialize, Hash, Eq, PartialEq, Debug)] -pub struct TopicPartition { - pub topic: String, - pub partition: i32, -} - -impl TopicPartition { - pub fn new(topic: String, partition: i32) -> Self { - TopicPartition { - topic: topic, - partition: partition, - } - } -}