Skip to content

Commit

Permalink
indexer-alt: pruner task
Browse files Browse the repository at this point in the history
## Description

Add the task that actually deletes data, based on the reader low
watermark.

## Test plan

Run the indexer and note the following:

- Metrics related to deleted rows by the pruner (from
  `localhost:9184/metrics`)
- The contents of the `watermarks` table.

```
sui$ cargo run -p sui-indexer-alt --release --                                   \
  --database-url "postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt" \
  indexer --remote-store-url https://checkpoints.mainnet.sui.io                  \
  --last-checkpoint 10000                                                        \
  --consistent-range 100 --consistent-pruning-interval 10                        \
  --pipeline sum_obj_types --pipeline wal_obj_types
```

Also tested running the indexer for an extended period of time (1M
checkpoints over roughly half an hour in local testing), and noted how
the pruner behaves. When configured as it would be in production
(roughly one hour of consistent range, and a 5 minute pruning interval
and a 2 minute pruning delay):

- Many rows accumulated during backfill -- by the end of the 1M
  checkpoints, the pruner had only pruned up to between checkpoint 500K
  and checkpoint 700K depending on the pipeline. This should not be an
  issue under normal operation where the indexer will run for long
  enough for pruning to stabilise at the tip of the network (and it
  would be recommended practice to start from formal snapshot and
  therefore only need to run pruning from that point forward).
- Because the reader watermark task and the pruner task use the same
  interval, it can take up to two ticks of that interval for the pruner
  to act on a change to its upperbound -- again, it should be okay, as
  the pruner's interval should be at least an order of magnitude smaller
  than its retention period.
  • Loading branch information
amnn committed Nov 13, 2024
1 parent d0c7294 commit 6fe410d
Show file tree
Hide file tree
Showing 8 changed files with 434 additions and 6 deletions.
9 changes: 9 additions & 0 deletions crates/sui-indexer-alt/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ pub enum Command {
)]
consistent_pruning_interval: Duration,

/// How long to wait before honouring reader low watermarks.
#[arg(
long,
default_value = "120",
value_name = "SECONDS",
value_parser = |s: &str| s.parse().map(Duration::from_secs),
)]
pruner_delay: Duration,

/// Number of checkpoints to delay indexing summary tables for.
#[clap(long)]
consistent_range: Option<u64>,
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_types::full_checkpoint_content::CheckpointData;

