Skip to content

Commit

Permalink
indexer-alt: shared pruner configs with overrides
Browse files Browse the repository at this point in the history
## Description

Similar to the change introducing a shared committer config, this change
introduces a shared pruner config between all concurrent pipelines. The
values in this config are only used to fill out concurrent pipelines who
have already specified that they want pruning enabled (i.e. unlike the
committer override which is always inherited, this will only be
inherited if the pruner config is called for).

Additionally, unlike other fields where the last write wins when merging
configs, `PrunerConfig.retention` will be merged by `max`. This is to
deal with merging the configs of two different apps that need to index
the same table, but with different retentions.

## Test plan

New unit tests:

```
sui$ cargo nextest run -p sui-indexer-alt
```
  • Loading branch information
amnn committed Nov 29, 2024
1 parent 31253d2 commit 04c768c
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 29 deletions.
215 changes: 187 additions & 28 deletions crates/sui-indexer-alt/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
//

use serde::{Deserialize, Serialize};
use sui_default_config::DefaultConfig;

use crate::{
Expand All @@ -27,6 +25,13 @@ pub struct IndexerConfig {
/// override individual settings in their own configuration sections.
pub committer: CommitterLayer,

/// Default configuration for pruners that is shared by all concurrent pipelines. Pipelies can
/// override individual settings in their own configuration sections. Concurrent pipelines
/// still need to specify a pruner configuration (although it can be empty) to indicate that
/// they want to enable pruning, but when they do, any missing values will be filled in by this
/// config.
pub pruner: PrunerLayer,

/// Per-pipeline configurations.
pub pipeline: PipelineLayer,
}
Expand Down Expand Up @@ -90,16 +95,13 @@ pub struct CommitterLayer {
watermark_interval_ms: Option<u64>,
}

/// PrunerLayer is special in that its fields are not optional -- a layer needs to specify all or
/// none of the values, this means it has the same shape as [PrunerConfig], but we define it as its
/// own type so that it can implement the deserialization logic necessary for being read from a
/// TOML file.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[DefaultConfig]
#[derive(Clone, Default, Debug)]
pub struct PrunerLayer {
pub interval_ms: u64,
pub delay_ms: u64,
pub retention: u64,
pub max_chunk_size: u64,
pub interval_ms: Option<u64>,
pub delay_ms: Option<u64>,
pub retention: Option<u64>,
pub max_chunk_size: Option<u64>,
}

#[DefaultConfig]
Expand Down Expand Up @@ -157,6 +159,7 @@ impl IndexerConfig {
ingestion: self.ingestion.merge(other.ingestion),
consistency: self.consistency.merge(other.consistency),
committer: self.committer.merge(other.committer),
pruner: self.pruner.merge(other.pruner),
pipeline: self.pipeline.merge(other.pipeline),
}
}
Expand Down Expand Up @@ -228,19 +231,23 @@ impl ConcurrentLayer {
pub fn merge(self, other: ConcurrentLayer) -> ConcurrentLayer {
ConcurrentLayer {
committer: merge_recursive!(self.committer, other.committer),
pruner: other.pruner.or(self.pruner),
pruner: merge_recursive!(self.pruner, other.pruner),
}
}

/// Unlike other parameters, `pruner` will appear in the finished configuration only if they
/// appear in the layer *and* in the base.
pub fn finish(self, base: ConcurrentConfig) -> ConcurrentConfig {
ConcurrentConfig {
committer: if let Some(committer) = self.committer {
committer.finish(base.committer)
} else {
base.committer
},
// If the layer defines a pruner config, it takes precedence.
pruner: self.pruner.map(Into::into).or(base.pruner),
pruner: match (self.pruner, base.pruner) {
(None, _) | (_, None) => None,
(Some(pruner), Some(base)) => Some(pruner.finish(base)),
},
}
}
}
Expand All @@ -265,6 +272,32 @@ impl CommitterLayer {
}
}

impl PrunerLayer {
/// Last write takes precedence for all fields except the `retention`, which takes the max of
/// all available values.
pub fn merge(self, other: PrunerLayer) -> PrunerLayer {
PrunerLayer {
interval_ms: other.interval_ms.or(self.interval_ms),
delay_ms: other.delay_ms.or(self.delay_ms),
retention: match (other.retention, self.retention) {
(Some(a), Some(b)) => Some(a.max(b)),
(Some(a), _) | (_, Some(a)) => Some(a),
(None, None) => None,
},
max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
}
}

pub fn finish(self, base: PrunerConfig) -> PrunerConfig {
PrunerConfig {
interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
retention: self.retention.unwrap_or(base.retention),
max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
}
}
}

