From a6a80846d7a48ce6c602991221bcc03581d8da33 Mon Sep 17 00:00:00 2001 From: Emma Zhong Date: Tue, 10 Dec 2024 23:56:16 -0800 Subject: [PATCH] [indexer alt] rename a few structs to make their units more clear (#20419) ## Description While reading/testing this code I found that it can be tough to reason about limits of channels because each channel may have different units: checkpoints, batches, rows. So this PR made some struct names more explicit according to my understanding and added some comments to make channel limits more clear. ## Test plan Existing tests. --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../src/pipeline/concurrent/collector.rs | 56 ++--- .../src/pipeline/concurrent/committer.rs | 211 +++++++++--------- .../src/pipeline/concurrent/mod.rs | 7 +- .../src/pipeline/mod.rs | 4 +- .../src/pipeline/processor.rs | 6 +- .../src/pipeline/sequential/committer.rs | 6 +- 6 files changed, 151 insertions(+), 139 deletions(-) diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs index 96d3e276ec9d6..f3a935efd8dfd 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs @@ -13,21 +13,21 @@ use tracing::{debug, info}; use crate::{ metrics::IndexerMetrics, - pipeline::{CommitterConfig, Indexed, WatermarkPart}, + pipeline::{CommitterConfig, IndexedCheckpoint, WatermarkPart}, }; -use super::{Batched, Handler}; +use super::{BatchedRows, Handler}; /// Processed values that are waiting to be written to the database. This is an internal type used /// by the concurrent collector to hold data it is waiting to send to the committer. -struct Pending { +struct PendingCheckpoint { /// Values to be inserted into the database from this checkpoint values: Vec, /// The watermark associated with this checkpoint and the part of it that is left to commit watermark: WatermarkPart, } -impl Pending { +impl PendingCheckpoint { /// Whether there are values left to commit from this indexed checkpoint. fn is_empty(&self) -> bool { let empty = self.values.is_empty(); @@ -39,7 +39,7 @@ impl Pending { /// Adds data from this indexed checkpoint to the `batch`, honoring the handler's bounds on /// chunk size. - fn batch_into(&mut self, batch: &mut Batched) { + fn batch_into(&mut self, batch: &mut BatchedRows) { let max_chunk_rows = super::max_chunk_rows::(); if batch.values.len() + self.values.len() > max_chunk_rows { let mut for_batch = self.values.split_off(max_chunk_rows - batch.values.len()); @@ -54,8 +54,8 @@ impl Pending { } } -impl From> for Pending { - fn from(indexed: Indexed) -> Self { +impl From> for PendingCheckpoint { + fn from(indexed: IndexedCheckpoint) -> Self { Self { watermark: WatermarkPart { watermark: indexed.watermark, @@ -84,8 +84,8 @@ impl From> for Pending { pub(super) fn collector( config: CommitterConfig, checkpoint_lag: Option, - mut rx: mpsc::Receiver>, - tx: mpsc::Sender>, + mut rx: mpsc::Receiver>, + tx: mpsc::Sender>, metrics: Arc, cancel: CancellationToken, ) -> JoinHandle<()> { @@ -96,11 +96,11 @@ pub(super) fn collector( poll.set_missed_tick_behavior(MissedTickBehavior::Delay); // Data for checkpoints that have been received but not yet ready to be sent to committer due to lag constraint. - let mut received: BTreeMap> = BTreeMap::new(); + let mut received: BTreeMap> = BTreeMap::new(); let checkpoint_lag = checkpoint_lag.unwrap_or_default(); // Data for checkpoints that are ready to be sent but haven't been written yet. - let mut pending: BTreeMap> = BTreeMap::new(); + let mut pending: BTreeMap> = BTreeMap::new(); let mut pending_rows = 0; info!(pipeline = H::NAME, "Starting collector"); @@ -119,7 +119,7 @@ pub(super) fn collector( .with_label_values(&[H::NAME]) .start_timer(); - let mut batch = Batched::new(); + let mut batch = BatchedRows::new(); while !batch.is_full() { let Some(mut entry) = pending.first_entry() else { break; @@ -193,8 +193,8 @@ pub(super) fn collector( /// Move all checkpoints from `received` that are within the lag range into `pending`. /// Returns the number of rows moved. fn move_ready_checkpoints( - received: &mut BTreeMap>, - pending: &mut BTreeMap>, + received: &mut BTreeMap>, + pending: &mut BTreeMap>, checkpoint_lag: u64, ) -> usize { let tip = match (received.last_key_value(), pending.last_key_value()) { @@ -280,7 +280,10 @@ mod tests { // Add checkpoints 1-5 to received for i in 1..=5 { - received.insert(i, Indexed::new(0, i, 0, 0, vec![Entry, Entry, Entry])); + received.insert( + i, + IndexedCheckpoint::new(0, i, 0, 0, vec![Entry, Entry, Entry]), + ); } // With lag of 2 and tip at 5, only checkpoints 1-3 should move @@ -300,11 +303,14 @@ mod tests { let mut pending = BTreeMap::new(); // Add checkpoint 10 to pending to establish tip - pending.insert(10, Pending::from(Indexed::new(0, 10, 0, 0, vec![Entry]))); + pending.insert( + 10, + PendingCheckpoint::from(IndexedCheckpoint::new(0, 10, 0, 0, vec![Entry])), + ); // Add checkpoints 1-5 to received for i in 1..=5 { - received.insert(i, Indexed::new(0, i, 0, 0, vec![Entry])); + received.insert(i, IndexedCheckpoint::new(0, i, 0, 0, vec![Entry])); } // With lag of 3 and tip at 10, checkpoints 1-7 can move @@ -322,7 +328,7 @@ mod tests { // Add checkpoints 8-10 to received for i in 8..=10 { - received.insert(i, Indexed::new(0, i, 0, 0, vec![Entry])); + received.insert(i, IndexedCheckpoint::new(0, i, 0, 0, vec![Entry])); } // With lag of 5 and tip at 10, no checkpoints can move @@ -355,9 +361,9 @@ mod tests { // Send test data let test_data = vec![ - Indexed::new(0, 1, 10, 1000, vec![Entry; part1_length]), - Indexed::new(0, 2, 20, 2000, vec![Entry; part2_length]), - Indexed::new(0, 3, 30, 3000, vec![Entry, Entry]), + IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; part1_length]), + IndexedCheckpoint::new(0, 2, 20, 2000, vec![Entry; part2_length]), + IndexedCheckpoint::new(0, 3, 30, 3000, vec![Entry, Entry]), ]; for data in test_data { @@ -393,7 +399,7 @@ mod tests { ); processor_tx - .send(Indexed::new(0, 1, 10, 1000, vec![Entry, Entry])) + .send(IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry, Entry])) .await .unwrap(); @@ -434,7 +440,7 @@ mod tests { ); // Send more data than MAX_PENDING_ROWS plus collector channel buffer - let data = Indexed::new( + let data = IndexedCheckpoint::new( 0, 1, 10, @@ -452,12 +458,12 @@ mod tests { // Now fill up the processor channel with minimum data to trigger send blocking for _ in 0..processor_channel_size { - let more_data = Indexed::new(0, 2, 11, 1000, vec![Entry]); + let more_data = IndexedCheckpoint::new(0, 2, 11, 1000, vec![Entry]); processor_tx.send(more_data).await.unwrap(); } // Now sending even more data should block because of MAX_PENDING_ROWS limit. - let even_more_data = Indexed::new(0, 3, 12, 1000, vec![Entry]); + let even_more_data = IndexedCheckpoint::new(0, 3, 12, 1000, vec![Entry]); let send_result = processor_tx.try_send(even_more_data); assert!(matches!( diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs index 0b2f25f50bbd3..bc56e0a32e34a 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs @@ -16,7 +16,7 @@ use crate::{ task::TrySpawnStreamExt, }; -use super::{Batched, Handler}; +use super::{BatchedRows, Handler}; /// If the committer needs to retry a commit, it will wait this long initially. const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100); @@ -37,7 +37,7 @@ const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1); pub(super) fn committer( config: CommitterConfig, skip_watermark: bool, - rx: mpsc::Receiver>, + rx: mpsc::Receiver>, tx: mpsc::Sender>, db: Db, metrics: Arc, @@ -47,124 +47,127 @@ pub(super) fn committer( info!(pipeline = H::NAME, "Starting committer"); match ReceiverStream::new(rx) - .try_for_each_spawned(config.write_concurrency, |Batched { values, watermark }| { - let values = Arc::new(values); - let tx = tx.clone(); - let db = db.clone(); - let metrics = metrics.clone(); - let cancel = cancel.clone(); - - // Repeatedly try to get a connection to the DB and write the batch. Use an - // exponential backoff in case the failure is due to contention over the DB - // connection pool. - let backoff = ExponentialBackoff { - initial_interval: INITIAL_RETRY_INTERVAL, - current_interval: INITIAL_RETRY_INTERVAL, - max_interval: MAX_RETRY_INTERVAL, - max_elapsed_time: None, - ..Default::default() - }; - - use backoff::Error as BE; - let commit = move || { - let values = values.clone(); + .try_for_each_spawned( + config.write_concurrency, + |BatchedRows { values, watermark }| { + let values = Arc::new(values); + let tx = tx.clone(); let db = db.clone(); let metrics = metrics.clone(); - async move { - if values.is_empty() { - return Ok(()); - } - - metrics - .total_committer_batches_attempted - .with_label_values(&[H::NAME]) - .inc(); - - let guard = metrics - .committer_commit_latency - .with_label_values(&[H::NAME]) - .start_timer(); - - let mut conn = db.connect().await.map_err(|e| { - warn!( - pipeline = H::NAME, - "Committed failed to get connection for DB" - ); - BE::transient(Break::Err(e.into())) - })?; - - let affected = H::commit(values.as_slice(), &mut conn).await; - let elapsed = guard.stop_and_record(); - - match affected { - Ok(affected) => { - debug!( - pipeline = H::NAME, - elapsed_ms = elapsed * 1000.0, - affected, - committed = values.len(), - "Wrote batch", - ); - - metrics - .total_committer_batches_succeeded - .with_label_values(&[H::NAME]) - .inc(); + let cancel = cancel.clone(); + + // Repeatedly try to get a connection to the DB and write the batch. Use an + // exponential backoff in case the failure is due to contention over the DB + // connection pool. + let backoff = ExponentialBackoff { + initial_interval: INITIAL_RETRY_INTERVAL, + current_interval: INITIAL_RETRY_INTERVAL, + max_interval: MAX_RETRY_INTERVAL, + max_elapsed_time: None, + ..Default::default() + }; - metrics - .total_committer_rows_committed - .with_label_values(&[H::NAME]) - .inc_by(values.len() as u64); + use backoff::Error as BE; + let commit = move || { + let values = values.clone(); + let db = db.clone(); + let metrics = metrics.clone(); + async move { + if values.is_empty() { + return Ok(()); + } - metrics - .total_committer_rows_affected - .with_label_values(&[H::NAME]) - .inc_by(affected as u64); + metrics + .total_committer_batches_attempted + .with_label_values(&[H::NAME]) + .inc(); - metrics - .committer_tx_rows - .with_label_values(&[H::NAME]) - .observe(affected as f64); + let guard = metrics + .committer_commit_latency + .with_label_values(&[H::NAME]) + .start_timer(); - Ok(()) - } - - Err(e) => { + let mut conn = db.connect().await.map_err(|e| { warn!( pipeline = H::NAME, - elapsed_ms = elapsed * 1000.0, - committed = values.len(), - "Error writing batch: {e}", + "Committed failed to get connection for DB" ); - - Err(BE::transient(Break::Err(e))) + BE::transient(Break::Err(e.into())) + })?; + + let affected = H::commit(values.as_slice(), &mut conn).await; + let elapsed = guard.stop_and_record(); + + match affected { + Ok(affected) => { + debug!( + pipeline = H::NAME, + elapsed_ms = elapsed * 1000.0, + affected, + committed = values.len(), + "Wrote batch", + ); + + metrics + .total_committer_batches_succeeded + .with_label_values(&[H::NAME]) + .inc(); + + metrics + .total_committer_rows_committed + .with_label_values(&[H::NAME]) + .inc_by(values.len() as u64); + + metrics + .total_committer_rows_affected + .with_label_values(&[H::NAME]) + .inc_by(affected as u64); + + metrics + .committer_tx_rows + .with_label_values(&[H::NAME]) + .observe(affected as f64); + + Ok(()) + } + + Err(e) => { + warn!( + pipeline = H::NAME, + elapsed_ms = elapsed * 1000.0, + committed = values.len(), + "Error writing batch: {e}", + ); + + Err(BE::transient(Break::Err(e))) + } } } - } - }; + }; - async move { - tokio::select! { - _ = cancel.cancelled() => { - return Err(Break::Cancel); - } + async move { + tokio::select! { + _ = cancel.cancelled() => { + return Err(Break::Cancel); + } - // Double check that the commit actually went through, (this backoff should - // not produce any permanent errors, but if it does, we need to shutdown - // the pipeline). - commit = backoff::future::retry(backoff, commit) => { - let () = commit?; + // Double check that the commit actually went through, (this backoff should + // not produce any permanent errors, but if it does, we need to shutdown + // the pipeline). + commit = backoff::future::retry(backoff, commit) => { + let () = commit?; + } + }; + + if !skip_watermark && tx.send(watermark).await.is_err() { + info!(pipeline = H::NAME, "Watermark closed channel"); + return Err(Break::Cancel); } - }; - if !skip_watermark && tx.send(watermark).await.is_err() { - info!(pipeline = H::NAME, "Watermark closed channel"); - return Err(Break::Cancel); + Ok(()) } - - Ok(()) - } - }) + }, + ) .await { Ok(()) => { diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs index 35a2aa2f6a128..a5e43cca2adb4 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs @@ -103,7 +103,10 @@ pub struct PrunerConfig { /// Values ready to be written to the database. This is an internal type used to communicate /// between the collector and the committer parts of the pipeline. -struct Batched { +/// +/// Values inside each batch may or may not be from the same checkpoint. Values in the same +/// checkpoint can also be split across multiple batches. +struct BatchedRows { /// The rows to write values: Vec, /// Proportions of all the watermarks that are represented in this chunk @@ -120,7 +123,7 @@ impl PrunerConfig { } } -impl Batched { +impl BatchedRows { fn new() -> Self { Self { values: vec![], diff --git a/crates/sui-indexer-alt-framework/src/pipeline/mod.rs b/crates/sui-indexer-alt-framework/src/pipeline/mod.rs index 14abb8095b966..e5c90a2d714e5 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/mod.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/mod.rs @@ -40,7 +40,7 @@ pub struct CommitterConfig { /// Processed values associated with a single checkpoint. This is an internal type used to /// communicate between the processor and the collector parts of the pipeline. -struct Indexed { +struct IndexedCheckpoint { /// Values to be inserted into the database from this checkpoint values: Vec, /// The watermark associated with this checkpoint @@ -79,7 +79,7 @@ impl CommitterConfig { } } -impl Indexed

{ +impl IndexedCheckpoint

{ fn new( epoch: u64, cp_sequence_number: u64, diff --git a/crates/sui-indexer-alt-framework/src/pipeline/processor.rs b/crates/sui-indexer-alt-framework/src/pipeline/processor.rs index 2c35ec04fa1d2..10a20969d9721 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/processor.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/processor.rs @@ -12,7 +12,7 @@ use tracing::{debug, error, info}; use crate::{metrics::IndexerMetrics, pipeline::Break, task::TrySpawnStreamExt}; -use super::Indexed; +use super::IndexedCheckpoint; /// Implementors of this trait are responsible for transforming checkpoint into rows for their /// table. The `FANOUT` associated value controls how many concurrent workers will be used to @@ -43,7 +43,7 @@ pub trait Processor { pub(super) fn processor( processor: P, rx: mpsc::Receiver>, - tx: mpsc::Sender>, + tx: mpsc::Sender>, metrics: Arc, cancel: CancellationToken, ) -> JoinHandle<()> { @@ -119,7 +119,7 @@ pub(super) fn processor( .with_label_values(&[P::NAME]) .inc_by(values.len() as u64); - tx.send(Indexed::new( + tx.send(IndexedCheckpoint::new( epoch, cp_sequence_number, tx_hi, diff --git a/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs b/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs index c6550a4417ccc..0cdc3d8d9855f 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs @@ -15,7 +15,7 @@ use tracing::{debug, info, warn}; use crate::{ metrics::IndexerMetrics, - pipeline::{logging::WatermarkLogger, Indexed, WARN_PENDING_WATERMARKS}, + pipeline::{logging::WatermarkLogger, IndexedCheckpoint, WARN_PENDING_WATERMARKS}, watermarks::CommitterWatermark, }; @@ -42,7 +42,7 @@ use super::{Handler, SequentialConfig}; pub(super) fn committer( config: SequentialConfig, watermark: Option>, - mut rx: mpsc::Receiver>, + mut rx: mpsc::Receiver>, tx: mpsc::UnboundedSender<(&'static str, u64)>, db: Db, metrics: Arc, @@ -84,7 +84,7 @@ pub(super) fn committer( // Data for checkpoint that haven't been written yet. Note that `pending_rows` includes // rows in `batch`. - let mut pending: BTreeMap> = BTreeMap::new(); + let mut pending: BTreeMap> = BTreeMap::new(); let mut pending_rows = 0; info!(pipeline = H::NAME, ?watermark, "Starting committer");