Skip to content

Commit

Permalink
[1/n][dialect_support]Generalize indexer code to support different sq…
Browse files Browse the repository at this point in the history
…l dialects (MystenLabs#17167)

## Description 

This PR generalizes the code in `sui-indexer` crate such that different
diesel backends can be enabled through feature flags when compiling. The
default backend is still postgres which means nothing changes in terms
of how we build our binaries today.

1. The most important thing is that we do all kinds of db operation
through macros which are feature gated such that we only pull in
relevant db specific dependencies.
2. In Cargo.toml by default we do not add any db specific dependencies
but only pull them through features:
3. In `sui-indexer`, all instances of `diesel::PgConnection` are pulled
with `postgres-feature` and similary all instances of
`diesel::MySqlConnection` are pulled with `mysql-feature`
4. `run_query` and `run_query_async` are replaced with their
corresponding macro invocations
5. We do not try to make other dependencies of `sui-indexer` generic in
this PR. This includes `sui-graphql-rpc` which just works with
`IndexerReader<PgConnection>` for now (in the future we will refactor
read for other db backends but for now the focus is to get writes to be
compatible first)
6. Usages of `#[Cached(..)]` had to be replaced with an explicit usage
of `SizedCache` as this macro doesn't seem to work for functions which
take type parameters.
7. Next: we are going to introduce migrations for mysql and build
`sui-indexer` for mysql


## Test plan 

Running this locally works: `cargo run --bin sui-indexer -- --db-url
"postgres://sadhansood:sadhansood@localhost/test" --rpc-client-url "<>"
--fullnode-sync-worker --reset-db`
  • Loading branch information
sadhansood authored Apr 23, 2024
1 parent 66a3763 commit e305b4c
Show file tree
Hide file tree
Showing 30 changed files with 1,008 additions and 638 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,12 @@ derive_builder = "0.12.0"
derive_more = "0.99.17"
diesel = { version = "2.1.0", features = [
"chrono",
"postgres",
"r2d2",
"serde_json",
"64-column-tables",
"i-implement-a-third-party-backend-and-opt-into-breaking-changes",
"postgres_backend",
] }
diesel-derive-enum = { version = "2.0.1", features = ["postgres"] }
diesel-derive-enum = { version = "2.0.1" }
diesel_migrations = { version = "2.0.0" }
dirs = "4.0.0"
duration-str = "0.5.0"
Expand Down
6 changes: 3 additions & 3 deletions crates/data-transform/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use sui_types::object::MoveObject;

use self::models::*;
use std::env;
use sui_indexer::db::new_pg_connection_pool;
use sui_indexer::db::new_connection_pool;
use sui_indexer::errors::IndexerError;
use sui_indexer::store::package_resolver::IndexerStorePackageResolver;

Expand All @@ -35,7 +35,7 @@ use move_core_types::account_address::AccountAddress;
struct GrootModuleResolver {
module_map: HashMap<String, Vec<u8>>,
#[allow(dead_code)]
original: IndexerStorePackageResolver,
original: IndexerStorePackageResolver<PgConnection>,
}

impl GrootModuleResolver {
Expand Down Expand Up @@ -157,7 +157,7 @@ fn main() {
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let connection = &mut establish_connection();

let blocking_cp = new_pg_connection_pool(&database_url, None)
let blocking_cp = new_connection_pool(&database_url, None)
.map_err(|e| anyhow!("Unable to connect to Postgres, is it running? {e}"));
//let module_cache = Arc::new(SyncModuleCache::new(IndexerModuleResolver::new(blocking_cp.expect("REASON").clone())));
//
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-cluster-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ tokio = { workspace = true, features = ["full"] }
jsonrpsee.workspace = true
tracing.workspace = true
clap.workspace = true
diesel.workspace = true
reqwest.workspace = true
async-trait.workspace = true
anyhow = { workspace = true, features = ["backtrace"] }
Expand Down Expand Up @@ -44,4 +45,6 @@ test-cluster.workspace = true
move-core-types.workspace = true

[features]
default = ["postgres-feature"]
postgres-feature = ["diesel/postgres", "diesel/postgres_backend"]
pg_integration = []
4 changes: 2 additions & 2 deletions crates/sui-cluster-test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl Cluster for LocalNewCluster {
(options.pg_address.clone(), indexer_address)
{
// Start in writer mode
start_test_indexer(
start_test_indexer::<diesel::PgConnection>(
Some(pg_address.clone()),
fullnode_url.clone(),
ReaderWriterConfig::writer_mode(None),
Expand All @@ -236,7 +236,7 @@ impl Cluster for LocalNewCluster {
.await;

// Start in reader mode
start_test_indexer(
start_test_indexer::<diesel::PgConnection>(
Some(pg_address),
fullnode_url.clone(),
ReaderWriterConfig::reader_mode(indexer_address.to_string()),
Expand Down
7 changes: 5 additions & 2 deletions crates/sui-graphql-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ tower-http.workspace = true
thiserror.workspace = true
uuid.workspace = true
im.workspace = true
downcast = "0.11.0"

sui-graphql-rpc-headers.workspace = true
sui-graphql-rpc-client.workspace = true
Expand All @@ -71,7 +72,7 @@ bcs.workspace = true
simulacrum.workspace = true # todo: cleanup test only deps
sui-json-rpc.workspace = true
sui-json-rpc-types.workspace = true
sui-indexer.workspace = true
sui-indexer = { workspace = true, default-features = true }
sui-rest-api.workspace = true
sui-swarm-config.workspace = true
test-cluster.workspace = true
Expand All @@ -88,7 +89,9 @@ sui-framework.workspace = true
tower.workspace = true
sui-test-transaction-builder.workspace = true


[features]
default = ["pg_backend"]
default = ["pg_backend", "postgres-feature"]
postgres-feature = ["diesel/postgres", "diesel/postgres_backend"]
pg_integration = []
pg_backend = []
15 changes: 8 additions & 7 deletions crates/sui-graphql-rpc/src/context_data/db_data_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::{
error::Error,
types::{address::Address, sui_address::SuiAddress, validator::Validator},
};
use diesel::PgConnection;
use std::{collections::BTreeMap, time::Duration};
use sui_indexer::db::PgConnectionPoolConfig;
use sui_indexer::db::ConnectionPoolConfig;
use sui_indexer::{apis::GovernanceReadApi, indexer_reader::IndexerReader};
use sui_json_rpc_types::Stake as RpcStakedSui;
use sui_types::{
Expand All @@ -19,16 +20,16 @@ use sui_types::{
};

pub(crate) struct PgManager {
pub inner: IndexerReader,
pub inner: IndexerReader<PgConnection>,
}

impl PgManager {
pub(crate) fn new(inner: IndexerReader) -> Self {
pub(crate) fn new(inner: IndexerReader<PgConnection>) -> Self {
Self { inner }
}

/// Create a new underlying reader, which is used by this type as well as other data providers.
pub(crate) fn reader(db_url: impl Into<String>) -> Result<IndexerReader, Error> {
pub(crate) fn reader(db_url: impl Into<String>) -> Result<IndexerReader<PgConnection>, Error> {
Self::reader_with_config(
db_url,
DEFAULT_SERVER_DB_POOL_SIZE,
Expand All @@ -40,11 +41,11 @@ impl PgManager {
db_url: impl Into<String>,
pool_size: u32,
timeout_ms: u64,
) -> Result<IndexerReader, Error> {
let mut config = PgConnectionPoolConfig::default();
) -> Result<IndexerReader<PgConnection>, Error> {
let mut config = ConnectionPoolConfig::default();
config.set_pool_size(pool_size);
config.set_statement_timeout(Duration::from_millis(timeout_ms));
IndexerReader::new_with_config(db_url, config)
IndexerReader::<PgConnection>::new_with_config(db_url, config)
.map_err(|e| Error::Internal(format!("Failed to create reader: {e}")))
}
}
Expand Down
31 changes: 18 additions & 13 deletions crates/sui-graphql-rpc/src/data/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ use diesel::{
};
use sui_indexer::indexer_reader::IndexerReader;

use sui_indexer::{run_query_async, run_query_repeatable_async, spawn_read_only_blocking};
use tracing::error;

#[derive(Clone)]
pub(crate) struct PgExecutor {
pub inner: IndexerReader,
pub inner: IndexerReader<diesel::PgConnection>,
pub limits: Limits,
pub metrics: Metrics,
}
Expand All @@ -29,7 +30,11 @@ pub(crate) struct PgConnection<'c> {
}

impl PgExecutor {
pub(crate) fn new(inner: IndexerReader, limits: Limits, metrics: Metrics) -> Self {
pub(crate) fn new(
inner: IndexerReader<diesel::PgConnection>,
limits: Limits,
metrics: Metrics,
) -> Self {
Self {
inner,
limits,
Expand All @@ -54,10 +59,8 @@ impl QueryExecutor for PgExecutor {
{
let max_cost = self.limits.max_db_query_cost;
let instant = Instant::now();
let result = self
.inner
.run_query_async(move |conn| txn(&mut PgConnection { max_cost, conn }))
.await;
let pool = self.inner.get_pool();
let result = run_query_async!(&pool, move |conn| txn(&mut PgConnection { max_cost, conn }));
self.metrics
.observe_db_data(instant.elapsed(), result.is_ok());
if let Err(e) = &result {
Expand All @@ -76,10 +79,11 @@ impl QueryExecutor for PgExecutor {
{
let max_cost = self.limits.max_db_query_cost;
let instant = Instant::now();
let result = self
.inner
.run_query_repeatable_async(move |conn| txn(&mut PgConnection { max_cost, conn }))
.await;
let pool = self.inner.get_pool();
let result = run_query_repeatable_async!(&pool, move |conn| txn(&mut PgConnection {
max_cost,
conn
}));
self.metrics
.observe_db_data(instant.elapsed(), result.is_ok());
if let Err(e) = &result {
Expand Down Expand Up @@ -187,16 +191,17 @@ mod tests {
use diesel::QueryDsl;
use sui_framework::BuiltInFramework;
use sui_indexer::{
db::{get_pg_pool_connection, new_pg_connection_pool, reset_database},
db::{get_pool_connection, new_connection_pool, reset_database},
models::objects::StoredObject,
schema::objects,
types::IndexedObject,
};

#[test]
fn test_query_cost() {
let pool = new_pg_connection_pool(DEFAULT_SERVER_DB_URL, Some(5)).unwrap();
let mut conn = get_pg_pool_connection(&pool).unwrap();
let pool =
new_connection_pool::<diesel::PgConnection>(DEFAULT_SERVER_DB_URL, Some(5)).unwrap();
let mut conn = get_pool_connection(&pool).unwrap();
reset_database(&mut conn, /* drop_all */ true).unwrap();

let objects: Vec<StoredObject> = BuiltInFramework::iter_system_packages()
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;

pub struct ExecutorCluster {
pub executor_server_handle: JoinHandle<()>,
pub indexer_store: PgIndexerStore,
pub indexer_store: PgIndexerStore<diesel::PgConnection>,
pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
pub graphql_server_join_handle: JoinHandle<()>,
pub graphql_client: SimpleClient,
Expand All @@ -48,7 +48,7 @@ pub struct ExecutorCluster {

pub struct Cluster {
pub validator_fullnode_handle: TestCluster,
pub indexer_store: PgIndexerStore,
pub indexer_store: PgIndexerStore<diesel::PgConnection>,
pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
pub graphql_server_join_handle: JoinHandle<()>,
pub graphql_client: SimpleClient,
Expand Down Expand Up @@ -333,7 +333,7 @@ impl ExecutorCluster {
self.cancellation_token.cancel();
let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
let db_url = self.graphql_connection_config.db_url.clone();
force_delete_database(db_url).await;
force_delete_database::<diesel::PgConnection>(db_url).await;
}

pub async fn force_objects_snapshot_catchup(&self, start_cp: u64, end_cp: u64) {
Expand Down
8 changes: 6 additions & 2 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ chrono.workspace = true
serde_with.workspace = true
clap.workspace = true
tap.workspace = true
diesel.workspace = true
diesel-derive-enum.workspace = true
diesel = { workspace = true, optional = true }
diesel-derive-enum = { workspace = true, optional = true }
futures.workspace = true
itertools.workspace = true
jsonrpsee.workspace = true
Expand Down Expand Up @@ -56,9 +56,13 @@ move-binary-format.workspace = true
diesel_migrations.workspace = true
cached.workspace = true
secrecy = "0.8.0"
downcast = "0.11.0"

[features]
pg_integration = []
default = ["postgres-feature"]
postgres-feature = ["diesel/postgres", "diesel/postgres_backend", "diesel-derive-enum/postgres"]
mysql-feature = ["diesel/mysql", "diesel/mysql_backend", "diesel-derive-enum/mysql"]

[dev-dependencies]
sui-keys.workspace = true
Expand Down
13 changes: 7 additions & 6 deletions crates/sui-indexer/src/apis/coin_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::indexer_reader::IndexerReader;
use async_trait::async_trait;
use diesel::r2d2::R2D2Connection;
use jsonrpsee::core::RpcResult;
use jsonrpsee::RpcModule;
use sui_json_rpc::coin_api::{parse_to_struct_tag, parse_to_type_tag};
Expand All @@ -14,18 +15,18 @@ use sui_types::balance::Supply;
use sui_types::base_types::{ObjectID, SuiAddress};
use sui_types::gas_coin::{GAS, TOTAL_SUPPLY_MIST};

pub(crate) struct CoinReadApi {
inner: IndexerReader,
pub(crate) struct CoinReadApi<T: R2D2Connection + 'static> {
inner: IndexerReader<T>,
}

impl CoinReadApi {
pub fn new(inner: IndexerReader) -> Self {
impl<T: R2D2Connection> CoinReadApi<T> {
pub fn new(inner: IndexerReader<T>) -> Self {
Self { inner }
}
}

#[async_trait]
impl CoinReadApiServer for CoinReadApi {
impl<T: R2D2Connection + 'static> CoinReadApiServer for CoinReadApi<T> {
async fn get_coins(
&self,
owner: SuiAddress,
Expand Down Expand Up @@ -142,7 +143,7 @@ impl CoinReadApiServer for CoinReadApi {
}
}

impl SuiRpcModule for CoinReadApi {
impl<T: R2D2Connection> SuiRpcModule for CoinReadApi<T> {
fn rpc(self) -> RpcModule<Self> {
self.into_rpc()
}
Expand Down
Loading

0 comments on commit e305b4c

Please sign in to comment.