Skip to content

Make max checkpoints configurable #22015

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-data-ingestion-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tempfile.workspace = true
tap.workspace = true
sui-protocol-config.workspace = true
sui-rpc-api.workspace = true
once_cell.workspace = true

[dev-dependencies]
rand.workspace = true
39 changes: 36 additions & 3 deletions crates/sui-data-ingestion-core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{DataIngestionMetrics, ReaderOptions};
use anyhow::Result;
use futures::Future;
use mysten_metrics::spawn_monitored_task;
use once_cell::sync::Lazy;
use prometheus::Registry;
use std::path::PathBuf;
use std::pin::Pin;
Expand All @@ -19,8 +20,40 @@ use sui_types::full_checkpoint_content::CheckpointData;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::info;

pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;
/// Environment variable to override the default maximum number of checkpoints that can be processed concurrently.
const MAX_CHECKPOINTS_IN_PROGRESS_VAR_NAME: &str = "MAX_CHECKPOINTS_IN_PROGRESS";

/// Default maximum number of checkpoints in progress.
const DEFAULT_MAX_CHECKPOINTS_IN_PROGRESS: usize = 10_000;

/// Maximum number of checkpoints that can be processed concurrently.
///
/// This value can be overridden by setting the `MAX_CHECKPOINTS_IN_PROGRESS` environment variable
/// before starting the process. If the environment variable is unset, the default value of
/// `DEFAULT_MAX_CHECKPOINTS_IN_PROGRESS` will be used.
///
/// This is read once at startup and cached. Changing the environment variable at runtime will not
/// have any effect.
pub static MAX_CHECKPOINTS_IN_PROGRESS: Lazy<usize> = Lazy::new(|| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include this as a part of config vs using an env var?

let max_checkpoints_opt = std::env::var(MAX_CHECKPOINTS_IN_PROGRESS_VAR_NAME)
.ok()
.and_then(|s| s.parse().ok());
if let Some(max_checkpoints) = max_checkpoints_opt {
info!(
"Using custom value for '{}' max checkpoints in progress: {}",
MAX_CHECKPOINTS_IN_PROGRESS_VAR_NAME, max_checkpoints
);
max_checkpoints
} else {
info!(
"Using default value for '{}' -- max checkpoints in progress: {}",
MAX_CHECKPOINTS_IN_PROGRESS_VAR_NAME, DEFAULT_MAX_CHECKPOINTS_IN_PROGRESS
);
DEFAULT_MAX_CHECKPOINTS_IN_PROGRESS
}
});

pub struct IndexerExecutor<P> {
pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
Expand All @@ -34,7 +67,7 @@ pub struct IndexerExecutor<P> {
impl<P: ProgressStore> IndexerExecutor<P> {
pub fn new(progress_store: P, number_of_jobs: usize, metrics: DataIngestionMetrics) -> Self {
let (pool_progress_sender, pool_progress_receiver) =
mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
mpsc::channel(number_of_jobs * *MAX_CHECKPOINTS_IN_PROGRESS);
Self {
pools: vec![],
pool_senders: vec![],
Expand All @@ -48,7 +81,7 @@ impl<P: ProgressStore> IndexerExecutor<P> {
/// Registers new worker pool in executor
pub async fn register<W: Worker + 'static>(&mut self, pool: WorkerPool<W>) -> Result<()> {
let checkpoint_number = self.progress_store.load(pool.task_name.clone()).await?;
let (sender, receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
let (sender, receiver) = mpsc::channel(*MAX_CHECKPOINTS_IN_PROGRESS);
self.pools.push(Box::pin(pool.run(
checkpoint_number,
receiver,
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-data-ingestion-core/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl CheckpointReader {
/// Reads files in a local directory, validates them, and forwards `CheckpointData` to the executor.
async fn read_local_files(&self) -> Result<Vec<Arc<CheckpointData>>> {
let mut checkpoints = vec![];
for offset in 0..MAX_CHECKPOINTS_IN_PROGRESS {
for offset in 0..*MAX_CHECKPOINTS_IN_PROGRESS {
let sequence_number = self.current_checkpoint_number + offset as u64;
if self.exceeds_capacity(sequence_number) {
break;
Expand All @@ -97,7 +97,7 @@ impl CheckpointReader {
}

fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
((*MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
|| self.data_limiter.exceeds()
}

Expand Down Expand Up @@ -328,8 +328,8 @@ impl CheckpointReader {
mpsc::Sender<CheckpointSequenceNumber>,
oneshot::Sender<()>,
) {
let (checkpoint_sender, checkpoint_recv) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
let (processed_sender, processed_receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
let (checkpoint_sender, checkpoint_recv) = mpsc::channel(*MAX_CHECKPOINTS_IN_PROGRESS);
let (processed_sender, processed_receiver) = mpsc::channel(*MAX_CHECKPOINTS_IN_PROGRESS);
let (exit_sender, exit_receiver) = oneshot::channel();
let reader = Self {
path,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-data-ingestion-core/src/reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) async fn reduce<W: Worker>(
) -> Result<()> {
// convert to a stream of MAX size. This way, each iteration of the loop will process all ready messages
let mut stream =
ReceiverStream::new(progress_receiver).ready_chunks(MAX_CHECKPOINTS_IN_PROGRESS);
ReceiverStream::new(progress_receiver).ready_chunks(*MAX_CHECKPOINTS_IN_PROGRESS);
let mut unprocessed = HashMap::new();
let mut batch = vec![];
let mut progress_update = None;
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-data-ingestion-core/src/worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ impl<W: Worker + 'static> WorkerPool<W> {
"Starting indexing pipeline {} with concurrency {}. Current watermark is {}.",
self.task_name, self.concurrency, watermark
);
let (progress_sender, mut progress_receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
let (reducer_sender, reducer_receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
let (progress_sender, mut progress_receiver) = mpsc::channel(*MAX_CHECKPOINTS_IN_PROGRESS);
let (reducer_sender, reducer_receiver) = mpsc::channel(*MAX_CHECKPOINTS_IN_PROGRESS);
let mut workers = vec![];
let mut idle: BTreeSet<_> = (0..self.concurrency).collect();
let mut checkpoints = VecDeque::new();
Expand All @@ -65,7 +65,7 @@ impl<W: Worker + 'static> WorkerPool<W> {
// spawn child workers
for worker_id in 0..self.concurrency {
let (worker_sender, mut worker_recv) =
mpsc::channel::<Arc<CheckpointData>>(MAX_CHECKPOINTS_IN_PROGRESS);
mpsc::channel::<Arc<CheckpointData>>(*MAX_CHECKPOINTS_IN_PROGRESS);
let (term_sender, mut term_receiver) = oneshot::channel::<()>();
let cloned_progress_sender = progress_sender.clone();
let task_name = self.task_name.clone();
Expand Down
Loading