Skip to content

Commit f5fa18b

Browse files
committed
add journal
1 parent c61a4f8 commit f5fa18b

File tree

6 files changed

+195
-6
lines changed

6 files changed

+195
-6
lines changed

kafka-json-processor-core/src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub struct InternalConfig {
1818
pub channel_capacity: usize,
1919
pub queue_slowdown_ms: usize,
2020
pub queue_size: usize,
21+
pub journal_path: String,
2122
}
2223

2324
impl Default for InternalConfig {
@@ -27,6 +28,7 @@ impl Default for InternalConfig {
2728
channel_capacity: 50,
2829
queue_slowdown_ms: 10_000, // 10 s
2930
queue_size: 100_000,
31+
journal_path: "./kjp_journal".to_string(),
3032
}
3133
}
3234
}
@@ -102,6 +104,9 @@ fn set_internal(key: &str, value: &str, config: &mut InternalConfig) -> Result<(
102104
"processor.queue.slowdown.ms" =>
103105
config.queue_slowdown_ms = value.parse()?,
104106

107+
"processor.journal.path" =>
108+
config.journal_path = value.parse()?,
109+
105110
_ => {
106111
warn!("Unknown config option: {key}={value}. Ignoring.")
107112
}

kafka-json-processor-core/src/consumer.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use log::{debug, error, trace, warn};
44
use rdkafka::consumer::StreamConsumer;
55
use rdkafka::Message;
66
use tokio::runtime::Runtime;
7-
use crate::{PendingMessage, Stream};
7+
use crate::{MessageOffset, PendingMessage, Stream};
88
use crate::processor::{process_payload, ProcessingResult};
99

1010
pub async fn consumer_loop(consumer: StreamConsumer, tx: Sender<PendingMessage>, runtime: &Runtime, streams: HashMap<String, Stream>)
@@ -22,12 +22,17 @@ pub async fn consumer_loop(consumer: StreamConsumer, tx: Sender<PendingMessage>,
2222
message.offset(),
2323
message.timestamp().to_millis().unwrap_or(0)
2424
);
25+
let message_offset = MessageOffset {
26+
topic: message.topic().to_string(),
27+
partition: message.partition(),
28+
offset: message.offset(),
29+
};
2530

2631
debug!("[{key}] Received message.");
2732
trace!("[{key}] Message: {}", String::from_utf8_lossy(&payload));
2833

2934
if let Some(stream) = streams.get(message.topic()) {
30-
spawn_task(runtime, tx.clone(), key, payload, stream.clone());
35+
spawn_task(runtime, tx.clone(), key, payload, stream.clone(), message_offset);
3136
} else {
3237
warn!("[{key}] Topic {} is unsupported! Ignoring message.", message.topic());
3338
}
@@ -39,14 +44,15 @@ pub async fn consumer_loop(consumer: StreamConsumer, tx: Sender<PendingMessage>,
3944
}
4045
}
4146

