Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer: exact epoch tx count #12395

Merged
merged 2 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ where
let mut object_changes_commit_res = self
.state
.persist_object_changes(
&checkpoint,
&object_changes,
self.metrics.object_mutation_db_commit_latency.clone(),
self.metrics.object_deletion_db_commit_latency.clone(),
Expand All @@ -429,7 +428,6 @@ where
object_changes_commit_res = self
.state
.persist_object_changes(
&checkpoint,
&object_changes,
self.metrics.object_mutation_db_commit_latency.clone(),
self.metrics.object_deletion_db_commit_latency.clone(),
Expand Down Expand Up @@ -476,7 +474,6 @@ where
}
epoch_db_guard.stop_and_record();
self.metrics.total_epoch_committed.inc();
info!("Epoch {} committed.", indexed_epoch.new_epoch.epoch);
}
} else {
// sleep for 1 sec to avoid occupying the mutex, as this happens once per epoch / day
Expand Down Expand Up @@ -718,14 +715,20 @@ where

let event = event.as_ref();

let last_epoch = system_state.epoch as i64 - 1;
let network_tx_count_prev_epoch = self
.state
.get_network_total_transactions_previous_epoch(last_epoch)
.await?;
Some(TemporaryEpochStore {
last_epoch: Some(DBEpochInfo {
epoch: system_state.epoch as i64 - 1,
epoch: last_epoch,
first_checkpoint_id: 0,
last_checkpoint_id: Some(checkpoint.sequence_number as i64),
epoch_start_timestamp: 0,
epoch_end_timestamp: Some(checkpoint.timestamp_ms as i64),
epoch_total_transactions: 0,
epoch_total_transactions: checkpoint.network_total_transactions as i64
- network_tx_count_prev_epoch,
next_epoch_version: Some(
end_of_epoch_data.next_epoch_protocol_version.as_u64() as i64,
),
Expand Down
5 changes: 4 additions & 1 deletion crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ pub trait IndexerStore {
) -> Result<usize, IndexerError>;
async fn persist_object_changes(
&self,
checkpoint: &Checkpoint,
tx_object_changes: &[TransactionObjectChanges],
object_mutation_latency: Histogram,
object_deletion_latency: Histogram,
Expand All @@ -235,6 +234,10 @@ pub trait IndexerStore {
) -> Result<(), IndexerError>;

async fn persist_epoch(&self, data: &TemporaryEpochStore) -> Result<(), IndexerError>;
async fn get_network_total_transactions_previous_epoch(
&self,
epoch: i64,
) -> Result<i64, IndexerError>;

async fn get_epochs(
&self,
Expand Down
91 changes: 46 additions & 45 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use async_trait::async_trait;
use cached::proc_macro::once;
use diesel::dsl::{count, max};
use diesel::pg::PgConnection;
use diesel::query_builder::AsQuery;
use diesel::sql_types::{BigInt, VarChar};
use diesel::upsert::excluded;
use diesel::{ExpressionMethods, PgArrayExpressionMethods};
Expand Down Expand Up @@ -1331,46 +1330,31 @@ impl IndexerStore for PgIndexerStore {

async fn persist_object_changes(
&self,
checkpoint: &Checkpoint,
tx_object_changes: &[TransactionObjectChanges],
object_mutation_latency: Histogram,
object_deletion_latency: Histogram,
) -> Result<(), IndexerError> {
transactional_blocking!(&self.blocking_cp, |conn| {
// update epoch transaction count
let sql = "UPDATE epochs e1
SET epoch_total_transactions = e2.epoch_total_transactions + $1
FROM epochs e2
WHERE e1.epoch = e2.epoch
AND e1.epoch = $2;";
diesel::sql_query(sql)
.bind::<BigInt, _>(checkpoint.transactions.len() as i64)
.bind::<BigInt, _>(checkpoint.epoch)
.as_query()
.execute(conn)?;

{
let mutated_objects: Vec<Object> = tx_object_changes
.iter()
.flat_map(|changes| changes.changed_objects.iter().cloned())
.collect();
let deleted_changes = tx_object_changes
.iter()
.flat_map(|changes| changes.deleted_objects.iter().cloned())
.collect::<Vec<_>>();
let deleted_objects: Vec<Object> = deleted_changes
.iter()
.map(|deleted_object| deleted_object.clone().into())
.collect();
persist_transaction_object_changes(
conn,
mutated_objects,
deleted_objects,
Some(object_mutation_latency),
Some(object_deletion_latency),
)?;
Ok::<(), IndexerError>(())
}
let mutated_objects: Vec<Object> = tx_object_changes
.iter()
.flat_map(|changes| changes.changed_objects.iter().cloned())
.collect();
let deleted_changes = tx_object_changes
.iter()
.flat_map(|changes| changes.deleted_objects.iter().cloned())
.collect::<Vec<_>>();
let deleted_objects: Vec<Object> = deleted_changes
.iter()
.map(|deleted_object| deleted_object.clone().into())
.collect();
persist_transaction_object_changes(
conn,
mutated_objects,
deleted_objects,
Some(object_mutation_latency),
Some(object_deletion_latency),
)?;
Ok::<(), IndexerError>(())
})?;
Ok(())
}
Expand Down Expand Up @@ -1497,6 +1481,20 @@ WHERE e1.epoch = e2.epoch
Ok(())
}

async fn get_network_total_transactions_previous_epoch(
&self,
epoch: i64,
) -> Result<i64, IndexerError> {
read_only_blocking!(&self.blocking_cp, |conn| {
checkpoints::table
.filter(checkpoints::epoch.eq(epoch - 1))
.select(max(checkpoints::network_total_transactions))
.first::<Option<i64>>(conn)
.map(|o| o.unwrap_or(0))
})
.context("Failed to count network transactions in previous epoch")
}

async fn persist_epoch(&self, data: &TemporaryEpochStore) -> Result<(), IndexerError> {
// MUSTFIX(gegaowp): temporarily disable the epoch advance logic.
// let last_epoch_cp_id = if data.last_epoch.is_none() {
Expand All @@ -1510,18 +1508,18 @@ WHERE e1.epoch = e2.epoch
.advance_epoch(&data.new_epoch, last_epoch_cp_id)
.await?;
}
let epoch = data.new_epoch.epoch;
info!("Persisting epoch {}", epoch);

transactional_blocking!(&self.blocking_cp, |conn| {
if let Some(last_epoch) = &data.last_epoch {
info!("Persisting at the end of epoch {}", last_epoch.epoch);
diesel::insert_into(epochs::table)
.values(last_epoch)
.on_conflict(epochs::epoch)
.do_update()
.set((
epochs::last_checkpoint_id.eq(excluded(epochs::last_checkpoint_id)),
epochs::epoch_end_timestamp.eq(excluded(epochs::epoch_end_timestamp)),
epochs::epoch_total_transactions
.eq(excluded(epochs::epoch_total_transactions)),
epochs::protocol_version.eq(excluded(epochs::protocol_version)),
epochs::next_epoch_version.eq(excluded(epochs::next_epoch_version)),
epochs::next_epoch_committee.eq(excluded(epochs::next_epoch_committee)),
Expand All @@ -1543,12 +1541,8 @@ WHERE e1.epoch = e2.epoch
.eq(excluded(epochs::leftover_storage_fund_inflow)),
))
.execute(conn)?;
info!("Persisted epoch {}", last_epoch.epoch);
}
diesel::insert_into(epochs::table)
.values(&data.new_epoch)
.on_conflict_do_nothing()
.execute(conn)?;

diesel::insert_into(system_states::table)
.values(&data.system_state)
.on_conflict_do_nothing()
Expand All @@ -1559,7 +1553,14 @@ WHERE e1.epoch = e2.epoch
.on_conflict_do_nothing()
.execute(conn)
})?;
info!("Persisted epoch {}", epoch);
info!("Persisting initial state of epoch {}", data.new_epoch.epoch);
transactional_blocking!(&self.blocking_cp, |conn| {
diesel::insert_into(epochs::table)
.values(&data.new_epoch)
.on_conflict_do_nothing()
.execute(conn)
})?;
info!("Persisted initial state of epoch {}", data.new_epoch.epoch);
Ok(())
}

Expand Down