Skip to content

Commit

Permalink
[1/n][dialect_support][indexer] Generalize indexer code to support di…
Browse files Browse the repository at this point in the history
…fferent sql dialects
  • Loading branch information
sadhansood committed Apr 16, 2024
1 parent dd84ebd commit f005ad4
Show file tree
Hide file tree
Showing 29 changed files with 897 additions and 531 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions crates/data-transform/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use sui_types::object::MoveObject;

use self::models::*;
use std::env;
use sui_indexer::db::new_pg_connection_pool;
use sui_indexer::db::new_connection_pool;
use sui_indexer::errors::IndexerError;
use sui_indexer::store::package_resolver::IndexerStorePackageResolver;

Expand All @@ -35,7 +35,7 @@ use move_core_types::account_address::AccountAddress;
struct GrootModuleResolver {
module_map: HashMap<String, Vec<u8>>,
#[allow(dead_code)]
original: IndexerStorePackageResolver,
original: IndexerStorePackageResolver<PgConnection>,
}

impl GrootModuleResolver {
Expand Down Expand Up @@ -157,7 +157,7 @@ fn main() {
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let connection = &mut establish_connection();

let blocking_cp = new_pg_connection_pool(&database_url, None)
let blocking_cp = new_connection_pool(&database_url, None)
.map_err(|e| anyhow!("Unable to connect to Postgres, is it running? {e}"));
//let module_cache = Arc::new(SyncModuleCache::new(IndexerModuleResolver::new(blocking_cp.expect("REASON").clone())));
//
Expand Down
1 change: 1 addition & 0 deletions crates/sui-cluster-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ tokio = { workspace = true, features = ["full"] }
jsonrpsee.workspace = true
tracing.workspace = true
clap.workspace = true
diesel.workspace = true
reqwest.workspace = true
async-trait.workspace = true
anyhow = { workspace = true, features = ["backtrace"] }
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-cluster-test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,15 @@ impl Cluster for LocalNewCluster {
(options.pg_address.clone(), indexer_address)
{
// Start in writer mode
start_test_indexer(
start_test_indexer::<diesel::PgConnection>(
Some(pg_address.clone()),
fullnode_url.clone(),
ReaderWriterConfig::writer_mode(None),
)
.await;

// Start in reader mode
start_test_indexer(
start_test_indexer::<diesel::PgConnection>(
Some(pg_address),
fullnode_url.clone(),
ReaderWriterConfig::reader_mode(indexer_address.to_string()),
Expand Down
7 changes: 5 additions & 2 deletions crates/sui-graphql-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ tower-http.workspace = true
thiserror.workspace = true
uuid.workspace = true
im.workspace = true
downcast = "0.11.0"

sui-graphql-rpc-headers.workspace = true
sui-graphql-rpc-client.workspace = true
Expand All @@ -70,7 +71,7 @@ bcs.workspace = true
simulacrum.workspace = true # todo: cleanup test only deps
sui-json-rpc.workspace = true
sui-json-rpc-types.workspace = true
sui-indexer.workspace = true
sui-indexer = { workspace = true, default-features = true }
sui-rest-api.workspace = true
sui-swarm-config.workspace = true
test-cluster.workspace = true
Expand All @@ -87,7 +88,9 @@ sui-framework.workspace = true
tower.workspace = true
sui-test-transaction-builder.workspace = true


[features]
default = ["pg_backend"]
default = ["pg_backend", "postgres-feature"]
postgres-feature = []
pg_integration = []
pg_backend = []
15 changes: 8 additions & 7 deletions crates/sui-graphql-rpc/src/context_data/db_data_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::{
error::Error,
types::{address::Address, sui_address::SuiAddress, validator::Validator},
};
use diesel::PgConnection;
use std::{collections::BTreeMap, time::Duration};
use sui_indexer::db::PgConnectionPoolConfig;
use sui_indexer::db::ConnectionPoolConfig;
use sui_indexer::{apis::GovernanceReadApi, indexer_reader::IndexerReader};
use sui_json_rpc_types::Stake as RpcStakedSui;
use sui_types::{
Expand All @@ -19,16 +20,16 @@ use sui_types::{
};

pub(crate) struct PgManager {
pub inner: IndexerReader,
pub inner: IndexerReader<PgConnection>,
}

impl PgManager {
pub(crate) fn new(inner: IndexerReader) -> Self {
pub(crate) fn new(inner: IndexerReader<PgConnection>) -> Self {
Self { inner }
}

/// Create a new underlying reader, which is used by this type as well as other data providers.
pub(crate) fn reader(db_url: impl Into<String>) -> Result<IndexerReader, Error> {
pub(crate) fn reader(db_url: impl Into<String>) -> Result<IndexerReader<PgConnection>, Error> {
Self::reader_with_config(
db_url,
DEFAULT_SERVER_DB_POOL_SIZE,
Expand All @@ -40,11 +41,11 @@ impl PgManager {
db_url: impl Into<String>,
pool_size: u32,
timeout_ms: u64,
) -> Result<IndexerReader, Error> {
let mut config = PgConnectionPoolConfig::default();
) -> Result<IndexerReader<PgConnection>, Error> {
let mut config = ConnectionPoolConfig::default();
config.set_pool_size(pool_size);
config.set_statement_timeout(Duration::from_millis(timeout_ms));
IndexerReader::new_with_config(db_url, config)
IndexerReader::<PgConnection>::new_with_config(db_url, config)
.map_err(|e| Error::Internal(format!("Failed to create reader: {e}")))
}
}
Expand Down
34 changes: 18 additions & 16 deletions crates/sui-graphql-rpc/src/context_data/package_cache.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use async_trait::async_trait;
use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl};
use move_core_types::account_address::AccountAddress;
use sui_indexer::errors::IndexerError;
use sui_indexer::{indexer_reader::IndexerReader, schema::objects};
use sui_indexer::{
indexer_reader::IndexerReader, run_query_async, schema::objects, spawn_read_only_blocking,
};
use sui_package_resolver::{
error::Error as PackageResolverError, Package, PackageStore, PackageStoreWithLruCache, Result,
};
Expand Down Expand Up @@ -36,7 +37,7 @@ pub(crate) type PackageCache = PackageStoreWithLruCache<DbPackageStore>;

/// Store which fetches package for the given address from the backend db on every call
/// to `fetch`
pub struct DbPackageStore(pub IndexerReader);
pub struct DbPackageStore(pub IndexerReader<diesel::PgConnection>);

#[async_trait]
impl PackageStore for DbPackageStore {
Expand All @@ -52,36 +53,37 @@ impl PackageStore for DbPackageStore {

async fn get_package_version_from_db(
id: AccountAddress,
sui_indexer: &IndexerReader,
sui_indexer: &IndexerReader<diesel::PgConnection>,
) -> Result<SequenceNumber> {
let query = objects::dsl::objects
.select(objects::dsl::object_version)
.filter(objects::dsl::object_id.eq(id.to_vec()));

let Some(version) = sui_indexer
.run_query_async(move |conn| query.get_result::<i64>(conn).optional())
.await
.map_err(Error::Indexer)?
let pool = sui_indexer.get_pool();
let Some(version) =
run_query_async!(&pool, move |conn| query.get_result::<i64>(conn).optional())
.map_err(Error::Indexer)?
else {
return Err(PackageResolverError::PackageNotFound(id));
};

Ok(SequenceNumber::from_u64(version as u64))
}

async fn get_package_from_db(id: AccountAddress, sui_indexer: &IndexerReader) -> Result<Package> {
async fn get_package_from_db(
id: AccountAddress,
sui_indexer: &IndexerReader<diesel::PgConnection>,
) -> Result<Package> {
let query = objects::dsl::objects
.select(objects::dsl::serialized_object)
.filter(objects::dsl::object_id.eq(id.to_vec()));

let Some(bcs) = sui_indexer
.run_query_async(move |conn| query.get_result::<Vec<u8>>(conn).optional())
.await
.map_err(Error::Indexer)?
let pool = sui_indexer.get_pool();
let Some(bcs) = run_query_async!(&pool, move |conn| query
.get_result::<Vec<u8>>(conn)
.optional())
.map_err(Error::Indexer)?
else {
return Err(PackageResolverError::PackageNotFound(id));
};

let object = bcs::from_bytes::<Object>(&bcs)?;
Package::read(&object)
}
31 changes: 18 additions & 13 deletions crates/sui-graphql-rpc/src/data/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ use diesel::{
};
use sui_indexer::indexer_reader::IndexerReader;

use sui_indexer::{run_query_async, run_query_repeatable_async, spawn_read_only_blocking};
use tracing::error;

#[derive(Clone)]
pub(crate) struct PgExecutor {
pub inner: IndexerReader,
pub inner: IndexerReader<diesel::PgConnection>,
pub limits: Limits,
pub metrics: Metrics,
}
Expand All @@ -29,7 +30,11 @@ pub(crate) struct PgConnection<'c> {
}

impl PgExecutor {
pub(crate) fn new(inner: IndexerReader, limits: Limits, metrics: Metrics) -> Self {
pub(crate) fn new(
inner: IndexerReader<diesel::PgConnection>,
limits: Limits,
metrics: Metrics,
) -> Self {
Self {
inner,
limits,
Expand All @@ -54,10 +59,8 @@ impl QueryExecutor for PgExecutor {
{
let max_cost = self.limits.max_db_query_cost;
let instant = Instant::now();
let result = self
.inner
.run_query_async(move |conn| txn(&mut PgConnection { max_cost, conn }))
.await;
let pool = self.inner.get_pool();
let result = run_query_async!(&pool, move |conn| txn(&mut PgConnection { max_cost, conn }));
self.metrics
.observe_db_data(instant.elapsed(), result.is_ok());
if let Err(e) = &result {
Expand All @@ -76,10 +79,11 @@ impl QueryExecutor for PgExecutor {
{
let max_cost = self.limits.max_db_query_cost;
let instant = Instant::now();
let result = self
.inner
.run_query_repeatable_async(move |conn| txn(&mut PgConnection { max_cost, conn }))
.await;
let pool = self.inner.get_pool();
let result = run_query_repeatable_async!(&pool, move |conn| txn(&mut PgConnection {
max_cost,
conn
}));
self.metrics
.observe_db_data(instant.elapsed(), result.is_ok());
if let Err(e) = &result {
Expand Down Expand Up @@ -187,16 +191,17 @@ mod tests {
use diesel::QueryDsl;
use sui_framework::BuiltInFramework;
use sui_indexer::{
db::{get_pg_pool_connection, new_pg_connection_pool, reset_database},
db::{get_pool_connection, new_connection_pool, reset_database},
models::objects::StoredObject,
schema::objects,
types::IndexedObject,
};

#[test]
fn test_query_cost() {
let pool = new_pg_connection_pool(DEFAULT_SERVER_DB_URL, Some(5)).unwrap();
let mut conn = get_pg_pool_connection(&pool).unwrap();
let pool =
new_connection_pool::<diesel::PgConnection>(DEFAULT_SERVER_DB_URL, Some(5)).unwrap();
let mut conn = get_pool_connection(&pool).unwrap();
reset_database(&mut conn, /* drop_all */ true).unwrap();

let objects: Vec<StoredObject> = BuiltInFramework::iter_system_packages()
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;

pub struct ExecutorCluster {
pub executor_server_handle: JoinHandle<()>,
pub indexer_store: PgIndexerStore,
pub indexer_store: PgIndexerStore<diesel::PgConnection>,
pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
pub graphql_server_join_handle: JoinHandle<()>,
pub graphql_client: SimpleClient,
Expand All @@ -46,7 +46,7 @@ pub struct ExecutorCluster {

pub struct Cluster {
pub validator_fullnode_handle: TestCluster,
pub indexer_store: PgIndexerStore,
pub indexer_store: PgIndexerStore<diesel::PgConnection>,
pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
pub graphql_server_join_handle: JoinHandle<()>,
pub graphql_client: SimpleClient,
Expand Down Expand Up @@ -318,7 +318,7 @@ impl ExecutorCluster {
pub async fn cleanup_resources(self) {
self.cancellation_token.cancel();
let db_url = self.graphql_connection_config.db_url.clone();
force_delete_database(db_url).await;
force_delete_database::<diesel::PgConnection>(db_url).await;
}

pub async fn force_objects_snapshot_catchup(&self, start_cp: u64, end_cp: u64) {
Expand Down
14 changes: 12 additions & 2 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ chrono.workspace = true
serde_with.workspace = true
clap.workspace = true
tap.workspace = true
diesel.workspace = true
diesel-derive-enum.workspace = true
diesel = { features = [
"chrono",
"r2d2",
"serde_json",
"64-column-tables",
"i-implement-a-third-party-backend-and-opt-into-breaking-changes",
], optional = true }
diesel-derive-enum = { optional = true }
futures.workspace = true
itertools.workspace = true
jsonrpsee.workspace = true
Expand Down Expand Up @@ -52,9 +58,13 @@ move-binary-format.workspace = true

diesel_migrations.workspace = true
cached.workspace = true
downcast = "0.11.0"

[features]
pg_integration = []
default = ["postgres-feature"]
postgres-feature = ["diesel/postgres", "diesel/postgres_backend", "diesel-derive-enum/postgres"]
mysql-feature = ["diesel/mysql", "diesel/mysql_backend", "diesel-derive-enum/mysql"]

[dev-dependencies]
sui-keys.workspace = true
Expand Down
Loading

0 comments on commit f005ad4

Please sign in to comment.