From 5f18e041731c4fe95113457ffaff6cf7e59448df Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Tue, 26 Nov 2024 01:09:24 +0000 Subject: [PATCH] indexer-alt: pass ingestion client over CLI ## Description Move the source of checkpoint data (remote store or local path) into its own struct that is parsed using clap, instead of serde. This is to cater to the case where the same indexer configuration might be used to index different networks (mainnet, testnet, devnet, etc). ## Test plan Run the indexer against a variety of configs: ``` sui$ cargo run -p sui-indexer-alt --release -- \ indexer --last-checkpoint 10000 \ --remote-store-url https://checkpoints.mainnet.sui.io \ --config $CONFIG ``` Where config is a file that contains one of the following: ``` [pipeline.kv_objects] [pipeline.kv_transactions] ``` ``` [consistency] consistent-range = 1000 [pipeline.sum_obj_types] ``` ``` [committer] collect-interval-ms = 1000 [pipeline.tx_calls] [pipeline.tx_affected_objects] collect-interval-ms = 5000 ``` --- crates/sui-indexer-alt/src/args.rs | 4 ++++ crates/sui-indexer-alt/src/benchmark.rs | 10 ++++++--- crates/sui-indexer-alt/src/ingestion/mod.rs | 24 ++++++++++++++------- crates/sui-indexer-alt/src/lib.rs | 23 ++++++++++++++++---- crates/sui-indexer-alt/src/main.rs | 10 ++++++++- 5 files changed, 55 insertions(+), 16 deletions(-) diff --git a/crates/sui-indexer-alt/src/args.rs b/crates/sui-indexer-alt/src/args.rs index 27def895d1df7..1fe149a3c471e 100644 --- a/crates/sui-indexer-alt/src/args.rs +++ b/crates/sui-indexer-alt/src/args.rs @@ -6,6 +6,7 @@ use std::path::PathBuf; #[cfg(feature = "benchmark")] use crate::benchmark::BenchmarkArgs; use crate::db::DbArgs; +use crate::ingestion::IngestionArgs; use crate::IndexerArgs; use clap::Subcommand; @@ -23,6 +24,9 @@ pub struct Args { pub enum Command { /// Run the indexer. Indexer { + #[command(flatten)] + ingestion_args: IngestionArgs, + #[command(flatten)] indexer_args: IndexerArgs, diff --git a/crates/sui-indexer-alt/src/benchmark.rs b/crates/sui-indexer-alt/src/benchmark.rs index 23c6a0962af5d..4cc462f8a7084 100644 --- a/crates/sui-indexer-alt/src/benchmark.rs +++ b/crates/sui-indexer-alt/src/benchmark.rs @@ -5,6 +5,7 @@ use std::{path::PathBuf, time::Instant}; use crate::{ db::{reset_database, DbArgs}, + ingestion::IngestionArgs, start_indexer, IndexerArgs, IndexerConfig, }; use sui_synthetic_ingestion::synthetic_ingestion::read_ingestion_data; @@ -19,7 +20,7 @@ pub struct BenchmarkArgs { pub async fn run_benchmark( db_args: DbArgs, benchmark_args: BenchmarkArgs, - mut indexer_config: IndexerConfig, + indexer_config: IndexerConfig, ) -> anyhow::Result<()> { let BenchmarkArgs { ingestion_path } = benchmark_args; @@ -36,14 +37,17 @@ pub async fn run_benchmark( ..Default::default() }; - indexer_config.ingestion.remote_store_url = None; - indexer_config.ingestion.local_ingestion_path = Some(ingestion_path); + let ingestion_args = IngestionArgs { + remote_store_url: None, + local_ingestion_path: Some(ingestion_path.clone()), + }; let cur_time = Instant::now(); start_indexer( db_args, indexer_args, + ingestion_args, indexer_config, false, /* with_genesis */ ) diff --git a/crates/sui-indexer-alt/src/ingestion/mod.rs b/crates/sui-indexer-alt/src/ingestion/mod.rs index 49f2609f9f7f7..67595882fe0aa 100644 --- a/crates/sui-indexer-alt/src/ingestion/mod.rs +++ b/crates/sui-indexer-alt/src/ingestion/mod.rs @@ -29,16 +29,21 @@ mod remote_client; #[cfg(test)] mod test_utils; -#[DefaultConfig] -#[derive(Clone)] -pub struct IngestionConfig { +#[derive(clap::Args, Clone, Debug)] +pub struct IngestionArgs { /// Remote Store to fetch checkpoints from. + #[clap(long, required = true, group = "source")] pub remote_store_url: Option, /// Path to the local ingestion directory. /// If both remote_store_url and local_ingestion_path are provided, remote_store_url will be used. + #[clap(long, required = true, group = "source")] pub local_ingestion_path: Option, +} +#[DefaultConfig] +#[derive(Clone)] +pub struct IngestionConfig { /// Maximum size of checkpoint backlog across all workers downstream of the ingestion service. pub checkpoint_buffer_size: usize, @@ -66,18 +71,20 @@ impl IngestionConfig { impl IngestionService { pub fn new( + args: IngestionArgs, config: IngestionConfig, metrics: Arc, cancel: CancellationToken, ) -> Result { // TODO: Potentially support a hybrid mode where we can fetch from both local and remote. - let client = if let Some(url) = config.remote_store_url.as_ref() { + let client = if let Some(url) = args.remote_store_url.as_ref() { IngestionClient::new_remote(url.clone(), metrics.clone())? - } else if let Some(path) = config.local_ingestion_path.as_ref() { + } else if let Some(path) = args.local_ingestion_path.as_ref() { IngestionClient::new_local(path.clone(), metrics.clone()) } else { panic!("Either remote_store_url or local_ingestion_path must be provided"); }; + let subscribers = Vec::new(); let (ingest_hi_tx, ingest_hi_rx) = mpsc::unbounded_channel(); Ok(Self { @@ -166,8 +173,6 @@ impl IngestionService { impl Default for IngestionConfig { fn default() -> Self { Self { - remote_store_url: None, - local_ingestion_path: None, checkpoint_buffer_size: 5000, ingest_concurrency: 200, retry_interval_ms: 200, @@ -196,8 +201,11 @@ mod tests { cancel: CancellationToken, ) -> IngestionService { IngestionService::new( - IngestionConfig { + IngestionArgs { remote_store_url: Some(Url::parse(&uri).unwrap()), + local_ingestion_path: None, + }, + IngestionConfig { checkpoint_buffer_size, ingest_concurrency, ..Default::default() diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index d6c4b267e8b8c..f846c05009dbd 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -17,7 +17,7 @@ use handlers::{ tx_balance_changes::TxBalanceChanges, tx_calls::TxCalls, tx_digests::TxDigests, tx_kinds::TxKinds, wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes, }; -use ingestion::{client::IngestionClient, IngestionConfig, IngestionService}; +use ingestion::{client::IngestionClient, IngestionArgs, IngestionConfig, IngestionService}; use metrics::{IndexerMetrics, MetricsService}; use models::watermarks::CommitterWatermark; use pipeline::{ @@ -110,6 +110,7 @@ impl Indexer { pub async fn new( db_args: DbArgs, indexer_args: IndexerArgs, + ingestion_args: IngestionArgs, ingestion_config: IngestionConfig, cancel: CancellationToken, ) -> Result { @@ -131,8 +132,13 @@ impl Indexer { let (metrics, metrics_service) = MetricsService::new(metrics_address, db.clone(), cancel.clone())?; - let ingestion_service = - IngestionService::new(ingestion_config, metrics.clone(), cancel.clone())?; + + let ingestion_service = IngestionService::new( + ingestion_args, + ingestion_config, + metrics.clone(), + cancel.clone(), + )?; Ok(Self { db, @@ -352,6 +358,7 @@ impl Default for IndexerArgs { pub async fn start_indexer( db_args: DbArgs, indexer_args: IndexerArgs, + ingestion_args: IngestionArgs, indexer_config: IndexerConfig, // If true, the indexer will bootstrap from genesis. // Otherwise it will skip the pipelines that rely on genesis data. @@ -410,7 +417,15 @@ pub async fn start_indexer( let cancel = CancellationToken::new(); let retry_interval = ingestion.retry_interval(); - let mut indexer = Indexer::new(db_args, indexer_args, ingestion, cancel.clone()).await?; + + let mut indexer = Indexer::new( + db_args, + indexer_args, + ingestion_args, + ingestion, + cancel.clone(), + ) + .await?; macro_rules! add_concurrent { ($handler:expr, $config:expr) => { diff --git a/crates/sui-indexer-alt/src/main.rs b/crates/sui-indexer-alt/src/main.rs index 0b606b768bdb5..2884874438686 100644 --- a/crates/sui-indexer-alt/src/main.rs +++ b/crates/sui-indexer-alt/src/main.rs @@ -22,6 +22,7 @@ async fn main() -> Result<()> { match args.command { Command::Indexer { + ingestion_args, indexer_args, config, } => { @@ -32,7 +33,14 @@ async fn main() -> Result<()> { let indexer_config: IndexerConfig = toml::from_str(&config_contents) .context("Failed to parse configuration TOML file.")?; - start_indexer(args.db_args, indexer_args, indexer_config, true).await?; + start_indexer( + args.db_args, + indexer_args, + ingestion_args, + indexer_config, + true, + ) + .await?; } Command::GenerateConfig => {