Skip to content

Commit

Permalink
indexer: quick object snapshot and fix object history (#15339)
Browse files Browse the repository at this point in the history
## Description 

main changes are:
- instead of updating object snapshot at epoch boundary, now we update
object snapshot following a sliding window of checkpoints, so that each
update will not take too long
- fix object history, prev it skipped intermediate object versions,
which is correct for `objects` which only tracks latest versions, but
not the case for `objects_history`, which is supposed to track all
versions
- move advance epoch before checkpoint persisting, so that checkpoint
can always be the watermark, in case of pod crashing and resuming, the
advance epoch process will be resumed properly too

## Test Plan 

local run and verify that
- objects_history table has all intermediate versions & checkpoints
- object snapshot can be update properly per the objects_history

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
gegaowp authored Dec 18, 2023
1 parent 399df17 commit b54bd23
Show file tree
Hide file tree
Showing 10 changed files with 331 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ CREATE TABLE objects_history (
CREATE TABLE objects_history_partition_0 PARTITION OF objects_history FOR VALUES FROM (0) TO (MAXVALUE);
-- TODO(gegaowp): add corresponding indices for consistent reads of objects_history table

-- snapshot table by folding objects_history table until certain epoch,
-- effectively the snapshot of objects at the same epoch,
-- snapshot table by folding objects_history table until certain checkpoint,
-- effectively the snapshot of objects at the same checkpoint,
-- except that it also includes deleted or wrapped objects with the corresponding object_status.
CREATE TABLE objects_snapshot (
object_id bytea PRIMARY KEY,
Expand All @@ -80,3 +80,7 @@ CREATE TABLE objects_snapshot (
df_object_type text,
df_object_id bytea
);
CREATE INDEX objects_snapshot_checkpoint_sequence_number ON objects_snapshot (checkpoint_sequence_number);
CREATE INDEX objects_snapshot_owner ON objects_snapshot (owner_type, owner_id) WHERE owner_type BETWEEN 1 AND 2 AND owner_id IS NOT NULL;
CREATE INDEX objects_snapshot_coin ON objects_snapshot (owner_id, coin_type) WHERE coin_type IS NOT NULL AND owner_type = 1;
CREATE INDEX objects_snapshot_type ON objects_snapshot (object_type);
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ CREATE TABLE display
object_type text PRIMARY KEY,
id BYTEA NOT NULL,
version SMALLINT NOT NULL,
bcs BYTEA NOT NULL
bcs BYTEA NOT NULL
);
72 changes: 72 additions & 0 deletions crates/sui-indexer/src/handlers/checkpoint_handler_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ where
// Index Objects
let object_changes: TransactionObjectChangesToCommit =
Self::index_objects(data.clone(), &metrics, &module_resolver);
let object_history_changes: TransactionObjectChangesToCommit =
Self::index_objects_history(data.clone(), &module_resolver);

let (checkpoint, db_transactions, db_events, db_indices, db_displays) = {
let CheckpointData {
Expand Down Expand Up @@ -345,6 +347,7 @@ where
tx_indices: db_indices,
display_updates: db_displays,
object_changes,
object_history_changes,
packages,
epoch,
})
Expand Down Expand Up @@ -589,6 +592,75 @@ where
}
}

