Skip to content

Commit

Permalink
indexer: stop indexing tx_senders and tx_recipients
Browse files Browse the repository at this point in the history
## Description

All references to these fields have been removed from readers, so we can
stop keeping it up-to-date. Once this change lands, we can also clean
these tables from the schema.

## Test plan

```
sui$ cargo nextest run -p sui-indexer
```
  • Loading branch information
amnn committed Oct 10, 2024
1 parent 089096f commit c28f0af
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 83 deletions.
36 changes: 1 addition & 35 deletions crates/sui-indexer/src/models/tx_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{
schema::{
tx_affected_addresses, tx_affected_objects, tx_calls_fun, tx_calls_mod, tx_calls_pkg,
tx_changed_objects, tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
tx_changed_objects, tx_digests, tx_input_objects, tx_kinds,
},
types::TxIndex,
};
Expand Down Expand Up @@ -39,21 +39,6 @@ pub struct StoredTxAffectedObjects {
pub sender: Vec<u8>,
}

#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)]
#[diesel(table_name = tx_senders)]
pub struct StoredTxSenders {
pub tx_sequence_number: i64,
pub sender: Vec<u8>,
}

#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)]
#[diesel(table_name = tx_recipients)]
pub struct StoredTxRecipients {
pub tx_sequence_number: i64,
pub recipient: Vec<u8>,
pub sender: Vec<u8>,
}

#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)]
#[diesel(table_name = tx_input_objects)]
pub struct StoredTxInputObject {
Expand Down Expand Up @@ -118,8 +103,6 @@ impl TxIndex {
) -> (
Vec<StoredTxAffectedAddresses>,
Vec<StoredTxAffectedObjects>,
Vec<StoredTxSenders>,
Vec<StoredTxRecipients>,
Vec<StoredTxInputObject>,
Vec<StoredTxChangedObject>,
Vec<StoredTxPkg>,
Expand Down Expand Up @@ -153,21 +136,6 @@ impl TxIndex {
})
.collect();

let tx_sender = StoredTxSenders {
tx_sequence_number,
sender: self.sender.to_vec(),
};

let tx_recipients = self
.recipients
.iter()
.map(|s| StoredTxRecipients {
tx_sequence_number,
recipient: s.to_vec(),
sender: self.sender.to_vec(),
})
.collect();

let tx_input_objects = self
.input_objects
.iter()
Expand Down Expand Up @@ -245,8 +213,6 @@ impl TxIndex {
(
tx_affected_addresses,
tx_affected_objects,
vec![tx_sender],
tx_recipients,
tx_input_objects,
tx_changed_objects,
tx_pkgs,
Expand Down
56 changes: 8 additions & 48 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::schema::{
objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark,
raw_checkpoints, transactions, tx_affected_addresses, tx_affected_objects, tx_calls_fun,
tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, tx_kinds,
tx_recipients, tx_senders, watermarks,
watermarks,
};
use crate::store::transaction_with_retry;
use crate::types::{EventIndex, IndexedDeletedObject, IndexedObject};
Expand Down Expand Up @@ -1066,8 +1066,6 @@ impl PgIndexerStore {
let (
affected_addresses,
affected_objects,
senders,
recipients,
input_objects,
changed_objects,
pkgs,
Expand All @@ -1086,14 +1084,10 @@ impl PgIndexerStore {
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
),
|(
mut tx_affected_addresses,
mut tx_affected_objects,
mut tx_senders,
mut tx_recipients,
mut tx_input_objects,
mut tx_changed_objects,
mut tx_pkgs,
Expand All @@ -1105,20 +1099,16 @@ impl PgIndexerStore {
index| {
tx_affected_addresses.extend(index.0);
tx_affected_objects.extend(index.1);
tx_senders.extend(index.2);
tx_recipients.extend(index.3);
tx_input_objects.extend(index.4);
tx_changed_objects.extend(index.5);
tx_pkgs.extend(index.6);
tx_mods.extend(index.7);
tx_funs.extend(index.8);
tx_digests.extend(index.9);
tx_kinds.extend(index.10);
tx_input_objects.extend(index.2);
tx_changed_objects.extend(index.3);
tx_pkgs.extend(index.4);
tx_mods.extend(index.5);
tx_funs.extend(index.6);
tx_digests.extend(index.7);
tx_kinds.extend(index.8);
(
tx_affected_addresses,
tx_affected_objects,
tx_senders,
tx_recipients,
tx_input_objects,
tx_changed_objects,
tx_pkgs,
Expand Down Expand Up @@ -1152,22 +1142,6 @@ impl PgIndexerStore {
.await?;
}

for senders_chunk in senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_senders::table)
.values(senders_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

for recipients_chunk in recipients.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_recipients::table)
.values(recipients_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

for input_objects_chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_input_objects::table)
.values(input_objects_chunk)
Expand Down Expand Up @@ -1478,20 +1452,6 @@ impl PgIndexerStore {
.execute(conn)
.await?;

diesel::delete(
tx_senders::table
.filter(tx_senders::tx_sequence_number.between(min_tx, max_tx)),
)
.execute(conn)
.await?;

diesel::delete(
tx_recipients::table
.filter(tx_recipients::tx_sequence_number.between(min_tx, max_tx)),
)
.execute(conn)
.await?;

diesel::delete(
tx_input_objects::table
.filter(tx_input_objects::tx_sequence_number.between(min_tx, max_tx)),
Expand Down

0 comments on commit c28f0af

Please sign in to comment.