Skip to content

Commit

Permalink
indexer reader: derive dynamic field info (MystenLabs#19099)
Browse files Browse the repository at this point in the history
## Description 

title, with this pr and the corresponding pr in GQL, we can remove df_*
from objects table except df_kind.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
gegaowp authored Sep 9, 2024
1 parent 53e5806 commit 227ded1
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 230 deletions.
8 changes: 4 additions & 4 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,18 +745,18 @@ fn try_create_dynamic_field_info(
))
})?;
let move_struct = move_object.to_move_struct(&move_struct_layout)?;
let (name_value, type_, object_id) =
let (move_value, type_, object_id) =
DynamicFieldInfo::parse_move_object(&move_struct).tap_err(|e| warn!("{e}"))?;
let name_type = move_object.type_().try_extract_field_name(&type_)?;
let bcs_name = bcs::to_bytes(&name_value.clone().undecorate()).map_err(|e| {
let bcs_name = bcs::to_bytes(&move_value.clone().undecorate()).map_err(|e| {
IndexerError::SerdeError(format!(
"Failed to serialize dynamic field name {:?}: {e}",
name_value
move_value
))
})?;
let name = DynamicFieldName {
type_: name_type,
value: SuiMoveValue::from(name_value).to_json_value(),
value: SuiMoveValue::from(move_value).to_json_value(),
};
Ok(Some(match type_ {
DynamicFieldType::DynamicObject => {
Expand Down
219 changes: 118 additions & 101 deletions crates/sui-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use anyhow::Result;
use diesel::{
dsl::sql, sql_types::Bool, ExpressionMethods, OptionalExtension, QueryDsl,
TextExpressionMethods,
};
use itertools::{any, Itertools};
use tap::Pipe;
use tap::TapFallible;
use itertools::Itertools;
use tap::{Pipe, TapFallible};
use tracing::{debug, error, warn};

use fastcrypto::encoding::Encoding;
use fastcrypto::encoding::Hex;
use move_core_types::annotated_value::MoveStructLayout;
use move_core_types::language_storage::StructTag;
use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout};
use move_core_types::language_storage::{StructTag, TypeTag};
use sui_json_rpc_types::DisplayFieldsResponse;
use sui_json_rpc_types::{Balance, Coin as SuiCoin, SuiCoinMetadata};
use sui_json_rpc_types::{Balance, Coin as SuiCoin, SuiCoinMetadata, SuiMoveValue};
use sui_json_rpc_types::{
CheckpointId, EpochInfo, EventFilter, SuiEvent, SuiObjectDataFilter,
SuiTransactionBlockResponse, TransactionFilter,
Expand All @@ -28,10 +27,10 @@ use sui_package_resolver::{PackageStoreWithLruCache, Resolver};
use sui_types::effects::TransactionEvents;
use sui_types::{balance::Supply, coin::TreasuryCap, dynamic_field::DynamicFieldName};
use sui_types::{
base_types::{ObjectID, ObjectRef, SequenceNumber, SuiAddress, VersionNumber},
base_types::{ObjectID, SuiAddress, VersionNumber},
committee::EpochId,
digests::{ObjectDigest, TransactionDigest},
dynamic_field::DynamicFieldInfo,
digests::TransactionDigest,
dynamic_field::{DynamicFieldInfo, DynamicFieldType},
object::{Object, ObjectRead},
sui_system_state::{sui_system_state_summary::SuiSystemStateSummary, SuiSystemStateTrait},
};
Expand All @@ -47,7 +46,7 @@ use crate::{
display::StoredDisplay,
epoch::StoredEpochInfo,
events::StoredEvent,
objects::{CoinBalance, ObjectRefColumn, StoredObject},
objects::{CoinBalance, StoredObject},
transactions::{tx_events_to_sui_tx_events, StoredTransaction},
tx_indices::TxSequenceNumber,
},
Expand Down Expand Up @@ -475,10 +474,10 @@ impl IndexerReader {
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| tracing::error!("Failed to join all tx block futures: {}", e))?
.tap_err(|e| error!("Failed to join all tx block futures: {}", e))?
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| tracing::error!("Failed to collect tx block futures: {}", e))?;
.tap_err(|e| error!("Failed to collect tx block futures: {}", e))?;
Ok(tx_blocks)
}

Expand Down Expand Up @@ -837,7 +836,7 @@ impl IndexerReader {
limit,
);

tracing::debug!("query transaction blocks: {}", query);
debug!("query transaction blocks: {}", query);
let tx_sequence_numbers = diesel::sql_query(query.clone())
.load::<TxSequenceNumber>(&mut connection)
.await?
Expand Down Expand Up @@ -1074,7 +1073,7 @@ impl IndexerReader {
main_where_clause, cursor_clause, order_clause, limit,
)
};
tracing::debug!("query events: {}", query);
debug!("query events: {}", query);
let stored_events = diesel::sql_query(query)
.load::<StoredEvent>(&mut connection)
.await?;
Expand All @@ -1090,10 +1089,10 @@ impl IndexerReader {
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| tracing::error!("Failed to join sui event futures: {}", e))?
.tap_err(|e| error!("Failed to join sui event futures: {}", e))?
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| tracing::error!("Failed to collect sui event futures: {}", e))?;
.tap_err(|e| error!("Failed to collect sui event futures: {}", e))?;
Ok(sui_events)
}

Expand All @@ -1103,57 +1102,31 @@ impl IndexerReader {
cursor: Option<ObjectID>,
limit: usize,
) -> Result<Vec<DynamicFieldInfo>, IndexerError> {
let objects = self
let stored_objects = self
.get_dynamic_fields_raw(parent_object_id, cursor, limit)
.await?;

if any(objects.iter(), |o| o.df_object_id.is_none()) {
return Err(IndexerError::PersistentStorageDataCorruptionError(format!(
"Dynamic field has empty df_object_id column for parent object {}",
parent_object_id
)));
}

// for Dynamic field objects, df_object_id != object_id, we need another look up
// to get the version and digests.
// TODO: simply store df_object_version and df_object_digest as well?
let dfo_ids = objects
.iter()
.filter_map(|o| {
// Unwrap safe: checked nullity above
if o.df_object_id.as_ref().unwrap() == &o.object_id {
None
} else {
Some(o.df_object_id.clone().unwrap())
}
})
.collect::<Vec<_>>();

let object_refs = self.get_object_refs(dfo_ids).await?;
let mut df_futures = vec![];
for object in objects {
let package_resolver_clone = self.package_resolver.clone();
df_futures.push(tokio::task::spawn(
object.try_into_expectant_dynamic_field_info(package_resolver_clone),
));
let indexer_reader_arc = Arc::new(self.clone());
for stored_object in stored_objects {
let indexer_reader_arc_clone = Arc::clone(&indexer_reader_arc);
df_futures.push(tokio::task::spawn(async move {
indexer_reader_arc_clone
.try_create_dynamic_field_info(stored_object)
.await
}));
}
let mut dynamic_fields = futures::future::join_all(df_futures)
let df_infos = futures::future::join_all(df_futures)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| tracing::error!("Error joining DF futures: {:?}", e))?
.tap_err(|e| error!("Error joining DF futures: {:?}", e))?
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| tracing::error!("Error calling DF try_into function: {:?}", e))?;

for df in dynamic_fields.iter_mut() {
if let Some(obj_ref) = object_refs.get(&df.object_id) {
df.version = obj_ref.1;
df.digest = obj_ref.2;
}
}

Ok(dynamic_fields)
.tap_err(|e| error!("Error calling try_create_dynamic_field_info: {:?}", e))?
.into_iter()
.flatten()
.collect::<Vec<_>>();
Ok(df_infos)
}

