Skip to content

Commit

Permalink
indexer: fix multi object updates in one checkpoint (MystenLabs#9945)
Browse files Browse the repository at this point in the history
## Description 

PostgresWriteError("ON CONFLICT DO UPDATE command cannot affect row a
second time"), we cannot update the same row for more than once in one
query, thus splitting the commits here.

## Test Plan 

Local run on top of `releases/0.29` and make sure it can handle multiple
object updates against staging
```
cargo run --bin sui-indexer -- --db-url "postgres://postgres:postgres@localhost/gegao" --rpc-client-url "http://lax-suifn-58113.staging.sui.io:9000" --client-metric-host "127.0.0.1" --client-metric-port 9184 --rpc-server-port 3030 --reset-db

2023-03-27T19:33:24.941550Z  INFO sui_indexer::handlers::checkpoint_handler: Checkpoint 100 committed with 1 transactions and 1 object changes.
2023-03-27T19:33:25.195859Z  INFO sui_indexer::handlers::checkpoint_handler: Checkpoint 101 committed with 1 transactions and 1 object changes.
2023-03-27T19:33:26.401435Z  INFO sui_indexer::handlers::checkpoint_handler: Checkpoint 102 committed with 3 transactions and 3 object changes.
2023-03-27T19:33:27.914732Z  INFO sui_indexer::handlers::checkpoint_handler: Checkpoint 103 committed with 4 transactions and 4 object changes.
2023-03-27T19:33:28.850721Z  INFO sui_indexer::handlers::checkpoint_handler: Checkpoint 104 committed with 2 transactions and 2 object changes.
2023-03-27T19:33:30.301859Z  INFO sui_indexer::handlers::checkpoint_handler: Checkpoint 105 committed with 4 transactions and 4 object changes.
...
```

Also added CI test for multiple object updates in one checkpoint
  • Loading branch information
gegaowp authored Mar 27, 2023
1 parent 1594a4a commit e25d334
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 18 deletions.
27 changes: 27 additions & 0 deletions crates/sui-indexer/src/models/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,30 @@ pub fn compose_object_bulk_insert_query(objects: &[Object]) -> String {
);
bulk_insert_query
}

pub fn group_and_sort_objects(objects: Vec<Object>) -> Vec<Vec<Object>> {
let mut objects_sorted = objects;
objects_sorted.sort_by(|a, b| a.object_id.cmp(&b.object_id));
// Group objects by object_id
let mut groups: Vec<Vec<Object>> = vec![];
let mut current_group: Vec<Object> = vec![];
let mut current_object_id = String::new();
for object in objects_sorted {
if object.object_id != current_object_id {
if !current_group.is_empty() {
// Sort the group by version, in a reverse order to be popped later
current_group.sort_by(|a, b| b.version.cmp(&a.version));
groups.push(current_group);
}
current_group = vec![];
current_object_id = object.object_id.clone();
}
current_group.push(object);
}
// Sort the last group by version, in a reverse order to be popped later
if !current_group.is_empty() {
current_group.sort_by(|a, b| b.version.cmp(&a.version));
groups.push(current_group);
}
groups
}
23 changes: 19 additions & 4 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ use crate::models::checkpoints::Checkpoint;
use crate::models::epoch::DBEpochInfo;
use crate::models::events::Event;
use crate::models::network_metrics::{DBMoveCallMetrics, DBNetworkMetrics};
use crate::models::objects::{compose_object_bulk_insert_update_query, Object};
use crate::models::objects::{
compose_object_bulk_insert_update_query, group_and_sort_objects, Object,
};
use crate::models::system_state::DBValidatorSummary;
use crate::models::transactions::Transaction;
use crate::schema::{
Expand Down Expand Up @@ -1075,9 +1077,22 @@ impl IndexerStore for PgIndexerStore {
.iter()
.flat_map(|changes| changes.mutated_objects.iter().cloned())
.collect();
// bulk insert/update via UNNEST trick
let insert_update_query = compose_object_bulk_insert_update_query(&all_mutated_objects);
diesel::sql_query(insert_update_query).execute(conn)?;
// NOTE: to avoid error of `ON CONFLICT DO UPDATE command cannot affect row a second time`,
// we have to limit update of one object once in a query.
let mut mutated_object_groups = group_and_sort_objects(all_mutated_objects);
loop {
let mutated_object_group = mutated_object_groups
.iter_mut()
.filter_map(|group| group.pop())
.collect::<Vec<_>>();
if mutated_object_group.is_empty() {
break;
}
// bulk insert/update via UNNEST trick
let insert_update_query =
compose_object_bulk_insert_update_query(&mutated_object_group);
diesel::sql_query(insert_update_query).execute(conn)?;
}
// TODO(gegao): monitor the deletion batch size to see
// if bulk update via unnest is necessary.
let all_deleted_changes = objects_changes
Expand Down
53 changes: 39 additions & 14 deletions crates/sui-indexer/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub mod pg_integration_test {
use sui_config::SUI_KEYSTORE_FILENAME;
use sui_indexer::errors::IndexerError;
use sui_indexer::models::objects::{
compose_object_bulk_insert_query, compose_object_bulk_insert_update_query, NamedBcsBytes,
Object, ObjectStatus,
compose_object_bulk_insert_query, compose_object_bulk_insert_update_query,
group_and_sort_objects, NamedBcsBytes, Object, ObjectStatus,
};
use sui_indexer::models::owners::OwnerType;
use sui_indexer::schema::objects;
Expand Down Expand Up @@ -1000,7 +1000,7 @@ pub mod pg_integration_test {
let pg_connection_pool = new_pg_connection_pool(&db_url).await.unwrap();
let mut pg_pool_conn = get_pg_pool_connection(&pg_connection_pool).unwrap();

let mut bulk_data = (1..=10000)
let bulk_data = (1..=10000)
.into_iter()
.map(|_| Object {
epoch: 0,
Expand Down Expand Up @@ -1030,24 +1030,49 @@ pub mod pg_integration_test {
assert!(result.is_ok());
assert_eq!(result.unwrap(), 10000);

bulk_data = bulk_data
let mutated_bulk_data = bulk_data
.clone()
.into_iter()
.map(|mut object| {
object.object_status = ObjectStatus::Mutated;
object.version = 1;
object
})
.collect::<Vec<_>>();
let mutated_bulk_data_same_checkpoint = bulk_data
.into_iter()
.map(|mut object| {
object.object_status = ObjectStatus::Mutated;
object.version = 2;
object
})
.chain(mutated_bulk_data)
.collect::<Vec<_>>();
let mut pg_pool_conn = get_pg_pool_connection(&pg_connection_pool).unwrap();
let insert_update_query = compose_object_bulk_insert_update_query(&bulk_data);
eprintln!("insert_update_query: {}", insert_update_query);
let result: Result<usize, IndexerError> = pg_pool_conn
.build_transaction()
.serializable()
.read_write()
.run(|conn| diesel::sql_query(insert_update_query).execute(conn))
.map_err(IndexerError::PostgresError);
assert!(result.is_ok());
assert_eq!(result.unwrap(), 10000);
let mut mutated_object_groups = group_and_sort_objects(mutated_bulk_data_same_checkpoint);
let mut counter = 0;
loop {
let mutated_object_group = mutated_object_groups
.iter_mut()
.filter_map(|group| group.pop())
.collect::<Vec<_>>();
if mutated_object_group.is_empty() {
break;
}
// bulk insert/update via UNNEST trick
let insert_update_query =
compose_object_bulk_insert_update_query(&mutated_object_group);
let result: Result<usize, IndexerError> = pg_pool_conn
.build_transaction()
.serializable()
.read_write()
.run(|conn| diesel::sql_query(insert_update_query).execute(conn))
.map_err(IndexerError::PostgresError);
assert!(result.is_ok());
assert_eq!(result.unwrap(), 10000);
counter += 1;
}
assert_eq!(counter, 2);
}

#[tokio::test]
Expand Down

0 comments on commit e25d334

Please sign in to comment.