Skip to content

Commit

Permalink
[indexer-alt] add prune impls for each pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
emmazzz authored and wlmyng committed Dec 26, 2024
1 parent 87504e7 commit e8252bf
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 10 deletions.
15 changes: 15 additions & 0 deletions crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::{collections::BTreeSet, sync::Arc};

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::models::cp_sequence_numbers::tx_interval;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{events::StoredEvEmitMod, schema::ev_emit_mod};
use sui_pg_db as db;
Expand Down Expand Up @@ -57,4 +60,16 @@ impl Handler for EvEmitMod {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to).await?;

let filter = ev_emit_mod::table
.filter(ev_emit_mod::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
20 changes: 18 additions & 2 deletions crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeSet, sync::Arc};
use std::{collections::BTreeSet, ops::Range, sync::Arc};

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{events::StoredEvStructInst, schema::ev_struct_inst};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -60,4 +64,16 @@ impl Handler for EvStructInst {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to).await?;

let filter = ev_struct_inst::table
.filter(ev_struct_inst::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
8 changes: 8 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::Arc;

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{checkpoints::StoredCheckpoint, schema::kv_checkpoints};
Expand Down Expand Up @@ -38,4 +39,11 @@ impl Handler for KvCheckpoints {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let filter = kv_checkpoints::table
.filter(kv_checkpoints::sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
21 changes: 20 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::epoch_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{epochs::StoredEpochEnd, schema::kv_epoch_ends};
use sui_pg_db as db;
use sui_types::{
Expand Down Expand Up @@ -125,4 +130,18 @@ impl Handler for KvEpochEnds {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let Range {
start: from_epoch,
end: to_epoch,
} = epoch_interval(conn, from..to).await?;
if from_epoch < to_epoch {
let filter = kv_epoch_ends::table
.filter(kv_epoch_ends::epoch.between(from_epoch as i64, to_epoch as i64 - 1));
Ok(diesel::delete(filter).execute(conn).await?)
} else {
Ok(0)
}
}
}
22 changes: 21 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::epoch_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{epochs::StoredEpochStart, schema::kv_epoch_starts};
use sui_pg_db as db;
use sui_types::{
Expand Down Expand Up @@ -72,4 +77,19 @@ impl Handler for KvEpochStarts {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let Range {
start: from_epoch,
end: to_epoch,
} = epoch_interval(conn, from..to).await?;
if from_epoch < to_epoch {
let filter = kv_epoch_starts::table
.filter(kv_epoch_starts::epoch.between(from_epoch as i64, to_epoch as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
} else {
Ok(0)
}
}
}
10 changes: 10 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::Arc;

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{schema::kv_transactions, transactions::StoredTransaction};
Expand Down Expand Up @@ -66,4 +67,13 @@ impl Handler for KvTransactions {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
// TODO: use tx_interval. `tx_sequence_number` needs to be added to this table, and an index
// created as its primary key is on `tx_digest`.
let filter = kv_transactions::table
.filter(kv_transactions::cp_sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
19 changes: 18 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use itertools::Itertools;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{
schema::tx_affected_addresses, transactions::StoredTxAffectedAddress,
};
Expand Down Expand Up @@ -69,4 +74,16 @@ impl Handler for TxAffectedAddresses {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to).await?;
let filter = tx_affected_addresses::table.filter(
tx_affected_addresses::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}
19 changes: 18 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{schema::tx_affected_objects, transactions::StoredTxAffectedObject};
use sui_pg_db as db;
use sui_types::{effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData};
Expand Down Expand Up @@ -59,4 +64,16 @@ impl Handler for TxAffectedObjects {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to).await?;
let filter = tx_affected_objects::table.filter(
tx_affected_objects::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}
19 changes: 18 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::{collections::BTreeMap, sync::Arc};

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{
schema::tx_balance_changes,
transactions::{BalanceChange, StoredTxBalanceChange},
Expand Down Expand Up @@ -65,6 +70,18 @@ impl Handler for TxBalanceChanges {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to).await?;
let filter = tx_balance_changes::table.filter(
tx_balance_changes::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}

/// Calculate balance changes based on the object's input and output objects.
Expand Down
18 changes: 17 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_calls.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::{Ok, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{schema::tx_calls, transactions::StoredTxCalls};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -62,4 +67,15 @@ impl Handler for TxCalls {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to).await?;
let filter = tx_calls::table
.filter(tx_calls::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
18 changes: 17 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_digests.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{schema::tx_digests, transactions::StoredTxDigest};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -49,4 +54,15 @@ impl Handler for TxDigests {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to).await?;
let filter = tx_digests::table
.filter(tx_digests::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
18 changes: 17 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_kinds.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{
schema::tx_kinds,
transactions::{StoredKind, StoredTxKind},
Expand Down Expand Up @@ -60,4 +65,15 @@ impl Handler for TxKinds {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to).await?;
let filter = tx_kinds::table
.filter(tx_kinds::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}

0 comments on commit e8252bf

Please sign in to comment.