Skip to content

Commit

Permalink
[mvr] use mvr-mode on sui-mvr-graphql-rpc for in-crate tests (#20318)
Browse files Browse the repository at this point in the history
## Description 

Needed to adjust the `ConnectionAsObjectStore`, because `objects` will
not be written to in `mvr-mode`. Querying `objects` and querying
`objects_history` + `objects_snapshot` should both yield the live object
set.



## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
wlmyng authored Nov 22, 2024
1 parent fa52d46 commit c5a595c
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 110 deletions.
64 changes: 58 additions & 6 deletions crates/sui-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ use sui_types::{coin::CoinMetadata, event::EventID};

use crate::database::ConnectionPool;
use crate::db::ConnectionPoolConfig;
use crate::models::objects::StoredHistoryObject;
use crate::models::objects::StoredObjectSnapshot;
use crate::models::transactions::{stored_events_to_events, StoredTransactionEvents};
use crate::schema::objects_history;
use crate::schema::objects_snapshot;
use crate::schema::pruner_cp_watermark;
use crate::schema::tx_digests;
use crate::{
Expand Down Expand Up @@ -1453,7 +1457,7 @@ impl ConnectionAsObjectStore {
Ok(Self { inner: connection })
}

fn get_object_from_db(
fn get_object_from_objects(
&self,
object_id: &ObjectID,
version: Option<VersionNumber>,
Expand All @@ -1477,17 +1481,65 @@ impl ConnectionAsObjectStore {
.map_err(Into::into)
}

fn get_object_from_history(
&self,
object_id: &ObjectID,
version: Option<VersionNumber>,
) -> Result<Option<StoredObject>, IndexerError> {
use diesel::RunQueryDsl;

let mut guard = self.inner.lock().unwrap();
let connection: &mut diesel_async::async_connection_wrapper::AsyncConnectionWrapper<_> =
&mut guard;

let mut history_query = objects_history::table
.filter(objects_history::dsl::object_id.eq(object_id.to_vec()))
.into_boxed();

if let Some(version) = version {
history_query = history_query
.filter(objects_history::dsl::object_version.eq(version.value() as i64));
}

let history_latest = history_query
.order_by(objects_history::dsl::object_version.desc())
.first::<StoredHistoryObject>(connection)
.optional()?;

if let Some(history_record) = history_latest {
return Ok(Some(history_record.try_into()?));
}

let mut snapshot_query = objects_snapshot::table
.filter(objects_snapshot::dsl::object_id.eq(object_id.to_vec()))
.into_boxed();

if let Some(version) = version {
snapshot_query = snapshot_query
.filter(objects_snapshot::dsl::object_version.eq(version.value() as i64));
}

snapshot_query
.first::<StoredObjectSnapshot>(connection)
.optional()?
.map(|o| o.try_into())
.transpose()
.map_err(Into::into)
}

fn get_object(
&self,
object_id: &ObjectID,
version: Option<VersionNumber>,
) -> Result<Option<Object>, IndexerError> {
let Some(stored_package) = self.get_object_from_db(object_id, version)? else {
return Ok(None);
};
let mut result = self.get_object_from_objects(object_id, version)?;

let object = stored_package.try_into()?;
Ok(Some(object))
// This is for mvr-mode, which doesn't maintain an `objects` table.
if result.is_none() {
result = self.get_object_from_history(object_id, version)?;
}

result.map(|o| o.try_into()).transpose().map_err(Into::into)
}
}

Expand Down
58 changes: 58 additions & 0 deletions crates/sui-indexer/src/models/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,64 @@ impl From<IndexedDeletedObject> for StoredFullHistoryObject {
}
}

impl TryFrom<StoredHistoryObject> for StoredObject {
type Error = IndexerError;

fn try_from(o: StoredHistoryObject) -> Result<Self, Self::Error> {
// Return early if any required fields are None
if o.object_digest.is_none() || o.owner_type.is_none() || o.serialized_object.is_none() {
return Err(IndexerError::PostgresReadError(
"Missing required fields in StoredHistoryObject".to_string(),
));
}

Ok(Self {
object_id: o.object_id,
object_version: o.object_version,
object_digest: o.object_digest.unwrap(),
owner_type: o.owner_type.unwrap(),
owner_id: o.owner_id,
object_type: o.object_type,
object_type_package: o.object_type_package,
object_type_module: o.object_type_module,
object_type_name: o.object_type_name,
serialized_object: o.serialized_object.unwrap(),
coin_type: o.coin_type,
coin_balance: o.coin_balance,
df_kind: o.df_kind,
})
}
}

impl TryFrom<StoredObjectSnapshot> for StoredObject {
type Error = IndexerError;

fn try_from(o: StoredObjectSnapshot) -> Result<Self, Self::Error> {
// Return early if any required fields are None
if o.object_digest.is_none() || o.owner_type.is_none() || o.serialized_object.is_none() {
return Err(IndexerError::PostgresReadError(
"Missing required fields in StoredObjectSnapshot".to_string(),
));
}

Ok(Self {
object_id: o.object_id,
object_version: o.object_version,
object_digest: o.object_digest.unwrap(),
owner_type: o.owner_type.unwrap(),
owner_id: o.owner_id,
object_type: o.object_type,
object_type_package: o.object_type_package,
object_type_module: o.object_type_module,
object_type_name: o.object_type_name,
serialized_object: o.serialized_object.unwrap(),
coin_type: o.coin_type,
coin_balance: o.coin_balance,
df_kind: o.df_kind,
})
}
}

#[cfg(test)]
mod tests {
use move_core_types::{account_address::AccountAddress, language_storage::StructTag};
Expand Down
8 changes: 5 additions & 3 deletions crates/sui-mvr-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use sui_indexer::config::RetentionConfig;
pub use sui_indexer::config::SnapshotLagConfig;
use sui_indexer::errors::IndexerError;
use sui_indexer::store::PgIndexerStore;
use sui_indexer::test_utils::start_indexer_writer_for_testing;
use sui_indexer::test_utils::start_indexer_writer_for_testing_with_mvr_mode;
use sui_pg_temp_db::{get_available_port, TempDb};
use sui_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
use sui_types::storage::RestStateReader;
Expand Down Expand Up @@ -125,14 +125,15 @@ pub async fn start_network_cluster() -> NetworkCluster {
let val_fn = start_validator_with_fullnode(data_ingestion_path.path().to_path_buf()).await;

// Starts indexer
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing_with_mvr_mode(
db_url,
None,
None,
Some(data_ingestion_path.path().to_path_buf()),
Some(cancellation_token.clone()),
None, /* start_checkpoint */
None, /* end_checkpoint */
true,
)
.await;

Expand Down Expand Up @@ -182,14 +183,15 @@ pub async fn serve_executor(

let snapshot_config = snapshot_config.unwrap_or_default();

let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing_with_mvr_mode(
db_url,
Some(snapshot_config.clone()),
retention_config,
Some(data_ingestion_path),
Some(cancellation_token.clone()),
None,
None,
true,
)
.await;

Expand Down
102 changes: 1 addition & 101 deletions crates/sui-mvr-graphql-rpc/tests/e2e_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use sui_types::DEEPBOOK_ADDRESS;
use sui_types::SUI_FRAMEWORK_ADDRESS;
use sui_types::SUI_FRAMEWORK_PACKAGE_ID;
use tempfile::tempdir;
use tokio::time::sleep;

#[tokio::test]
async fn test_simple_client_validator_cluster() {
Expand Down Expand Up @@ -275,106 +274,6 @@ async fn test_graphql_client_variables() {
}
}

#[tokio::test]
async fn test_transaction_execution() {
let cluster = start_cluster(ServiceConfig::test_defaults()).await;

let addresses = cluster
.network
.validator_fullnode_handle
.wallet
.get_addresses();

let sender = addresses[0];
let recipient = addresses[1];
let tx = cluster
.network
.validator_fullnode_handle
.test_transaction_builder()
.await
.transfer_sui(Some(1_000), recipient)
.build();
let signed_tx = cluster
.network
.validator_fullnode_handle
.wallet
.sign_transaction(&tx);
let original_digest = signed_tx.digest();
let (tx_bytes, sigs) = signed_tx.to_tx_bytes_and_signatures();
let tx_bytes = tx_bytes.encoded();
let sigs = sigs.iter().map(|sig| sig.encoded()).collect::<Vec<_>>();

let mutation = r#"{ executeTransactionBlock(txBytes: $tx, signatures: $sigs) { effects { transactionBlock { digest } } errors}}"#;

let variables = vec![
GraphqlQueryVariable {
name: "tx".to_string(),
ty: "String!".to_string(),
value: json!(tx_bytes),
},
GraphqlQueryVariable {
name: "sigs".to_string(),
ty: "[String!]!".to_string(),
value: json!(sigs),
},
];
let res = cluster
.graphql_client
.execute_mutation_to_graphql(mutation.to_string(), variables)
.await
.unwrap();
let binding = res.response_body().data.clone().into_json().unwrap();
let res = binding.get("executeTransactionBlock").unwrap();

let digest = res
.get("effects")
.unwrap()
.get("transactionBlock")
.unwrap()
.get("digest")
.unwrap()
.as_str()
.unwrap();
assert!(res.get("errors").unwrap().is_null());
assert_eq!(digest, original_digest.to_string());

// Wait for the transaction to be committed and indexed
sleep(Duration::from_secs(10)).await;
// Query the transaction
let query = r#"
{
transactionBlock(digest: $dig){
sender {
address
}
}
}
"#;

let variables = vec![GraphqlQueryVariable {
name: "dig".to_string(),
ty: "String!".to_string(),
value: json!(digest),
}];
let res = cluster
.graphql_client
.execute_to_graphql(query.to_string(), true, variables, vec![])
.await
.unwrap();

let binding = res.response_body().data.clone().into_json().unwrap();
let sender_read = binding
.get("transactionBlock")
.unwrap()
.get("sender")
.unwrap()
.get("address")
.unwrap()
.as_str()
.unwrap();
assert_eq!(sender_read, sender.to_string());
}

#[tokio::test]
async fn test_zklogin_sig_verify() {
use shared_crypto::intent::Intent;
Expand Down Expand Up @@ -764,6 +663,7 @@ async fn test_epoch_live_object_set_digest() {
let query = "
{
epoch(id: 0){
epochId
liveObjectSetDigest
}
}
Expand Down

0 comments on commit c5a595c

Please sign in to comment.