Skip to content

Commit

Permalink
indexer: exact epoch tx count (#12395)
Browse files Browse the repository at this point in the history
## Description
Before this PR, tx count was updated within an epoch. This is
problematic on backfill when data racing happens, as a result, epoch tx
count is not accurate.

Also removed the epoch updating on each checkpoint b/c it has caused
constant DB commit errors like
```
2023-06-07T23:13:27.759942Z  INFO sui_indexer::store::pg_indexer_store: Persisting epoch 19
2023-06-07T23:13:36.865246Z  WARN sui_indexer::handlers::checkpoint_handler: Indexer epoch commit failed with error: PostgresWriteError("could not serialize access due to read/write dependencies among transactions"), retrying after 100 milli-secs...
2023-06-07T23:13:36.965945Z  INFO sui_indexer::store::pg_indexer_store: Persisting epoch 19
2023-06-07T23:13:46.059553Z  WARN sui_indexer::handlers::checkpoint_handler: Indexer epoch commit failed with error: PostgresWriteError("could not serialize access due to read/write dependencies among transactions"), retrying after 100 milli-secs...
2023-06-07T23:13:46.160704Z  INFO sui_indexer::store::pg_indexer_store: Persisting epoch 19
```

## Test Plan
Build image and test with mainnet, verify in the DB.

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] protocol change
- [x] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
gegaowp authored Jun 9, 2023
1 parent 69e4e48 commit b29b1fe
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 51 deletions.
13 changes: 8 additions & 5 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ where
let mut object_changes_commit_res = self
.state
.persist_object_changes(
&checkpoint,
&object_changes,
self.metrics.object_mutation_db_commit_latency.clone(),
self.metrics.object_deletion_db_commit_latency.clone(),
Expand All @@ -429,7 +428,6 @@ where
object_changes_commit_res = self
.state
.persist_object_changes(
&checkpoint,
&object_changes,
self.metrics.object_mutation_db_commit_latency.clone(),
self.metrics.object_deletion_db_commit_latency.clone(),
Expand Down Expand Up @@ -476,7 +474,6 @@ where
}
epoch_db_guard.stop_and_record();
self.metrics.total_epoch_committed.inc();
info!("Epoch {} committed.", indexed_epoch.new_epoch.epoch);
}
} else {
// sleep for 1 sec to avoid occupying the mutex, as this happens once per epoch / day
Expand Down Expand Up @@ -718,14 +715,20 @@ where

let event = event.as_ref();

let last_epoch = system_state.epoch as i64 - 1;
let network_tx_count_prev_epoch = self
.state
.get_network_total_transactions_previous_epoch(last_epoch)
.await?;
Some(TemporaryEpochStore {
last_epoch: Some(DBEpochInfo {
epoch: system_state.epoch as i64 - 1,
epoch: last_epoch,
first_checkpoint_id: 0,
last_checkpoint_id: Some(checkpoint.sequence_number as i64),
epoch_start_timestamp: 0,
epoch_end_timestamp: Some(checkpoint.timestamp_ms as i64),
epoch_total_transactions: 0,
epoch_total_transactions: checkpoint.network_total_transactions as i64
- network_tx_count_prev_epoch,
next_epoch_version: Some(
end_of_epoch_data.next_epoch_protocol_version.as_u64() as i64,
),
Expand Down
5 changes: 4 additions & 1 deletion crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ pub trait IndexerStore {
) -> Result<usize, IndexerError>;
async fn persist_object_changes(
&self,
checkpoint: &Checkpoint,
tx_object_changes: &[TransactionObjectChanges],
object_mutation_latency: Histogram,
object_deletion_latency: Histogram,
Expand All @@ -235,6 +234,10 @@ pub trait IndexerStore {
) -> Result<(), IndexerError>;

async fn persist_epoch(&self, data: &TemporaryEpochStore) -> Result<(), IndexerError>;
async fn get_network_total_transactions_previous_epoch(
&self,
epoch: i64,
) -> Result<i64, IndexerError>;

async fn get_epochs(
&self,
Expand Down
91 changes: 46 additions & 45 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use async_trait::async_trait;
use cached::proc_macro::once;
use diesel::dsl::{count, max};
use diesel::pg::PgConnection;
use diesel::query_builder::AsQuery;
use diesel::sql_types::{BigInt, VarChar};
use diesel::upsert::excluded;
use diesel::{ExpressionMethods, PgArrayExpressionMethods};
Expand Down Expand Up @@ -1331,46 +1330,31 @@ impl IndexerStore for PgIndexerStore {

async fn persist_object_changes(
&self,
checkpoint: &Checkpoint,
tx_object_changes: &[TransactionObjectChanges],
object_mutation_latency: Histogram,
object_deletion_latency: Histogram,
) -> Result<(), IndexerError> {
transactional_blocking!(&self.blocking_cp, |conn| {
// update epoch transaction count
let sql = "UPDATE epochs e1
SET epoch_total_transactions = e2.epoch_total_transactions + $1
FROM epochs e2
WHERE e1.epoch = e2.epoch
AND e1.epoch = $2;";
diesel::sql_query(sql)
.bind::<BigInt, _>(checkpoint.transactions.len() as i64)
.bind::<BigInt, _>(checkpoint.epoch)
.as_query()
.execute(conn)?;

{
let mutated_objects: Vec<Object> = tx_object_changes
.iter()
.flat_map(|changes| changes.changed_objects.iter().cloned())
.collect();
let deleted_changes = tx_object_changes
.iter()
.flat_map(|changes| changes.deleted_objects.iter().cloned())
.collect::<Vec<_>>();
let deleted_objects: Vec<Object> = deleted_changes
.iter()
.map(|deleted_object| deleted_object.clone().into())
.collect();
persist_transaction_object_changes(
conn,
mutated_objects,
deleted_objects,
Some(object_mutation_latency),
Some(object_deletion_latency),
)?;
Ok::<(), IndexerError>(())
}
let mutated_objects: Vec<Object> = tx_object_changes
.iter()
.flat_map(|changes| changes.changed_objects.iter().cloned())
.collect();
let deleted_changes = tx_object_changes
.iter()
.flat_map(|changes| changes.deleted_objects.iter().cloned())
.collect::<Vec<_>>();
let deleted_objects: Vec<Object> = deleted_changes
.iter()
.map(|deleted_object| deleted_object.clone().into())
.collect();
persist_transaction_object_changes(
conn,
mutated_objects,
deleted_objects,
Some(object_mutation_latency),
Some(object_deletion_latency),
)?;
Ok::<(), IndexerError>(())
})?;
Ok(())
}
Expand Down Expand Up @@ -1497,6 +1481,20 @@ WHERE e1.epoch = e2.epoch
Ok(())
}

async fn get_network_total_transactions_previous_epoch(
&self,
epoch: i64,
) -> Result<i64, IndexerError> {
read_only_blocking!(&self.blocking_cp, |conn| {
checkpoints::table
.filter(checkpoints::epoch.eq(epoch - 1))
.select(max(checkpoints::network_total_transactions))
.first::<Option<i64>>(conn)
.map(|o| o.unwrap_or(0))
})
.context("Failed to count network transactions in previous epoch")
}

async fn persist_epoch(&self, data: &TemporaryEpochStore) -> Result<(), IndexerError> {
// MUSTFIX(gegaowp): temporarily disable the epoch advance logic.
// let last_epoch_cp_id = if data.last_epoch.is_none() {
Expand All @@ -1510,18 +1508,18 @@ WHERE e1.epoch = e2.epoch
.advance_epoch(&data.new_epoch, last_epoch_cp_id)
.await?;
}
let epoch = data.new_epoch.epoch;
info!("Persisting epoch {}", epoch);

transactional_blocking!(&self.blocking_cp, |conn| {
if let Some(last_epoch) = &data.last_epoch {
info!("Persisting at the end of epoch {}", last_epoch.epoch);
diesel::insert_into(epochs::table)
.values(last_epoch)
.on_conflict(epochs::epoch)
.do_update()
.set((
epochs::last_checkpoint_id.eq(excluded(epochs::last_checkpoint_id)),
epochs::epoch_end_timestamp.eq(excluded(epochs::epoch_end_timestamp)),
epochs::epoch_total_transactions
.eq(excluded(epochs::epoch_total_transactions)),
epochs::protocol_version.eq(excluded(epochs::protocol_version)),
epochs::next_epoch_version.eq(excluded(epochs::next_epoch_version)),
epochs::next_epoch_committee.eq(excluded(epochs::next_epoch_committee)),
Expand All @@ -1543,12 +1541,8 @@ WHERE e1.epoch = e2.epoch
.eq(excluded(epochs::leftover_storage_fund_inflow)),
))
.execute(conn)?;
info!("Persisted epoch {}", last_epoch.epoch);
}
diesel::insert_into(epochs::table)
.values(&data.new_epoch)
.on_conflict_do_nothing()
.execute(conn)?;

diesel::insert_into(system_states::table)
.values(&data.system_state)
.on_conflict_do_nothing()
Expand All @@ -1559,7 +1553,14 @@ WHERE e1.epoch = e2.epoch
.on_conflict_do_nothing()
.execute(conn)
})?;
info!("Persisted epoch {}", epoch);
info!("Persisting initial state of epoch {}", data.new_epoch.epoch);
transactional_blocking!(&self.blocking_cp, |conn| {
diesel::insert_into(epochs::table)
.values(&data.new_epoch)
.on_conflict_do_nothing()
.execute(conn)
})?;
info!("Persisted initial state of epoch {}", data.new_epoch.epoch);
Ok(())
}

Expand Down

2 comments on commit b29b1fe

@vercel
Copy link

@vercel vercel bot commented on b29b1fe Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

sui-kiosk – ./dapps/kiosk

sui-kiosk.vercel.app
sui-kiosk-git-main-mysten-labs.vercel.app
sui-kiosk-mysten-labs.vercel.app

@vercel
Copy link

@vercel vercel bot commented on b29b1fe Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

offline-signer-helper – ./dapps/offline-signer-helper

offline-signer-helper-mysten-labs.vercel.app
offline-signer-helper.vercel.app
offline-signer-helper-git-main-mysten-labs.vercel.app

Please sign in to comment.