Skip to content

Commit

Permalink
[1/n][dialect_support][indexer] Generalize indexer code to support di…
Browse files Browse the repository at this point in the history
…fferent sql dialects
  • Loading branch information
sadhansood committed Apr 23, 2024
1 parent 80e6392 commit c8e42dc
Show file tree
Hide file tree
Showing 30 changed files with 1,007 additions and 638 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,12 @@ derive_builder = "0.12.0"
derive_more = "0.99.17"
diesel = { version = "2.1.0", features = [
"chrono",
"postgres",
"r2d2",
"serde_json",
"64-column-tables",
"i-implement-a-third-party-backend-and-opt-into-breaking-changes",
"postgres_backend",
] }
diesel-derive-enum = { version = "2.0.1", features = ["postgres"] }
diesel-derive-enum = { version = "2.0.1" }
diesel_migrations = { version = "2.0.0" }
dirs = "4.0.0"
duration-str = "0.5.0"
Expand Down
6 changes: 3 additions & 3 deletions crates/data-transform/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use sui_types::object::MoveObject;

use self::models::*;
use std::env;
use sui_indexer::db::new_pg_connection_pool;
use sui_indexer::db::new_connection_pool;
use sui_indexer::errors::IndexerError;
use sui_indexer::store::package_resolver::IndexerStorePackageResolver;

Expand All @@ -35,7 +35,7 @@ use move_core_types::account_address::AccountAddress;
struct GrootModuleResolver {
module_map: HashMap<String, Vec<u8>>,
#[allow(dead_code)]
original: IndexerStorePackageResolver,
original: IndexerStorePackageResolver<PgConnection>,
}

