Skip to content

Commit

Permalink
indexer-alt: file-based configs -- add back --pipeline flag
Browse files Browse the repository at this point in the history
## Description

Add back the `--pipeline` command-line argument, layered on top of the
file-based configuration.

## Test plan

```
sui$ cargo run -p sui-indexer-alt -- generate-config > /tmp/indexer.toml
# Remove some pipelines

sui$ cargo run -p sui-indexer-alt -- indexer            \
  --last-checkpoint 10000                               \
  --remote-store-url https://checkpoints.mainnet.sui.io \
  --config /tmp/indexer.toml                            \
  --pipeline i_dont_exist

sui$ cargo run -p sui-indexer-alt -- indexer            \
  --last-checkpoint 10000                               \
  --remote-store-url https://checkpoints.mainnet.sui.io \
  --config /tmp/indexer.toml                            \
  --pipeline kv_objects --pipeline kv_transactions

sui$ cargo run -p sui-indexer-alt -- indexer            \
  --last-checkpoint 10000                               \
  --remote-store-url https://checkpoints.mainnet.sui.io \
  --config /tmp/indexer.toml
```
  • Loading branch information
amnn committed Nov 29, 2024
1 parent d0b4271 commit aaa1f92
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ pub struct BenchmarkArgs {
/// Path to the local ingestion directory to read checkpoints data from.
#[arg(long)]
ingestion_path: PathBuf,

/// Only run the following pipelines. If not provided, all pipelines found in the
/// configuration file will be run.
#[arg(long, action = clap::ArgAction::Append)]
pipeline: Vec<String>,
}

pub async fn run_benchmark(
db_args: DbArgs,
benchmark_args: BenchmarkArgs,
indexer_config: IndexerConfig,
) -> anyhow::Result<()> {
let BenchmarkArgs { ingestion_path } = benchmark_args;
let BenchmarkArgs {
ingestion_path,
pipeline,
} = benchmark_args;

let ingestion_data = read_ingestion_data(&ingestion_path).await?;
let first_checkpoint = *ingestion_data.keys().next().unwrap();
Expand All @@ -34,6 +42,7 @@ pub async fn run_benchmark(
let indexer_args = IndexerArgs {
first_checkpoint: Some(first_checkpoint),
last_checkpoint: Some(last_checkpoint),
pipeline,
..Default::default()
};

Expand Down
32 changes: 32 additions & 0 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ pub struct IndexerArgs {
#[arg(long)]
pub last_checkpoint: Option<u64>,

/// Only run the following pipelines. If not provided, all pipelines found in the
/// configuration file will be run.
#[arg(long, action = clap::ArgAction::Append)]
pipeline: Vec<String>,

/// Don't write to the watermark tables for concurrent pipelines.
#[arg(long)]
pub skip_watermark: bool,
Expand Down Expand Up @@ -90,6 +95,11 @@ pub struct Indexer {
/// Don't write to the watermark tables for concurrent pipelines.
skip_watermark: bool,

/// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
/// run. Any pipelines that are present in this filter but not added to the indexer will yield
/// a warning when the indexer is run.
enabled_pipelines: Option<BTreeSet<String>>,

/// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
/// with the same name isn't added twice.
added_pipelines: BTreeSet<&'static str>,
Expand Down Expand Up @@ -117,6 +127,7 @@ impl Indexer {
let IndexerArgs {
first_checkpoint,
last_checkpoint,
pipeline,
skip_watermark,
metrics_address,
} = indexer_args;
Expand Down Expand Up @@ -148,6 +159,11 @@ impl Indexer {
first_checkpoint,
last_checkpoint,
skip_watermark,
enabled_pipelines: if pipeline.is_empty() {
None
} else {
Some(pipeline.into_iter().collect())
},
added_pipelines: BTreeSet::new(),
cancel,
first_checkpoint_from_watermark: u64::MAX,
Expand Down Expand Up @@ -274,6 +290,14 @@ impl Indexer {
/// Ingestion will stop after consuming the configured `last_checkpoint`, if one is provided,
/// or will continue until it tracks the tip of the network.
pub async fn run(mut self) -> Result<JoinHandle<()>> {
if let Some(enabled_pipelines) = self.enabled_pipelines {
ensure!(
enabled_pipelines.is_empty(),
"Tried to enable pipelines that this indexer does not know about: \
{enabled_pipelines:#?}",
);
}

let metrics_handle = self
.metrics_service
.run()
Expand Down Expand Up @@ -328,6 +352,13 @@ impl Indexer {
P::NAME,
);

if let Some(enabled_pipelines) = &mut self.enabled_pipelines {
if !enabled_pipelines.remove(P::NAME) {
info!(pipeline = P::NAME, "Skipping");
return Ok(None);
}
}

let mut conn = self.db.connect().await.context("Failed DB connection")?;

let watermark = CommitterWatermark::get(&mut conn, P::NAME)
Expand All @@ -349,6 +380,7 @@ impl Default for IndexerArgs {
Self {
first_checkpoint: None,
last_checkpoint: None,
pipeline: vec![],
skip_watermark: false,
metrics_address: "0.0.0.0:9184".parse().unwrap(),
}
Expand Down

0 comments on commit aaa1f92

Please sign in to comment.