Skip to content

Commit

Permalink
indexer: exclude deleted/wrapped objects from snapshot (#19455)
Browse files Browse the repository at this point in the history
## Description 

https://mysten-labs.slack.com/archives/C0578KFD9D2/p1726765436840499
this will cut storage further down

## Test plan 

objects snapshot ingestion test

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
gegaowp authored Oct 9, 2024
1 parent dcab1a5 commit d09e3ed
Showing 1 changed file with 100 additions and 20 deletions.
120 changes: 100 additions & 20 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,9 @@ impl PgIndexerStore {
})
}

async fn persist_objects_snapshot_chunk(
async fn persist_object_snapshot_mutation_chunk(
&self,
objects_snapshot: Vec<StoredObjectSnapshot>,
objects_snapshot_mutations: Vec<StoredObjectSnapshot>,
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;
let guard = self
Expand All @@ -433,11 +433,11 @@ impl PgIndexerStore {
.start_timer();
transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
for objects_snapshot_chunk in
objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
for mutation_chunk in
objects_snapshot_mutations.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(objects_snapshot::table)
.values(objects_snapshot_chunk)
.values(mutation_chunk)
.on_conflict(objects_snapshot::object_id)
.do_update()
.set((
Expand Down Expand Up @@ -482,6 +482,57 @@ impl PgIndexerStore {
})
}

async fn persist_object_snapshot_deletion_chunk(
&self,
objects_snapshot_deletions: Vec<StoredObjectSnapshot>,
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;
let guard = self
.metrics
.checkpoint_db_commit_latency_objects_snapshot_chunks
.start_timer();

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
for deletion_chunk in
objects_snapshot_deletions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::delete(
objects_snapshot::table.filter(
objects_snapshot::object_id.eq_any(
deletion_chunk
.iter()
.map(|o| o.object_id.clone())
.collect::<Vec<_>>(),
),
),
)
.execute(conn)
.await
.map_err(IndexerError::from)
.context("Failed to write object deletion to PostgresDB")?;
}
Ok::<(), IndexerError>(())
}
.scope_boxed()
})
.await
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(
elapsed,
"Deleted {} chunked object snapshots",
objects_snapshot_deletions.len(),
);
})
.tap_err(|e| {
tracing::error!(
"Failed to persist object snapshot deletions with error: {}",
e
);
})
}

async fn persist_objects_history_chunk(
&self,
stored_objects_history: Vec<StoredHistoryObject>,
Expand Down Expand Up @@ -1669,34 +1720,63 @@ impl IndexerStore for PgIndexerStore {
.checkpoint_db_commit_latency_objects_snapshot
.start_timer();
let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
let objects_snapshot = indexed_mutations
let object_snapshot_mutations: Vec<StoredObjectSnapshot> = indexed_mutations
.into_iter()
.map(StoredObjectSnapshot::from)
.chain(
indexed_deletions
.into_iter()
.map(StoredObjectSnapshot::from),
)
.collect();
let object_snapshot_deletions: Vec<StoredObjectSnapshot> = indexed_deletions
.into_iter()
.map(StoredObjectSnapshot::from)
.collect();
let mutation_len = object_snapshot_mutations.len();
let deletion_len = object_snapshot_deletions.len();
let object_snapshot_mutation_chunks = chunk!(
object_snapshot_mutations,
self.config.parallel_objects_chunk_size
);
let object_snapshot_deletion_chunks = chunk!(
object_snapshot_deletions,
self.config.parallel_objects_chunk_size
);
let mutation_futures = object_snapshot_mutation_chunks
.into_iter()
.map(|c| self.persist_object_snapshot_mutation_chunk(c))
.map(Either::Left)
.collect::<Vec<_>>();
let len = objects_snapshot.len();
let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
let futures = chunks
let deletion_futures = object_snapshot_deletion_chunks
.into_iter()
.map(|c| self.persist_objects_snapshot_chunk(c))
.map(|c| self.persist_object_snapshot_deletion_chunk(c))
.map(Either::Right)
.collect::<Vec<_>>();

futures::future::join_all(futures)
let all_futures = mutation_futures
.into_iter()
.chain(deletion_futures)
.collect::<Vec<_>>();
futures::future::join_all(all_futures)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
IndexerError::PostgresWriteError(format!(
"Failed to persist all objects snapshot chunks: {:?}",
"Failed to persist object snapshot mutation or deletion chunks: {:?}",
e
))
})
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(
elapsed,
"Persisted {} objects snapshot mutations and {} deletions",
mutation_len,
deletion_len
);
})
.tap_err(|e| {
tracing::error!(
"Failed to persist object snapshot mutation or deletion chunks: {:?}",
e
)
})?;
let elapsed = guard.stop_and_record();
info!(elapsed, "Persisted {} objects snapshot", len);
Ok(())
}

Expand Down

0 comments on commit d09e3ed

Please sign in to comment.