impl GrootModuleResolver {
Expand Down Expand Up @@ -157,7 +157,7 @@ fn main() {
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let connection = &mut establish_connection();

let blocking_cp = new_pg_connection_pool(&database_url, None)
let blocking_cp = new_connection_pool(&database_url, None)
.map_err(|e| anyhow!("Unable to connect to Postgres, is it running? {e}"));
//let module_cache = Arc::new(SyncModuleCache::new(IndexerModuleResolver::new(blocking_cp.expect("REASON").clone())));
//
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-cluster-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ tokio = { workspace = true, features = ["full"] }
jsonrpsee.workspace = true
tracing.workspace = true
clap.workspace = true
diesel.workspace = true
reqwest.workspace = true
async-trait.workspace = true
anyhow = { workspace = true, features = ["backtrace"] }
Expand Down Expand Up @@ -44,4 +45,6 @@ test-cluster.workspace = true
move-core-types.workspace = true

[features]
default = ["pg_integration", "postgres-feature"]
postgres-feature = ["diesel/postgres", "diesel/postgres_backend"]
pg_integration = []
4 changes: 2 additions & 2 deletions crates/sui-cluster-test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl Cluster for LocalNewCluster {
(options.pg_address.clone(), indexer_address)
{
// Start in writer mode
start_test_indexer(
start_test_indexer::<diesel::PgConnection>(
Some(pg_address.clone()),
fullnode_url.clone(),
ReaderWriterConfig::writer_mode(None),
Expand All @@ -236,7 +236,7 @@ impl Cluster for LocalNewCluster {
.await;

// Start in reader mode
start_test_indexer(
start_test_indexer::<diesel::PgConnection>(
Some(pg_address),
fullnode_url.clone(),
ReaderWriterConfig::reader_mode(indexer_address.to_string()),
Expand Down
7 changes: 5 additions & 2 deletions crates/sui-graphql-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ tower-http.workspace = true
thiserror.workspace = true
uuid.workspace = true
im.workspace = true
downcast = "0.11.0"

sui-graphql-rpc-headers.workspace = true
sui-graphql-rpc-client.workspace = true
Expand All @@ -71,7 +72,7 @@ bcs.workspace = true
simulacrum.workspace = true # todo: cleanup test only deps
sui-json-rpc.workspace = true
sui-json-rpc-types.workspace = true
sui-indexer.workspace = true
sui-indexer = { workspace = true, default-features = true }
sui-rest-api.workspace = true
sui-swarm-config.workspace = true
test-cluster.workspace = true
Expand All @@ -88,7 +89,9 @@ sui-framework.workspace = true
tower.workspace = true
sui-test-transaction-builder.workspace = true


[features]
default = ["pg_backend"]
default = ["pg_backend", "postgres-feature"]
postgres-feature = ["diesel/postgres", "diesel/postgres_backend"]
pg_integration = []
pg_backend = []
15 changes: 8 additions & 7 deletions crates/sui-graphql-rpc/src/context_data/db_data_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::{
error::Error,
types::{address::Address, sui_address::SuiAddress, validator::Validator},
};
use diesel::PgConnection;
use std::{collections::BTreeMap, time::Duration};
use sui_indexer::db::PgConnectionPoolConfig;
use sui_indexer::db::ConnectionPoolConfig;
use sui_indexer::{apis::GovernanceReadApi, indexer_reader::IndexerReader};
use sui_json_rpc_types::Stake as RpcStakedSui;
use sui_types::{
Expand All @@ -19,16 +20,16 @@ use sui_types::{
};

pub(crate) struct PgManager {
pub inner: IndexerReader,
pub inner: IndexerReader<PgConnection>,
}

impl PgManager {
pub(crate) fn new(inner: IndexerReader) -> Self {
pub(crate) fn new(inner: IndexerReader<PgConnection>) -> Self {
Self { inner }
}

/// Create a new underlying reader, which is used by this type as well as other data providers.
pub(crate) fn reader(db_url: impl Into<String>) -> Result<IndexerReader, Error> {
pub(crate) fn reader(db_url: impl Into<String>) -> Result<IndexerReader<PgConnection>, Error> {
Self::reader_with_config(
db_url,
DEFAULT_SERVER_DB_POOL_SIZE,
Expand All @@ -40,11 +41,11 @@ impl PgManager {
db_url: impl Into<String>,
pool_size: u32,
timeout_ms: u64,
) -> Result<IndexerReader, Error> {
let mut config = PgConnectionPoolConfig::default();
) -> Result<IndexerReader<PgConnection>, Error> {
let mut config = ConnectionPoolConfig::default();
config.set_pool_size(pool_size);
config.set_statement_timeout(Duration::from_millis(timeout_ms));
IndexerReader::new_with_config(db_url, config)
IndexerReader::<PgConnection>::new_with_config(db_url, config)
.map_err(|e| Error::Internal(format!("Failed to create reader: {e}")))
}
}
Expand Down
31 changes: 18 additions & 13 deletions crates/sui-graphql-rpc/src/data/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ use diesel::{
};
use sui_indexer::indexer_reader::IndexerReader;

use sui_indexer::{run_query_async, run_query_repeatable_async, spawn_read_only_blocking};
use tracing::error;

#[derive(Clone)]
pub(crate) struct PgExecutor {
pub inner: IndexerReader,
pub inner: IndexerReader<diesel::PgConnection>,
pub limits: Limits,
pub metrics: Metrics,
}
Expand All @@ -29,7 +30,11 @@ pub(crate) struct PgConnection<'c> {
}

impl PgExecutor {
pub(crate) fn new(inner: IndexerReader, limits: Limits, metrics: Metrics) -> Self {
pub(crate) fn new(
inner: IndexerReader<diesel::PgConnection>,
limits: Limits,
metrics: Metrics,
) -> Self {
Self {
inner,
limits,
Expand All @@ -54,10 +59,8 @@ impl QueryExecutor for PgExecutor {
{
let max_cost = self.limits.max_db_query_cost;
let instant = Instant::now();
let result = self
.inner
.run_query_async(move |conn| txn(&mut PgConnection { max_cost, conn }))
.await;
let pool = self.inner.get_pool();
let result = run_query_async!(&pool, move |conn| txn(&mut PgConnection { max_cost, conn }));
self.metrics
.observe_db_data(instant.elapsed(), result.is_ok());
if let Err(e) = &result {
Expand All @@ -76,10 +79,11 @@ impl QueryExecutor for PgExecutor {
{
let max_cost = self.limits.max_db_query_cost;
let instant = Instant::now();
let result = self
.inner
.run_query_repeatable_async(move |conn| txn(&mut PgConnection { max_cost, conn }))
.await;
let pool = self.inner.get_pool();
let result = run_query_repeatable_async!(&pool, move |conn| txn(&mut PgConnection {
max_cost,
conn
}));
self.metrics
.observe_db_data(instant.elapsed(), result.is_ok());
if let Err(e) = &result {
Expand Down Expand Up @@ -187,16 +191,17 @@ mod tests {
use diesel::QueryDsl;
use sui_framework::BuiltInFramework;
use sui_indexer::{
db::{get_pg_pool_connection, new_pg_connection_pool, reset_database},
db::{get_pool_connection, new_connection_pool, reset_database},
models::objects::StoredObject,
schema::objects,
types::IndexedObject,
};

#[test]
fn test_query_cost() {
let pool = new_pg_connection_pool(DEFAULT_SERVER_DB_URL, Some(5)).unwrap();
let mut conn = get_pg_pool_connection(&pool).unwrap();
let pool =
new_connection_pool::<diesel::PgConnection>(DEFAULT_SERVER_DB_URL, Some(5)).unwrap();
let mut conn = get_pool_connection(&pool).unwrap();
reset_database(&mut conn, /* drop_all */ true).unwrap();

let objects: Vec<StoredObject> = BuiltInFramework::iter_system_packages()
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;

pub struct ExecutorCluster {
pub executor_server_handle: JoinHandle<()>,
pub indexer_store: PgIndexerStore,
pub indexer_store: PgIndexerStore<diesel::PgConnection>,
pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
pub graphql_server_join_handle: JoinHandle<()>,
pub graphql_client: SimpleClient,
Expand All @@ -47,7 +47,7 @@ pub struct ExecutorCluster {

pub struct Cluster {
pub validator_fullnode_handle: TestCluster,
pub indexer_store: PgIndexerStore,
pub indexer_store: PgIndexerStore<diesel::PgConnection>,
pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
pub graphql_server_join_handle: JoinHandle<()>,
pub graphql_client: SimpleClient,
Expand Down Expand Up @@ -329,7 +329,7 @@ impl ExecutorCluster {
pub async fn cleanup_resources(self) {
self.cancellation_token.cancel();
let db_url = self.graphql_connection_config.db_url.clone();
force_delete_database(db_url).await;
force_delete_database::<diesel::PgConnection>(db_url).await;
}

pub async fn force_objects_snapshot_catchup(&self, start_cp: u64, end_cp: u64) {
Expand Down
8 changes: 6 additions & 2 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ chrono.workspace = true
serde_with.workspace = true
clap.workspace = true
tap.workspace = true
diesel.workspace = true
diesel-derive-enum.workspace = true
diesel = { workspace = true, optional = true }
diesel-derive-enum = { workspace = true, optional = true }
futures.workspace = true
itertools.workspace = true
jsonrpsee.workspace = true
Expand Down Expand Up @@ -56,9 +56,13 @@ move-binary-format.workspace = true
diesel_migrations.workspace = true
cached.workspace = true
secrecy = "0.8.0"
downcast = "0.11.0"

[features]
pg_integration = []
default = ["postgres-feature"]
postgres-feature = ["diesel/postgres", "diesel/postgres_backend", "diesel-derive-enum/postgres"]
mysql-feature = ["diesel/mysql", "diesel/mysql_backend", "diesel-derive-enum/mysql"]

[dev-dependencies]
sui-keys.workspace = true
Expand Down
13 changes: 7 additions & 6 deletions crates/sui-indexer/src/apis/coin_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::indexer_reader::IndexerReader;
use async_trait::async_trait;
use diesel::r2d2::R2D2Connection;
use jsonrpsee::core::RpcResult;
use jsonrpsee::RpcModule;
use sui_json_rpc::coin_api::{parse_to_struct_tag, parse_to_type_tag};
Expand All @@ -14,18 +15,18 @@ use sui_types::balance::Supply;
use sui_types::base_types::{ObjectID, SuiAddress};
use sui_types::gas_coin::{GAS, TOTAL_SUPPLY_MIST};

pub(crate) struct CoinReadApi {
inner: IndexerReader,
pub(crate) struct CoinReadApi<T: R2D2Connection + 'static> {
inner: IndexerReader<T>,
}

impl CoinReadApi {
pub fn new(inner: IndexerReader) -> Self {
impl<T: R2D2Connection> CoinReadApi<T> {
pub fn new(inner: IndexerReader<T>) -> Self {
Self { inner }
}
}

#[async_trait]
impl CoinReadApiServer for CoinReadApi {
impl<T: R2D2Connection + 'static> CoinReadApiServer for CoinReadApi<T> {
async fn get_coins(
&self,
owner: SuiAddress,
Expand Down Expand Up @@ -142,7 +143,7 @@ impl CoinReadApiServer for CoinReadApi {
}
}

impl SuiRpcModule for CoinReadApi {
impl<T: R2D2Connection> SuiRpcModule for CoinReadApi<T> {
fn rpc(self) -> RpcModule<Self> {
self.into_rpc()
}
Expand Down
Loading

0 comments on commit c8e42dc

Please sign in to comment.