Skip to content

Commit

Permalink
indexer-alt: pass ingestion client over CLI
Browse files Browse the repository at this point in the history
## Description

Move the source of checkpoint data (remote store or local path) into its
own struct that is parsed using clap, instead of serde. This is to cater
to the case where the same indexer configuration might be used to index
different networks (mainnet, testnet, devnet, etc).

## Test plan

Run the indexer against a variety of configs:

```
sui$ cargo run -p sui-indexer-alt --release --         \
 indexer --last-checkpoint 10000                       \
 --remote-store-url https://checkpoints.mainnet.sui.io \
 --config $CONFIG
```

Where config is a file that contains one of the following:

```
[pipeline.kv_objects]
[pipeline.kv_transactions]
```

```
[consistency]
consistent-range = 1000

[pipeline.sum_obj_types]
```

```
[committer]
collect-interval-ms = 1000

[pipeline.tx_calls]

[pipeline.tx_affected_objects]
collect-interval-ms = 5000
```
  • Loading branch information
amnn committed Nov 29, 2024
1 parent 3f724bf commit 5f18e04
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 16 deletions.
4 changes: 4 additions & 0 deletions crates/sui-indexer-alt/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::path::PathBuf;
#[cfg(feature = "benchmark")]
use crate::benchmark::BenchmarkArgs;
use crate::db::DbArgs;
use crate::ingestion::IngestionArgs;
use crate::IndexerArgs;
use clap::Subcommand;

