Skip to content

Commit

Permalink
refactor(indexer-alt): factor out Db/DbArgs
Browse files Browse the repository at this point in the history
## Description

Pull out the `Db` and `DbArgs` types from `sui-indexer-alt-framework`,
so that they can also be used in the reader implementation. They have
been combined with the existing `TempDb`, and its crate has been renamed
from `sui-temp-pg-db` to `sui-pg-db` to account for the fact that it's
now holding something more general.

## Test plan
CI
  • Loading branch information
amnn committed Dec 11, 2024
1 parent 2aa3e18 commit 3cd7e2b
Show file tree
Hide file tree
Showing 65 changed files with 198 additions and 252 deletions.
21 changes: 13 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ members = [
"crates/sui-package-dump",
"crates/sui-package-management",
"crates/sui-package-resolver",
"crates/sui-pg-temp-db",
"crates/sui-pg-db",
"crates/sui-proc-macros",
"crates/sui-protocol-config",
"crates/sui-protocol-config-macros",
Expand Down Expand Up @@ -671,7 +671,7 @@ sui-open-rpc-macros = { path = "crates/sui-open-rpc-macros" }
sui-package-dump = { path = "crates/sui-package-dump" }
sui-package-management = { path = "crates/sui-package-management" }
sui-package-resolver = { path = "crates/sui-package-resolver" }
sui-pg-temp-db = { path = "crates/sui-pg-temp-db" }
sui-pg-db = { path = "crates/sui-pg-db" }
sui-proc-macros = { path = "crates/sui-proc-macros" }
sui-protocol-config = { path = "crates/sui-protocol-config" }
sui-protocol-config-macros = { path = "crates/sui-protocol-config-macros" }
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-bridge-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ sui-bridge.workspace = true
sui-sdk.workspace = true
sui-json-rpc-types.workspace = true
sui-data-ingestion-core.workspace = true
sui-pg-temp-db.workspace = true
sui-pg-db.workspace = true
sui-types.workspace = true
telemetry-subscribers.workspace = true
tracing.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-bridge-indexer/tests/indexer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use sui_bridge_indexer::{create_sui_indexer, schema};
use sui_data_ingestion_core::DataIngestionMetrics;
use sui_indexer::database::Connection;
use sui_indexer_builder::indexer_builder::IndexerProgressStore;
use sui_pg_temp_db::TempDb;
use sui_pg_db::temp::TempDb;

const MIGRATIONS: EmbeddedMigrations = embed_migrations!("src/migrations");

Expand Down
2 changes: 1 addition & 1 deletion crates/sui-cluster-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ regex.workspace = true
sui-indexer.workspace = true
sui-faucet.workspace = true
sui-graphql-rpc.workspace = true
sui-pg-temp-db.workspace = true
sui-pg-db.workspace = true
sui-swarm.workspace = true
sui-swarm-config.workspace = true
sui-json-rpc-types.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-cluster-test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sui_indexer::test_utils::{
start_indexer_jsonrpc_for_testing, start_indexer_writer_for_testing,
};
use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
use sui_pg_temp_db::TempDb;
use sui_pg_db::temp::TempDb;
use sui_sdk::sui_client_config::{SuiClientConfig, SuiEnv};
use sui_sdk::wallet_context::WalletContext;
use sui_swarm::memory::Swarm;
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-graphql-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ serde_with.workspace = true
serde_yaml.workspace = true
shared-crypto.workspace = true
similar.workspace = true
sui-pg-temp-db.workspace = true
sui-pg-db.workspace = true
sui-sdk.workspace = true
sui-types.workspace = true
tap.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-graphql-rpc/src/data/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ mod tests {
database::Connection, db::reset_database, models::objects::StoredObject, schema::objects,
types::IndexedObject,
};
use sui_pg_temp_db::TempDb;
use sui_pg_db::temp::TempDb;

#[tokio::test]
async fn test_query_cost() {
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-graphql-rpc/src/server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ pub mod tests {
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use sui_pg_temp_db::get_available_port;
use sui_pg_db::temp::get_available_port;
use sui_sdk::SuiClient;
use sui_types::digests::get_mainnet_chain_identifier;
use sui_types::transaction::TransactionData;
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub use sui_indexer::config::SnapshotLagConfig;
use sui_indexer::errors::IndexerError;
use sui_indexer::store::PgIndexerStore;
use sui_indexer::test_utils::start_indexer_writer_for_testing;
use sui_pg_temp_db::{get_available_port, TempDb};
use sui_pg_db::temp::{get_available_port, TempDb};
use sui_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
use sui_types::storage::RpcStateReader;
use tempfile::tempdir;
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-graphql-rpc/tests/move_registry_e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use sui_graphql_rpc_client::simple_client::SimpleClient;
use sui_json_rpc::name_service::{Domain, DomainFormat};
use sui_json_rpc_types::ObjectChange;
use sui_move_build::BuildConfig;
use sui_pg_temp_db::get_available_port;
use sui_pg_db::temp::get_available_port;
use sui_types::{
base_types::{ObjectID, SequenceNumber},
digests::ObjectDigest,
Expand Down
3 changes: 1 addition & 2 deletions crates/sui-indexer-alt-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tracing.workspace = true
url.workspace = true

sui-field-count.workspace = true
sui-pg-db.workspace = true
sui-storage.workspace = true
sui-types.workspace = true

Expand All @@ -37,5 +38,3 @@ rand.workspace = true
telemetry-subscribers.workspace = true
tempfile.workspace = true
wiremock.workspace = true

sui-pg-temp-db.workspace = true
32 changes: 28 additions & 4 deletions crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,34 @@
use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};

use anyhow::{ensure, Context, Result};
use db::{Db, DbArgs};
use diesel_migrations::EmbeddedMigrations;
use diesel::{
migration::{self, Migration, MigrationSource},
pg::Pg,
};
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use ingestion::{client::IngestionClient, ClientArgs, IngestionConfig, IngestionService};
use metrics::{IndexerMetrics, MetricsService};
use pipeline::{
concurrent::{self, ConcurrentConfig},
sequential::{self, SequentialConfig},
Processor,
};
use sui_pg_db::{Db, DbArgs};
use task::graceful_shutdown;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use watermarks::CommitterWatermark;

pub mod db;
pub mod ingestion;
pub(crate) mod metrics;
pub mod pipeline;
pub(crate) mod schema;
pub mod task;
pub(crate) mod watermarks;

const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");

/// Command-line arguments for the indexer
#[derive(clap::Args, Debug, Clone)]
pub struct IndexerArgs {
Expand Down Expand Up @@ -135,7 +140,7 @@ impl Indexer {
.context("Failed to connect to database")?;

// At indexer initialization, we ensure that the DB schema is up-to-date.
db.run_migrations(migrations)
db.run_migrations(Self::migrations(migrations))
.await
.context("Failed to run pending migrations")?;

Expand Down Expand Up @@ -337,6 +342,25 @@ impl Indexer {
}))
}

/// Combine the provided `migrations` with the migrations necessary to set up the indexer
/// framework. The returned migration source can be passed to [Db::run_migrations] to ensure
/// the database's schema is up-to-date for both the indexer framework and the specific
/// indexer.
pub fn migrations(
migrations: &'static EmbeddedMigrations,
) -> impl MigrationSource<Pg> + Send + Sync + 'static {
struct Migrations(&'static EmbeddedMigrations);
impl MigrationSource<Pg> for Migrations {
fn migrations(&self) -> migration::Result<Vec<Box<dyn Migration<Pg>>>> {
let mut migrations = MIGRATIONS.migrations()?;
migrations.extend(self.0.migrations()?);
Ok(migrations)
}
}

Migrations(migrations)
}

/// Update the indexer's first checkpoint based on the watermark for the pipeline by adding for
/// handler `H` (as long as it's enabled). Returns `Ok(None)` if the pipeline is disabled,
/// `Ok(Some(None))` if the pipeline is enabled but its watermark is not found, and
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-indexer-alt-framework/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use prometheus::{
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, Histogram,
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry, TextEncoder,
};
use sui_pg_db::Db;
use tokio::{net::TcpListener, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};

use crate::{db::Db, ingestion::error::Error};
use crate::ingestion::error::Error;

/// Histogram buckets for the distribution of checkpoint fetching latencies.
const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,10 @@ fn move_ready_checkpoints<H: Handler>(
#[cfg(test)]
mod tests {
use sui_field_count::FieldCount;
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;

use crate::{
db,
pipeline::{concurrent::max_chunk_rows, Processor},
};
use crate::pipeline::{concurrent::max_chunk_rows, Processor};

use super::*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
sync::Arc,
};

use sui_pg_db::Db;
use tokio::{
sync::mpsc,
task::JoinHandle,
Expand All @@ -16,7 +17,6 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

use crate::{
db::Db,
metrics::IndexerMetrics,
pipeline::{logging::WatermarkLogger, CommitterConfig, WatermarkPart, WARN_PENDING_WATERMARKS},
watermarks::CommitterWatermark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
use std::{sync::Arc, time::Duration};

use backoff::ExponentialBackoff;
use sui_pg_db::Db;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

use crate::{
db::Db,
metrics::IndexerMetrics,
pipeline::{Break, CommitterConfig, WatermarkPart},
task::TrySpawnStreamExt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ use std::{sync::Arc, time::Duration};

use serde::{Deserialize, Serialize};
use sui_field_count::FieldCount;
use sui_pg_db::{self as db, Db};
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;

use crate::{
db::{self, Db},
metrics::IndexerMetrics,
watermarks::CommitterWatermark,
};
use crate::{metrics::IndexerMetrics, watermarks::CommitterWatermark};

use super::{processor::processor, CommitterConfig, Processor, WatermarkPart, PIPELINE_BUFFER};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::sync::Arc;

use sui_pg_db::Db;
use tokio::{
task::JoinHandle,
time::{interval, MissedTickBehavior},
Expand All @@ -11,7 +12,6 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

use crate::{
db::Db,
metrics::IndexerMetrics,
pipeline::logging::{LoggerWatermark, WatermarkLogger},
watermarks::PrunerWatermark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

use std::sync::Arc;

use sui_pg_db::Db;
use tokio::{task::JoinHandle, time::interval};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};

use crate::{
db::Db,
metrics::IndexerMetrics,
watermarks::{ReaderWatermark, StoredWatermark},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};

use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection};
use sui_pg_db::Db;
use tokio::{
sync::mpsc,
task::JoinHandle,
Expand All @@ -13,7 +14,6 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};

use crate::{
db::Db,
metrics::IndexerMetrics,
pipeline::{logging::WatermarkLogger, Indexed, WARN_PENDING_WATERMARKS},
watermarks::CommitterWatermark,
Expand Down
Loading

0 comments on commit 3cd7e2b

Please sign in to comment.