From 5577878bd585c20ec9ed03004e12362a4dbf1145 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Mon, 16 Dec 2024 15:23:54 -0800 Subject: [PATCH] [indexer-alt] Add more checkpoint lag metrics --- .../sui-indexer-alt-framework/src/metrics.rs | 86 +++++++++++++++++++ .../src/pipeline/concurrent/collector.rs | 12 ++- .../pipeline/concurrent/commit_watermark.rs | 18 ++-- .../src/pipeline/concurrent/committer.rs | 19 +++- .../src/pipeline/mod.rs | 4 + 5 files changed, 131 insertions(+), 8 deletions(-) diff --git a/crates/sui-indexer-alt-framework/src/metrics.rs b/crates/sui-indexer-alt-framework/src/metrics.rs index dee1313b82ca7..3afe99d3e5925 100644 --- a/crates/sui-indexer-alt-framework/src/metrics.rs +++ b/crates/sui-indexer-alt-framework/src/metrics.rs @@ -71,6 +71,7 @@ pub(crate) struct IndexerMetrics { pub total_ingested_transient_retries: IntCounterVec, pub total_ingested_not_found_retries: IntCounter, + // Checkpoint lag metrics for the ingestion pipeline. pub latest_ingested_checkpoint: IntGauge, pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge, pub ingested_checkpoint_timestamp_lag: Histogram, @@ -102,6 +103,26 @@ pub(crate) struct IndexerMetrics { pub total_pruner_chunks_deleted: IntCounterVec, pub total_pruner_rows_deleted: IntCounterVec, + // Checkpoint lag metrics for the collector. + pub latest_collected_checkpoint: IntGaugeVec, + pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec, + pub collected_checkpoint_timestamp_lag: HistogramVec, + + // Checkpoint lag metrics for the committer. + // We can only report partially committed checkpoints, since the concurrent committer isn't aware of + // when a checkpoint is fully committed. So we report whenever we see a checkpoint. Since data from + // the same checkpoint is batched continuously, this is a good proxy for the last committed checkpoint. + pub latest_partially_committed_checkpoint: IntGaugeVec, + pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec, + pub partially_committed_checkpoint_timestamp_lag: HistogramVec, + + // Checkpoint lag metrics for the watermarker. + // The latest watermarked checkpoint metric is already covered by watermark_checkpoint_in_db. + // While we already have watermark_timestamp_in_db_ms metric, reporting the lag explicitly + // for consistency. + pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec, + pub watermarked_checkpoint_timestamp_lag: HistogramVec, + pub collector_gather_latency: HistogramVec, pub collector_batch_size: HistogramVec, pub committer_commit_latency: HistogramVec, @@ -409,6 +430,71 @@ impl IndexerMetrics { registry, ) .unwrap(), + latest_collected_checkpoint: register_int_gauge_vec_with_registry!( + "indexer_latest_collected_checkpoint", + "Latest checkpoint sequence number collected by this collector", + &["pipeline"], + registry, + ) + .unwrap(), + latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!( + "indexer_latest_collected_checkpoint_timestamp_lag_ms", + "Difference between the system timestamp when the latest checkpoint was collected and the \ + timestamp in the checkpoint, in milliseconds", + &["pipeline"], + registry, + ) + .unwrap(), + collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!( + "indexer_collected_checkpoint_timestamp_lag", + "Difference between the system timestamp when a checkpoint was collected and the \ + timestamp in each checkpoint, in seconds", + &["pipeline"], + LAG_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!( + "indexer_latest_partially_committed_checkpoint", + "Latest checkpoint sequence number partially committed by this collector", + &["pipeline"], + registry, + ) + .unwrap(), + latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!( + "indexer_latest_partially_committed_checkpoint_timestamp_lag_ms", + "Difference between the system timestamp when the latest checkpoint was partially committed and the \ + timestamp in the checkpoint, in milliseconds", + &["pipeline"], + registry, + ) + .unwrap(), + partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!( + "indexer_partially_committed_checkpoint_timestamp_lag", + "Difference between the system timestamp when a checkpoint was partially committed and the \ + timestamp in each checkpoint, in seconds", + &["pipeline"], + LAG_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!( + "indexer_latest_watermarked_checkpoint_timestamp_lag_ms", + "Difference between the system timestamp when the latest checkpoint was watermarked and the \ + timestamp in the checkpoint, in milliseconds", + &["pipeline"], + registry, + ) + .unwrap(), + watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!( + "indexer_watermarked_checkpoint_timestamp_lag", + "Difference between the system timestamp when a checkpoint was watermarked and the \ + timestamp in each checkpoint, in seconds", + &["pipeline"], + LAG_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), collector_gather_latency: register_histogram_vec_with_registry!( "indexer_collector_gather_latency", "Time taken to gather rows into a batch by this collector", diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs index f3a935efd8dfd..5b3cb84c911d8 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs @@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info}; use crate::{ - metrics::IndexerMetrics, + metrics::{CheckpointLagMetricReporter, IndexerMetrics}, pipeline::{CommitterConfig, IndexedCheckpoint, WatermarkPart}, }; @@ -99,6 +99,12 @@ pub(super) fn collector( let mut received: BTreeMap> = BTreeMap::new(); let checkpoint_lag = checkpoint_lag.unwrap_or_default(); + let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::( + &metrics.collected_checkpoint_timestamp_lag, + &metrics.latest_collected_checkpoint_timestamp_lag_ms, + &metrics.latest_collected_checkpoint, + ); + // Data for checkpoints that are ready to be sent but haven't been written yet. let mut pending: BTreeMap> = BTreeMap::new(); let mut pending_rows = 0; @@ -128,6 +134,10 @@ pub(super) fn collector( let indexed = entry.get_mut(); indexed.batch_into(&mut batch); if indexed.is_empty() { + checkpoint_lag_reporter.report_lag( + indexed.watermark.checkpoint(), + indexed.watermark.timestamp_ms(), + ); entry.remove(); } } diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs index 96ade643909d7..a8cbf1fe0ee98 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs @@ -17,7 +17,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use crate::{ - metrics::IndexerMetrics, + metrics::{CheckpointLagMetricReporter, IndexerMetrics}, pipeline::{logging::WatermarkLogger, CommitterConfig, WatermarkPart, WARN_PENDING_WATERMARKS}, watermarks::CommitterWatermark, }; @@ -79,6 +79,12 @@ pub(super) fn commit_watermark( // demonstrate that the pipeline is making progress. let mut logger = WatermarkLogger::new("concurrent_committer", &watermark); + let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::( + &metrics.watermarked_checkpoint_timestamp_lag, + &metrics.latest_watermarked_checkpoint_timestamp_lag_ms, + &metrics.watermark_checkpoint_in_db, + ); + info!(pipeline = H::NAME, ?watermark, "Starting commit watermark"); loop { @@ -202,16 +208,16 @@ pub(super) fn commit_watermark( logger.log::(&watermark, elapsed); + checkpoint_lag_reporter.report_lag( + watermark.checkpoint_hi_inclusive as u64, + watermark.timestamp_ms_hi_inclusive as u64, + ); + metrics .watermark_epoch_in_db .with_label_values(&[H::NAME]) .set(watermark.epoch_hi_inclusive); - metrics - .watermark_checkpoint_in_db - .with_label_values(&[H::NAME]) - .set(watermark.checkpoint_hi_inclusive); - metrics .watermark_transaction_in_db .with_label_values(&[H::NAME]) diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs index 8a08ad225d403..5b06a82e89013 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs @@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use crate::{ - metrics::IndexerMetrics, + metrics::{CheckpointLagMetricReporter, IndexerMetrics}, pipeline::{Break, CommitterConfig, WatermarkPart}, task::TrySpawnStreamExt, }; @@ -45,6 +45,11 @@ pub(super) fn committer( ) -> JoinHandle<()> { tokio::spawn(async move { info!(pipeline = H::NAME, "Starting committer"); + let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::( + &metrics.partially_committed_checkpoint_timestamp_lag, + &metrics.latest_partially_committed_checkpoint_timestamp_lag_ms, + &metrics.latest_partially_committed_checkpoint, + ); match ReceiverStream::new(rx) .try_for_each_spawned( @@ -55,6 +60,7 @@ pub(super) fn committer( let db = db.clone(); let metrics = metrics.clone(); let cancel = cancel.clone(); + let checkpoint_lag_reporter = checkpoint_lag_reporter.clone(); // Repeatedly try to get a connection to the DB and write the batch. Use an // exponential backoff in case the failure is due to contention over the DB @@ -67,11 +73,16 @@ pub(super) fn committer( ..Default::default() }; + let highest_checkpoint = watermark.iter().map(|w| w.checkpoint()).max(); + let highest_checkpoint_timestamp = + watermark.iter().map(|w| w.timestamp_ms()).max(); + use backoff::Error as BE; let commit = move || { let values = values.clone(); let db = db.clone(); let metrics = metrics.clone(); + let checkpoint_lag_reporter = checkpoint_lag_reporter.clone(); async move { if values.is_empty() { return Ok(()); @@ -112,6 +123,12 @@ pub(super) fn committer( "Wrote batch", ); + checkpoint_lag_reporter.report_lag( + // unwrap is safe because we would have returned if values is empty. + highest_checkpoint.unwrap(), + highest_checkpoint_timestamp.unwrap(), + ); + metrics .total_committer_batches_succeeded .with_label_values(&[H::NAME]) diff --git a/crates/sui-indexer-alt-framework/src/pipeline/mod.rs b/crates/sui-indexer-alt-framework/src/pipeline/mod.rs index e5c90a2d714e5..08604023d0f66 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/mod.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/mod.rs @@ -115,6 +115,10 @@ impl WatermarkPart { self.watermark.checkpoint_hi_inclusive as u64 } + fn timestamp_ms(&self) -> u64 { + self.watermark.timestamp_ms_hi_inclusive as u64 + } + /// Check if all the rows from this watermark are represented in this part. fn is_complete(&self) -> bool { self.batch_rows == self.total_rows