Expand All @@ -23,6 +24,9 @@ pub struct Args {
pub enum Command {
/// Run the indexer.
Indexer {
#[command(flatten)]
ingestion_args: IngestionArgs,

#[command(flatten)]
indexer_args: IndexerArgs,

Expand Down
10 changes: 7 additions & 3 deletions crates/sui-indexer-alt/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{path::PathBuf, time::Instant};

use crate::{
db::{reset_database, DbArgs},
ingestion::IngestionArgs,
start_indexer, IndexerArgs, IndexerConfig,
};
use sui_synthetic_ingestion::synthetic_ingestion::read_ingestion_data;
Expand All @@ -19,7 +20,7 @@ pub struct BenchmarkArgs {
pub async fn run_benchmark(
db_args: DbArgs,
benchmark_args: BenchmarkArgs,
mut indexer_config: IndexerConfig,
indexer_config: IndexerConfig,
) -> anyhow::Result<()> {
let BenchmarkArgs { ingestion_path } = benchmark_args;

Expand All @@ -36,14 +37,17 @@ pub async fn run_benchmark(
..Default::default()
};

indexer_config.ingestion.remote_store_url = None;
indexer_config.ingestion.local_ingestion_path = Some(ingestion_path);
let ingestion_args = IngestionArgs {
remote_store_url: None,
local_ingestion_path: Some(ingestion_path.clone()),
};

let cur_time = Instant::now();

start_indexer(
db_args,
indexer_args,
ingestion_args,
indexer_config,
false, /* with_genesis */
)
Expand Down
24 changes: 16 additions & 8 deletions crates/sui-indexer-alt/src/ingestion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,21 @@ mod remote_client;
#[cfg(test)]
mod test_utils;

#[DefaultConfig]
#[derive(Clone)]
pub struct IngestionConfig {
#[derive(clap::Args, Clone, Debug)]
pub struct IngestionArgs {
/// Remote Store to fetch checkpoints from.
#[clap(long, required = true, group = "source")]
pub remote_store_url: Option<Url>,

/// Path to the local ingestion directory.
/// If both remote_store_url and local_ingestion_path are provided, remote_store_url will be used.
#[clap(long, required = true, group = "source")]
pub local_ingestion_path: Option<PathBuf>,
}

#[DefaultConfig]
#[derive(Clone)]
pub struct IngestionConfig {
/// Maximum size of checkpoint backlog across all workers downstream of the ingestion service.
pub checkpoint_buffer_size: usize,

Expand Down Expand Up @@ -66,18 +71,20 @@ impl IngestionConfig {

impl IngestionService {
pub fn new(
args: IngestionArgs,
config: IngestionConfig,
metrics: Arc<IndexerMetrics>,
cancel: CancellationToken,
) -> Result<Self> {
// TODO: Potentially support a hybrid mode where we can fetch from both local and remote.
let client = if let Some(url) = config.remote_store_url.as_ref() {
let client = if let Some(url) = args.remote_store_url.as_ref() {
IngestionClient::new_remote(url.clone(), metrics.clone())?
} else if let Some(path) = config.local_ingestion_path.as_ref() {
} else if let Some(path) = args.local_ingestion_path.as_ref() {
IngestionClient::new_local(path.clone(), metrics.clone())
} else {
panic!("Either remote_store_url or local_ingestion_path must be provided");
};

let subscribers = Vec::new();
let (ingest_hi_tx, ingest_hi_rx) = mpsc::unbounded_channel();
Ok(Self {
Expand Down Expand Up @@ -166,8 +173,6 @@ impl IngestionService {
impl Default for IngestionConfig {
fn default() -> Self {
Self {
remote_store_url: None,
local_ingestion_path: None,
checkpoint_buffer_size: 5000,
ingest_concurrency: 200,
retry_interval_ms: 200,
Expand Down Expand Up @@ -196,8 +201,11 @@ mod tests {
cancel: CancellationToken,
) -> IngestionService {
IngestionService::new(
IngestionConfig {
IngestionArgs {
remote_store_url: Some(Url::parse(&uri).unwrap()),
local_ingestion_path: None,
},
IngestionConfig {
checkpoint_buffer_size,
ingest_concurrency,
..Default::default()
Expand Down
23 changes: 19 additions & 4 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use handlers::{
tx_balance_changes::TxBalanceChanges, tx_calls::TxCalls, tx_digests::TxDigests,
tx_kinds::TxKinds, wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes,
};
use ingestion::{client::IngestionClient, IngestionConfig, IngestionService};
use ingestion::{client::IngestionClient, IngestionArgs, IngestionConfig, IngestionService};
use metrics::{IndexerMetrics, MetricsService};
use models::watermarks::CommitterWatermark;
use pipeline::{
Expand Down Expand Up @@ -110,6 +110,7 @@ impl Indexer {
pub async fn new(
db_args: DbArgs,
indexer_args: IndexerArgs,
ingestion_args: IngestionArgs,
ingestion_config: IngestionConfig,
cancel: CancellationToken,
) -> Result<Self> {
Expand All @@ -131,8 +132,13 @@ impl Indexer {

let (metrics, metrics_service) =
MetricsService::new(metrics_address, db.clone(), cancel.clone())?;
let ingestion_service =
IngestionService::new(ingestion_config, metrics.clone(), cancel.clone())?;

let ingestion_service = IngestionService::new(
ingestion_args,
ingestion_config,
metrics.clone(),
cancel.clone(),
)?;

Ok(Self {
db,
Expand Down Expand Up @@ -352,6 +358,7 @@ impl Default for IndexerArgs {
pub async fn start_indexer(
db_args: DbArgs,
indexer_args: IndexerArgs,
ingestion_args: IngestionArgs,
indexer_config: IndexerConfig,
// If true, the indexer will bootstrap from genesis.
// Otherwise it will skip the pipelines that rely on genesis data.
Expand Down Expand Up @@ -410,7 +417,15 @@ pub async fn start_indexer(

let cancel = CancellationToken::new();
let retry_interval = ingestion.retry_interval();
let mut indexer = Indexer::new(db_args, indexer_args, ingestion, cancel.clone()).await?;

let mut indexer = Indexer::new(
db_args,
indexer_args,
ingestion_args,
ingestion,
cancel.clone(),
)
.await?;

macro_rules! add_concurrent {
($handler:expr, $config:expr) => {
Expand Down
10 changes: 9 additions & 1 deletion crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async fn main() -> Result<()> {

match args.command {
Command::Indexer {
ingestion_args,
indexer_args,
config,
} => {
Expand All @@ -32,7 +33,14 @@ async fn main() -> Result<()> {
let indexer_config: IndexerConfig = toml::from_str(&config_contents)
.context("Failed to parse configuration TOML file.")?;

start_indexer(args.db_args, indexer_args, indexer_config, true).await?;
start_indexer(
args.db_args,
indexer_args,
ingestion_args,
indexer_config,
true,
)
.await?;
}

Command::GenerateConfig => {
Expand Down

0 comments on commit 5f18e04

Please sign in to comment.