Skip to content

Commit

Permalink
indexer: object fast path, the writing part (MystenLabs#10246)
Browse files Browse the repository at this point in the history
## Description 

Fast path object, the writing path

some following action items:
- implement get_latest_owned_object to be on top of `objects` table
- change `get_owned_objects` to include fresh object data if
`checkpoint` arg is not provided
- now we have both tx and object fast-path, we can change some testing
to not count on "waiting for X"
- fast-path partition of object history cannot be updated in-place, it
needs to be pruned periodically

## Test Plan 

CI
Rate limit · GitHub

Whoa there!

You have triggered an abuse detection mechanism.

Please wait a few minutes before you try again;
in some cases this may take up to an hour.

gegaowp authored Apr 1, 2023
1 parent 6fdee68 commit 1b5c8a3
Showing 10 changed files with 307 additions and 197 deletions.
2 changes: 1 addition & 1 deletion crates/sui-indexer/benches/indexer_benchmark.rs
Original file line number Diff line number Diff line change
@@ -74,7 +74,7 @@ fn create_checkpoint(sequence_number: i64) -> TemporaryCheckpointStore {
.collect(),
events: vec![],
objects_changes: vec![TransactionObjectChanges {
mutated_objects: (1..1000).map(|_| create_object(sequence_number)).collect(),
changed_objects: (1..1000).map(|_| create_object(sequence_number)).collect(),
deleted_objects: vec![],
}],
addresses: vec![],
Original file line number Diff line number Diff line change
@@ -62,6 +62,9 @@ CREATE TABLE objects_history
CREATE INDEX objects_history_id_version_index ON objects_history (object_id, version);
CREATE INDEX objects_history_owner_index ON objects_history (owner_type, owner_address);
CREATE INDEX objects_history_old_owner_index ON objects_history (old_owner_type, old_owner_address);
-- fast-path partition for the most recent objects before checkpoint, range is half-open.
-- partition name need to match regex of '.*(_partition_)\d+'.
CREATE TABLE objects_history_fast_path_partition_0 PARTITION OF objects_history FOR VALUES FROM (-1) TO (0);
CREATE TABLE objects_history_partition_0 PARTITION OF objects_history FOR VALUES FROM (0) TO (MAXVALUE);

CREATE OR REPLACE FUNCTION objects_modified_func() RETURNS TRIGGER AS
@@ -82,6 +85,8 @@ BEGIN
NEW.initial_shared_version,
NEW.previous_transaction, NEW.object_type, NEW.object_status, NEW.has_public_transfer,
NEW.storage_rebate, NEW.bcs);
-- MUSTFIX(gegaowp): we cannot update checkpoint in-place, b/c checkpoint is a partition key,
-- we need to prune old data in this partition periodically, like pruning old epochs upon new epoch.
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
-- object deleted from the main table, archive the history for that object
27 changes: 22 additions & 5 deletions crates/sui-indexer/src/apis/write_api.rs
Original file line number Diff line number Diff line change
@@ -6,18 +6,22 @@ use fastcrypto::encoding::Base64;
use jsonrpsee::core::RpcResult;
use jsonrpsee::http_client::HttpClient;
use jsonrpsee::RpcModule;

use sui_json_rpc::api::{WriteApiClient, WriteApiServer};
use sui_json_rpc::SuiRpcModule;
use sui_json_rpc_types::{
BigInt, DevInspectResults, DryRunTransactionBlockResponse, SuiTransactionBlockResponse,
SuiTransactionBlockResponseOptions,
BigInt, DevInspectResults, DryRunTransactionBlockResponse, SuiTransactionBlockEffectsAPI,
SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions,
};
use sui_open_rpc::Module;
use sui_types::base_types::{EpochId, SuiAddress};
use sui_types::messages::ExecuteTransactionRequestType;

use crate::handlers::checkpoint_handler::{
fetch_changed_objects, get_deleted_db_objects, get_object_changes, to_changed_db_objects,
};
use crate::models::transactions::Transaction;
use crate::store::IndexerStore;
use crate::store::{IndexerStore, TransactionObjectChanges};
use crate::types::{
FastPathTransactionBlockResponse, SuiTransactionBlockResponseWithOptions,
TemporaryTransactionBlockResponseStore,
@@ -54,12 +58,25 @@ where
.fullnode
.execute_transaction_block(tx_bytes, signatures, Some(fast_path_options), request_type)
.await?;

let fast_path_resp: FastPathTransactionBlockResponse =
sui_transaction_response.clone().try_into()?;
let effects = &fast_path_resp.effects;
let epoch = <u64>::from(effects.executed_epoch());

let object_changes = get_object_changes(effects);
let changed_objects = fetch_changed_objects(self.fullnode.clone(), object_changes).await?;
let changed_db_objects =
to_changed_db_objects(changed_objects, epoch, /* checkpoint */ None);
let deleted_db_objects = get_deleted_db_objects(effects, epoch, /* checkpoint */ None);
let tx_object_changes = TransactionObjectChanges {
changed_objects: changed_db_objects,
deleted_objects: deleted_db_objects,
};

let transaction_store: TemporaryTransactionBlockResponseStore = fast_path_resp.into();
let transaction: Transaction = transaction_store.try_into()?;
self.state.persist_fast_path(transaction)?;
self.state
.persist_fast_path(transaction, tx_object_changes)?;

Ok(SuiTransactionBlockResponseWithOptions {
response: sui_transaction_response,
4 changes: 2 additions & 2 deletions crates/sui-indexer/src/errors.rs
Original file line number Diff line number Diff line change
@@ -47,8 +47,8 @@ pub enum IndexerError {
#[error(transparent)]
PostgresError(#[from] diesel::result::Error),

#[error("Indexer failed to initialize fullnode RPC client with error: `{0}`")]
RpcClientInitError(String),
#[error("Indexer failed to initialize fullnode Http client with error: `{0}`")]
HttpClientInitError(String),

#[error("Indexer failed to serialize/deserialize with error: `{0}`")]
SerdeError(String),
Rate limit · GitHub

Whoa there!

You have triggered an abuse detection mechanism.

Please wait a few minutes before you try again;
in some cases this may take up to an hour.

0 comments on commit 1b5c8a3

Please sign in to comment.