Expand Down Expand Up @@ -56,4 +57,11 @@ impl Handler for WalCoinBalances {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let filter = wal_coin_balances::table
.filter(wal_coin_balances::cp_sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
8 changes: 8 additions & 0 deletions crates/sui-indexer-alt/src/handlers/wal_obj_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_types::full_checkpoint_content::CheckpointData;

Expand Down Expand Up @@ -59,4 +60,11 @@ impl Handler for WalObjTypes {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let filter = wal_obj_types::table
.filter(wal_obj_types::cp_sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
2 changes: 2 additions & 0 deletions crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn main() -> Result<()> {
Command::Indexer {
indexer,
consistent_pruning_interval,
pruner_delay,
consistent_range: lag,
} => {
let retry_interval = indexer.ingestion_config.retry_interval;
Expand All @@ -61,6 +62,7 @@ async fn main() -> Result<()> {
// write-ahead log needs to be pruned.
let pruner_config = lag.map(|l| PrunerConfig {
interval: consistent_pruning_interval,
delay: pruner_delay,
// Retain at least twice as much data as the lag, to guarantee overlap between the
// summary table and the write-ahead log.
retention: l * 2,
Expand Down
69 changes: 68 additions & 1 deletion crates/sui-indexer-alt/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,32 +81,40 @@ pub struct IndexerMetrics {

pub handler_checkpoint_latency: HistogramVec,

// Statistics related to individual ingestion pipelines' committers.
// Statistics related to individual ingestion pipelines.
pub total_collector_rows_received: IntCounterVec,
pub total_collector_batches_created: IntCounterVec,
pub total_committer_batches_attempted: IntCounterVec,
pub total_committer_batches_succeeded: IntCounterVec,
pub total_committer_rows_committed: IntCounterVec,
pub total_committer_rows_affected: IntCounterVec,
pub total_watermarks_out_of_order: IntCounterVec,
pub total_pruner_chunks_attempted: IntCounterVec,
pub total_pruner_chunks_deleted: IntCounterVec,
pub total_pruner_rows_deleted: IntCounterVec,

pub collector_gather_latency: HistogramVec,
pub collector_batch_size: HistogramVec,
pub committer_commit_latency: HistogramVec,
pub watermark_gather_latency: HistogramVec,
pub watermark_commit_latency: HistogramVec,
pub watermark_pruner_read_latency: HistogramVec,
pub watermark_pruner_write_latency: HistogramVec,
pub pruner_delete_latency: HistogramVec,

pub watermark_epoch: IntGaugeVec,
pub watermark_checkpoint: IntGaugeVec,
pub watermark_transaction: IntGaugeVec,
pub watermark_timestamp_ms: IntGaugeVec,
pub watermark_reader_lo: IntGaugeVec,
pub watermark_pruner_hi: IntGaugeVec,

pub watermark_epoch_in_db: IntGaugeVec,
pub watermark_checkpoint_in_db: IntGaugeVec,
pub watermark_transaction_in_db: IntGaugeVec,
pub watermark_timestamp_in_db_ms: IntGaugeVec,
pub watermark_reader_lo_in_db: IntGaugeVec,
pub watermark_pruner_hi_in_db: IntGaugeVec,
}

/// Collects information about the database connection pool.
Expand Down Expand Up @@ -319,6 +327,27 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
"indexer_pruner_chunks_attempted",
"Number of chunks this pruner attempted to delete",
&["pipeline"],
registry,
)
.unwrap(),
total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
"indexer_pruner_chunks_deleted",
"Number of chunks this pruner successfully deleted",
&["pipeline"],
registry,
)
.unwrap(),
total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
"indexer_pruner_rows_deleted",
"Number of rows this pruner successfully deleted",
&["pipeline"],
registry,
)
.unwrap(),
collector_gather_latency: register_histogram_vec_with_registry!(
"indexer_collector_gather_latency",
"Time taken to gather rows into a batch by this collector",
Expand Down Expand Up @@ -359,6 +388,30 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
watermark_pruner_read_latency: register_histogram_vec_with_registry!(
"indexer_watermark_pruner_read_latency",
"Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
&["pipeline"],
DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
watermark_pruner_write_latency: register_histogram_vec_with_registry!(
"indexer_watermark_pruner_write_latency",
"Time taken to write the pruner's new upperbound to the database by this pruner",
&["pipeline"],
DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
pruner_delete_latency: register_histogram_vec_with_registry!(
"indexer_pruner_delete_latency",
"Time taken to delete a chunk of data from the database by this pruner",
&["pipeline"],
DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
watermark_epoch: register_int_gauge_vec_with_registry!(
"indexer_watermark_epoch",
"Current epoch high watermark for this committer",
Expand Down Expand Up @@ -394,6 +447,13 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
watermark_pruner_hi: register_int_gauge_vec_with_registry!(
"indexer_watermark_pruner_hi",
"Current pruner high watermark for this pruner",
&["pipeline"],
registry,
)
.unwrap(),
watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
"indexer_watermark_epoch_in_db",
"Last epoch high watermark this committer wrote to the DB",
Expand Down Expand Up @@ -429,6 +489,13 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
"indexer_watermark_pruner_hi_in_db",
"Last pruner high watermark this pruner wrote to the DB",
&["pipeline"],
registry,
)
.unwrap(),
}
}

Expand Down
92 changes: 90 additions & 2 deletions crates/sui-indexer-alt/src/models/watermarks.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::borrow::Cow;
use std::{borrow::Cow, time::Duration};

use chrono::{naive::NaiveDateTime, DateTime, Utc};
use diesel::prelude::*;
use diesel::{dsl::sql, prelude::*, sql_types};
use diesel_async::RunQueryDsl;
use sui_field_count::FieldCount;

Expand Down Expand Up @@ -42,6 +42,24 @@ pub struct ReaderWatermark<'p> {
pub reader_lo: i64,
}

#[derive(Queryable, Debug, Clone, FieldCount)]
#[diesel(table_name = watermarks)]
pub struct PrunerWatermark<'p> {
/// The pipeline in question
pub pipeline: Cow<'p, str>,

/// How long to wait from when this query ran on the database until this information can be
/// used to prune the database. This number could be negative, meaning no waiting is necessary.
pub wait_for: i64,

/// The pruner can delete up to this checkpoint, (exclusive).
pub reader_lo: i64,

/// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
/// point.
pub pruner_hi: i64,
}

impl StoredWatermark {
pub async fn get(
conn: &mut Connection<'_>,
Expand Down Expand Up @@ -129,6 +147,76 @@ impl<'p> ReaderWatermark<'p> {
}
}

impl PrunerWatermark<'static> {
/// Get the bounds for the region that the pruner still has to prune for the given `pipeline`,
/// along with a duration to wait before acting on this information, based on the time at which
/// the pruner last updated the bounds, and the configured `delay`.
///
/// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
/// `reader_lo` (exclusive) after `wait_for` milliseconds have passed since this response was
/// returned.
pub async fn get(
conn: &mut Connection<'_>,
pipeline: &'static str,
delay: Duration,
) -> QueryResult<Option<Self>> {
// |---------- + delay ---------------------|
// |--- wait_for ---|
// |-----------------------|----------------|
// ^ ^
// pruner_timestamp NOW()
let wait_for = sql::<sql_types::BigInt>(&format!(
"CAST({} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
delay.as_millis(),
));

watermarks::table
.select((
watermarks::pipeline,
wait_for,
watermarks::reader_lo,
watermarks::pruner_hi,
))
.filter(watermarks::pipeline.eq(pipeline))
.first(conn)
.await
.optional()
}
}

impl<'p> PrunerWatermark<'p> {
/// How long to wait before the pruner can act on this information, or `None`, if there is no
/// need to wait.
pub fn wait_for(&self) -> Option<Duration> {
(self.wait_for > 0).then(|| Duration::from_millis(self.wait_for as u64))
}

/// Whether the pruner has any work left to do on the range in this watermark.
pub fn is_empty(&self) -> bool {
self.pruner_hi >= self.reader_lo
}

/// The next chunk that the pruner should work on, to advance the watermark.
pub fn next_chunk(&mut self, size: u64) -> (u64, u64) {
let from = self.pruner_hi as u64;
let to = (from + size).min(self.reader_lo as u64);
(from, to)
}

/// Update the pruner high watermark (only) for an existing watermark row, as long as this
/// raises the watermark.
///
/// Returns a boolean indicating whether the watermark was actually updated or not.
pub async fn update(&self, conn: &mut Connection<'_>) -> QueryResult<bool> {
Ok(diesel::update(watermarks::table)
.set(watermarks::pruner_hi.eq(self.pruner_hi))
.filter(watermarks::pipeline.eq(&self.pipeline))
.execute(conn)
.await?
> 0)
}
}

impl<'p> From<CommitterWatermark<'p>> for StoredWatermark {
fn from(watermark: CommitterWatermark<'p>) -> Self {
StoredWatermark {
Expand Down
26 changes: 23 additions & 3 deletions crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ use crate::{

use super::{processor::processor, PipelineConfig, Processor, WatermarkPart, PIPELINE_BUFFER};

use self::{collector::collector, commit_watermark::commit_watermark, committer::committer};
use self::{
collector::collector, commit_watermark::commit_watermark, committer::committer, pruner::pruner,
};

mod collector;
mod commit_watermark;
mod committer;
mod pruner;
mod reader_watermark;

/// The maximum number of watermarks that can show up in a single batch. This limit exists to deal
Expand Down Expand Up @@ -63,13 +66,23 @@ pub trait Handler: Processor {
/// affected.
async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>)
-> anyhow::Result<usize>;

/// Clean up data between checkpoints `_from` and `_to` (inclusive) in the database, returning
/// the number of rows affected. This function is optional, and defaults to not pruning at all.
async fn prune(_from: u64, _to: u64, _conn: &mut db::Connection<'_>) -> anyhow::Result<usize> {
Ok(0)
}
}

#[derive(Debug, Clone)]
pub struct PrunerConfig {
/// How often the pruner should check whether there is any data to prune.
pub interval: Duration,

/// How long to wait after the reader low watermark was set, until it is safe to prune up until
/// this new watermark.
pub delay: Duration,

/// How much data to keep, this is measured in checkpoints.
pub retention: u64,

Expand Down Expand Up @@ -181,12 +194,19 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
cancel,
);

let reader_watermark = reader_watermark::<H>(pruner_config, db, metrics, pruner_cancel.clone());
let reader_watermark = reader_watermark::<H>(
pruner_config.clone(),
db.clone(),
metrics.clone(),
pruner_cancel.clone(),
);

let pruner = pruner::<H>(pruner_config, db, metrics, pruner_cancel.clone());

tokio::spawn(async move {
let (_, _, _, _) = futures::join!(processor, collector, committer, commit_watermark);

pruner_cancel.cancel();
let _ = futures::join!(reader_watermark);
let _ = futures::join!(reader_watermark, pruner);
})
}
Loading

0 comments on commit 6fe410d

Please sign in to comment.