From 2c6db08fe62fc1798d076413ac0cebc9e1052111 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Wed, 11 Sep 2024 09:14:40 -0700 Subject: [PATCH] [Indexer][CherryPick] Add full_objects_history table (#19227) (#19317) ## Description Add full_objects_history schema. ## Test plan CI --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: ## Description Describe the changes or additions included in this PR. ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../down.sql | 1 + .../up.sql | 10 ++ crates/sui-indexer/src/handlers/committer.rs | 5 + crates/sui-indexer/src/metrics.rs | 69 ++++++----- crates/sui-indexer/src/models/objects.rs | 31 ++++- crates/sui-indexer/src/schema/mod.rs | 1 + crates/sui-indexer/src/schema/pg.rs | 8 ++ crates/sui-indexer/src/store/indexer_store.rs | 5 + .../sui-indexer/src/store/pg_indexer_store.rs | 110 +++++++++++++++++- 9 files changed, 208 insertions(+), 32 deletions(-) create mode 100644 crates/sui-indexer/migrations/pg/2024-09-05-164455_full_objects_history/down.sql create mode 100644 crates/sui-indexer/migrations/pg/2024-09-05-164455_full_objects_history/up.sql diff --git a/crates/sui-indexer/migrations/pg/2024-09-05-164455_full_objects_history/down.sql b/crates/sui-indexer/migrations/pg/2024-09-05-164455_full_objects_history/down.sql new file mode 100644 index 0000000000000..619fc41782e68 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-09-05-164455_full_objects_history/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS full_objects_history; diff --git a/crates/sui-indexer/migrations/pg/2024-09-05-164455_full_objects_history/up.sql b/crates/sui-indexer/migrations/pg/2024-09-05-164455_full_objects_history/up.sql new file mode 100644 index 0000000000000..1504a21e51658 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-09-05-164455_full_objects_history/up.sql @@ -0,0 +1,10 @@ +-- This table will store every history version of each object, and never get pruned. +-- Since it can grow indefinitely, we keep minimum amount of information in this table for the purpose +-- of point lookups. +CREATE TABLE full_objects_history +( + object_id bytea NOT NULL, + object_version bigint NOT NULL, + serialized_object bytea, + PRIMARY KEY (object_id, object_version) +); diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index f4e5504893f5c..c51a59cc910ec 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -143,8 +143,13 @@ async fn commit_checkpoints( state.persist_event_indices(event_indices_batch), state.persist_displays(display_updates_batch), state.persist_packages(packages_batch), + // TODO: There are a few ways we could make the following more memory efficient. + // 1. persist_objects and persist_object_history both call another function to make the final + // committed object list. We could call it early and share the result. + // 2. We could avoid clone by using Arc. state.persist_objects(object_changes_batch.clone()), state.persist_object_history(object_history_changes_batch.clone()), + state.persist_full_objects_history(object_history_changes_batch.clone()), ]; if let Some(epoch_data) = epoch.clone() { persist_tasks.push(state.persist_epoch(epoch_data)); diff --git a/crates/sui-indexer/src/metrics.rs b/crates/sui-indexer/src/metrics.rs index 7f4f229ddcda9..35c07fbfc2abd 100644 --- a/crates/sui-indexer/src/metrics.rs +++ b/crates/sui-indexer/src/metrics.rs @@ -48,7 +48,7 @@ const DATA_INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[ 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, ]; /// NOTE: for objects_snapshot update and advance_epoch, which are expected to be within [0.1, 100] seconds, -/// and can go up to high hundres of seconds when things go wrong. +/// and can go up to high hundreds of seconds when things go wrong. const DB_UPDATE_QUERY_LATENCY_SEC_BUCKETS: &[f64] = &[ 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, @@ -111,9 +111,11 @@ pub struct IndexerMetrics { pub checkpoint_db_commit_latency_objects: Histogram, pub checkpoint_db_commit_latency_objects_snapshot: Histogram, pub checkpoint_db_commit_latency_objects_history: Histogram, + pub checkpoint_db_commit_latency_full_objects_history: Histogram, pub checkpoint_db_commit_latency_objects_chunks: Histogram, pub checkpoint_db_commit_latency_objects_snapshot_chunks: Histogram, pub checkpoint_db_commit_latency_objects_history_chunks: Histogram, + pub checkpoint_db_commit_latency_full_objects_history_chunks: Histogram, pub checkpoint_db_commit_latency_events: Histogram, pub checkpoint_db_commit_latency_events_chunks: Histogram, pub checkpoint_db_commit_latency_event_indices: Histogram, @@ -199,7 +201,7 @@ impl IndexerMetrics { ) .unwrap(), total_transaction_chunk_committed: register_int_counter_with_registry!( - "total_transaction_chunk_commited", + "total_transaction_chunk_committed", "Total number of transaction chunk committed", registry, ) @@ -291,7 +293,7 @@ impl IndexerMetrics { ).unwrap(), fullnode_checkpoint_data_download_latency: register_histogram_with_registry!( "fullnode_checkpoint_data_download_latency", - "Time spent in downloading checkpoint and transation for a new checkpoint from the Full Node", + "Time spent in downloading checkpoint and transaction for a new checkpoint from the Full Node", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) @@ -384,7 +386,7 @@ impl IndexerMetrics { .unwrap(), checkpoint_db_commit_latency: register_histogram_with_registry!( "checkpoint_db_commit_latency", - "Time spent commiting a checkpoint to the db", + "Time spent committing a checkpoint to the db", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) @@ -392,21 +394,21 @@ impl IndexerMetrics { checkpoint_db_commit_latency_step_1: register_histogram_with_registry!( "checkpoint_db_commit_latency_step_1", - "Time spent commiting a checkpoint to the db, step 1", + "Time spent committing a checkpoint to the db, step 1", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_transactions: register_histogram_with_registry!( "checkpoint_db_commit_latency_transactions", - "Time spent commiting transactions", + "Time spent committing transactions", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_transactions_chunks: register_histogram_with_registry!( "checkpoint_db_commit_latency_transactions_chunks", - "Time spent commiting transactions chunks", + "Time spent committing transactions chunks", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) @@ -420,103 +422,116 @@ impl IndexerMetrics { .unwrap(), checkpoint_db_commit_latency_objects: register_histogram_with_registry!( "checkpoint_db_commit_latency_objects", - "Time spent commiting objects", + "Time spent committing objects", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_objects_snapshot: register_histogram_with_registry!( "checkpoint_db_commit_latency_objects_snapshot", - "Time spent commiting objects snapshots", + "Time spent committing objects snapshots", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_objects_history: register_histogram_with_registry!( "checkpoint_db_commit_latency_objects_history", - "Time spent commiting objects history", + "Time spent committing objects history", + DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), + registry, + ).unwrap(), + checkpoint_db_commit_latency_full_objects_history: register_histogram_with_registry!( + "checkpoint_db_commit_latency_full_objects_history", + "Time spent committing full objects history", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ).unwrap(), checkpoint_db_commit_latency_objects_chunks: register_histogram_with_registry!( "checkpoint_db_commit_latency_objects_chunks", - "Time spent commiting objects chunks", + "Time spent committing objects chunks", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_objects_snapshot_chunks: register_histogram_with_registry!( "checkpoint_db_commit_latency_objects_snapshot_chunks", - "Time spent commiting objects snapshot chunks", + "Time spent committing objects snapshot chunks", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_objects_history_chunks: register_histogram_with_registry!( "checkpoint_db_commit_latency_objects_history_chunks", - "Time spent commiting objects history chunks", + "Time spent committing objects history chunks", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ).unwrap(), + checkpoint_db_commit_latency_full_objects_history_chunks: register_histogram_with_registry!( + "checkpoint_db_commit_latency_full_objects_history_chunks", + "Time spent committing full objects history chunks", + DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), checkpoint_db_commit_latency_events: register_histogram_with_registry!( "checkpoint_db_commit_latency_events", - "Time spent commiting events", + "Time spent committing events", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_events_chunks: register_histogram_with_registry!( "checkpoint_db_commit_latency_events_chunks", - "Time spent commiting events chunks", + "Time spent committing events chunks", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_event_indices: register_histogram_with_registry!( "checkpoint_db_commit_latency_event_indices", - "Time spent commiting event indices", + "Time spent committing event indices", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_event_indices_chunks: register_histogram_with_registry!( "checkpoint_db_commit_latency_event_indices_chunks", - "Time spent commiting event indices chunks", + "Time spent committing event indices chunks", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_packages: register_histogram_with_registry!( "checkpoint_db_commit_latency_packages", - "Time spent commiting packages", + "Time spent committing packages", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_tx_indices: register_histogram_with_registry!( "checkpoint_db_commit_latency_tx_indices", - "Time spent commiting tx indices", + "Time spent committing tx indices", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_tx_indices_chunks: register_histogram_with_registry!( "checkpoint_db_commit_latency_tx_indices_chunks", - "Time spent commiting tx_indices chunks", + "Time spent committing tx_indices chunks", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_checkpoints: register_histogram_with_registry!( "checkpoint_db_commit_latency_checkpoints", - "Time spent commiting checkpoints", + "Time spent committing checkpoints", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), checkpoint_db_commit_latency_epoch: register_histogram_with_registry!( "checkpoint_db_commit_latency_epochs", - "Time spent commiting epochs", + "Time spent committing epochs", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) @@ -529,35 +544,35 @@ impl IndexerMetrics { ).unwrap(), thousand_transaction_avg_db_commit_latency: register_histogram_with_registry!( "transaction_db_commit_latency", - "Average time spent commiting 1000 transactions to the db", + "Average time spent committing 1000 transactions to the db", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), object_db_commit_latency: register_histogram_with_registry!( "object_db_commit_latency", - "Time spent commiting a object to the db", + "Time spent committing a object to the db", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), object_mutation_db_commit_latency: register_histogram_with_registry!( "object_mutation_db_commit_latency", - "Time spent commiting a object mutation to the db", + "Time spent committing a object mutation to the db", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), object_deletion_db_commit_latency: register_histogram_with_registry!( "object_deletion_db_commit_latency", - "Time spent commiting a object deletion to the db", + "Time spent committing a object deletion to the db", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) .unwrap(), epoch_db_commit_latency: register_histogram_with_registry!( "epoch_db_commit_latency", - "Time spent commiting a epoch to the db", + "Time spent committing a epoch to the db", DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), registry, ) diff --git a/crates/sui-indexer/src/models/objects.rs b/crates/sui-indexer/src/models/objects.rs index 03f9cc3c81299..2e71546ceec7d 100644 --- a/crates/sui-indexer/src/models/objects.rs +++ b/crates/sui-indexer/src/models/objects.rs @@ -18,7 +18,7 @@ use sui_types::object::Object; use sui_types::object::ObjectRead; use crate::errors::IndexerError; -use crate::schema::{objects, objects_history, objects_snapshot}; +use crate::schema::{full_objects_history, objects, objects_history, objects_snapshot}; use crate::types::{owner_to_owner_info, IndexedDeletedObject, IndexedObject, ObjectStatus}; #[derive(Queryable)] @@ -544,6 +544,35 @@ impl TryFrom for Balance { } } +#[derive(Queryable, Insertable, Debug, Identifiable, Clone, QueryableByName)] +#[diesel(table_name = full_objects_history, primary_key(object_id, object_version))] +pub struct StoredFullHistoryObject { + pub object_id: Vec, + pub object_version: i64, + pub serialized_object: Option>, +} + +impl From for StoredFullHistoryObject { + fn from(o: IndexedObject) -> Self { + let object = o.object; + Self { + object_id: object.id().to_vec(), + object_version: object.version().value() as i64, + serialized_object: Some(bcs::to_bytes(&object).unwrap()), + } + } +} + +impl From for StoredFullHistoryObject { + fn from(o: IndexedDeletedObject) -> Self { + Self { + object_id: o.object_id.to_vec(), + object_version: o.object_version as i64, + serialized_object: None, + } + } +} + #[cfg(test)] mod tests { use move_core_types::{account_address::AccountAddress, language_storage::StructTag}; diff --git a/crates/sui-indexer/src/schema/mod.rs b/crates/sui-indexer/src/schema/mod.rs index 0ede77098b9ae..b65995cbcc233 100644 --- a/crates/sui-indexer/src/schema/mod.rs +++ b/crates/sui-indexer/src/schema/mod.rs @@ -18,6 +18,7 @@ pub use pg::event_struct_name; pub use pg::event_struct_package; pub use pg::events; pub use pg::feature_flags; +pub use pg::full_objects_history; pub use pg::objects; pub use pg::objects_history; pub use pg::objects_snapshot; diff --git a/crates/sui-indexer/src/schema/pg.rs b/crates/sui-indexer/src/schema/pg.rs index e8f975e581848..540fb9c14f876 100644 --- a/crates/sui-indexer/src/schema/pg.rs +++ b/crates/sui-indexer/src/schema/pg.rs @@ -168,6 +168,14 @@ diesel::table! { } } +diesel::table! { + full_objects_history (object_id, object_version) { + object_id -> Bytea, + object_version -> Int8, + serialized_object -> Nullable, + } +} + diesel::table! { objects (object_id) { object_id -> Bytea, diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index 743c2471f58ea..1e994c03d4d19 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -48,6 +48,11 @@ pub trait IndexerStore: Clone + Sync + Send + 'static { object_changes: Vec, ) -> Result<(), IndexerError>; + async fn persist_full_objects_history( + &self, + object_changes: Vec, + ) -> Result<(), IndexerError>; + async fn persist_objects_snapshot( &self, object_changes: Vec, diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 4b4a21cb44715..bbcdf5f5788f0 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -34,6 +34,7 @@ use crate::models::epoch::StoredEpochInfo; use crate::models::epoch::{StoredFeatureFlag, StoredProtocolConfig}; use crate::models::events::StoredEvent; use crate::models::obj_indices::StoredObjectVersion; +use crate::models::objects::StoredFullHistoryObject; use crate::models::objects::{ StoredDeletedHistoryObject, StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot, @@ -43,10 +44,10 @@ use crate::models::transactions::StoredTransaction; use crate::schema::{ chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package, event_senders, event_struct_instantiation, event_struct_module, event_struct_name, - event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot, - objects_version, packages, protocol_configs, pruner_cp_watermark, transactions, tx_calls_fun, - tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, tx_kinds, - tx_recipients, tx_senders, + event_struct_package, events, feature_flags, full_objects_history, objects, objects_history, + objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark, + transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, + tx_input_objects, tx_kinds, tx_recipients, tx_senders, }; use crate::types::EventIndex; use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex}; @@ -587,6 +588,39 @@ impl PgIndexerStore { }) } + fn persist_full_objects_history_chunk( + &self, + objects: Vec, + ) -> Result<(), IndexerError> { + let guard = self + .metrics + .checkpoint_db_commit_latency_full_objects_history_chunks + .start_timer(); + + transactional_blocking_with_retry!( + &self.blocking_cp, + |conn| { + for objects_chunk in objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + insert_or_ignore_into!(full_objects_history::table, objects_chunk, conn); + } + + Ok::<(), IndexerError>(()) + }, + PG_DB_COMMIT_SLEEP_DURATION + ) + .tap_ok(|_| { + let elapsed = guard.stop_and_record(); + info!( + elapsed, + "Persisted {} chunked full objects history", + objects.len(), + ); + }) + .tap_err(|e| { + tracing::error!("Failed to persist full object history with error: {}", e); + }) + } + fn persist_checkpoints(&self, checkpoints: Vec) -> Result<(), IndexerError> { let Some(first_checkpoint) = checkpoints.first() else { return Ok(()); @@ -1799,6 +1833,74 @@ impl IndexerStore for PgIndexerStore { Ok(()) } + // TODO: There are quite some shared boiler-plate code in all functions. + // We should clean them up eventually. + async fn persist_full_objects_history( + &self, + object_changes: Vec, + ) -> Result<(), IndexerError> { + let skip_history = std::env::var("SKIP_OBJECT_HISTORY") + .map(|val| val.eq_ignore_ascii_case("true")) + .unwrap_or(false); + if skip_history { + info!("skipping object history"); + return Ok(()); + } + + if object_changes.is_empty() { + return Ok(()); + } + let objects: Vec = object_changes + .into_iter() + .flat_map(|c| { + let TransactionObjectChangesToCommit { + changed_objects, + deleted_objects, + } = c; + changed_objects + .into_iter() + .map(|o| o.into()) + .chain(deleted_objects.into_iter().map(|o| o.into())) + }) + .collect(); + let guard = self + .metrics + .checkpoint_db_commit_latency_full_objects_history + .start_timer(); + + let len = objects.len(); + let chunks = chunk!(objects, self.config.parallel_objects_chunk_size); + let futures = chunks + .into_iter() + .map(|c| { + self.spawn_blocking_task(move |this| this.persist_full_objects_history_chunk(c)) + }) + .collect::>(); + + futures::future::join_all(futures) + .await + .into_iter() + .collect::, _>>() + .map_err(|e| { + tracing::error!( + "Failed to join persist_full_objects_history_chunk futures: {}", + e + ); + IndexerError::from(e) + })? + .into_iter() + .collect::, _>>() + .map_err(|e| { + IndexerError::PostgresWriteError(format!( + "Failed to persist all full objects history chunks: {:?}", + e + )) + })?; + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted {} full objects history", len); + Ok(()) + } + async fn persist_checkpoints( &self, checkpoints: Vec,