pub async fn get_dynamic_fields_raw(
Expand Down Expand Up @@ -1183,6 +1156,90 @@ impl IndexerReader {
.map_err(Into::into)
}

async fn try_create_dynamic_field_info(
&self,
stored_object: StoredObject,
) -> Result<Option<DynamicFieldInfo>, IndexerError> {
if stored_object.df_kind.is_none() {
return Ok(None);
}

let object: Object = stored_object.try_into()?;
let move_object = match object.data.try_as_move().cloned() {
Some(move_object) => move_object,
None => {
return Err(IndexerError::ResolveMoveStructError(
"Object is not a MoveObject".to_string(),
));
}
};
let struct_tag: StructTag = move_object.type_().clone().into();
let move_type_layout = self
.package_resolver
.type_layout(TypeTag::Struct(Box::new(struct_tag.clone())))
.await
.map_err(|e| {
IndexerError::ResolveMoveStructError(format!(
"Failed to get type layout for type {}: {}",
struct_tag, e
))
})?;
let MoveTypeLayout::Struct(move_struct_layout) = move_type_layout else {
return Err(IndexerError::ResolveMoveStructError(
"MoveTypeLayout is not Struct".to_string(),
));
};

let move_struct = move_object.to_move_struct(&move_struct_layout)?;
let (move_value, type_, object_id) =
DynamicFieldInfo::parse_move_object(&move_struct).tap_err(|e| warn!("{e}"))?;
let name_type = move_object.type_().try_extract_field_name(&type_)?;
let bcs_name = bcs::to_bytes(&move_value.clone().undecorate()).map_err(|e| {
IndexerError::SerdeError(format!(
"Failed to serialize dynamic field name {:?}: {e}",
move_value
))
})?;
let name = DynamicFieldName {
type_: name_type,
value: SuiMoveValue::from(move_value).to_json_value(),
};

Ok(Some(match type_ {
DynamicFieldType::DynamicObject => {
let object = self.get_object(&object_id, None).await?.ok_or(
IndexerError::UncategorizedError(anyhow::anyhow!(
"Failed to find object_id {:?} when trying to create dynamic field info",
object_id
)),
)?;

let version = object.version();
let digest = object.digest();
let object_type = object.data.type_().unwrap().clone();
DynamicFieldInfo {
name,
bcs_name,
type_,
object_type: object_type.to_canonical_string(/* with_prefix */ true),
object_id,
version,
digest,
}
}
DynamicFieldType::DynamicField => DynamicFieldInfo {
name,
bcs_name,
type_,
object_type: move_object.into_type().into_type_params()[1]
.to_canonical_string(/* with_prefix */ true),
object_id: object.id(),
version: object.version(),
digest: object.digest(),
},
}))
}

pub async fn bcs_name_from_dynamic_field_name(
&self,
name: &DynamicFieldName,
Expand All @@ -1202,45 +1259,6 @@ impl IndexerReader {
Ok(name_bcs_value)
}

async fn get_object_refs(
&self,
object_ids: Vec<Vec<u8>>,
) -> IndexerResult<HashMap<ObjectID, ObjectRef>> {
use diesel_async::RunQueryDsl;

let mut connection = self.pool.get().await?;

objects::table
.filter(objects::object_id.eq_any(object_ids))
.select((
objects::object_id,
objects::object_version,
objects::object_digest,
))
.load::<ObjectRefColumn>(&mut connection)
.await?
.into_iter()
.map(|object_ref: ObjectRefColumn| {
let object_id =
ObjectID::from_bytes(object_ref.object_id.clone()).map_err(|_e| {
IndexerError::PersistentStorageDataCorruptionError(format!(
"Can't convert {:?} to ObjectID",
object_ref.object_id
))
})?;
let seq = SequenceNumber::from_u64(object_ref.object_version as u64);
let object_digest = ObjectDigest::try_from(object_ref.object_digest.as_slice())
.map_err(|e| {
IndexerError::PersistentStorageDataCorruptionError(format!(
"object {:?} has incompatible object digest. Error: {e}",
object_ref.object_digest
))
})?;
Ok((object_id, (object_id, seq, object_digest)))
})
.collect::<IndexerResult<HashMap<_, _>>>()
}

async fn get_display_object_by_type(
&self,
object_type: &move_core_types::language_storage::StructTag,
Expand Down Expand Up @@ -1331,8 +1349,7 @@ impl IndexerReader {
coin_type_filter,
);

tracing::debug!("get coin balances query: {query}");

debug!("get coin balances query: {query}");
diesel::sql_query(query)
.load::<CoinBalance>(&mut connection)
.await?
Expand Down
Loading

0 comments on commit 227ded1

Please sign in to comment.