42-
fn spawn_task(runtime: &Runtime, tx: Sender<PendingMessage>, key: String, payload: Vec<u8>, stream: Stream) {
47+
fn spawn_task(runtime: &Runtime, tx: Sender<PendingMessage>, key: String, payload: Vec<u8>, stream: Stream, message_offset: MessageOffset) {
4348
runtime.spawn(async move {
4449
match process_payload(key.clone(), &payload, stream.processors) {
4550
Ok(processed) => {
4651
trace!("[{key}] Output: {}", processed.message);
4752
tx.send(PendingMessage::Processed {
4853
id: key,
4954
topic: stream.target_topic.clone(),
55+
offset: message_offset,
5056
message: processed,
5157
}).unwrap();
5258
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
use std::collections::HashMap;
2+
use std::error::Error;
3+
use std::ffi::OsStr;
4+
use std::fs;
5+
use std::ops::Deref;
6+
use std::path::{Path, PathBuf};
7+
use std::sync::{Arc, Mutex};
8+
use log::{debug, error};
9+
use crate::MessageOffset;
10+
11+
pub struct MessageOffsetHolder {
12+
journal_dir: String,
13+
inner: Arc<Mutex<HashMap<OffsetKey, Offset>>>
14+
}
15+
16+
type Offset = i64;
17+
18+
#[derive(Clone, Eq, PartialEq, Hash, Debug)]
19+
pub struct OffsetKey(pub String, pub i32);
20+
21+
impl MessageOffsetHolder {
22+
pub fn with_offsets_in(journal_dir: String) -> Result<MessageOffsetHolder, Box<dyn Error>> {
23+
let offsets = read_offsets_from(&journal_dir)?;
24+
Ok(MessageOffsetHolder {
25+
journal_dir,
26+
inner: Arc::new(Mutex::new(offsets)),
27+
})
28+
}
29+
30+
pub fn update(&self, offset: MessageOffset) {
31+
let mut guard = self.inner.lock().unwrap();
32+
(*guard).insert(OffsetKey(offset.topic, offset.partition), offset.offset);
33+
}
34+
35+
pub fn flush(&self) {
36+
let offsets = {
37+
let guard = self.inner.lock().unwrap();
38+
guard.clone()
39+
};
40+
41+
save_offsets_in(offsets, &self.journal_dir);
42+
}
43+
}
44+
45+
fn read_offsets_from<P: AsRef<Path>>(directory: P) -> Result<HashMap<OffsetKey, Offset>, Box<dyn Error>> {
46+
ensure_dir_exists(&directory)?;
47+
48+
let map = fs::read_dir(directory.as_ref())?
49+
.filter_map(|file| {
50+
file.map_err(|e| {
51+
error!("Error reading file: {e}")
52+
}).ok()
53+
})
54+
.filter_map(|file| {
55+
let path = file.path();
56+
57+
fs::read_to_string(&path)
58+
.map_err(|e| {
59+
error!("Error reading file {path:?}! Reason: {e}")
60+
})
61+
.ok()
62+
.and_then(|content| {
63+
let offset_key = file_name_to_offset_key(path.file_name(), path.file_stem())?;
64+
Some((offset_key, content.parse().unwrap()))
65+
})
66+
})
67+
.collect();
68+
69+
Ok(map)
70+
}
71+
72+
/// Parses file_name as "$topic.$partition" (eg. sampletopic.1) and returns OffsetKey if successful
73+
fn file_name_to_offset_key(file_name: Option<&OsStr>, file_stem: Option<&OsStr>) -> Option<OffsetKey> {
74+
let file_name = file_name?.to_str()?;
75+
let file_stem = file_stem?.to_str()?;
76+
77+
let topic = file_name.strip_suffix(&file_stem)?.to_string();
78+
79+
Some(OffsetKey(topic, file_stem.parse().ok()?))
80+
}
81+
82+
fn save_offsets_in<P: AsRef<Path>>(offsets: HashMap<OffsetKey, Offset>, directory: P) {
83+
if let Err(e) = ensure_dir_exists(&directory) {
84+
error!("Cannot save offsets! {e}");
85+
return;
86+
}
87+
88+
offsets.into_iter()
89+
.for_each(|(key, offset)| {
90+
save_offset(&directory, key, offset);
91+
});
92+
}
93+
94+
fn save_offset<P: AsRef<Path>>(base_path: P, offset_key: OffsetKey, offset: Offset) {
95+
let base_path = PathBuf::from(base_path.as_ref());
96+
let file_path = base_path.join(format!("{}.{}", &offset_key.0, &offset_key.1));
97+
98+
debug!("Saving offset [Topic: {}] [Partition: {}] [Offset: {}] to {:?}", &offset_key.0, &offset_key.1, offset, &file_path);
99+
100+
// save offset to file with name "$topic.$partition" (eg. sampletopic.1)
101+
if let Err(e) = fs::write(&file_path, format!("{}", offset)) {
102+
error!("Failed to write journal to file {file_path:?}. Topic: [{}], Partition: [{}], Offset: [{}]. Reason: {e}", offset_key.0, offset_key.1, offset);
103+
}
104+
}
105+
106+
fn ensure_dir_exists<P: AsRef<Path>>(directory: P) -> Result<(), String> {
107+
let dir = directory.as_ref();
108+
if !dir.exists() {
109+
if let Err(e) = fs::create_dir_all(dir) {
110+
return Err(format!("Cannot create directory {dir:?}! Journal will not be saved. Reason: {e}"));
111+
}
112+
}
113+
if dir.is_file() {
114+
return Err(format!("Directory is a file: {dir:?}! Cannot use as journal."));
115+
}
116+
117+
Ok(())
118+
}
119+
120+
impl Deref for MessageOffsetHolder {
121+
type Target = Mutex<HashMap<OffsetKey, Offset>>;
122+
123+
fn deref(&self) -> &Self::Target {
124+
self.inner.deref()
125+
}
126+
}
127+
128+
impl Drop for MessageOffsetHolder {
129+
fn drop(&mut self) {
130+
// when json-processor finishes, we should save current offsets
131+
self.flush()
132+
}
133+
}

kafka-json-processor-core/src/lib.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
use std::collections::HashMap;
22
use std::error::Error;
3+
use std::sync::Arc;
34
use std::time::Duration;
45
use crossbeam_channel::bounded;
56
use log::{info, warn, error, debug};
67
use rdkafka::consumer::{Consumer, StreamConsumer};
78
use rdkafka::producer::BaseProducer;
89
use tokio::runtime::{Builder, Runtime};
10+
use tokio::time::interval;
911
use crate::config::Config;
1012
use crate::consumer::consumer_loop;
13+
use crate::journal::MessageOffsetHolder;
1114
use crate::processor::{Processor, SerializedOutputMessage};
1215
use crate::producer::producer_loop;
1316

@@ -18,6 +21,7 @@ pub mod processor;
1821
pub mod formatters;
1922
pub mod simulation;
2023
pub mod error;
24+
pub mod journal;
2125

2226
#[derive(Clone)]
2327
pub struct Stream {
@@ -31,10 +35,17 @@ pub enum PendingMessage {
3135
Processed {
3236
id: String,
3337
topic: String,
38+
offset: MessageOffset,
3439
message: SerializedOutputMessage,
3540
},
3641
}
3742

43+
pub struct MessageOffset {
44+
topic: String,
45+
partition: i32,
46+
offset: i64,
47+
}
48+
3849
pub fn run_processor(streams: HashMap<String, Stream>) {
3950
info!("Starting kafka-json-processor...");
4051

@@ -93,6 +104,10 @@ async fn run_processing_tasks(
93104
let consumer: StreamConsumer = exec_or_retry_in_10s!(config.consumer_config.create());
94105
let producer: BaseProducer = exec_or_retry_in_10s!(config.producer_config.create());
95106

107+
let offset_holder = MessageOffsetHolder::with_offsets_in(config.internal_config.journal_path)?;
108+
let offset_holder = Arc::new(offset_holder);
109+
let producer_offset_holder = offset_holder.clone();
110+
96111
show_streams_and_subscribe(&consumer, &streams)?;
97112

98113
let (tx, rx) = bounded(config.internal_config.channel_capacity);
@@ -102,9 +117,14 @@ async fn run_processing_tasks(
102117
rx,
103118
config.internal_config.queue_size,
104119
Duration::from_millis(config.internal_config.queue_slowdown_ms as u64),
120+
producer_offset_holder,
105121
).await;
106122
});
107123

124+
runtime.spawn(async move {
125+
journal_flush_loop(offset_holder).await;
126+
});
127+
108128
consumer_loop(consumer, tx, runtime, streams).await
109129
}
110130

@@ -121,4 +141,19 @@ fn show_streams_and_subscribe(consumer: &StreamConsumer, streams: &HashMap<Strin
121141
consumer.subscribe(&topics)?;
122142

123143
Ok(())
144+
}
145+
146+
/// Runs a loop that flushes offsets every 30 seconds.
147+
///
148+
/// Flushing saves current offsets to a journal on disk.
149+
/// It is used to subscribe to topics from given offsets in case of crash or client id change
150+
/// (when we cannot be sure where we finished during last run).
151+
async fn journal_flush_loop(offset_holder: Arc<MessageOffsetHolder>) {
152+
let mut journal_interval = interval(Duration::from_secs(30));
153+
154+
loop {
155+
journal_interval.tick().await;
156+
157+
offset_holder.flush();
158+
}
124159
}

kafka-json-processor-core/src/producer.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::Arc;
12
use std::time::Duration;
23
use crossbeam_channel::Receiver;
34
use log::{debug, error, info, trace, warn};
@@ -6,8 +7,9 @@ use rdkafka::error::RDKafkaErrorCode::{InvalidTopic, QueueFull, UnknownTopic, Un
67
use rdkafka::producer::{Producer, BaseProducer, BaseRecord};
78
use rdkafka::util::Timeout;
89
use crate::{PendingMessage, SerializedOutputMessage};
10+
use crate::journal::MessageOffsetHolder;
911

10-
pub async fn producer_loop(producer: BaseProducer, rx: Receiver<PendingMessage>, queue_size: usize, queue_slowdown_time: Duration) {
12+
pub async fn producer_loop(producer: BaseProducer, rx: Receiver<PendingMessage>, queue_size: usize, queue_slowdown_time: Duration, offset_holder: Arc<MessageOffsetHolder>) {
1113
while let Ok(pending) = rx.recv() {
1214
match pending {
1315
PendingMessage::Received => {
@@ -16,15 +18,18 @@ pub async fn producer_loop(producer: BaseProducer, rx: Receiver<PendingMessage>,
1618
// Sender<PendingMessage> uses a bounded channel, so it will block if it's full (that is,
1719
// when Receiver<PendingMessage> did not read objects from queue).
1820
}
19-
PendingMessage::Processed { id, topic, message } => {
21+
PendingMessage::Processed { id, topic, message, offset } => {
2022
debug!("[{id}] Producing message [{}]", message.key);
2123
trace!("[{id}] Produced: {}", message.message);
2224

2325
if let SentMessage::ShouldSkipMessage = send_loop(&producer, &topic, &id, message, queue_size, queue_slowdown_time) {
2426
// Message not sent, so
2527
continue;
2628
}
27-
29+
30+
// offset needed in case of recovery from crash
31+
offset_holder.update(offset);
32+
2833
while producer.in_flight_count() as f64 >= 0.95f64 * queue_size as f64 {
2934
warn!("Producer queue is almost full ({}/{}). Halting production for {}s.", producer.in_flight_count(), queue_size, queue_slowdown_time.as_secs());
3035
// This flush will block current thread for queue_slowdown_time or less.

processor.properties

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66

77
### kafka-json-processor config ###
88

9+
# Journal path. It is used to save current offsets in given directory. This is needed in case of crash/client id change.
10+
# If any of those situation happens, kafka-json-processor will rewind to those offsets, so it will not read from the start (again).
11+
# Default: ./kjp_journal
12+
processor.journal.path=./kjp_journal
13+
914
# Worker threads - how many threads to use for processing.
1015
# Default: 4
1116
processor.worker.threads=4

0 commit comments

Comments
 (0)