impl PipelineLayer {
pub fn merge(self, other: PipelineLayer) -> PipelineLayer {
PipelineLayer {
Expand Down Expand Up @@ -318,20 +351,6 @@ impl Default for ConsistencyConfig {
}
}

// Planning for these types to be in different crates from each other in the long-run, so use
// `Into` rather than `From`.
#[allow(clippy::from_over_into)]
impl Into<PrunerConfig> for PrunerLayer {
fn into(self) -> PrunerConfig {
PrunerConfig {
interval_ms: self.interval_ms,
delay_ms: self.delay_ms,
retention: self.retention,
max_chunk_size: self.max_chunk_size,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -474,4 +493,144 @@ mod tests {
},
);
}

#[test]
fn merge_pruner() {
let this = PrunerLayer {
interval_ms: None,
delay_ms: Some(100),
retention: Some(200),
max_chunk_size: Some(300),
};

let that = PrunerLayer {
interval_ms: Some(400),
delay_ms: None,
retention: Some(500),
max_chunk_size: Some(600),
};

let this_then_that = this.clone().merge(that.clone());
let that_then_this = that.clone().merge(this.clone());

assert_matches!(
this_then_that,
PrunerLayer {
interval_ms: Some(400),
delay_ms: Some(100),
retention: Some(500),
max_chunk_size: Some(600),
},
);

assert_matches!(
that_then_this,
PrunerLayer {
interval_ms: Some(400),
delay_ms: Some(100),
retention: Some(500),
max_chunk_size: Some(300),
},
);
}

#[test]
fn finish_concurrent_unpruned_override() {
let layer = ConcurrentLayer {
committer: None,
pruner: None,
};

let base = ConcurrentConfig {
committer: CommitterConfig {
write_concurrency: 5,
collect_interval_ms: 50,
watermark_interval_ms: 500,
},
pruner: Some(PrunerConfig::default()),
};

assert_matches!(
layer.finish(base),
ConcurrentConfig {
committer: CommitterConfig {
write_concurrency: 5,
collect_interval_ms: 50,
watermark_interval_ms: 500,
},
pruner: None,
},
);
}

#[test]
fn finish_concurrent_no_pruner() {
let layer = ConcurrentLayer {
committer: None,
pruner: None,
};

let base = ConcurrentConfig {
committer: CommitterConfig {
write_concurrency: 5,
collect_interval_ms: 50,
watermark_interval_ms: 500,
},
pruner: None,
};

assert_matches!(
layer.finish(base),
ConcurrentConfig {
committer: CommitterConfig {
write_concurrency: 5,
collect_interval_ms: 50,
watermark_interval_ms: 500,
},
pruner: None,
},
);
}

#[test]
fn finish_concurrent_pruner() {
let layer = ConcurrentLayer {
committer: None,
pruner: Some(PrunerLayer {
interval_ms: Some(1000),
..Default::default()
}),
};

let base = ConcurrentConfig {
committer: CommitterConfig {
write_concurrency: 5,
collect_interval_ms: 50,
watermark_interval_ms: 500,
},
pruner: Some(PrunerConfig {
interval_ms: 100,
delay_ms: 200,
retention: 300,
max_chunk_size: 400,
}),
};

assert_matches!(
layer.finish(base),
ConcurrentConfig {
committer: CommitterConfig {
write_concurrency: 5,
collect_interval_ms: 50,
watermark_interval_ms: 500,
},
pruner: Some(PrunerConfig {
interval_ms: 1000,
delay_ms: 200,
retention: 300,
max_chunk_size: 400,
}),
},
);
}
}
4 changes: 3 additions & 1 deletion crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ pub async fn start_indexer(
ingestion,
consistency,
committer,
pruner,
pipeline:
PipelineLayer {
sum_coin_balances,
Expand Down Expand Up @@ -439,6 +440,7 @@ pub async fn start_indexer(
} = consistency.finish(ConsistencyConfig::default());

let committer = committer.finish(CommitterConfig::default());
let pruner = pruner.finish(PrunerConfig::default());

// Pipelines that are split up into a summary table, and a write-ahead log prune their
// write-ahead log so it contains just enough information to overlap with the summary table.
Expand Down Expand Up @@ -493,7 +495,7 @@ pub async fn start_indexer(
$handler,
layer.finish(ConcurrentConfig {
committer: committer.clone(),
..Default::default()
pruner: Some(pruner.clone()),
}),
)
.await?
Expand Down
11 changes: 11 additions & 0 deletions crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ impl<H: Handler> Batched<H> {
}
}

impl Default for PrunerConfig {
fn default() -> Self {
Self {
interval_ms: 300_000,
delay_ms: 120_000,
retention: 4_000_000,
max_chunk_size: 2_000,
}
}
}

/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
/// strictly after the `watermark` (or from the beginning if no watermark was provided).
///
Expand Down

0 comments on commit 04c768c

Please sign in to comment.