From 97fd38206c8077378ce255d77791a389ecaf013f Mon Sep 17 00:00:00 2001 From: wlmyng <127570466+wlmyng@users.noreply.github.com> Date: Sat, 20 Jan 2024 13:26:48 -0800 Subject: [PATCH] [gql] objects_snapshot test for availableRange-aware object history lookup (#15677) ## Description Expose a way to provide config values to ObjectsSnapshotProcessor to enable e2e tests around objects_snapshot. 1. Adds a SnapshotLagConfig to ObjectsSnapshotProcessor struct 2. ReaderWriterConfig for start_test_indexer_v2, reorganizes some logic on instantiating a reader or writer indexer_v2 3. RunGraphqlCommand now waits for objects_snapshot catch up in addition to the existing checkpoint catch up. snapshot.move to test querying against objects_snapshot and objects_history tables ## Test Plan snapshot.move --- If your changes are not user-facing and do not break anything, you can skip the following section. Otherwise, please briefly describe what has changed under the Release Notes section. ### Type of Change (Check all that apply) - [ ] protocol change - [ ] user-visible impact - [ ] breaking change for a client SDKs - [ ] breaking change for FNs (FN binary must upgrade) - [ ] breaking change for validators or node operators (must upgrade binaries) - [ ] breaking change for on-chain data layout - [ ] necessitate either a data wipe or data migration ### Release notes --- crates/sui-cluster-test/src/cluster.rs | 6 +- .../tests/objects/snapshot.exp | 96 ++++++++++++ .../tests/objects/snapshot.move | 140 ++++++++++++++++++ crates/sui-graphql-rpc/src/server/builder.rs | 1 + .../sui-graphql-rpc/src/test_infra/cluster.rs | 51 ++++++- crates/sui-graphql-rpc/tests/e2e_tests.rs | 3 + .../tests/examples_validation_tests.rs | 2 + crates/sui-indexer/src/indexer_v2.rs | 22 ++- .../objects_snapshot_processor.rs | 73 +++++++-- crates/sui-indexer/src/test_utils.rs | 80 ++++++---- crates/sui-indexer/tests/ingestion_tests.rs | 4 +- .../sui-transactional-test-runner/src/args.rs | 4 + .../src/test_adapter.rs | 24 ++- 13 files changed, 452 insertions(+), 54 deletions(-) create mode 100644 crates/sui-graphql-e2e-tests/tests/objects/snapshot.exp create mode 100644 crates/sui-graphql-e2e-tests/tests/objects/snapshot.move diff --git a/crates/sui-cluster-test/src/cluster.rs b/crates/sui-cluster-test/src/cluster.rs index 27670cc1dee0f..be6889537aed0 100644 --- a/crates/sui-cluster-test/src/cluster.rs +++ b/crates/sui-cluster-test/src/cluster.rs @@ -9,7 +9,7 @@ use sui_config::Config; use sui_config::{PersistedConfig, SUI_KEYSTORE_FILENAME, SUI_NETWORK_CONFIG}; use sui_graphql_rpc::config::ConnectionConfig; use sui_graphql_rpc::test_infra::cluster::start_graphql_server; -use sui_indexer::test_utils::{start_test_indexer, start_test_indexer_v2}; +use sui_indexer::test_utils::{start_test_indexer, start_test_indexer_v2, ReaderWriterConfig}; use sui_indexer::IndexerConfig; use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore}; use sui_sdk::sui_client_config::{SuiClientConfig, SuiEnv}; @@ -228,8 +228,8 @@ impl Cluster for LocalNewCluster { start_test_indexer_v2( Some(pg_address.clone()), fullnode_url.clone(), - None, options.use_indexer_experimental_methods, + ReaderWriterConfig::writer_mode(None), ) .await; @@ -237,8 +237,8 @@ impl Cluster for LocalNewCluster { start_test_indexer_v2( Some(pg_address), fullnode_url.clone(), - Some(indexer_address.to_string()), options.use_indexer_experimental_methods, + ReaderWriterConfig::reader_mode(indexer_address.to_string()), ) .await; } else { diff --git a/crates/sui-graphql-e2e-tests/tests/objects/snapshot.exp b/crates/sui-graphql-e2e-tests/tests/objects/snapshot.exp new file mode 100644 index 0000000000000..a02eac298ce4d --- /dev/null +++ b/crates/sui-graphql-e2e-tests/tests/objects/snapshot.exp @@ -0,0 +1,96 @@ +processed 17 tasks + +init: +A: object(0,0) + +task 1 'publish'. lines 11-37: +created: object(1,0) +mutated: object(0,1) +gas summary: computation_cost: 1000000, storage_cost: 6171200, storage_rebate: 0, non_refundable_storage_fee: 0 + +task 2 'run'. lines 39-39: +created: object(2,0) +mutated: object(0,1) +gas summary: computation_cost: 1000000, storage_cost: 2302800, storage_rebate: 978120, non_refundable_storage_fee: 9880 + +task 3 'create-checkpoint'. lines 41-41: +Checkpoint created: 1 + +task 4 'run-graphql'. lines 43-56: +Response: { + "data": { + "object": { + "status": "LIVE", + "version": 3, + "asMoveObject": { + "contents": { + "json": { + "id": "0xcf27391f83c42a323595be23151f6ede95a083f451a388ff2e3e25fbbf31a7c3", + "value": "0" + } + } + } + } + } +} + +task 5 'run-graphql'. lines 59-73: +Response: { + "data": { + "object": { + "status": "HISTORICAL", + "version": 3, + "asMoveObject": { + "contents": { + "json": { + "id": "0xcf27391f83c42a323595be23151f6ede95a083f451a388ff2e3e25fbbf31a7c3", + "value": "0" + } + } + } + } + } +} + +task 6 'run'. lines 75-75: +created: object(6,0) +mutated: object(0,0) +wrapped: object(2,0) +gas summary: computation_cost: 1000000, storage_cost: 2553600, storage_rebate: 1301652, non_refundable_storage_fee: 13148 + +task 7 'create-checkpoint'. lines 77-77: +Checkpoint created: 2 + +task 9 'create-checkpoint'. lines 81-81: +Checkpoint created: 3 + +task 11 'create-checkpoint'. lines 85-85: +Checkpoint created: 4 + +task 13 'create-checkpoint'. lines 89-89: +Checkpoint created: 5 + +task 14 'run-graphql'. lines 91-105: +Response: { + "data": { + "object": null + } +} + +task 15 'run-graphql'. lines 108-123: +Response: { + "data": { + "object": { + "status": "WRAPPED_OR_DELETED", + "version": 4, + "asMoveObject": null + } + } +} + +task 16 'run-graphql'. lines 125-140: +Response: { + "data": { + "object": null + } +} diff --git a/crates/sui-graphql-e2e-tests/tests/objects/snapshot.move b/crates/sui-graphql-e2e-tests/tests/objects/snapshot.move new file mode 100644 index 0000000000000..7d58de978b53e --- /dev/null +++ b/crates/sui-graphql-e2e-tests/tests/objects/snapshot.move @@ -0,0 +1,140 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +// Objects can continue to be found on the live objects table until they are WrappedOrDeleted. From +// there, the object can be fetched on the objects_history table, until it gets snapshotted into +// objects_snapshot table. This test checks that we correctly fetch data from both the +// objects_snapshot and objects_history tables. + +//# init --addresses Test=0x0 --accounts A --simulator --object-snapshot-min-checkpoint-lag 0 --object-snapshot-max-checkpoint-lag 2 + +//# publish +module Test::M1 { + use sui::object::{Self, UID}; + use sui::tx_context::{Self, TxContext}; + use sui::transfer; + + struct Object has key, store { + id: UID, + value: u64, + } + + struct Wrapper has key { + id: UID, + o: Object + } + + public entry fun create(value: u64, recipient: address, ctx: &mut TxContext) { + transfer::public_transfer( + Object { id: object::new(ctx), value }, + recipient + ) + } + + public entry fun wrap(o: Object, ctx: &mut TxContext) { + transfer::transfer(Wrapper { id: object::new(ctx), o }, tx_context::sender(ctx)) + } +} + +//# run Test::M1::create --args 0 @A + +//# create-checkpoint 1 + +//# run-graphql +{ + object( + address: "@{obj_2_0}" + ) { + status + version + asMoveObject { + contents { + json + } + } + } +} + + +//# run-graphql +{ + object( + address: "@{obj_2_0}" + version: 3 + ) { + status + version + asMoveObject { + contents { + json + } + } + } +} + +//# run Test::M1::wrap --sender A --args object(2,0) + +//# create-checkpoint + +//# advance-clock --duration-ns 1 + +//# create-checkpoint + +//# advance-clock --duration-ns 1 + +//# create-checkpoint + +//# advance-clock --duration-ns 1 + +//# create-checkpoint + +//# run-graphql +# should not exist on live objects +{ + object( + address: "@{obj_2_0}" + ) { + status + version + asMoveObject { + contents { + json + } + } + } +} + + +//# run-graphql +# fetched from objects_snapshot +{ + object( + address: "@{obj_2_0}" + version: 4 + ) { + status + version + asMoveObject { + contents { + json + } + } + } +} + +//# run-graphql +# should not exist in either objects_snapshot or objects_history +{ + object( + address: "@{obj_2_0}" + version: 3 + ) { + status + version + asMoveObject { + contents { + json + } + } + } +} diff --git a/crates/sui-graphql-rpc/src/server/builder.rs b/crates/sui-graphql-rpc/src/server/builder.rs index f145e94f63a5d..c86e1d39f0c5d 100644 --- a/crates/sui-graphql-rpc/src/server/builder.rs +++ b/crates/sui-graphql-rpc/src/server/builder.rs @@ -326,6 +326,7 @@ pub mod tests { connection_config, DEFAULT_INTERNAL_DATA_SOURCE_PORT, Arc::new(sim), + None, ) .await, ) diff --git a/crates/sui-graphql-rpc/src/test_infra/cluster.rs b/crates/sui-graphql-rpc/src/test_infra/cluster.rs index 9004e8a834e6a..e131d2b669f30 100644 --- a/crates/sui-graphql-rpc/src/test_infra/cluster.rs +++ b/crates/sui-graphql-rpc/src/test_infra/cluster.rs @@ -11,9 +11,11 @@ use std::sync::Arc; use std::time::Duration; use sui_graphql_rpc_client::simple_client::SimpleClient; use sui_indexer::errors::IndexerError; +pub use sui_indexer::processors_v2::objects_snapshot_processor::SnapshotLagConfig; use sui_indexer::store::indexer_store_v2::IndexerStoreV2; use sui_indexer::store::PgIndexerStoreV2; use sui_indexer::test_utils::start_test_indexer_v2; +use sui_indexer::test_utils::ReaderWriterConfig; use sui_rest_api::node_state_getter::NodeStateGetter; use sui_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT}; use test_cluster::TestCluster; @@ -34,6 +36,7 @@ pub struct ExecutorCluster { pub indexer_join_handle: JoinHandle>, pub graphql_server_join_handle: JoinHandle<()>, pub graphql_client: SimpleClient, + pub snapshot_config: SnapshotLagConfig, } pub struct Cluster { @@ -44,6 +47,7 @@ pub struct Cluster { pub graphql_client: SimpleClient, } +/// Starts a validator, fullnode, indexer, and graphql service for testing. pub async fn start_cluster( graphql_connection_config: ConnectionConfig, internal_data_source_rpc_port: Option, @@ -53,8 +57,13 @@ pub async fn start_cluster( let val_fn = start_validator_with_fullnode(internal_data_source_rpc_port).await; // Starts indexer - let (pg_store, pg_handle) = - start_test_indexer_v2(Some(db_url), val_fn.rpc_url().to_string(), None, true).await; + let (pg_store, pg_handle) = start_test_indexer_v2( + Some(db_url), + val_fn.rpc_url().to_string(), + true, + ReaderWriterConfig::writer_mode(None), + ) + .await; // Starts graphql server let fn_rpc_url = val_fn.rpc_url().to_string(); @@ -79,10 +88,13 @@ pub async fn start_cluster( } } +/// Takes in a simulated instantiation of a Sui blockchain and builds a cluster around it. This +/// cluster is typically used in e2e tests to emulate and test behaviors. pub async fn serve_executor( graphql_connection_config: ConnectionConfig, internal_data_source_rpc_port: u16, executor: Arc, + snapshot_config: Option, ) -> ExecutorCluster { let db_url = graphql_connection_config.db_url.clone(); @@ -94,12 +106,11 @@ pub async fn serve_executor( sui_rest_api::start_service(executor_server_url, executor, Some("/rest".to_owned())).await; }); - // Starts indexer let (pg_store, pg_handle) = start_test_indexer_v2( Some(db_url), format!("http://{}", executor_server_url), - None, true, + ReaderWriterConfig::writer_mode(snapshot_config.clone()), ) .await; @@ -121,6 +132,7 @@ pub async fn serve_executor( indexer_join_handle: pg_handle, graphql_server_join_handle: graphql_server_handle, graphql_client: client, + snapshot_config: snapshot_config.unwrap_or_default(), } } @@ -197,4 +209,35 @@ impl ExecutorCluster { .await .expect("Timeout waiting for indexer to catchup to checkpoint"); } + + /// The ObjectsSnapshotProcessor is a long-running task that periodically takes a snapshot of + /// the objects table. This leads to flakiness in tests, so we wait until the objects_snapshot + /// has reached the expected state. + pub async fn wait_for_objects_snapshot_catchup(&self, base_timeout: Duration) { + let mut latest_snapshot_cp = 0; + + let latest_cp = self + .indexer_store + .get_latest_tx_checkpoint_sequence_number() + .await + .unwrap() + .unwrap(); + + tokio::time::timeout(base_timeout, async { + while latest_cp > latest_snapshot_cp + self.snapshot_config.snapshot_max_lag as u64 { + tokio::time::sleep(std::time::Duration::from_secs( + self.snapshot_config.sleep_duration, + )) + .await; + latest_snapshot_cp = self + .indexer_store + .get_latest_object_snapshot_checkpoint_sequence_number() + .await + .unwrap() + .unwrap_or_default(); + } + }) + .await + .expect("Timeout waiting for indexer to update objects snapshot"); + } } diff --git a/crates/sui-graphql-rpc/tests/e2e_tests.rs b/crates/sui-graphql-rpc/tests/e2e_tests.rs index d9b32513e6262..513872fd00e65 100644 --- a/crates/sui-graphql-rpc/tests/e2e_tests.rs +++ b/crates/sui-graphql-rpc/tests/e2e_tests.rs @@ -84,6 +84,7 @@ mod tests { connection_config, DEFAULT_INTERNAL_DATA_SOURCE_PORT, Arc::new(sim), + None, ) .await; @@ -115,6 +116,7 @@ mod tests { connection_config, DEFAULT_INTERNAL_DATA_SOURCE_PORT, Arc::new(sim), + None, ) .await; @@ -156,6 +158,7 @@ mod tests { connection_config, DEFAULT_INTERNAL_DATA_SOURCE_PORT, Arc::new(sim), + None, ) .await; diff --git a/crates/sui-graphql-rpc/tests/examples_validation_tests.rs b/crates/sui-graphql-rpc/tests/examples_validation_tests.rs index 32825355329cd..8ef26ec9ff819 100644 --- a/crates/sui-graphql-rpc/tests/examples_validation_tests.rs +++ b/crates/sui-graphql-rpc/tests/examples_validation_tests.rs @@ -113,6 +113,7 @@ mod tests { connection_config, DEFAULT_INTERNAL_DATA_SOURCE_PORT, Arc::new(sim), + None, ) .await; @@ -177,6 +178,7 @@ mod tests { connection_config, DEFAULT_INTERNAL_DATA_SOURCE_PORT, Arc::new(sim), + None, ) .await; diff --git a/crates/sui-indexer/src/indexer_v2.rs b/crates/sui-indexer/src/indexer_v2.rs index 602d97dd246c5..9ea75af63f003 100644 --- a/crates/sui-indexer/src/indexer_v2.rs +++ b/crates/sui-indexer/src/indexer_v2.rs @@ -21,7 +21,9 @@ use tracing::info; use crate::framework::fetcher::CheckpointFetcher; use crate::handlers::checkpoint_handler_v2::new_handlers; -use crate::processors_v2::objects_snapshot_processor::ObjectsSnapshotProcessor; +use crate::processors_v2::objects_snapshot_processor::{ + ObjectsSnapshotProcessor, SnapshotLagConfig, +}; use crate::processors_v2::processor_orchestrator_v2::ProcessorOrchestratorV2; use crate::store::{IndexerStoreV2, PgIndexerAnalyticalStore}; @@ -34,6 +36,16 @@ impl IndexerV2 { config: &IndexerConfig, store: S, metrics: IndexerMetrics, + ) -> Result<(), IndexerError> { + let snapshot_config = SnapshotLagConfig::default(); + IndexerV2::start_writer_with_config(config, store, metrics, snapshot_config).await + } + + pub async fn start_writer_with_config( + config: &IndexerConfig, + store: S, + metrics: IndexerMetrics, + snapshot_config: SnapshotLagConfig, ) -> Result<(), IndexerError> { info!( "Sui indexerV2 Writer (version {:?}) started...", @@ -63,8 +75,12 @@ impl IndexerV2 { ); spawn_monitored_task!(fetcher.run()); - let objects_snapshot_processor = - ObjectsSnapshotProcessor::new(store.clone(), metrics.clone()); + let objects_snapshot_processor = ObjectsSnapshotProcessor::new_with_config( + store.clone(), + metrics.clone(), + snapshot_config, + ); + spawn_monitored_task!(objects_snapshot_processor.start()); let checkpoint_handler = new_handlers(store, metrics, config).await?; diff --git a/crates/sui-indexer/src/processors_v2/objects_snapshot_processor.rs b/crates/sui-indexer/src/processors_v2/objects_snapshot_processor.rs index 35171b3ca40e2..d301942adf5f6 100644 --- a/crates/sui-indexer/src/processors_v2/objects_snapshot_processor.rs +++ b/crates/sui-indexer/src/processors_v2/objects_snapshot_processor.rs @@ -12,8 +12,49 @@ const OBJECTS_SNAPSHOT_MIN_CHECKPOINT_LAG: usize = 300; pub struct ObjectsSnapshotProcessor { pub store: S, metrics: IndexerMetrics, + pub config: SnapshotLagConfig, +} + +#[derive(Clone)] +pub struct SnapshotLagConfig { pub snapshot_min_lag: usize, pub snapshot_max_lag: usize, + pub sleep_duration: u64, +} + +impl SnapshotLagConfig { + pub fn new( + min_lag: Option, + max_lag: Option, + sleep_duration: Option, + ) -> Self { + let default_config = Self::default(); + Self { + snapshot_min_lag: min_lag.unwrap_or(default_config.snapshot_min_lag), + snapshot_max_lag: max_lag.unwrap_or(default_config.snapshot_max_lag), + sleep_duration: sleep_duration.unwrap_or(default_config.sleep_duration), + } + } +} + +impl Default for SnapshotLagConfig { + fn default() -> Self { + let snapshot_min_lag = std::env::var("OBJECTS_SNAPSHOT_MIN_CHECKPOINT_LAG") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(OBJECTS_SNAPSHOT_MIN_CHECKPOINT_LAG); + + let snapshot_max_lag = std::env::var("OBJECTS_SNAPSHOT_MAX_CHECKPOINT_LAG") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(OBJECTS_SNAPSHOT_MAX_CHECKPOINT_LAG); + + SnapshotLagConfig { + snapshot_min_lag, + snapshot_max_lag, + sleep_duration: 5, + } + } } impl ObjectsSnapshotProcessor @@ -21,23 +62,22 @@ where S: IndexerStoreV2 + Clone + Sync + Send + 'static, { pub fn new(store: S, metrics: IndexerMetrics) -> ObjectsSnapshotProcessor { - let snapshot_min_lag = std::env::var("OBJECTS_SNAPSHOT_MIN_CHECKPOINT_LAG") - .map(|s| { - s.parse::() - .unwrap_or(OBJECTS_SNAPSHOT_MIN_CHECKPOINT_LAG) - }) - .unwrap_or(0); - let snapshot_max_lag = std::env::var("OBJECTS_SNAPSHOT_MAX_CHECKPOINT_LAG") - .map(|s| { - s.parse::() - .unwrap_or(OBJECTS_SNAPSHOT_MAX_CHECKPOINT_LAG) - }) - .unwrap_or(0); Self { store, metrics, - snapshot_min_lag, - snapshot_max_lag, + config: SnapshotLagConfig::default(), + } + } + + pub fn new_with_config( + store: S, + metrics: IndexerMetrics, + config: SnapshotLagConfig, + ) -> ObjectsSnapshotProcessor { + Self { + store, + metrics, + config, } } @@ -64,7 +104,8 @@ where latest_snapshot_cp + 1 }; // with MAX and MIN, the CSR range will vary from MIN cps to MAX cps - let snapshot_window = self.snapshot_max_lag as u64 - self.snapshot_min_lag as u64; + let snapshot_window = + self.config.snapshot_max_lag as u64 - self.config.snapshot_min_lag as u64; let mut latest_cp = self .store .get_latest_tx_checkpoint_sequence_number() @@ -72,7 +113,7 @@ where .unwrap_or_default(); loop { - while latest_cp <= start_cp + self.snapshot_max_lag as u64 { + while latest_cp <= start_cp + self.config.snapshot_max_lag as u64 { tokio::time::sleep(std::time::Duration::from_secs(5)).await; latest_cp = self .store diff --git a/crates/sui-indexer/src/test_utils.rs b/crates/sui-indexer/src/test_utils.rs index 4246907441dc9..47f426a54d3b6 100644 --- a/crates/sui-indexer/src/test_utils.rs +++ b/crates/sui-indexer/src/test_utils.rs @@ -13,16 +13,36 @@ use tracing::info; use crate::errors::IndexerError; use crate::indexer_v2::IndexerV2; +use crate::processors_v2::objects_snapshot_processor::SnapshotLagConfig; use crate::store::{PgIndexerStore, PgIndexerStoreV2}; use crate::utils::reset_database; use crate::{new_pg_connection_pool, Indexer, IndexerConfig}; use crate::{new_pg_connection_pool_impl, IndexerMetrics}; +pub enum ReaderWriterConfig { + Reader { reader_mode_rpc_url: String }, + Writer { snapshot_config: SnapshotLagConfig }, +} + +impl ReaderWriterConfig { + pub fn reader_mode(reader_mode_rpc_url: String) -> Self { + Self::Reader { + reader_mode_rpc_url, + } + } + + pub fn writer_mode(snapshot_config: Option) -> Self { + Self::Writer { + snapshot_config: snapshot_config.unwrap_or_default(), + } + } +} + pub async fn start_test_indexer_v2( db_url: Option, rpc_url: String, - reader_mode_rpc_url: Option, use_indexer_experimental_methods: bool, + reader_writer_config: ReaderWriterConfig, ) -> (PgIndexerStoreV2, JoinHandle>) { // Reduce the connection pool size to 10 for testing // to prevent maxing out @@ -54,36 +74,46 @@ pub async fn start_test_indexer_v2( ..Default::default() }; - if let Some(reader_mode_rpc_url) = &reader_mode_rpc_url { - let reader_mode_rpc_url = reader_mode_rpc_url - .parse::() - .expect("Unable to parse fullnode address"); - config.fullnode_sync_worker = false; - config.rpc_server_worker = true; - config.rpc_server_url = reader_mode_rpc_url.ip().to_string(); - config.rpc_server_port = reader_mode_rpc_url.port(); - } - - let parsed_url = config.get_db_url().unwrap(); - let blocking_pool = new_pg_connection_pool_impl(&parsed_url, Some(5)).unwrap(); - if config.reset_db && reader_mode_rpc_url.is_none() { - reset_database(&mut blocking_pool.get().unwrap(), true, config.use_v2).unwrap(); - } - let registry = prometheus::Registry::default(); init_metrics(®istry); let indexer_metrics = IndexerMetrics::new(®istry); - let store = PgIndexerStoreV2::new(blocking_pool, indexer_metrics.clone()); - let store_clone = store.clone(); - let handle = if reader_mode_rpc_url.is_some() { - tokio::spawn(async move { IndexerV2::start_reader(&config, ®istry, db_url).await }) - } else { - tokio::spawn( - async move { IndexerV2::start_writer(&config, store_clone, indexer_metrics).await }, - ) + let parsed_url = config.get_db_url().unwrap(); + let blocking_pool = new_pg_connection_pool_impl(&parsed_url, Some(5)).unwrap(); + let store = PgIndexerStoreV2::new(blocking_pool.clone(), indexer_metrics.clone()); + + let handle = match reader_writer_config { + ReaderWriterConfig::Reader { + reader_mode_rpc_url, + } => { + let reader_mode_rpc_url = reader_mode_rpc_url + .parse::() + .expect("Unable to parse fullnode address"); + config.fullnode_sync_worker = false; + config.rpc_server_worker = true; + config.rpc_server_url = reader_mode_rpc_url.ip().to_string(); + config.rpc_server_port = reader_mode_rpc_url.port(); + + tokio::spawn(async move { IndexerV2::start_reader(&config, ®istry, db_url).await }) + } + ReaderWriterConfig::Writer { snapshot_config } => { + if config.reset_db { + reset_database(&mut blocking_pool.get().unwrap(), true, config.use_v2).unwrap(); + } + let store_clone = store.clone(); + + tokio::spawn(async move { + IndexerV2::start_writer_with_config( + &config, + store_clone, + indexer_metrics, + snapshot_config, + ) + .await + }) + } }; (store, handle) diff --git a/crates/sui-indexer/tests/ingestion_tests.rs b/crates/sui-indexer/tests/ingestion_tests.rs index f276a7e73ee99..c62eae68b021e 100644 --- a/crates/sui-indexer/tests/ingestion_tests.rs +++ b/crates/sui-indexer/tests/ingestion_tests.rs @@ -15,7 +15,7 @@ mod ingestion_tests { use sui_indexer::models_v2::transactions::StoredTransaction; use sui_indexer::schema_v2::transactions; use sui_indexer::store::{indexer_store_v2::IndexerStoreV2, PgIndexerStoreV2}; - use sui_indexer::test_utils::start_test_indexer_v2; + use sui_indexer::test_utils::{start_test_indexer_v2, ReaderWriterConfig}; use sui_types::base_types::SuiAddress; use sui_types::effects::TransactionEffectsAPI; use tokio::task::JoinHandle; @@ -53,8 +53,8 @@ mod ingestion_tests { let (pg_store, pg_handle) = start_test_indexer_v2( Some(DEFAULT_DB_URL.to_owned()), format!("http://{}", server_url), - None, true, + ReaderWriterConfig::writer_mode(None), ) .await; (server_handle, pg_store, pg_handle) diff --git a/crates/sui-transactional-test-runner/src/args.rs b/crates/sui-transactional-test-runner/src/args.rs index f2071cd673ffa..460d5c869816f 100644 --- a/crates/sui-transactional-test-runner/src/args.rs +++ b/crates/sui-transactional-test-runner/src/args.rs @@ -60,6 +60,10 @@ pub struct SuiInitArgs { pub reference_gas_price: Option, #[clap(long = "default-gas-price")] pub default_gas_price: Option, + #[clap(long = "object-snapshot-min-checkpoint-lag")] + pub object_snapshot_min_checkpoint_lag: Option, + #[clap(long = "object-snapshot-max-checkpoint-lag")] + pub object_snapshot_max_checkpoint_lag: Option, } #[derive(Debug, clap::Parser)] diff --git a/crates/sui-transactional-test-runner/src/test_adapter.rs b/crates/sui-transactional-test-runner/src/test_adapter.rs index 73a7417e6253c..4c16170c85cc3 100644 --- a/crates/sui-transactional-test-runner/src/test_adapter.rs +++ b/crates/sui-transactional-test-runner/src/test_adapter.rs @@ -48,9 +48,9 @@ use sui_core::authority::test_authority_builder::TestAuthorityBuilder; use sui_core::authority::AuthorityState; use sui_framework::DEFAULT_FRAMEWORK_PATH; use sui_graphql_rpc::config::ConnectionConfig; -use sui_graphql_rpc::test_infra::cluster::serve_executor; use sui_graphql_rpc::test_infra::cluster::ExecutorCluster; use sui_graphql_rpc::test_infra::cluster::DEFAULT_INTERNAL_DATA_SOURCE_PORT; +use sui_graphql_rpc::test_infra::cluster::{serve_executor, SnapshotLagConfig}; use sui_json_rpc_api::QUERY_MAX_RESULT_LIMIT; use sui_json_rpc_types::{DevInspectResults, SuiExecutionStatus, SuiTransactionBlockEffectsAPI}; use sui_protocol_config::{Chain, ProtocolConfig}; @@ -205,6 +205,8 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter<'a> { custom_validator_account, reference_gas_price, default_gas_price, + object_snapshot_min_checkpoint_lag, + object_snapshot_max_checkpoint_lag, ) = match task_opt.map(|t| t.command) { Some(( InitCommand { named_addresses }, @@ -217,6 +219,8 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter<'a> { custom_validator_account, reference_gas_price, default_gas_price, + object_snapshot_min_checkpoint_lag, + object_snapshot_max_checkpoint_lag, }, )) => { let map = verify_and_create_named_address_mapping(named_addresses).unwrap(); @@ -244,6 +248,7 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter<'a> { if reference_gas_price.is_some() && !simulator { panic!("Can only set reference gas price in simulator mode"); } + ( map, accounts, @@ -252,6 +257,8 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter<'a> { custom_validator_account, reference_gas_price, default_gas_price, + object_snapshot_min_checkpoint_lag, + object_snapshot_max_checkpoint_lag, ) } None => { @@ -264,6 +271,8 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter<'a> { false, None, None, + None, + None, ) } }; @@ -286,6 +295,8 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter<'a> { &protocol_config, custom_validator_account, reference_gas_price, + object_snapshot_min_checkpoint_lag, + object_snapshot_max_checkpoint_lag, ) .await } else { @@ -542,6 +553,10 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter<'a> { .wait_for_checkpoint_catchup(highest_checkpoint, Duration::from_secs(30)) .await; + cluster + .wait_for_objects_snapshot_catchup(Duration::from_secs(30)) + .await; + let interpolated = self.interpolate_query(&contents, &cursors)?; let resp = cluster .graphql_client @@ -1837,6 +1852,8 @@ async fn init_sim_executor( protocol_config: &ProtocolConfig, custom_validator_account: bool, reference_gas_price: Option, + object_snapshot_min_checkpoint_lag: Option, + object_snapshot_max_checkpoint_lag: Option, ) -> ( Box, AccountSetup, @@ -1906,6 +1923,11 @@ async fn init_sim_executor( ConnectionConfig::ci_integration_test_cfg(), DEFAULT_INTERNAL_DATA_SOURCE_PORT, Arc::new(read_replica), + Some(SnapshotLagConfig::new( + object_snapshot_min_checkpoint_lag, + object_snapshot_max_checkpoint_lag, + Some(1), + )), ) .await;