Skip to content

Commit

Permalink
[data ingestion] ensure current checkpoint watermark is greater or eq…
Browse files Browse the repository at this point in the history
…ual to pruning watermark… (MystenLabs#18922)

this is required for setups that use colocation setups and have multiple
processes/hosts that execute the same workflow with shared progress
store
  • Loading branch information
phoenix-o authored Aug 7, 2024
1 parent 5cb3a2e commit e4c9076
Showing 1 changed file with 2 additions and 0 deletions.
2 changes: 2 additions & 0 deletions crates/sui-data-ingestion-core/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use notify::RecursiveMode;
use notify::Watcher;
use object_store::path::Path;
use object_store::ObjectStore;
use std::cmp::max;
use std::collections::BTreeMap;
use std::ffi::OsString;
use std::fs;
Expand Down Expand Up @@ -292,6 +293,7 @@ impl CheckpointReader {
info!("cleaning processed files, watermark is {}", watermark);
self.data_limiter.gc(watermark);
self.last_pruned_watermark = watermark;
self.current_checkpoint_number = max(self.current_checkpoint_number, watermark);
for entry in fs::read_dir(self.path.clone())? {
let entry = entry?;
let filename = entry.file_name();
Expand Down

0 comments on commit e4c9076

Please sign in to comment.