Skip to content

Commit

Permalink
indexer pruner: add pruner and prune unpartitioned tables (MystenLabs…
Browse files Browse the repository at this point in the history
…#18495)

## Description 

a separate pruner that use `epochs` table and `cp_tx` table to track
progress of pruning and handle disaster recovery;
and `checkpoints` table as available range data source.

## Test plan 

local run and verify via local client
- progress tracking via epoch, cp, and tx

![epoch_progress](https://github.com/MystenLabs/sui/assets/106119108/f628c7b5-add7-4198-94e3-f2542dfd7890)

![tx_cp_progress](https://github.com/MystenLabs/sui/assets/106119108/4395ff7d-b79b-4a6d-aec6-243ac5c86c22)



- verify on checkpoints & tx_ tables
  - make sure that the checkpoints table is indeed latest cp_tx cp + 1

![check_cp](https://github.com/MystenLabs/sui/assets/106119108/13de7129-16b6-44e9-a922-c487100cb152)
  - make sure that all cp < min_tx in cp_tx have been pruned

![check_tx](https://github.com/MystenLabs/sui/assets/106119108/0708e667-135a-44be-be77-b1eb667ba640)

---



## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
gegaowp authored Jul 19, 2024
1 parent 090bec4 commit e7d196e
Show file tree
Hide file tree
Showing 16 changed files with 461 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DROP TABLE IF EXISTS checkpoints;
DROP TABLE IF EXISTS pruner_cp_watermark;
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ CREATE TABLE checkpoints

CREATE INDEX checkpoints_epoch ON checkpoints (epoch, sequence_number);
CREATE INDEX checkpoints_digest ON checkpoints (checkpoint_digest(32));

CREATE TABLE pruner_cp_watermark (
checkpoint_sequence_number BIGINT PRIMARY KEY,
min_tx_sequence_number BIGINT NOT NULL,
max_tx_sequence_number BIGINT NOT NULL
)
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
-- This file should undo anything in `up.sql`
DROP TABLE IF EXISTS checkpoints;
DROP TABLE IF EXISTS pruner_cp_watermark;
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ CREATE TABLE checkpoints

CREATE INDEX checkpoints_epoch ON checkpoints (epoch, sequence_number);
CREATE INDEX checkpoints_digest ON checkpoints USING HASH (checkpoint_digest);

CREATE TABLE pruner_cp_watermark (
checkpoint_sequence_number BIGINT PRIMARY KEY,
min_tx_sequence_number BIGINT NOT NULL,
max_tx_sequence_number BIGINT NOT NULL
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ CREATE TABLE tx_input_objects (
object_id BYTEA NOT NULL,
PRIMARY KEY(object_id, tx_sequence_number, cp_sequence_number)
);
CREATE INDEX tx_input_objects_tx_sequence_number_index ON tx_input_objects (tx_sequence_number);

CREATE TABLE tx_changed_objects (
cp_sequence_number BIGINT NOT NULL,
Expand All @@ -31,6 +32,7 @@ CREATE TABLE tx_changed_objects (
object_id BYTEA NOT NULL,
PRIMARY KEY(object_id, tx_sequence_number, cp_sequence_number)
);
CREATE INDEX tx_changed_objects_tx_sequence_number_index ON tx_changed_objects (tx_sequence_number);

CREATE TABLE tx_calls (
cp_sequence_number BIGINT NOT NULL,
Expand All @@ -52,3 +54,4 @@ CREATE TABLE tx_digests (
cp_sequence_number BIGINT NOT NULL,
tx_sequence_number BIGINT NOT NULL
);
CREATE INDEX tx_digests_tx_sequence_number ON tx_digests (tx_sequence_number);
1 change: 1 addition & 0 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
pub mod checkpoint_handler;
pub mod committer;
pub mod objects_snapshot_processor;
pub mod pruner;
pub mod tx_processor;

#[derive(Debug)]
Expand Down
60 changes: 60 additions & 0 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use tokio_util::sync::CancellationToken;
use tracing::{error, info};

use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult};

pub struct Pruner<S> {
pub store: S,
pub epochs_to_keep: u64,
pub metrics: IndexerMetrics,
}

impl<S> Pruner<S>
where
S: IndexerStore + Clone + Sync + Send + 'static,
{
pub fn new(store: S, epochs_to_keep: u64, metrics: IndexerMetrics) -> Self {
Self {
store,
epochs_to_keep,
metrics,
}
}

pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> {
loop {
if cancel.is_cancelled() {
info!("Pruner task cancelled.");
return Ok(());
}

let (mut min_epoch, mut max_epoch) = self.store.get_available_epoch_range().await?;
while min_epoch + self.epochs_to_keep > max_epoch {
if cancel.is_cancelled() {
info!("Pruner task cancelled.");
return Ok(());
}
tokio::time::sleep(Duration::from_secs(5)).await;
(min_epoch, max_epoch) = self.store.get_available_epoch_range().await?;
}

for epoch in min_epoch..=max_epoch - self.epochs_to_keep {
if cancel.is_cancelled() {
info!("Pruner task cancelled.");
return Ok(());
}
info!("Pruning epoch {}", epoch);
self.store.prune_epoch(epoch).await.unwrap_or_else(|e| {
error!("Failed to prune epoch {}: {}", epoch, e);
});
self.metrics.last_pruned_epoch.set(epoch as i64);
info!("Pruned epoch {}", epoch);
}
}
}
}
13 changes: 13 additions & 0 deletions crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::handlers::checkpoint_handler::new_handlers;
use crate::handlers::objects_snapshot_processor::{
start_objects_snapshot_processor, SnapshotLagConfig,
};
use crate::handlers::pruner::Pruner;
use crate::indexer_reader::IndexerReader;
use crate::metrics::IndexerMetrics;
use crate::store::IndexerStore;
Expand Down Expand Up @@ -108,6 +109,18 @@ impl Indexer {
)
.await?;

let epochs_to_keep = std::env::var("EPOCHS_TO_KEEP")
.map(|s| s.parse::<u64>().ok())
.unwrap_or_else(|_e| None);
if let Some(epochs_to_keep) = epochs_to_keep {
info!(
"Starting indexer pruner with epochs to keep: {}",
epochs_to_keep
);
let pruner = Pruner::new(store.clone(), epochs_to_keep, metrics.clone());
spawn_monitored_task!(pruner.start(CancellationToken::new()));
}

let cancel_clone = cancel.clone();
let (exit_sender, exit_receiver) = oneshot::channel();
// Spawn a task that links the cancellation token to the exit sender
Expand Down
29 changes: 28 additions & 1 deletion crates/sui-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,13 @@ pub struct IndexerMetrics {
// indexer state metrics
pub db_conn_pool_size: IntGauge,
pub idle_db_conn: IntGauge,

pub address_processor_failure: IntCounter,
pub checkpoint_metrics_processor_failure: IntCounter,
// pruner metrics
pub last_pruned_epoch: IntGauge,
pub last_pruned_checkpoint: IntGauge,
pub last_pruned_transaction: IntGauge,
pub epoch_pruning_latency: Histogram,
}

impl IndexerMetrics {
Expand Down Expand Up @@ -742,6 +746,29 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
last_pruned_epoch: register_int_gauge_with_registry!(
"last_pruned_epoch",
"Last pruned epoch number",
registry,
)
.unwrap(),
last_pruned_checkpoint: register_int_gauge_with_registry!(
"last_pruned_checkpoint",
"Last pruned checkpoint sequence number",
registry,
)
.unwrap(),
last_pruned_transaction: register_int_gauge_with_registry!(
"last_pruned_transaction",
"Last pruned transaction sequence number",
registry,
).unwrap(),
epoch_pruning_latency: register_histogram_with_registry!(
"epoch_pruning_latency",
"Time spent in pruning one epoch",
DB_UPDATE_QUERY_LATENCY_SEC_BUCKETS.to_vec(),
registry
).unwrap(),
}
}
}
20 changes: 19 additions & 1 deletion crates/sui-indexer/src/models/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sui_types::digests::CheckpointDigest;
use sui_types::gas::GasCostSummary;

use crate::errors::IndexerError;
use crate::schema::checkpoints;
use crate::schema::{checkpoints, pruner_cp_watermark};
use crate::types::IndexedCheckpoint;

#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)]
Expand Down Expand Up @@ -205,3 +205,21 @@ impl TryFrom<StoredCheckpoint> for RpcCheckpoint {
})
}
}

#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)]
#[diesel(table_name = pruner_cp_watermark)]
pub struct StoredCpTx {
pub checkpoint_sequence_number: i64,
pub min_tx_sequence_number: i64,
pub max_tx_sequence_number: i64,
}

impl From<&IndexedCheckpoint> for StoredCpTx {
fn from(c: &IndexedCheckpoint) -> Self {
Self {
checkpoint_sequence_number: c.sequence_number as i64,
min_tx_sequence_number: c.min_tx_sequence_number as i64,
max_tx_sequence_number: c.max_tx_sequence_number as i64,
}
}
}
3 changes: 3 additions & 0 deletions crates/sui-indexer/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod inner {
pub use crate::schema::pg::objects_history;
pub use crate::schema::pg::objects_snapshot;
pub use crate::schema::pg::packages;
pub use crate::schema::pg::pruner_cp_watermark;
pub use crate::schema::pg::transactions;
pub use crate::schema::pg::tx_calls;
pub use crate::schema::pg::tx_changed_objects;
Expand All @@ -40,6 +41,7 @@ mod inner {
pub use crate::schema::mysql::objects_history;
pub use crate::schema::mysql::objects_snapshot;
pub use crate::schema::mysql::packages;
pub use crate::schema::mysql::pruner_cp_watermark;
pub use crate::schema::mysql::transactions;
pub use crate::schema::mysql::tx_calls;
pub use crate::schema::mysql::tx_changed_objects;
Expand All @@ -57,6 +59,7 @@ pub use inner::objects;
pub use inner::objects_history;
pub use inner::objects_snapshot;
pub use inner::packages;
pub use inner::pruner_cp_watermark;
pub use inner::transactions;
pub use inner::tx_calls;
pub use inner::tx_changed_objects;
Expand Down
9 changes: 9 additions & 0 deletions crates/sui-indexer/src/schema/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ diesel::table! {
}
}

diesel::table! {
pruner_cp_watermark (checkpoint_sequence_number) {
checkpoint_sequence_number -> Bigint,
min_tx_sequence_number -> Bigint,
max_tx_sequence_number -> Bigint,
}
}

diesel::table! {
transactions (tx_sequence_number, checkpoint_sequence_number) {
tx_sequence_number -> Bigint,
Expand Down Expand Up @@ -226,6 +234,7 @@ macro_rules! for_all_tables {
objects_history,
objects_snapshot,
packages,
pruner_cp_watermark,
transactions,
tx_calls,
tx_changed_objects,
Expand Down
10 changes: 10 additions & 0 deletions crates/sui-indexer/src/schema/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ diesel::table! {
}
}

diesel::table! {
pruner_cp_watermark (checkpoint_sequence_number) {
checkpoint_sequence_number -> Int8,
min_tx_sequence_number -> Int8,
max_tx_sequence_number -> Int8,
}
}

diesel::table! {
display (object_type) {
object_type -> Text,
Expand Down Expand Up @@ -277,6 +285,8 @@ macro_rules! for_all_tables {
($action:path) => {
$action!(
checkpoints,
pruner_cp_watermark,
display,
epochs,
events,
events_partition_0,
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ pub enum ObjectChangeToCommit {
pub trait IndexerStore: Any + Clone + Sync + Send + 'static {
async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError>;

async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError>;

async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError>;

async fn get_latest_object_snapshot_checkpoint_sequence_number(
&self,
) -> Result<Option<u64>, IndexerError>;
Expand Down Expand Up @@ -70,6 +74,8 @@ pub trait IndexerStore: Any + Clone + Sync + Send + 'static {

async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError>;

async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError>;

async fn get_network_total_transactions_by_end_of_epoch(
&self,
epoch: u64,
Expand Down
Loading

0 comments on commit e7d196e

Please sign in to comment.