Skip to content

Commit

Permalink
[indexer-alt] Fix bad rebase in obj_info_pruner (MystenLabs#20567)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Dec 10, 2024
1 parent b8b4916 commit 4ac0808
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
18 changes: 12 additions & 6 deletions crates/sui-indexer-alt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub struct SequentialLayer {
pub struct ConcurrentLayer {
committer: Option<CommitterLayer>,
pruner: Option<PrunerLayer>,
checkpoint_lag: Option<u64>,

#[serde(flatten)]
pub extra: toml::Table,
Expand Down Expand Up @@ -148,7 +149,7 @@ pub struct PipelineLayer {

// Concurrent pipelines with a lagged consistent pruner which is also a concurrent pipeline.
pub obj_info: Option<CommitterLayer>,
pub obj_info_pruner: Option<CommitterLayer>,
pub obj_info_pruner: Option<ConcurrentLayer>,

// All concurrent pipelines
pub ev_emit_mod: Option<ConcurrentLayer>,
Expand Down Expand Up @@ -248,7 +249,7 @@ impl ConcurrentLayer {
(None, _) | (_, None) => None,
(Some(pruner), Some(base)) => Some(pruner.finish(base)),
},
checkpoint_lag: base.checkpoint_lag,
checkpoint_lag: self.checkpoint_lag.or(base.checkpoint_lag),
}
}
}
Expand Down Expand Up @@ -378,6 +379,7 @@ impl Merge for ConcurrentLayer {
ConcurrentLayer {
committer: self.committer.merge(other.committer),
pruner: self.pruner.merge(other.pruner),
checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
extra: Default::default(),
}
}
Expand Down Expand Up @@ -509,6 +511,7 @@ impl From<ConcurrentConfig> for ConcurrentLayer {
Self {
committer: Some(config.committer.into()),
pruner: config.pruner.map(Into::into),
checkpoint_lag: config.checkpoint_lag,
extra: Default::default(),
}
}
Expand Down Expand Up @@ -759,6 +762,7 @@ mod tests {
let layer = ConcurrentLayer {
committer: None,
pruner: None,
checkpoint_lag: None,
extra: Default::default(),
};

Expand All @@ -769,7 +773,7 @@ mod tests {
watermark_interval_ms: 500,
},
pruner: Some(PrunerConfig::default()),
checkpoint_lag: None,
checkpoint_lag: Some(100),
};

assert_matches!(
Expand All @@ -781,7 +785,7 @@ mod tests {
watermark_interval_ms: 500,
},
pruner: None,
checkpoint_lag: None,
checkpoint_lag: Some(100),
},
);
}
Expand All @@ -791,6 +795,7 @@ mod tests {
let layer = ConcurrentLayer {
committer: None,
pruner: None,
checkpoint_lag: None,
extra: Default::default(),
};

Expand All @@ -801,7 +806,7 @@ mod tests {
watermark_interval_ms: 500,
},
pruner: None,
checkpoint_lag: None,
checkpoint_lag: Some(100),
};

assert_matches!(
Expand All @@ -813,7 +818,7 @@ mod tests {
watermark_interval_ms: 500,
},
pruner: None,
checkpoint_lag: None,
checkpoint_lag: Some(100),
},
);
}
Expand All @@ -826,6 +831,7 @@ mod tests {
interval_ms: Some(1000),
..Default::default()
}),
checkpoint_lag: None,
extra: Default::default(),
};

Expand Down
8 changes: 4 additions & 4 deletions crates/sui-indexer-alt/src/handlers/obj_info_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ impl Handler for ObjInfoPruner {
// For each object_id, we first get the highest cp_sequence_number_exclusive.
// TODO: We could consider make this more efficient by doing some grouping in the collector
// so that we could merge as many objects as possible across checkpoints.
let to_prune = values.iter().fold(BTreeMap::new(), |mut acc, v| {
let mut to_prune = BTreeMap::new();
for v in values {
let object_id = v.object_id();
let cp_sequence_number_exclusive = match v.update {
ProcessedObjInfoUpdate::Insert(_) => v.cp_sequence_number,
ProcessedObjInfoUpdate::Delete(_) => v.cp_sequence_number + 1,
} as i64;
let cp = acc.entry(object_id).or_default();
let cp = to_prune.entry(object_id).or_default();
*cp = std::cmp::max(*cp, cp_sequence_number_exclusive);
acc
});
}
let mut committed_rows = 0;
for (object_id, cp_sequence_number_exclusive) in to_prune {
committed_rows += diesel::delete(obj_info::table)
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,11 @@ pub async fn start_indexer(
indexer
.concurrent_pipeline(
$lagged_handler,
ConcurrentConfig {
committer: $lagged_config.unwrap_or_default().finish(committer.clone()),
$lagged_config.unwrap_or_default().finish(ConcurrentConfig {
committer: committer.clone(),
pruner: None,
checkpoint_lag: Some(consistent_range),
},
}),
)
.await?;
}
Expand Down

0 comments on commit 4ac0808

Please sign in to comment.