// similar to index_objects, but objects_history keeps all versions of objects
fn index_objects_history(
data: CheckpointData,
module_resolver: &impl GetModule,
) -> TransactionObjectChangesToCommit {
let checkpoint_seq = data.checkpoint_summary.sequence_number;
let deleted_objects = data
.transactions
.iter()
.flat_map(|tx| get_deleted_objects(&tx.effects))
.collect::<Vec<_>>();
let indexed_deleted_objects: Vec<IndexedDeletedObject> = deleted_objects
.into_iter()
.map(|o| IndexedDeletedObject {
object_id: o.0,
object_version: o.1.value(),
checkpoint_sequence_number: checkpoint_seq,
})
.collect();

let (latest_objects, _) = get_latest_objects(data.output_objects());
let history_object_map = data
.output_objects()
.into_iter()
.map(|o| ((o.id(), o.version()), o.clone()))
.collect::<HashMap<_, _>>();

let changed_objects: Vec<IndexedObject> = data
.transactions
.iter()
.flat_map(|tx| {
let CheckpointTransaction {
transaction: tx,
effects: fx,
..
} = tx;
fx.all_changed_objects()
.into_iter()
.map(|(oref, _owner, _kind)| {
let history_object = history_object_map.get(&(oref.0, oref.1)).unwrap_or_else(|| {
panic!(
"object {:?} version {:?} not found in CheckpointData (tx_digest: {})",
oref.0,
oref.1,
tx.digest()
)
});
assert_eq!(oref.2, history_object.digest());
let df_info =
try_create_dynamic_field_info(history_object, &latest_objects, module_resolver)
.unwrap_or_else(|e| {
panic!(
"failed to create dynamic field info for history obj: {:?}:{:?}. Err: {e}",
history_object.id(),
history_object.version()
)
});

IndexedObject::from_object(checkpoint_seq, history_object.clone(), df_info)
})
.collect::<Vec<_>>()
})
.collect();
TransactionObjectChangesToCommit {
changed_objects,
deleted_objects: indexed_deleted_objects,
}
}

fn index_packages(
checkpoint_data: &[CheckpointData],
metrics: &IndexerMetrics,
Expand Down
30 changes: 17 additions & 13 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ async fn commit_checkpoints<S>(
let mut tx_indices_batch = vec![];
let mut display_updates_batch = BTreeMap::new();
let mut object_changes_batch = vec![];
let mut object_history_changes_batch = vec![];
let mut packages_batch = vec![];

for indexed_checkpoint in indexed_checkpoint_batch {
Expand All @@ -113,6 +114,7 @@ async fn commit_checkpoints<S>(
tx_indices,
display_updates,
object_changes,
object_history_changes,
packages,
epoch: _,
} = indexed_checkpoint;
Expand All @@ -122,6 +124,7 @@ async fn commit_checkpoints<S>(
tx_indices_batch.push(tx_indices);
display_updates_batch.extend(display_updates.into_iter());
object_changes_batch.push(object_changes);
object_history_changes_batch.push(object_history_changes);
packages_batch.push(packages);
}

Expand All @@ -145,7 +148,8 @@ async fn commit_checkpoints<S>(
state.persist_displays(display_updates_batch),
state.persist_packages(packages_batch),
state.persist_objects(object_changes_batch.clone()),
state.persist_object_history(object_changes_batch),
state.persist_object_history(object_history_changes_batch.clone()),
state.persist_object_snapshot(),
];
if let Some(epoch_data) = epoch.clone() {
persist_tasks.push(state.persist_epoch(epoch_data));
Expand All @@ -163,18 +167,6 @@ async fn commit_checkpoints<S>(
.expect("Persisting data into DB should not fail.");
}

state
.persist_checkpoints(checkpoint_batch)
.await
.tap_err(|e| {
error!(
"Failed to persist checkpoint data with error: {}",
e.to_string()
);
})
.expect("Persisting data into DB should not fail.");
let elapsed = guard.stop_and_record();

// handle partitioning on epoch boundary
if let Some(epoch_data) = epoch {
state
Expand All @@ -187,6 +179,18 @@ async fn commit_checkpoints<S>(
metrics.total_epoch_committed.inc();
}

state
.persist_checkpoints(checkpoint_batch)
.await
.tap_err(|e| {
error!(
"Failed to persist checkpoint data with error: {}",
e.to_string()
);
})
.expect("Persisting data into DB should not fail.");
let elapsed = guard.stop_and_record();

commit_notifier
.send(Some(last_checkpoint_seq))
.expect("Commit watcher should not be closed");
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct CheckpointDataToCommit {
pub tx_indices: Vec<TxIndex>,
pub display_updates: BTreeMap<String, StoredDisplay>,
pub object_changes: TransactionObjectChangesToCommit,
pub object_history_changes: TransactionObjectChangesToCommit,
pub packages: Vec<IndexedPackage>,
pub epoch: Option<EpochToCommit>,
}
Expand Down
27 changes: 26 additions & 1 deletion crates/sui-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
},
schema_v2::{
address_metrics, checkpoints, display, epochs, events, move_call_metrics, objects,
packages, transactions,
objects_snapshot, packages, transactions,
},
types_v2::{IndexerResult, OwnerType},
PgConnectionConfig, PgConnectionPoolConfig, PgPoolConnection,
Expand Down Expand Up @@ -1637,6 +1637,31 @@ impl IndexerReader {
)))?;
Ok(TreasuryCap::try_from(treasury_cap_obj_object)?.total_supply)
}

pub fn get_consistent_read_range(&self) -> Result<(i64, i64), IndexerError> {
let latest_checkpoint_sequence = self
.run_query(|conn| {
checkpoints::table
.select(checkpoints::sequence_number)
.order(checkpoints::sequence_number.desc())
.first::<i64>(conn)
.optional()
})?
.unwrap_or_default();
let latest_object_snapshot_checkpoint_sequence = self
.run_query(|conn| {
objects_snapshot::table
.select(objects_snapshot::checkpoint_sequence_number)
.order(objects_snapshot::checkpoint_sequence_number.desc())
.first::<i64>(conn)
.optional()
})?
.unwrap_or_default();
Ok((
latest_object_snapshot_checkpoint_sequence,
latest_checkpoint_sequence,
))
}
}

