From 3cd7e2bf3b62f9720e5ed6eb97476a0ddf53884f Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Fri, 6 Dec 2024 14:27:41 +0000 Subject: [PATCH] refactor(indexer-alt): factor out Db/DbArgs ## Description Pull out the `Db` and `DbArgs` types from `sui-indexer-alt-framework`, so that they can also be used in the reader implementation. They have been combined with the existing `TempDb`, and its crate has been renamed from `sui-temp-pg-db` to `sui-pg-db` to account for the fact that it's now holding something more general. ## Test plan CI --- Cargo.lock | 21 +++++--- Cargo.toml | 4 +- crates/sui-bridge-indexer/Cargo.toml | 2 +- .../sui-bridge-indexer/tests/indexer_tests.rs | 2 +- crates/sui-cluster-test/Cargo.toml | 2 +- crates/sui-cluster-test/src/cluster.rs | 2 +- crates/sui-graphql-rpc/Cargo.toml | 2 +- crates/sui-graphql-rpc/src/data/pg.rs | 2 +- crates/sui-graphql-rpc/src/server/builder.rs | 2 +- .../sui-graphql-rpc/src/test_infra/cluster.rs | 2 +- .../tests/move_registry_e2e.rs | 2 +- crates/sui-indexer-alt-framework/Cargo.toml | 3 +- crates/sui-indexer-alt-framework/src/lib.rs | 32 ++++++++++-- .../sui-indexer-alt-framework/src/metrics.rs | 3 +- .../src/pipeline/concurrent/collector.rs | 6 +-- .../pipeline/concurrent/commit_watermark.rs | 2 +- .../src/pipeline/concurrent/committer.rs | 2 +- .../src/pipeline/concurrent/mod.rs | 7 +-- .../src/pipeline/concurrent/pruner.rs | 2 +- .../pipeline/concurrent/reader_watermark.rs | 2 +- .../src/pipeline/sequential/committer.rs | 2 +- .../src/pipeline/sequential/mod.rs | 7 +-- .../src/watermarks.rs | 3 +- crates/sui-indexer-alt/Cargo.toml | 1 + crates/sui-indexer-alt/src/args.rs | 3 +- crates/sui-indexer-alt/src/benchmark.rs | 9 ++-- .../src/handlers/ev_emit_mod.rs | 9 ++-- .../src/handlers/ev_struct_inst.rs | 9 ++-- .../src/handlers/kv_checkpoints.rs | 9 ++-- .../src/handlers/kv_epoch_ends.rs | 9 ++-- .../src/handlers/kv_epoch_starts.rs | 9 ++-- .../src/handlers/kv_feature_flags.rs | 11 ++--- .../src/handlers/kv_objects.rs | 9 ++-- .../src/handlers/kv_protocol_configs.rs | 11 ++--- .../src/handlers/kv_transactions.rs | 9 ++-- .../sui-indexer-alt/src/handlers/obj_info.rs | 13 ++--- .../src/handlers/obj_info_pruner.rs | 6 +-- .../src/handlers/obj_versions.rs | 9 ++-- .../src/handlers/sum_coin_balances.rs | 13 ++--- .../src/handlers/sum_displays.rs | 9 ++-- .../src/handlers/sum_obj_types.rs | 13 ++--- .../src/handlers/sum_packages.rs | 9 ++-- .../src/handlers/tx_affected_addresses.rs | 9 ++-- .../src/handlers/tx_affected_objects.rs | 9 ++-- .../src/handlers/tx_balance_changes.rs | 13 ++--- .../sui-indexer-alt/src/handlers/tx_calls.rs | 9 ++-- .../src/handlers/tx_digests.rs | 9 ++-- .../sui-indexer-alt/src/handlers/tx_kinds.rs | 9 ++-- .../src/handlers/wal_coin_balances.rs | 9 ++-- .../src/handlers/wal_obj_types.rs | 9 ++-- crates/sui-indexer-alt/src/lib.rs | 2 +- crates/sui-indexer-alt/src/main.rs | 9 +++- crates/sui-indexer/Cargo.toml | 2 +- crates/sui-indexer/src/db.rs | 4 +- crates/sui-indexer/src/test_utils.rs | 2 +- crates/sui-mvr-graphql-rpc/Cargo.toml | 2 +- crates/sui-mvr-graphql-rpc/src/data/pg.rs | 2 +- .../sui-mvr-graphql-rpc/src/server/builder.rs | 2 +- .../src/test_infra/cluster.rs | 2 +- .../tests/move_registry_e2e.rs | 2 +- .../{sui-pg-temp-db => sui-pg-db}/Cargo.toml | 8 ++- .../src/db.rs => sui-pg-db/src/lib.rs} | 49 +++++++------------ .../src/lib.rs => sui-pg-db/src/temp.rs} | 0 crates/test-cluster/Cargo.toml | 2 +- .../test-cluster/src/test_indexer_handle.rs | 2 +- 65 files changed, 198 insertions(+), 252 deletions(-) rename crates/{sui-pg-temp-db => sui-pg-db}/Cargo.toml (73%) rename crates/{sui-indexer-alt-framework/src/db.rs => sui-pg-db/src/lib.rs} (82%) rename crates/{sui-pg-temp-db/src/lib.rs => sui-pg-db/src/temp.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index cdd3e642ca4e4..9492700e9b024 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13357,7 +13357,7 @@ dependencies = [ "sui-indexer", "sui-indexer-builder", "sui-json-rpc-types", - "sui-pg-temp-db", + "sui-pg-db", "sui-sdk", "sui-test-transaction-builder", "sui-types", @@ -13394,7 +13394,7 @@ dependencies = [ "sui-json", "sui-json-rpc-types", "sui-keys", - "sui-pg-temp-db", + "sui-pg-db", "sui-sdk", "sui-swarm", "sui-swarm-config", @@ -14024,7 +14024,7 @@ dependencies = [ "sui-json-rpc-types", "sui-move-build", "sui-package-resolver", - "sui-pg-temp-db", + "sui-pg-db", "sui-protocol-config", "sui-rpc-api", "sui-sdk", @@ -14119,7 +14119,7 @@ dependencies = [ "sui-move-build", "sui-open-rpc", "sui-package-resolver", - "sui-pg-temp-db", + "sui-pg-db", "sui-protocol-config", "sui-rpc-api", "sui-sdk", @@ -14162,6 +14162,7 @@ dependencies = [ "sui-field-count", "sui-indexer-alt-framework", "sui-indexer-alt-schema", + "sui-pg-db", "sui-protocol-config", "sui-synthetic-ingestion", "sui-types", @@ -14194,7 +14195,7 @@ dependencies = [ "reqwest 0.12.5", "serde", "sui-field-count", - "sui-pg-temp-db", + "sui-pg-db", "sui-storage", "sui-types", "telemetry-subscribers", @@ -14718,7 +14719,7 @@ dependencies = [ "sui-json-rpc-types", "sui-move-build", "sui-package-resolver", - "sui-pg-temp-db", + "sui-pg-db", "sui-protocol-config", "sui-rpc-api", "sui-sdk", @@ -15030,12 +15031,16 @@ dependencies = [ ] [[package]] -name = "sui-pg-temp-db" +name = "sui-pg-db" version = "1.40.0" dependencies = [ "anyhow", + "bb8", + "clap", "diesel", "diesel-async", + "diesel_migrations", + "telemetry-subscribers", "tempfile", "tokio", "tracing", @@ -16411,7 +16416,7 @@ dependencies = [ "sui-keys", "sui-macros", "sui-node", - "sui-pg-temp-db", + "sui-pg-db", "sui-protocol-config", "sui-sdk", "sui-simulator", diff --git a/Cargo.toml b/Cargo.toml index ae28dc78f7951..af50b5241fa6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,7 +143,7 @@ members = [ "crates/sui-package-dump", "crates/sui-package-management", "crates/sui-package-resolver", - "crates/sui-pg-temp-db", + "crates/sui-pg-db", "crates/sui-proc-macros", "crates/sui-protocol-config", "crates/sui-protocol-config-macros", @@ -671,7 +671,7 @@ sui-open-rpc-macros = { path = "crates/sui-open-rpc-macros" } sui-package-dump = { path = "crates/sui-package-dump" } sui-package-management = { path = "crates/sui-package-management" } sui-package-resolver = { path = "crates/sui-package-resolver" } -sui-pg-temp-db = { path = "crates/sui-pg-temp-db" } +sui-pg-db = { path = "crates/sui-pg-db" } sui-proc-macros = { path = "crates/sui-proc-macros" } sui-protocol-config = { path = "crates/sui-protocol-config" } sui-protocol-config-macros = { path = "crates/sui-protocol-config-macros" } diff --git a/crates/sui-bridge-indexer/Cargo.toml b/crates/sui-bridge-indexer/Cargo.toml index e401f341ef2af..ab5ca21f24c59 100644 --- a/crates/sui-bridge-indexer/Cargo.toml +++ b/crates/sui-bridge-indexer/Cargo.toml @@ -29,7 +29,7 @@ sui-bridge.workspace = true sui-sdk.workspace = true sui-json-rpc-types.workspace = true sui-data-ingestion-core.workspace = true -sui-pg-temp-db.workspace = true +sui-pg-db.workspace = true sui-types.workspace = true telemetry-subscribers.workspace = true tracing.workspace = true diff --git a/crates/sui-bridge-indexer/tests/indexer_tests.rs b/crates/sui-bridge-indexer/tests/indexer_tests.rs index 6ddda14629f0a..76b539dbe49f0 100644 --- a/crates/sui-bridge-indexer/tests/indexer_tests.rs +++ b/crates/sui-bridge-indexer/tests/indexer_tests.rs @@ -19,7 +19,7 @@ use sui_bridge_indexer::{create_sui_indexer, schema}; use sui_data_ingestion_core::DataIngestionMetrics; use sui_indexer::database::Connection; use sui_indexer_builder::indexer_builder::IndexerProgressStore; -use sui_pg_temp_db::TempDb; +use sui_pg_db::temp::TempDb; const MIGRATIONS: EmbeddedMigrations = embed_migrations!("src/migrations"); diff --git a/crates/sui-cluster-test/Cargo.toml b/crates/sui-cluster-test/Cargo.toml index 87b2add805c32..4b890ba8a6c7a 100644 --- a/crates/sui-cluster-test/Cargo.toml +++ b/crates/sui-cluster-test/Cargo.toml @@ -28,7 +28,7 @@ regex.workspace = true sui-indexer.workspace = true sui-faucet.workspace = true sui-graphql-rpc.workspace = true -sui-pg-temp-db.workspace = true +sui-pg-db.workspace = true sui-swarm.workspace = true sui-swarm-config.workspace = true sui-json-rpc-types.workspace = true diff --git a/crates/sui-cluster-test/src/cluster.rs b/crates/sui-cluster-test/src/cluster.rs index 61cb2d7062f55..b21095c64cea7 100644 --- a/crates/sui-cluster-test/src/cluster.rs +++ b/crates/sui-cluster-test/src/cluster.rs @@ -14,7 +14,7 @@ use sui_indexer::test_utils::{ start_indexer_jsonrpc_for_testing, start_indexer_writer_for_testing, }; use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore}; -use sui_pg_temp_db::TempDb; +use sui_pg_db::temp::TempDb; use sui_sdk::sui_client_config::{SuiClientConfig, SuiEnv}; use sui_sdk::wallet_context::WalletContext; use sui_swarm::memory::Swarm; diff --git a/crates/sui-graphql-rpc/Cargo.toml b/crates/sui-graphql-rpc/Cargo.toml index d8f797650fcdf..73a8cae06bb1f 100644 --- a/crates/sui-graphql-rpc/Cargo.toml +++ b/crates/sui-graphql-rpc/Cargo.toml @@ -47,7 +47,7 @@ serde_with.workspace = true serde_yaml.workspace = true shared-crypto.workspace = true similar.workspace = true -sui-pg-temp-db.workspace = true +sui-pg-db.workspace = true sui-sdk.workspace = true sui-types.workspace = true tap.workspace = true diff --git a/crates/sui-graphql-rpc/src/data/pg.rs b/crates/sui-graphql-rpc/src/data/pg.rs index 34d5242ea0e47..c61a02e4bbf88 100644 --- a/crates/sui-graphql-rpc/src/data/pg.rs +++ b/crates/sui-graphql-rpc/src/data/pg.rs @@ -249,7 +249,7 @@ mod tests { database::Connection, db::reset_database, models::objects::StoredObject, schema::objects, types::IndexedObject, }; - use sui_pg_temp_db::TempDb; + use sui_pg_db::temp::TempDb; #[tokio::test] async fn test_query_cost() { diff --git a/crates/sui-graphql-rpc/src/server/builder.rs b/crates/sui-graphql-rpc/src/server/builder.rs index 396eb0eb93308..0e8a7169a20d7 100644 --- a/crates/sui-graphql-rpc/src/server/builder.rs +++ b/crates/sui-graphql-rpc/src/server/builder.rs @@ -692,7 +692,7 @@ pub mod tests { use serde_json::json; use std::sync::Arc; use std::time::Duration; - use sui_pg_temp_db::get_available_port; + use sui_pg_db::temp::get_available_port; use sui_sdk::SuiClient; use sui_types::digests::get_mainnet_chain_identifier; use sui_types::transaction::TransactionData; diff --git a/crates/sui-graphql-rpc/src/test_infra/cluster.rs b/crates/sui-graphql-rpc/src/test_infra/cluster.rs index 83610f6c3f011..6469144784b0c 100644 --- a/crates/sui-graphql-rpc/src/test_infra/cluster.rs +++ b/crates/sui-graphql-rpc/src/test_infra/cluster.rs @@ -19,7 +19,7 @@ pub use sui_indexer::config::SnapshotLagConfig; use sui_indexer::errors::IndexerError; use sui_indexer::store::PgIndexerStore; use sui_indexer::test_utils::start_indexer_writer_for_testing; -use sui_pg_temp_db::{get_available_port, TempDb}; +use sui_pg_db::temp::{get_available_port, TempDb}; use sui_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT}; use sui_types::storage::RpcStateReader; use tempfile::tempdir; diff --git a/crates/sui-graphql-rpc/tests/move_registry_e2e.rs b/crates/sui-graphql-rpc/tests/move_registry_e2e.rs index 36320b42b9859..5ee3ed5714dfa 100644 --- a/crates/sui-graphql-rpc/tests/move_registry_e2e.rs +++ b/crates/sui-graphql-rpc/tests/move_registry_e2e.rs @@ -15,7 +15,7 @@ use sui_graphql_rpc_client::simple_client::SimpleClient; use sui_json_rpc::name_service::{Domain, DomainFormat}; use sui_json_rpc_types::ObjectChange; use sui_move_build::BuildConfig; -use sui_pg_temp_db::get_available_port; +use sui_pg_db::temp::get_available_port; use sui_types::{ base_types::{ObjectID, SequenceNumber}, digests::ObjectDigest, diff --git a/crates/sui-indexer-alt-framework/Cargo.toml b/crates/sui-indexer-alt-framework/Cargo.toml index 1d4fe70045dba..9342f72c4c9ff 100644 --- a/crates/sui-indexer-alt-framework/Cargo.toml +++ b/crates/sui-indexer-alt-framework/Cargo.toml @@ -29,6 +29,7 @@ tracing.workspace = true url.workspace = true sui-field-count.workspace = true +sui-pg-db.workspace = true sui-storage.workspace = true sui-types.workspace = true @@ -37,5 +38,3 @@ rand.workspace = true telemetry-subscribers.workspace = true tempfile.workspace = true wiremock.workspace = true - -sui-pg-temp-db.workspace = true diff --git a/crates/sui-indexer-alt-framework/src/lib.rs b/crates/sui-indexer-alt-framework/src/lib.rs index 9ebe7ed77e47c..77fe3c7dfb120 100644 --- a/crates/sui-indexer-alt-framework/src/lib.rs +++ b/crates/sui-indexer-alt-framework/src/lib.rs @@ -4,8 +4,11 @@ use std::{collections::BTreeSet, net::SocketAddr, sync::Arc}; use anyhow::{ensure, Context, Result}; -use db::{Db, DbArgs}; -use diesel_migrations::EmbeddedMigrations; +use diesel::{ + migration::{self, Migration, MigrationSource}, + pg::Pg, +}; +use diesel_migrations::{embed_migrations, EmbeddedMigrations}; use ingestion::{client::IngestionClient, ClientArgs, IngestionConfig, IngestionService}; use metrics::{IndexerMetrics, MetricsService}; use pipeline::{ @@ -13,13 +16,13 @@ use pipeline::{ sequential::{self, SequentialConfig}, Processor, }; +use sui_pg_db::{Db, DbArgs}; use task::graceful_shutdown; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; use watermarks::CommitterWatermark; -pub mod db; pub mod ingestion; pub(crate) mod metrics; pub mod pipeline; @@ -27,6 +30,8 @@ pub(crate) mod schema; pub mod task; pub(crate) mod watermarks; +const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); + /// Command-line arguments for the indexer #[derive(clap::Args, Debug, Clone)] pub struct IndexerArgs { @@ -135,7 +140,7 @@ impl Indexer { .context("Failed to connect to database")?; // At indexer initialization, we ensure that the DB schema is up-to-date. - db.run_migrations(migrations) + db.run_migrations(Self::migrations(migrations)) .await .context("Failed to run pending migrations")?; @@ -337,6 +342,25 @@ impl Indexer { })) } + /// Combine the provided `migrations` with the migrations necessary to set up the indexer + /// framework. The returned migration source can be passed to [Db::run_migrations] to ensure + /// the database's schema is up-to-date for both the indexer framework and the specific + /// indexer. + pub fn migrations( + migrations: &'static EmbeddedMigrations, + ) -> impl MigrationSource + Send + Sync + 'static { + struct Migrations(&'static EmbeddedMigrations); + impl MigrationSource for Migrations { + fn migrations(&self) -> migration::Result>>> { + let mut migrations = MIGRATIONS.migrations()?; + migrations.extend(self.0.migrations()?); + Ok(migrations) + } + } + + Migrations(migrations) + } + /// Update the indexer's first checkpoint based on the watermark for the pipeline by adding for /// handler `H` (as long as it's enabled). Returns `Ok(None)` if the pipeline is disabled, /// `Ok(Some(None))` if the pipeline is enabled but its watermark is not found, and diff --git a/crates/sui-indexer-alt-framework/src/metrics.rs b/crates/sui-indexer-alt-framework/src/metrics.rs index 7f2bba651fb99..492de934cb30f 100644 --- a/crates/sui-indexer-alt-framework/src/metrics.rs +++ b/crates/sui-indexer-alt-framework/src/metrics.rs @@ -13,11 +13,12 @@ use prometheus::{ register_int_gauge_vec_with_registry, register_int_gauge_with_registry, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry, TextEncoder, }; +use sui_pg_db::Db; use tokio::{net::TcpListener, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; -use crate::{db::Db, ingestion::error::Error}; +use crate::ingestion::error::Error; /// Histogram buckets for the distribution of checkpoint fetching latencies. const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[ diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs index e390df37370b3..96d3e276ec9d6 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs @@ -221,12 +221,10 @@ fn move_ready_checkpoints( #[cfg(test)] mod tests { use sui_field_count::FieldCount; + use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; - use crate::{ - db, - pipeline::{concurrent::max_chunk_rows, Processor}, - }; + use crate::pipeline::{concurrent::max_chunk_rows, Processor}; use super::*; diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs index f35140fb6205a..96ade643909d7 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs @@ -7,6 +7,7 @@ use std::{ sync::Arc, }; +use sui_pg_db::Db; use tokio::{ sync::mpsc, task::JoinHandle, @@ -16,7 +17,6 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use crate::{ - db::Db, metrics::IndexerMetrics, pipeline::{logging::WatermarkLogger, CommitterConfig, WatermarkPart, WARN_PENDING_WATERMARKS}, watermarks::CommitterWatermark, diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs index 2897e75306a61..0b2f25f50bbd3 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs @@ -4,13 +4,13 @@ use std::{sync::Arc, time::Duration}; use backoff::ExponentialBackoff; +use sui_pg_db::Db; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use crate::{ - db::Db, metrics::IndexerMetrics, pipeline::{Break, CommitterConfig, WatermarkPart}, task::TrySpawnStreamExt, diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs index 93d05355c0940..35a2aa2f6a128 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs @@ -5,15 +5,12 @@ use std::{sync::Arc, time::Duration}; use serde::{Deserialize, Serialize}; use sui_field_count::FieldCount; +use sui_pg_db::{self as db, Db}; use sui_types::full_checkpoint_content::CheckpointData; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; -use crate::{ - db::{self, Db}, - metrics::IndexerMetrics, - watermarks::CommitterWatermark, -}; +use crate::{metrics::IndexerMetrics, watermarks::CommitterWatermark}; use super::{processor::processor, CommitterConfig, Processor, WatermarkPart, PIPELINE_BUFFER}; diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs index 365bbc2f03646..721035c77713d 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs @@ -3,6 +3,7 @@ use std::sync::Arc; +use sui_pg_db::Db; use tokio::{ task::JoinHandle, time::{interval, MissedTickBehavior}, @@ -11,7 +12,6 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use crate::{ - db::Db, metrics::IndexerMetrics, pipeline::logging::{LoggerWatermark, WatermarkLogger}, watermarks::PrunerWatermark, diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/reader_watermark.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/reader_watermark.rs index 03b18ee4ac352..770a914f27bb1 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/reader_watermark.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/reader_watermark.rs @@ -3,12 +3,12 @@ use std::sync::Arc; +use sui_pg_db::Db; use tokio::{task::JoinHandle, time::interval}; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; use crate::{ - db::Db, metrics::IndexerMetrics, watermarks::{ReaderWatermark, StoredWatermark}, }; diff --git a/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs b/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs index 4f72c9cdc4244..c6550a4417ccc 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs @@ -4,6 +4,7 @@ use std::{cmp::Ordering, collections::BTreeMap, sync::Arc}; use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection}; +use sui_pg_db::Db; use tokio::{ sync::mpsc, task::JoinHandle, @@ -13,7 +14,6 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; use crate::{ - db::Db, metrics::IndexerMetrics, pipeline::{logging::WatermarkLogger, Indexed, WARN_PENDING_WATERMARKS}, watermarks::CommitterWatermark, diff --git a/crates/sui-indexer-alt-framework/src/pipeline/sequential/mod.rs b/crates/sui-indexer-alt-framework/src/pipeline/sequential/mod.rs index 5cac521e30260..b5f3c33899e22 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/sequential/mod.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/sequential/mod.rs @@ -4,15 +4,12 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; +use sui_pg_db::{self as db, Db}; use sui_types::full_checkpoint_content::CheckpointData; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; -use crate::{ - db::{self, Db}, - metrics::IndexerMetrics, - watermarks::CommitterWatermark, -}; +use crate::{metrics::IndexerMetrics, watermarks::CommitterWatermark}; use super::{processor::processor, CommitterConfig, Processor, PIPELINE_BUFFER}; diff --git a/crates/sui-indexer-alt-framework/src/watermarks.rs b/crates/sui-indexer-alt-framework/src/watermarks.rs index 1ba188b04e0ba..975d3cd593925 100644 --- a/crates/sui-indexer-alt-framework/src/watermarks.rs +++ b/crates/sui-indexer-alt-framework/src/watermarks.rs @@ -7,8 +7,9 @@ use chrono::{naive::NaiveDateTime, DateTime, Utc}; use diesel::{dsl::sql, prelude::*, sql_types}; use diesel_async::RunQueryDsl; use sui_field_count::FieldCount; +use sui_pg_db::Connection; -use crate::{db::Connection, schema::watermarks}; +use crate::schema::watermarks; #[derive(Insertable, Selectable, Queryable, Debug, Clone, FieldCount)] #[diesel(table_name = watermarks)] diff --git a/crates/sui-indexer-alt/Cargo.toml b/crates/sui-indexer-alt/Cargo.toml index 5f6e79d01e464..673b6450e509b 100644 --- a/crates/sui-indexer-alt/Cargo.toml +++ b/crates/sui-indexer-alt/Cargo.toml @@ -31,6 +31,7 @@ sui-default-config.workspace = true sui-field-count.workspace = true sui-indexer-alt-framework.workspace = true sui-indexer-alt-schema.workspace = true +sui-pg-db.workspace = true sui-protocol-config.workspace = true sui-types.workspace = true sui-synthetic-ingestion = { workspace = true, optional = true } diff --git a/crates/sui-indexer-alt/src/args.rs b/crates/sui-indexer-alt/src/args.rs index ab799683b876e..f82ab500d1aff 100644 --- a/crates/sui-indexer-alt/src/args.rs +++ b/crates/sui-indexer-alt/src/args.rs @@ -7,7 +7,8 @@ use std::path::PathBuf; use crate::benchmark::BenchmarkArgs; use crate::IndexerArgs; use clap::Subcommand; -use sui_indexer_alt_framework::{db::DbArgs, ingestion::ClientArgs}; +use sui_indexer_alt_framework::ingestion::ClientArgs; +use sui_pg_db::DbArgs; #[derive(clap::Parser, Debug, Clone)] pub struct Args { diff --git a/crates/sui-indexer-alt/src/benchmark.rs b/crates/sui-indexer-alt/src/benchmark.rs index 73281e5230541..09d8bea02f755 100644 --- a/crates/sui-indexer-alt/src/benchmark.rs +++ b/crates/sui-indexer-alt/src/benchmark.rs @@ -3,12 +3,9 @@ use std::{path::PathBuf, time::Instant}; -use sui_indexer_alt_framework::{ - db::{reset_database, DbArgs}, - ingestion::ClientArgs, - IndexerArgs, -}; +use sui_indexer_alt_framework::{ingestion::ClientArgs, Indexer, IndexerArgs}; use sui_indexer_alt_schema::MIGRATIONS; +use sui_pg_db::{reset_database, DbArgs}; use sui_synthetic_ingestion::synthetic_ingestion::read_ingestion_data; use crate::{config::IndexerConfig, start_indexer}; @@ -40,7 +37,7 @@ pub async fn run_benchmark( let last_checkpoint = *ingestion_data.keys().last().unwrap(); let num_transactions: usize = ingestion_data.values().map(|c| c.transactions.len()).sum(); - reset_database(db_args.clone(), Some(&MIGRATIONS)).await?; + reset_database(db_args.clone(), Some(Indexer::migrations(&MIGRATIONS))).await?; let indexer_args = IndexerArgs { first_checkpoint: Some(first_checkpoint), diff --git a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs index 2eab83efd02ae..b5f6f0a4b7ea3 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs @@ -5,13 +5,10 @@ use std::{collections::BTreeSet, sync::Arc}; use anyhow::Result; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_types::full_checkpoint_content::CheckpointData; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{events::StoredEvEmitMod, schema::ev_emit_mod}; +use sui_pg_db as db; +use sui_types::full_checkpoint_content::CheckpointData; pub(crate) struct EvEmitMod; diff --git a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs index e372d6a7f0c11..c66d5592fe57e 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs @@ -5,13 +5,10 @@ use std::{collections::BTreeSet, sync::Arc}; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_types::full_checkpoint_content::CheckpointData; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{events::StoredEvStructInst, schema::ev_struct_inst}; +use sui_pg_db as db; +use sui_types::full_checkpoint_content::CheckpointData; pub(crate) struct EvStructInst; diff --git a/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs b/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs index 1bccfffad293d..a9bc26a7e90f4 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs @@ -5,13 +5,10 @@ use std::sync::Arc; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_types::full_checkpoint_content::CheckpointData; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{checkpoints::StoredCheckpoint, schema::kv_checkpoints}; +use sui_pg_db as db; +use sui_types::full_checkpoint_content::CheckpointData; pub(crate) struct KvCheckpoints; diff --git a/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs b/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs index 57714de291b5f..926d9325f442e 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs @@ -5,18 +5,15 @@ use std::sync::Arc; use anyhow::{bail, Context, Result}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_schema::{epochs::StoredEpochEnd, schema::kv_epoch_ends}; +use sui_pg_db as db; use sui_types::{ event::SystemEpochInfoEvent, full_checkpoint_content::CheckpointData, transaction::{TransactionDataAPI, TransactionKind}, }; -use sui_indexer_alt_schema::{epochs::StoredEpochEnd, schema::kv_epoch_ends}; - pub(crate) struct KvEpochEnds; impl Processor for KvEpochEnds { diff --git a/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs b/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs index 224ca11625809..bd5efcdf61463 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs @@ -5,18 +5,15 @@ use std::sync::Arc; use anyhow::{bail, Context, Result}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_schema::{epochs::StoredEpochStart, schema::kv_epoch_starts}; +use sui_pg_db as db; use sui_types::{ full_checkpoint_content::CheckpointData, sui_system_state::{get_sui_system_state, SuiSystemStateTrait}, transaction::{TransactionDataAPI, TransactionKind}, }; -use sui_indexer_alt_schema::{epochs::StoredEpochStart, schema::kv_epoch_starts}; - pub(crate) struct KvEpochStarts; impl Processor for KvEpochStarts { diff --git a/crates/sui-indexer-alt/src/handlers/kv_feature_flags.rs b/crates/sui-indexer-alt/src/handlers/kv_feature_flags.rs index 792314c93ac87..480492f3d72ba 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_feature_flags.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_feature_flags.rs @@ -5,16 +5,13 @@ use std::sync::Arc; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_protocol_config::ProtocolConfig; -use sui_types::full_checkpoint_content::CheckpointData; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{ checkpoints::StoredGenesis, epochs::StoredFeatureFlag, schema::kv_feature_flags, }; +use sui_pg_db as db; +use sui_protocol_config::ProtocolConfig; +use sui_types::full_checkpoint_content::CheckpointData; pub(crate) struct KvFeatureFlags(pub(crate) StoredGenesis); diff --git a/crates/sui-indexer-alt/src/handlers/kv_objects.rs b/crates/sui-indexer-alt/src/handlers/kv_objects.rs index 583857bc59cad..02dd5253cf17c 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_objects.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_objects.rs @@ -5,13 +5,10 @@ use std::sync::Arc; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_types::full_checkpoint_content::CheckpointData; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{objects::StoredObject, schema::kv_objects}; +use sui_pg_db as db; +use sui_types::full_checkpoint_content::CheckpointData; pub(crate) struct KvObjects; diff --git a/crates/sui-indexer-alt/src/handlers/kv_protocol_configs.rs b/crates/sui-indexer-alt/src/handlers/kv_protocol_configs.rs index a6222555fedf8..6540391c68b23 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_protocol_configs.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_protocol_configs.rs @@ -5,16 +5,13 @@ use std::sync::Arc; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_protocol_config::ProtocolConfig; -use sui_types::full_checkpoint_content::CheckpointData; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{ checkpoints::StoredGenesis, epochs::StoredProtocolConfig, schema::kv_protocol_configs, }; +use sui_pg_db as db; +use sui_protocol_config::ProtocolConfig; +use sui_types::full_checkpoint_content::CheckpointData; pub(crate) struct KvProtocolConfigs(pub(crate) StoredGenesis); diff --git a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs index edc5567eeb979..7bef2130d8177 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs @@ -5,13 +5,10 @@ use std::sync::Arc; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_types::full_checkpoint_content::CheckpointData; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{schema::kv_transactions, transactions::StoredTransaction}; +use sui_pg_db as db; +use sui_types::full_checkpoint_content::CheckpointData; pub(crate) struct KvTransactions; diff --git a/crates/sui-indexer-alt/src/handlers/obj_info.rs b/crates/sui-indexer-alt/src/handlers/obj_info.rs index 5a3bc06092f5e..1da35917fedf0 100644 --- a/crates/sui-indexer-alt/src/handlers/obj_info.rs +++ b/crates/sui-indexer-alt/src/handlers/obj_info.rs @@ -6,21 +6,18 @@ use std::{collections::BTreeMap, sync::Arc}; use anyhow::{anyhow, Result}; use diesel_async::RunQueryDsl; use sui_field_count::FieldCount; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_schema::{ + objects::{StoredObjInfo, StoredOwnerKind}, + schema::obj_info, }; +use sui_pg_db as db; use sui_types::{ base_types::ObjectID, full_checkpoint_content::CheckpointData, object::{Object, Owner}, }; -use sui_indexer_alt_schema::{ - objects::{StoredObjInfo, StoredOwnerKind}, - schema::obj_info, -}; - pub(crate) struct ObjInfo; pub(crate) enum ProcessedObjInfoUpdate { diff --git a/crates/sui-indexer-alt/src/handlers/obj_info_pruner.rs b/crates/sui-indexer-alt/src/handlers/obj_info_pruner.rs index 7fd3d59a3d7f4..7060a45bc81b8 100644 --- a/crates/sui-indexer-alt/src/handlers/obj_info_pruner.rs +++ b/crates/sui-indexer-alt/src/handlers/obj_info_pruner.rs @@ -6,11 +6,9 @@ use std::{collections::BTreeMap, sync::Arc}; use anyhow::Result; use diesel::ExpressionMethods; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::schema::obj_info; +use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; use super::obj_info::{ObjInfo, ProcessedObjInfo, ProcessedObjInfoUpdate}; diff --git a/crates/sui-indexer-alt/src/handlers/obj_versions.rs b/crates/sui-indexer-alt/src/handlers/obj_versions.rs index c7ec9d66ddc62..9c1022103f384 100644 --- a/crates/sui-indexer-alt/src/handlers/obj_versions.rs +++ b/crates/sui-indexer-alt/src/handlers/obj_versions.rs @@ -5,13 +5,10 @@ use std::sync::Arc; use anyhow::Result; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_types::full_checkpoint_content::CheckpointData; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{objects::StoredObjVersion, schema::obj_versions}; +use sui_pg_db as db; +use sui_types::full_checkpoint_content::CheckpointData; pub(crate) struct ObjVersions; diff --git a/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs b/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs index 03c27ff9dbbea..0e2a067440317 100644 --- a/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs +++ b/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs @@ -11,20 +11,17 @@ use diesel::{upsert::excluded, ExpressionMethods}; use diesel_async::RunQueryDsl; use futures::future::{try_join_all, Either}; use sui_field_count::FieldCount; -use sui_indexer_alt_framework::{ - db, - pipeline::{sequential::Handler, Processor}, +use sui_indexer_alt_framework::pipeline::{sequential::Handler, Processor}; +use sui_indexer_alt_schema::{ + objects::{StoredObjectUpdate, StoredSumCoinBalance}, + schema::sum_coin_balances, }; +use sui_pg_db as db; use sui_types::{ base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData, object::Owner, }; -use sui_indexer_alt_schema::{ - objects::{StoredObjectUpdate, StoredSumCoinBalance}, - schema::sum_coin_balances, -}; - const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredSumCoinBalance::FIELD_COUNT; const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize; diff --git a/crates/sui-indexer-alt/src/handlers/sum_displays.rs b/crates/sui-indexer-alt/src/handlers/sum_displays.rs index a245bf17cd317..9da4d18bc4c79 100644 --- a/crates/sui-indexer-alt/src/handlers/sum_displays.rs +++ b/crates/sui-indexer-alt/src/handlers/sum_displays.rs @@ -8,13 +8,10 @@ use diesel::{upsert::excluded, ExpressionMethods}; use diesel_async::RunQueryDsl; use futures::future::try_join_all; use sui_field_count::FieldCount; -use sui_indexer_alt_framework::{ - db, - pipeline::{sequential::Handler, Processor}, -}; -use sui_types::{display::DisplayVersionUpdatedEvent, full_checkpoint_content::CheckpointData}; - +use sui_indexer_alt_framework::pipeline::{sequential::Handler, Processor}; use sui_indexer_alt_schema::{displays::StoredDisplay, schema::sum_displays}; +use sui_pg_db as db; +use sui_types::{display::DisplayVersionUpdatedEvent, full_checkpoint_content::CheckpointData}; const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredDisplay::FIELD_COUNT; diff --git a/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs b/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs index 1ef4cf1241c24..84e61e2e01523 100644 --- a/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs +++ b/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs @@ -11,20 +11,17 @@ use diesel::{upsert::excluded, ExpressionMethods}; use diesel_async::RunQueryDsl; use futures::future::{try_join_all, Either}; use sui_field_count::FieldCount; -use sui_indexer_alt_framework::{ - db, - pipeline::{sequential::Handler, Processor}, +use sui_indexer_alt_framework::pipeline::{sequential::Handler, Processor}; +use sui_indexer_alt_schema::{ + objects::{StoredObjectUpdate, StoredOwnerKind, StoredSumObjType}, + schema::sum_obj_types, }; +use sui_pg_db as db; use sui_types::{ base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData, object::Owner, }; -use sui_indexer_alt_schema::{ - objects::{StoredObjectUpdate, StoredOwnerKind, StoredSumObjType}, - schema::sum_obj_types, -}; - const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredSumObjType::FIELD_COUNT; const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize; diff --git a/crates/sui-indexer-alt/src/handlers/sum_packages.rs b/crates/sui-indexer-alt/src/handlers/sum_packages.rs index a4fa9891ea6b3..7567754699e74 100644 --- a/crates/sui-indexer-alt/src/handlers/sum_packages.rs +++ b/crates/sui-indexer-alt/src/handlers/sum_packages.rs @@ -8,13 +8,10 @@ use diesel::{upsert::excluded, ExpressionMethods}; use diesel_async::RunQueryDsl; use futures::future::try_join_all; use sui_field_count::FieldCount; -use sui_indexer_alt_framework::{ - db, - pipeline::{sequential::Handler, Processor}, -}; -use sui_types::full_checkpoint_content::CheckpointData; - +use sui_indexer_alt_framework::pipeline::{sequential::Handler, Processor}; use sui_indexer_alt_schema::{packages::StoredPackage, schema::sum_packages}; +use sui_pg_db as db; +use sui_types::full_checkpoint_content::CheckpointData; const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredPackage::FIELD_COUNT; diff --git a/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs b/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs index 10374a89ce9c3..51fb7e6917b8f 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs @@ -6,15 +6,12 @@ use std::sync::Arc; use anyhow::Result; use diesel_async::RunQueryDsl; use itertools::Itertools; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_types::{full_checkpoint_content::CheckpointData, object::Owner}; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{ schema::tx_affected_addresses, transactions::StoredTxAffectedAddress, }; +use sui_pg_db as db; +use sui_types::{full_checkpoint_content::CheckpointData, object::Owner}; pub(crate) struct TxAffectedAddresses; diff --git a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs index ad016e06957fd..c99f8dd56a49b 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs @@ -5,13 +5,10 @@ use std::sync::Arc; use anyhow::Result; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_types::{effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData}; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{schema::tx_affected_objects, transactions::StoredTxAffectedObject}; +use sui_pg_db as db; +use sui_types::{effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData}; pub(crate) struct TxAffectedObjects; diff --git a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs index 0fdb0f610d11e..31a49d33943cc 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs @@ -5,10 +5,12 @@ use std::{collections::BTreeMap, sync::Arc}; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_schema::{ + schema::tx_balance_changes, + transactions::{BalanceChange, StoredTxBalanceChange}, }; +use sui_pg_db as db; use sui_types::{ coin::Coin, effects::TransactionEffectsAPI, @@ -16,11 +18,6 @@ use sui_types::{ gas_coin::GAS, }; -use sui_indexer_alt_schema::{ - schema::tx_balance_changes, - transactions::{BalanceChange, StoredTxBalanceChange}, -}; - pub(crate) struct TxBalanceChanges; impl Processor for TxBalanceChanges { diff --git a/crates/sui-indexer-alt/src/handlers/tx_calls.rs b/crates/sui-indexer-alt/src/handlers/tx_calls.rs index f791a508721a0..e189bdd9acd2d 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_calls.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_calls.rs @@ -5,15 +5,12 @@ use std::sync::Arc; use anyhow::{Ok, Result}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_schema::{schema::tx_calls, transactions::StoredTxCalls}; +use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; use sui_types::transaction::TransactionDataAPI; -use sui_indexer_alt_schema::{schema::tx_calls, transactions::StoredTxCalls}; - pub(crate) struct TxCalls; impl Processor for TxCalls { diff --git a/crates/sui-indexer-alt/src/handlers/tx_digests.rs b/crates/sui-indexer-alt/src/handlers/tx_digests.rs index 3aa9593ef0a29..579ec32429240 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_digests.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_digests.rs @@ -5,13 +5,10 @@ use std::sync::Arc; use anyhow::Result; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_types::full_checkpoint_content::CheckpointData; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{schema::tx_digests, transactions::StoredTxDigest}; +use sui_pg_db as db; +use sui_types::full_checkpoint_content::CheckpointData; pub(crate) struct TxDigests; diff --git a/crates/sui-indexer-alt/src/handlers/tx_kinds.rs b/crates/sui-indexer-alt/src/handlers/tx_kinds.rs index a98f13688d3a5..5f61e66be360f 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_kinds.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_kinds.rs @@ -5,16 +5,13 @@ use std::sync::Arc; use anyhow::Result; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_types::full_checkpoint_content::CheckpointData; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{ schema::tx_kinds, transactions::{StoredKind, StoredTxKind}, }; +use sui_pg_db as db; +use sui_types::full_checkpoint_content::CheckpointData; pub(crate) struct TxKinds; diff --git a/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs b/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs index 92205162e0d34..849b2d1faed7f 100644 --- a/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs +++ b/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs @@ -6,16 +6,13 @@ use std::sync::Arc; use anyhow::Result; use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_types::full_checkpoint_content::CheckpointData; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{ objects::{StoredObjectUpdate, StoredSumCoinBalance, StoredWalCoinBalance}, schema::wal_coin_balances, }; +use sui_pg_db as db; +use sui_types::full_checkpoint_content::CheckpointData; use super::sum_coin_balances::SumCoinBalances; diff --git a/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs b/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs index fade4ff095a6d..c0a6feb51cd24 100644 --- a/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs +++ b/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs @@ -6,16 +6,13 @@ use std::sync::Arc; use anyhow::Result; use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::{ - db, - pipeline::{concurrent::Handler, Processor}, -}; -use sui_types::full_checkpoint_content::CheckpointData; - +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{ objects::{StoredObjectUpdate, StoredSumObjType, StoredWalObjType}, schema::wal_obj_types, }; +use sui_pg_db as db; +use sui_types::full_checkpoint_content::CheckpointData; use super::sum_obj_types::SumObjTypes; diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index 0eeb2591c1d61..1483f8a0ac1ba 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -15,7 +15,6 @@ use handlers::{ tx_balance_changes::TxBalanceChanges, tx_calls::TxCalls, tx_digests::TxDigests, tx_kinds::TxKinds, wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes, }; -use sui_indexer_alt_framework::db::DbArgs; use sui_indexer_alt_framework::ingestion::{ClientArgs, IngestionConfig}; use sui_indexer_alt_framework::pipeline::{ concurrent::{ConcurrentConfig, PrunerConfig}, @@ -24,6 +23,7 @@ use sui_indexer_alt_framework::pipeline::{ }; use sui_indexer_alt_framework::{Indexer, IndexerArgs}; use sui_indexer_alt_schema::MIGRATIONS; +use sui_pg_db::DbArgs; use tokio_util::sync::CancellationToken; pub mod args; diff --git a/crates/sui-indexer-alt/src/main.rs b/crates/sui-indexer-alt/src/main.rs index d2ed2060ecc23..dba912d5b5351 100644 --- a/crates/sui-indexer-alt/src/main.rs +++ b/crates/sui-indexer-alt/src/main.rs @@ -12,8 +12,9 @@ use sui_indexer_alt::args::Command; use sui_indexer_alt::config::IndexerConfig; use sui_indexer_alt::config::Merge; use sui_indexer_alt::start_indexer; -use sui_indexer_alt_framework::db::reset_database; +use sui_indexer_alt_framework::Indexer; use sui_indexer_alt_schema::MIGRATIONS; +use sui_pg_db::reset_database; use tokio::fs; #[tokio::main] @@ -73,7 +74,11 @@ async fn main() -> Result<()> { } Command::ResetDatabase { skip_migrations } => { - reset_database(args.db_args, (!skip_migrations).then_some(&MIGRATIONS)).await?; + reset_database( + args.db_args, + (!skip_migrations).then(|| Indexer::migrations(&MIGRATIONS)), + ) + .await?; } #[cfg(feature = "benchmark")] diff --git a/crates/sui-indexer/Cargo.toml b/crates/sui-indexer/Cargo.toml index 002ba14a9ded6..f090010fee669 100644 --- a/crates/sui-indexer/Cargo.toml +++ b/crates/sui-indexer/Cargo.toml @@ -55,7 +55,7 @@ sui-json-rpc.workspace = true sui-json-rpc-api.workspace = true sui-json-rpc-types.workspace = true sui-open-rpc.workspace = true -sui-pg-temp-db.workspace = true +sui-pg-db.workspace = true sui-sdk.workspace = true sui-snapshot.workspace = true sui-storage.workspace = true diff --git a/crates/sui-indexer/src/db.rs b/crates/sui-indexer/src/db.rs index dc365c022b6d0..e955ea63cbdab 100644 --- a/crates/sui-indexer/src/db.rs +++ b/crates/sui-indexer/src/db.rs @@ -265,7 +265,7 @@ mod tests { use diesel::migration::{Migration, MigrationSource}; use diesel::pg::Pg; use diesel_migrations::MigrationHarness; - use sui_pg_temp_db::TempDb; + use sui_pg_db::temp::TempDb; // Check that the migration records in the database created from the local schema // pass the consistency check. @@ -397,7 +397,7 @@ mod tests { async fn temp_db_smoketest() { use crate::database::Connection; use diesel_async::RunQueryDsl; - use sui_pg_temp_db::TempDb; + use sui_pg_db::temp::TempDb; telemetry_subscribers::init_for_testing(); diff --git a/crates/sui-indexer/src/test_utils.rs b/crates/sui-indexer/src/test_utils.rs index a9b50769042b1..d6bc56bdfe26c 100644 --- a/crates/sui-indexer/src/test_utils.rs +++ b/crates/sui-indexer/src/test_utils.rs @@ -11,7 +11,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use sui_json_rpc_types::SuiTransactionBlockResponse; -use sui_pg_temp_db::{get_available_port, TempDb}; +use sui_pg_db::temp::{get_available_port, TempDb}; use crate::config::{IngestionConfig, RetentionConfig, SnapshotLagConfig, UploadOptions}; use crate::database::Connection; diff --git a/crates/sui-mvr-graphql-rpc/Cargo.toml b/crates/sui-mvr-graphql-rpc/Cargo.toml index 0cae0d26c50e6..77ebb84f57176 100644 --- a/crates/sui-mvr-graphql-rpc/Cargo.toml +++ b/crates/sui-mvr-graphql-rpc/Cargo.toml @@ -47,7 +47,7 @@ serde_with.workspace = true serde_yaml.workspace = true shared-crypto.workspace = true similar.workspace = true -sui-pg-temp-db.workspace = true +sui-pg-db.workspace = true sui-sdk.workspace = true sui-types.workspace = true tap.workspace = true diff --git a/crates/sui-mvr-graphql-rpc/src/data/pg.rs b/crates/sui-mvr-graphql-rpc/src/data/pg.rs index 34d5242ea0e47..c61a02e4bbf88 100644 --- a/crates/sui-mvr-graphql-rpc/src/data/pg.rs +++ b/crates/sui-mvr-graphql-rpc/src/data/pg.rs @@ -249,7 +249,7 @@ mod tests { database::Connection, db::reset_database, models::objects::StoredObject, schema::objects, types::IndexedObject, }; - use sui_pg_temp_db::TempDb; + use sui_pg_db::temp::TempDb; #[tokio::test] async fn test_query_cost() { diff --git a/crates/sui-mvr-graphql-rpc/src/server/builder.rs b/crates/sui-mvr-graphql-rpc/src/server/builder.rs index 9537f19c27d6e..161e92c83a0c3 100644 --- a/crates/sui-mvr-graphql-rpc/src/server/builder.rs +++ b/crates/sui-mvr-graphql-rpc/src/server/builder.rs @@ -692,7 +692,7 @@ pub mod tests { use serde_json::json; use std::sync::Arc; use std::time::Duration; - use sui_pg_temp_db::get_available_port; + use sui_pg_db::temp::get_available_port; use sui_sdk::SuiClient; use sui_types::digests::get_mainnet_chain_identifier; use sui_types::transaction::TransactionData; diff --git a/crates/sui-mvr-graphql-rpc/src/test_infra/cluster.rs b/crates/sui-mvr-graphql-rpc/src/test_infra/cluster.rs index f86b5ca0d08d2..c543acf71d4aa 100644 --- a/crates/sui-mvr-graphql-rpc/src/test_infra/cluster.rs +++ b/crates/sui-mvr-graphql-rpc/src/test_infra/cluster.rs @@ -19,7 +19,7 @@ pub use sui_indexer::config::SnapshotLagConfig; use sui_indexer::errors::IndexerError; use sui_indexer::store::PgIndexerStore; use sui_indexer::test_utils::start_indexer_writer_for_testing_with_mvr_mode; -use sui_pg_temp_db::{get_available_port, TempDb}; +use sui_pg_db::temp::{get_available_port, TempDb}; use sui_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT}; use sui_types::storage::RpcStateReader; use tempfile::tempdir; diff --git a/crates/sui-mvr-graphql-rpc/tests/move_registry_e2e.rs b/crates/sui-mvr-graphql-rpc/tests/move_registry_e2e.rs index 557142e61bede..fbb4bcda18a5d 100644 --- a/crates/sui-mvr-graphql-rpc/tests/move_registry_e2e.rs +++ b/crates/sui-mvr-graphql-rpc/tests/move_registry_e2e.rs @@ -15,7 +15,7 @@ use sui_mvr_graphql_rpc::{ wait_for_graphql_checkpoint_catchup, wait_for_graphql_server, NetworkCluster, }, }; -use sui_pg_temp_db::get_available_port; +use sui_pg_db::temp::get_available_port; use sui_types::{ base_types::{ObjectID, SequenceNumber}, digests::ObjectDigest, diff --git a/crates/sui-pg-temp-db/Cargo.toml b/crates/sui-pg-db/Cargo.toml similarity index 73% rename from crates/sui-pg-temp-db/Cargo.toml rename to crates/sui-pg-db/Cargo.toml index 1b85cdb94eae9..b9740cb54f0cd 100644 --- a/crates/sui-pg-temp-db/Cargo.toml +++ b/crates/sui-pg-db/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "sui-pg-temp-db" +name = "sui-pg-db" version.workspace = true authors = ["Mysten Labs "] license = "Apache-2.0" @@ -8,9 +8,15 @@ edition = "2021" [dependencies] anyhow.workspace = true +bb8 = "0.8.5" +clap.workspace = true diesel.workspace = true diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] } +diesel_migrations.workspace = true tempfile.workspace = true tokio = { workspace = true, features = ["full"] } tracing.workspace = true url.workspace = true + +[dev-dependencies] +telemetry-subscribers.workspace = true diff --git a/crates/sui-indexer-alt-framework/src/db.rs b/crates/sui-pg-db/src/lib.rs similarity index 82% rename from crates/sui-indexer-alt-framework/src/db.rs rename to crates/sui-pg-db/src/lib.rs index 0f04613787a30..affed173365d1 100644 --- a/crates/sui-indexer-alt-framework/src/db.rs +++ b/crates/sui-pg-db/src/lib.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::anyhow; -use diesel::migration::{self, Migration, MigrationSource, MigrationVersion}; +use diesel::migration::{MigrationSource, MigrationVersion}; use diesel::pg::Pg; use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; use diesel_async::{ @@ -12,14 +12,11 @@ use diesel_async::{ }, AsyncPgConnection, RunQueryDsl, }; -use diesel_migrations::{embed_migrations, EmbeddedMigrations}; use std::time::Duration; use tracing::info; use url::Url; -/// Migrations for schema that the indexer framework needs, regardless of the specific data being -/// indexed. -const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); +pub mod temp; #[derive(clap::Args, Debug, Clone)] pub struct DbArgs { @@ -71,7 +68,7 @@ impl Db { } /// Statistics about the connection pool - pub(crate) fn state(&self) -> bb8::State { + pub fn state(&self) -> bb8::State { self.pool.state() } @@ -126,27 +123,14 @@ impl Db { Ok(()) } - /// Run migrations on the database. Use the `migrations` parameter to pass in the migrations - /// that are specific to the indexer being run. Migrations that the indexer framework needs - /// will be added automatically. - /// - /// Use Diesel's `embed_migrations!` macro to generate the `migrations` parameter for your - /// indexer. - pub(crate) async fn run_migrations( + /// Run migrations on the database. Use Diesel's `embed_migrations!` macro to generate the + /// `migrations` parameter for your indexer. + pub async fn run_migrations + Send + Sync + 'static>( &self, - migrations: &'static EmbeddedMigrations, + migrations: S, ) -> Result>, anyhow::Error> { use diesel_migrations::MigrationHarness; - struct WithFrameworkMigrations(&'static EmbeddedMigrations); - impl MigrationSource for WithFrameworkMigrations { - fn migrations(&self) -> migration::Result>>> { - let mut migrations = self.0.migrations()?; - migrations.extend(MIGRATIONS.migrations()?); - Ok(migrations) - } - } - info!("Running migrations ..."); let conn = self.pool.dedicated_connection().await?; let mut wrapper: AsyncConnectionWrapper = @@ -154,11 +138,12 @@ impl Db { let finished_migrations = tokio::task::spawn_blocking(move || { wrapper - .run_pending_migrations(WithFrameworkMigrations(migrations)) + .run_pending_migrations(migrations) .map(|versions| versions.iter().map(MigrationVersion::as_owned).collect()) }) .await? .map_err(|e| anyhow!("Failed to run migrations: {:?}", e))?; + info!("Migrations complete."); Ok(finished_migrations) } @@ -178,13 +163,12 @@ impl Default for DbArgs { } /// Drop all tables, and re-run migrations if supplied. -pub async fn reset_database( +pub async fn reset_database + Send + Sync + 'static>( db_config: DbArgs, - migrations: Option<&'static EmbeddedMigrations>, + migrations: Option, ) -> Result<(), anyhow::Error> { let db = Db::new(db_config).await?; db.clear_database().await?; - if let Some(migrations) = migrations { db.run_migrations(migrations).await?; } @@ -195,15 +179,14 @@ pub async fn reset_database( #[cfg(test)] mod tests { use super::*; - use crate::db::{Db, DbArgs}; use diesel::prelude::QueryableByName; use diesel_async::RunQueryDsl; - use sui_pg_temp_db::TempDb; + use diesel_migrations::EmbeddedMigrations; #[tokio::test] async fn temp_db_smoketest() { telemetry_subscribers::init_for_testing(); - let db = TempDb::new().unwrap(); + let db = temp::TempDb::new().unwrap(); let url = db.database().url(); info!(%url); @@ -232,7 +215,7 @@ mod tests { #[tokio::test] async fn test_reset_database_skip_migrations() { - let temp_db = TempDb::new().unwrap(); + let temp_db = temp::TempDb::new().unwrap(); let url = temp_db.database().url(); let db_args = DbArgs { @@ -254,7 +237,9 @@ mod tests { .unwrap(); assert_eq!(cnt.cnt, 1); - reset_database(db_args, None).await.unwrap(); + reset_database::(db_args, None) + .await + .unwrap(); let mut conn = db.connect().await.unwrap(); let cnt = diesel::sql_query( diff --git a/crates/sui-pg-temp-db/src/lib.rs b/crates/sui-pg-db/src/temp.rs similarity index 100% rename from crates/sui-pg-temp-db/src/lib.rs rename to crates/sui-pg-db/src/temp.rs diff --git a/crates/test-cluster/Cargo.toml b/crates/test-cluster/Cargo.toml index f589b4c7611bf..4696c13997d44 100644 --- a/crates/test-cluster/Cargo.toml +++ b/crates/test-cluster/Cargo.toml @@ -29,7 +29,7 @@ sui-json-rpc.workspace = true sui-json-rpc-types.workspace = true sui-json-rpc-api.workspace = true sui-node.workspace = true -sui-pg-temp-db.workspace = true +sui-pg-db.workspace = true sui-protocol-config.workspace = true sui-swarm.workspace = true sui-types.workspace = true diff --git a/crates/test-cluster/src/test_indexer_handle.rs b/crates/test-cluster/src/test_indexer_handle.rs index 3641645789b8e..2bdf21be8d8e0 100644 --- a/crates/test-cluster/src/test_indexer_handle.rs +++ b/crates/test-cluster/src/test_indexer_handle.rs @@ -9,7 +9,7 @@ use sui_indexer::test_utils::{ start_indexer_jsonrpc_for_testing, start_indexer_writer_for_testing, }; use sui_json_rpc_api::ReadApiClient; -use sui_pg_temp_db::TempDb; +use sui_pg_db::temp::TempDb; use sui_sdk::{SuiClient, SuiClientBuilder}; use tempfile::TempDir; use tokio::time::sleep;