Skip to content

Commit

Permalink
Lag for group.
Browse files Browse the repository at this point in the history
  • Loading branch information
petrusbatalha committed Sep 8, 2020
1 parent 3ee68dc commit 89af2b7
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 154 deletions.
33 changes: 3 additions & 30 deletions src/db_client.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
use std::collections::HashMap;
extern crate bincode;
use crate::types::Group;
use async_trait::async_trait;
use rocksdb::IteratorMode;
use rocksdb::DB;

pub type Result<T> = std::result::Result<T, std::boxed::Box<bincode::ErrorKind>>;

pub struct LagDB {
pub lag_db: DB,
}

#[async_trait]
pub trait DBClient<T> {
fn put(&self, k: Vec<u8>, v: Vec<u8>) -> bool;
fn get(&self, k: Vec<u8>) -> Option<Vec<T>>;
async fn get_all(&self) -> HashMap<T, T>;
fn get(&self, k: Vec<u8>) -> Option<T>;
}

#[async_trait]
Expand All @@ -24,36 +19,14 @@ impl DBClient<Group> for LagDB {
self.lag_db.put(k, v).is_ok()
}

fn get(&self, k: Vec<u8>) -> Option<Vec<Group>> {
fn get(&self, k: Vec<u8>) -> Option<Group> {
let value = self.lag_db.get(k);
match value {
Ok(Some(v)) => {
let payload: Vec<Group> = bincode::deserialize(&v).unwrap();
let payload: Group = bincode::deserialize(&v).unwrap();
Some(payload)
}
_ => None,
}
}

async fn get_all(&self) -> HashMap<Group, Group> {
let iter = self.lag_db.iterator(IteratorMode::Start);
let mut group_map: HashMap<Group, Group> = HashMap::new();
for (group_key, group_payload) in iter {
let key: Result<Group> = bincode::deserialize(&group_key);
let payload: Result<Group> = bincode::deserialize(&group_payload);
let key: Option<Group> = match key {
Ok(k) => Some(k),
Err(_) => None,
};
let payload: Option<Group> = match payload {
Ok(p) => Some(p),
Err(_) => None,
};
match (key, payload) {
(Some(k), Some(p)) => group_map.insert(k, p),
_ => None,
};
}
group_map
}
}
131 changes: 55 additions & 76 deletions src/lag_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ extern crate slog_term;
use crate::config_reader::read;
use crate::db_client::{DBClient, LagDB};
use crate::parser::{parse_date, parse_message};
use crate::types::{Group, TopicPartition};
use futures::{join, TryStreamExt};
use crate::types::{Group, Lag};
use futures::TryStreamExt;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::Consumer;
Expand All @@ -17,6 +17,7 @@ use std::time::Duration;

