Skip to content

Commit

Permalink
indexer: backfill snapshot table via 2 modes (MystenLabs#17111)
Browse files Browse the repository at this point in the history
## Description 

during the backfill, all tables except objects snspshot are committed
all together and day can finish from genesis to latest in 4-5 days after
recent optimization, however the objects_snapshot table took longer.

This PR is to backfill objects snapshot in 2 modes
- during backfill, just populate it with new object changes and stay
updated
- when close to backfill finish, we switch to the original intentionally
lagging update mode

## Test plan 

local run and benchmark to monitor the new perf and correctness

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
gegaowp authored Apr 11, 2024
1 parent 9991f82 commit 61eda1d
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 23 deletions.
4 changes: 2 additions & 2 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl ExecutorCluster {

let latest_cp = self
.indexer_store
.get_latest_tx_checkpoint_sequence_number()
.get_latest_checkpoint_sequence_number()
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -323,7 +323,7 @@ impl ExecutorCluster {

pub async fn force_objects_snapshot_catchup(&self, start_cp: u64, end_cp: u64) {
self.indexer_store
.persist_object_snapshot(start_cp, end_cp)
.update_objects_snapshot(start_cp, end_cp)
.await
.unwrap();

Expand Down
6 changes: 4 additions & 2 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use mysten_metrics::{get_metrics, spawn_monitored_task};
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};
use sui_package_resolver::{PackageStore, Resolver};
use sui_rest_api::CheckpointData;
use sui_rest_api::CheckpointTransaction;
use sui_rest_api::{CheckpointData, CheckpointTransaction, Client};
use sui_types::base_types::ObjectRef;
use sui_types::dynamic_field::DynamicFieldInfo;
use sui_types::dynamic_field::DynamicFieldName;
Expand Down Expand Up @@ -60,6 +59,7 @@ const CHECKPOINT_QUEUE_SIZE: usize = 100;

pub async fn new_handlers<S>(
state: S,
client: Client,
metrics: IndexerMetrics,
) -> Result<CheckpointHandler<S>, IndexerError>
where
Expand All @@ -79,10 +79,12 @@ where
);

let state_clone = state.clone();
let client_clone = client.clone();
let metrics_clone = metrics.clone();
let (tx, package_tx) = watch::channel(None);
spawn_monitored_task!(start_tx_checkpoint_commit_task(
state_clone,
client_clone,
metrics_clone,
indexed_checkpoint_receiver,
tx,
Expand Down
34 changes: 32 additions & 2 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

use std::collections::BTreeMap;

use tap::tap::TapFallible;
use tokio::sync::watch;
use tracing::instrument;

use tap::tap::TapFallible;
use tracing::{error, info};

use sui_rest_api::Client;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

use crate::metrics::IndexerMetrics;
Expand All @@ -18,9 +18,11 @@ use crate::types::IndexerResult;
use super::{CheckpointDataToCommit, EpochToCommit};

const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
const OBJECTS_SNAPSHOT_MAX_CHECKPOINT_LAG: u64 = 900;

pub async fn start_tx_checkpoint_commit_task<S>(
state: S,
client: Client,
metrics: IndexerMetrics,
tx_indexing_receiver: mysten_metrics::metered_channel::Receiver<CheckpointDataToCommit>,
commit_notifier: watch::Sender<Option<CheckpointSequenceNumber>>,
Expand All @@ -38,11 +40,28 @@ pub async fn start_tx_checkpoint_commit_task<S>(

let mut stream = mysten_metrics::metered_channel::ReceiverStream::new(tx_indexing_receiver)
.ready_chunks(checkpoint_commit_batch_size);
let mut object_snapshot_backfill_mode = true;

while let Some(indexed_checkpoint_batch) = stream.next().await {
if indexed_checkpoint_batch.is_empty() {
continue;
}

let mut latest_fn_cp_res = client.get_latest_checkpoint().await;
while latest_fn_cp_res.is_err() {
error!("Failed to get latest checkpoint from the network");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
latest_fn_cp_res = client.get_latest_checkpoint().await;
}
// unwrap is safe here because we checked that latest_fn_cp_res is Ok above
let latest_fn_cp = latest_fn_cp_res.unwrap().sequence_number;
// unwrap is safe b/c we checked for empty batch above
let latest_committed_cp = indexed_checkpoint_batch
.last()
.unwrap()
.checkpoint
.sequence_number;

// split the batch into smaller batches per epoch to handle partitioning
let mut indexed_checkpoint_batch_per_epoch = vec![];
for indexed_checkpoint in indexed_checkpoint_batch {
Expand All @@ -55,6 +74,7 @@ pub async fn start_tx_checkpoint_commit_task<S>(
epoch,
&metrics,
&commit_notifier,
object_snapshot_backfill_mode,
)
.await;
indexed_checkpoint_batch_per_epoch = vec![];
Expand All @@ -67,9 +87,15 @@ pub async fn start_tx_checkpoint_commit_task<S>(
None,
&metrics,
&commit_notifier,
object_snapshot_backfill_mode,
)
.await;
}
// this is a one-way flip in case indexer falls behind again, so that the objects snapshot
// table will not be populated by both committer and async snapshot processor at the same time.
if latest_committed_cp + OBJECTS_SNAPSHOT_MAX_CHECKPOINT_LAG > latest_fn_cp {
object_snapshot_backfill_mode = false;
}
}
}

Expand All @@ -84,6 +110,7 @@ async fn commit_checkpoints<S>(
epoch: Option<EpochToCommit>,
metrics: &IndexerMetrics,
commit_notifier: &watch::Sender<Option<CheckpointSequenceNumber>>,
object_snapshot_backfill_mode: bool,
) where
S: IndexerStore + Clone + Sync + Send + 'static,
{
Expand Down Expand Up @@ -140,6 +167,9 @@ async fn commit_checkpoints<S>(
state.persist_objects(object_changes_batch.clone()),
state.persist_object_history(object_history_changes_batch.clone()),
];
if object_snapshot_backfill_mode {
persist_tasks.push(state.backfill_objects_snapshot(object_changes_batch));
}
if let Some(epoch_data) = epoch.clone() {
persist_tasks.push(state.persist_epoch(epoch_data));
}
Expand Down
34 changes: 31 additions & 3 deletions crates/sui-indexer/src/handlers/objects_snapshot_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@

use tracing::info;

use sui_rest_api::Client;

use crate::types::IndexerResult;
use crate::{metrics::IndexerMetrics, store::IndexerStore};

const OBJECTS_SNAPSHOT_MAX_CHECKPOINT_LAG: usize = 900;
const OBJECTS_SNAPSHOT_MIN_CHECKPOINT_LAG: usize = 300;

pub struct ObjectsSnapshotProcessor<S> {
pub client: Client,
pub store: S,
metrics: IndexerMetrics,
pub config: SnapshotLagConfig,
Expand Down Expand Up @@ -63,11 +66,13 @@ where
S: IndexerStore + Clone + Sync + Send + 'static,
{
pub fn new_with_config(
client: Client,
store: S,
metrics: IndexerMetrics,
config: SnapshotLagConfig,
) -> ObjectsSnapshotProcessor<S> {
Self {
client,
store,
metrics,
config,
Expand Down Expand Up @@ -101,22 +106,45 @@ where
self.config.snapshot_max_lag as u64 - self.config.snapshot_min_lag as u64;
let mut latest_cp = self
.store
.get_latest_tx_checkpoint_sequence_number()
.get_latest_checkpoint_sequence_number()
.await?
.unwrap_or_default();
// when backfill_mode is true, objects_snapshot will be updated with the latest
// in the checkpoint handler instead, because the async update in this processor
// is slower than other tables during the backfilling process.
let mut backfill_mode = true;
let mut latest_fn_cp = self.client.get_latest_checkpoint().await?.sequence_number;

loop {
if backfill_mode && latest_fn_cp > start_cp + self.config.snapshot_max_lag as u64 {
// backfill mode is true, and the lag is too high, so we need to wait for the lag to be reduced
// before we can update the snapshot.
tokio::time::sleep(std::time::Duration::from_secs(self.config.sleep_duration))
.await;
latest_fn_cp = self.client.get_latest_checkpoint().await?.sequence_number;
start_cp = self
.store
.get_latest_object_snapshot_checkpoint_sequence_number()
.await?
.unwrap_or_default();
continue;
} else if backfill_mode {
// flip the backfill mode to false, so that later objects_snapshot will be updated
// via this processor instead of the checkpoint handler.
backfill_mode = false;
}

while latest_cp <= start_cp + self.config.snapshot_max_lag as u64 {
tokio::time::sleep(std::time::Duration::from_secs(self.config.sleep_duration))
.await;
latest_cp = self
.store
.get_latest_tx_checkpoint_sequence_number()
.get_latest_checkpoint_sequence_number()
.await?
.unwrap_or_default();
}
self.store
.persist_object_snapshot(start_cp, start_cp + snapshot_window)
.update_objects_snapshot(start_cp, start_cp + snapshot_window)
.await?;
start_cp += snapshot_window;
self.metrics
Expand Down
5 changes: 3 additions & 2 deletions crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Indexer {

// None will be returned when checkpoints table is empty.
let last_seq_from_db = store
.get_latest_tx_checkpoint_sequence_number()
.get_latest_checkpoint_sequence_number()
.await
.expect("Failed to get latest tx checkpoint sequence number from DB");
let download_queue_size = env::var("DOWNLOAD_QUEUE_SIZE")
Expand All @@ -73,13 +73,14 @@ impl Indexer {
spawn_monitored_task!(fetcher.run());

let objects_snapshot_processor = ObjectsSnapshotProcessor::new_with_config(
rest_client.clone(),
store.clone(),
metrics.clone(),
snapshot_config,
);
spawn_monitored_task!(objects_snapshot_processor.start());

let checkpoint_handler = new_handlers(store, metrics.clone()).await?;
let checkpoint_handler = new_handlers(store, rest_client.clone(), metrics.clone()).await?;
crate::framework::runner::run(
mysten_metrics::metered_channel::ReceiverStream::new(
downloaded_checkpoint_data_receiver,
Expand Down
16 changes: 16 additions & 0 deletions crates/sui-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,10 @@ pub struct IndexerMetrics {
pub checkpoint_db_commit_latency_transactions_chunks: Histogram,
pub checkpoint_db_commit_latency_transactions_chunks_transformation: Histogram,
pub checkpoint_db_commit_latency_objects: Histogram,
pub checkpoint_db_commit_latency_objects_snapshot: Histogram,
pub checkpoint_db_commit_latency_objects_history: Histogram,
pub checkpoint_db_commit_latency_objects_chunks: Histogram,
pub checkpoint_db_commit_latency_objects_snapshot_chunks: Histogram,
pub checkpoint_db_commit_latency_objects_history_chunks: Histogram,
pub checkpoint_db_commit_latency_events: Histogram,
pub checkpoint_db_commit_latency_events_chunks: Histogram,
Expand Down Expand Up @@ -403,6 +405,13 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
checkpoint_db_commit_latency_objects_snapshot: register_histogram_with_registry!(
"checkpoint_db_commit_latency_objects_snapshots",
"Time spent commiting objects snapshots",
DB_COMMIT_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_objects_history: register_histogram_with_registry!(
"checkpoint_db_commit_latency_objects_history",
"Time spent commiting objects history",
Expand All @@ -416,6 +425,13 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
checkpoint_db_commit_latency_objects_snapshot_chunks: register_histogram_with_registry!(
"checkpoint_db_commit_latency_objects_snapshot_chunks",
"Time spent commiting objects snapshot chunks",
DB_COMMIT_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_objects_history_chunks: register_histogram_with_registry!(
"checkpoint_db_commit_latency_objects_history_chunks",
"Time spent commiting objects history chunks",
Expand Down
65 changes: 64 additions & 1 deletion crates/sui-indexer/src/models/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use sui_types::object::Object;
use sui_types::object::ObjectRead;

use crate::errors::IndexerError;
use crate::schema::{objects, objects_history};
use crate::schema::{objects, objects_history, objects_snapshot};
use crate::types::{IndexedDeletedObject, IndexedObject, ObjectStatus};

#[derive(Queryable)]
Expand Down Expand Up @@ -63,6 +63,48 @@ pub struct StoredObject {
pub df_object_id: Option<Vec<u8>>,
}

#[derive(Queryable, Insertable, Debug, Identifiable, Clone, QueryableByName)]
#[diesel(table_name = objects_snapshot, primary_key(object_id))]
pub struct StoredObjectSnapshot {
pub object_id: Vec<u8>,
pub object_version: i64,
pub object_status: i16,
pub object_digest: Option<Vec<u8>>,
pub checkpoint_sequence_number: i64,
pub owner_type: Option<i16>,
pub owner_id: Option<Vec<u8>>,
pub object_type: Option<String>,
pub serialized_object: Option<Vec<u8>>,
pub coin_type: Option<String>,
pub coin_balance: Option<i64>,
pub df_kind: Option<i16>,
pub df_name: Option<Vec<u8>>,
pub df_object_type: Option<String>,
pub df_object_id: Option<Vec<u8>>,
}

impl From<StoredObject> for StoredObjectSnapshot {
fn from(o: StoredObject) -> Self {
Self {
object_id: o.object_id,
object_version: o.object_version,
object_status: ObjectStatus::Active as i16,
object_digest: Some(o.object_digest),
checkpoint_sequence_number: o.checkpoint_sequence_number,
owner_type: Some(o.owner_type),
owner_id: o.owner_id,
object_type: o.object_type,
serialized_object: Some(o.serialized_object),
coin_type: o.coin_type,
coin_balance: o.coin_balance,
df_kind: o.df_kind,
df_name: o.df_name,
df_object_type: o.df_object_type,
df_object_id: o.df_object_id,
}
}
}

#[derive(Queryable, Insertable, Debug, Identifiable, Clone, QueryableByName)]
#[diesel(table_name = objects_history, primary_key(object_id, object_version, checkpoint_sequence_number))]
pub struct StoredHistoryObject {
Expand Down Expand Up @@ -123,6 +165,27 @@ impl From<IndexedDeletedObject> for StoredDeletedObject {
}
}

#[derive(Queryable, Insertable, Debug, Identifiable, Clone, QueryableByName)]
#[diesel(table_name = objects_snapshot, primary_key(object_id))]

pub struct StoredDeletedObjectSnapshot {
pub object_id: Vec<u8>,
pub object_version: i64,
pub object_status: i16,
pub checkpoint_sequence_number: i64,
}

impl From<StoredDeletedObject> for StoredDeletedObjectSnapshot {
fn from(o: StoredDeletedObject) -> Self {
Self {
object_id: o.object_id.to_vec(),
object_version: o.object_version,
object_status: ObjectStatus::WrappedOrDeleted as i16,
checkpoint_sequence_number: o.checkpoint_sequence_number,
}
}
}

#[derive(Queryable, Insertable, Debug, Identifiable, Clone, QueryableByName)]
#[diesel(table_name = objects_history, primary_key(object_id, object_version, checkpoint_sequence_number))]
pub struct StoredDeletedHistoryObject {
Expand Down
Loading

0 comments on commit 61eda1d

Please sign in to comment.