#[derive(Clone, Default)]
Expand Down
14 changes: 14 additions & 0 deletions crates/sui-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub struct IndexerMetrics {
pub checkpoint_db_commit_latency_tx_indices_chunks: Histogram,
pub checkpoint_db_commit_latency_checkpoints: Histogram,
pub checkpoint_db_commit_latency_epoch: Histogram,
pub advance_epoch_latency: Histogram,
pub update_object_snapshot_latency: Histogram,
// average latency of committing 1000 transactions.
// 1000 is not necessarily the batch size, it's to roughly map average tx commit latency to [0.1, 1] seconds,
// which is well covered by DB_COMMIT_LATENCY_SEC_BUCKETS.
Expand Down Expand Up @@ -383,6 +385,18 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
advance_epoch_latency: register_histogram_with_registry!(
"advance_epoch_latency",
"Time spent in advancing epoch",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
).unwrap(),
update_object_snapshot_latency: register_histogram_with_registry!(
"update_object_snapshot_latency",
"Time spent in updating object snapshot",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
).unwrap(),
thousand_transaction_avg_db_commit_latency: register_histogram_with_registry!(
"transaction_db_commit_latency",
"Average time spent commiting 1000 transactions to the db",
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-indexer/src/store/indexer_store_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ pub trait IndexerStoreV2 {

async fn get_latest_tx_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError>;

async fn get_latest_object_snapshot_checkpoint_sequence_number(
&self,
) -> Result<Option<u64>, IndexerError>;

async fn get_object_read(
&self,
object_id: ObjectID,
Expand All @@ -43,6 +47,8 @@ pub trait IndexerStoreV2 {
object_changes: Vec<TransactionObjectChangesToCommit>,
) -> Result<(), IndexerError>;

async fn persist_object_snapshot(&self) -> Result<(), IndexerError>;

async fn persist_checkpoints(
&self,
checkpoints: Vec<IndexedCheckpoint>,
Expand Down
Loading

2 comments on commit b54bd23

@vercel
Copy link

@vercel vercel bot commented on b54bd23 Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vercel
Copy link

@vercel vercel bot commented on b54bd23 Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.