lazy_static! {
static ref LOG: slog::Logger = create_log();
static ref WATERMARK_CONSUMER: StreamConsumer = read().create().unwrap();
pub static ref LAG_CONSUMER: LagConsumer = LagConsumer {
lag_db: Arc::new(LagDB {
lag_db: rocksdb::DB::open_default("/tmp/rocksdb".to_string()).unwrap()
Expand All @@ -41,7 +42,7 @@ impl LagConsumer {
let stream_processor = consumer
.start()
.try_for_each(|borrowed_message| async move {
info!(LOG, "Fetching watermarks...");
info!(LOG, "Consuming messages...");
let owned_message = borrowed_message.detach();
tokio::spawn(async move {
tokio::task::spawn_blocking(move || self.push_group_data(owned_message))
Expand All @@ -66,91 +67,69 @@ impl LagConsumer {
partition,
offset,
}) => {
let group_key = Group::GroupKey {
group: group,
topic_partition: TopicPartition { topic, partition },
let group_key = Group::GroupKey { group: group };
let serialized_group_key = bincode::serialize(&group_key).unwrap();
let group_payload: Option<Group> = match self.lag_db.get(serialized_group_key) {
Some(Group::GroupPayload { mut payload, topic }) => {
payload.insert(partition, (offset, parse_date(timestamp)));
Some(Group::GroupPayload { topic, payload })
}
None => {
let mut map: HashMap<i32, (i64, String)> = HashMap::new();
map.insert(partition, (offset, parse_date(timestamp)));
Some(Group::GroupPayload {
topic: topic,
payload: map,
})
}
_ => None,
};
let group_payload = Group::GroupPayload {
group_offset: offset,
commit_timestamp: parse_date(timestamp),
};
self.lag_db.put(
bincode::serialize(&group_key).unwrap(),
bincode::serialize(&group_payload).unwrap(),
);
match group_payload {
Some(payload) => {
self.lag_db.put(
bincode::serialize(&group_key).unwrap(),
bincode::serialize(&payload).unwrap(),
);
}
_ => (),
}
}
Err(e) => warn!(LOG, "Error to process High Topic Watermarks, {}", e),
_ => (),
}
}

async fn calculate_lag(&self) {
info!(LOG, "CALCULATING LAG...");
let groups_future = self.lag_db.get_all();
let hwms_future = self.get_hwms();
let result = join!(hwms_future, groups_future);
let hwms = result.0;
let groups = result.1;
let mut all_lag: Vec<Group> = Vec::new();
for (k, v) in groups {
match (k, v) {
(
Group::GroupKey {
group,
topic_partition,
},
Group::GroupPayload {
group_offset,
commit_timestamp,
},
) => {
let topic_hwms = hwms.get(&topic_partition);
let group_lag = Group::GroupLag {
topic: topic_partition.topic,
partition: topic_partition.partition,
group: group,
lag: topic_hwms.unwrap() - group_offset,
last_commit: commit_timestamp,
pub fn get_lag(&self, group: &str) -> Option<Group> {
match self.lag_db.get(
bincode::serialize(&Group::GroupKey {
group: group.to_string(),
})
.unwrap(),
) {
Some(Group::GroupPayload { payload, topic }) => {
let mut partitions_lag: Vec<Lag> = Vec::new();
for (partition, value) in payload {
let hwms = self.get_hwms(topic.clone(), partition);
let partition_lag = Lag {
partition: partition,
lag: hwms.1 - value.0,
timestamp: value.1,
};
all_lag.push(group_lag);
self.lag_db.put(
b"last_calculated_lag".to_vec(),
bincode::serialize(&all_lag).unwrap(),
);
partitions_lag.push(partition_lag);
}
_ => {}
Some(Group::GroupLag {
group: group.to_string(),
topic: topic,
lag: partitions_lag,
})
}
_ => None,
}
}

async fn get_hwms(&self) -> HashMap<TopicPartition, i64> {
let watermark_consumer: StreamConsumer = read().create().unwrap();
let metadata = &watermark_consumer
.fetch_metadata(None, Duration::from_secs(1))
.expect("errou");
let mut topics: HashMap<TopicPartition, i64> = HashMap::new();
for topic in metadata.topics() {
let partitions_count: i32 = topic.partitions().len() as i32;
for p in 0..partitions_count {
let hwms = watermark_consumer
.fetch_watermarks(&topic.name(), p, Duration::from_millis(100))
.unwrap();
topics.insert(TopicPartition::new(topic.name().to_string(), p), hwms.1);
}
}
topics
}

pub async fn lag_calc_update(&self) {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
loop {
interval.tick().await;
self.calculate_lag().await;
}
}

pub fn get_lag(&self) -> Option<Vec<Group>> {
self.lag_db.get(b"last_calculated_lag".to_vec())
fn get_hwms(&self, topic: String, partition: i32) -> (i64, i64) {
WATERMARK_CONSUMER
.fetch_watermarks(&topic, partition, Duration::from_millis(100))
.unwrap()
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ fn create_log() -> slog::Logger {
#[tokio::main]
async fn main() {
tokio::task::spawn(LAG_CONSUMER.consume());
tokio::task::spawn(LAG_CONSUMER.lag_calc_update());

let cors = warp::cors().allow_any_origin();

let lag = warp::path("lag")
.map(move || match LAG_CONSUMER.get_lag() {
.and(warp::path::param())
.map(|group: String| match LAG_CONSUMER.get_lag(&group) {
Some(v) => warp::reply::with_status(warp::reply::json(&v), StatusCode::OK),
None => {
warp::reply::with_status(warp::reply::json(&"Lag not found"), StatusCode::NOT_FOUND)
Expand Down
58 changes: 12 additions & 46 deletions src/types.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
use serde::{Deserialize, Serialize};
use std::fmt;
use std::collections::HashMap;

#[derive(Eq, Hash, Serialize, Deserialize, PartialEq, Debug)]
#[derive(Eq, Serialize, Deserialize, PartialEq, Debug)]
pub struct Lag {
pub partition: i32,
pub lag: i64,
pub timestamp: String,
}

#[derive(Eq, Serialize, Deserialize, PartialEq, Debug)]
pub enum Group {
GroupKey {
group: String,
topic_partition: TopicPartition,
},
GroupPayload {
group_offset: i64,
commit_timestamp: String,
topic: String,
payload: HashMap<i32, (i64, String)>,
},
GroupLag {
topic: String,
partition: i32,
group: String,
lag: i64,
last_commit: String,
lag: Vec<Lag>,
},
OffsetCommit {
group: String,
Expand All @@ -26,41 +30,3 @@ pub enum Group {
},
None,
}

impl fmt::Display for Group {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Group::GroupLag {
topic,
partition,
group,
lag,
last_commit,
} => write!(
f,
"Topic: {},
Partition: {},
Group: {},
Lag: {},
Last Commit: {}",
topic, partition, group, lag, last_commit
),
_ => Ok(()),
}
}
}

#[derive(Serialize, Deserialize, Hash, Eq, PartialEq, Debug)]
pub struct TopicPartition {
pub topic: String,
pub partition: i32,
}

impl TopicPartition {
pub fn new(topic: String, partition: i32) -> Self {
TopicPartition {
topic: topic,
partition: partition,
}
}
}

0 comments on commit 89af2b7

Please sign in to comment.