diff --git a/crates/sui-analytics-indexer/src/package_store.rs b/crates/sui-analytics-indexer/src/package_store.rs index 94199a6353009..ea6155ac2b4a1 100644 --- a/crates/sui-analytics-indexer/src/package_store.rs +++ b/crates/sui-analytics-indexer/src/package_store.rs @@ -32,7 +32,7 @@ impl From for PackageResolverError { match source { Error::TypedStore(store_error) => Self::Store { store: STORE, - source: Arc::new(store_error), + error: store_error.to_string(), }, } } diff --git a/crates/sui-graphql-rpc/src/data/package_resolver.rs b/crates/sui-graphql-rpc/src/data/package_resolver.rs index f10067fd007b9..6a1d2956663ef 100644 --- a/crates/sui-graphql-rpc/src/data/package_resolver.rs +++ b/crates/sui-graphql-rpc/src/data/package_resolver.rs @@ -66,7 +66,7 @@ impl Loader for Db { .await .map_err(|e| PackageResolverError::Store { store: STORE, - source: Arc::new(e), + error: e.to_string(), })?; let mut id_to_package = HashMap::new(); diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 52e0deeb1f5d6..62985b92cb927 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -33,7 +33,6 @@ use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary use sui_types::sui_system_state::{get_sui_system_state, SuiSystemStateTrait}; use sui_types::transaction::TransactionDataAPI; -use crate::db::ConnectionPool; use crate::errors::IndexerError; use crate::handlers::committer::start_tx_checkpoint_commit_task; use crate::handlers::tx_processor::IndexingPackageBuffer; @@ -154,8 +153,7 @@ impl CheckpointHandler { package_tx: watch::Receiver>, ) -> Self { let package_buffer = IndexingPackageBuffer::start(package_tx); - let pg_blocking_cp = Self::pg_blocking_cp(state.clone()).unwrap(); - let package_db_resolver = IndexerStorePackageResolver::new(pg_blocking_cp); + let package_db_resolver = IndexerStorePackageResolver::new(state.pool()); let in_mem_package_resolver = InterimPackageResolver::new( package_db_resolver, package_buffer.clone(), @@ -664,10 +662,6 @@ impl CheckpointHandler { }) .collect() } - - pub(crate) fn pg_blocking_cp(state: PgIndexerStore) -> Result { - Ok(state.blocking_cp()) - } } async fn get_move_struct_layout_map( diff --git a/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs b/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs index afb1cd93b3caf..670f5375b5e7e 100644 --- a/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs +++ b/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs @@ -128,8 +128,7 @@ impl ObjectsSnapshotProcessor { // We include a commit receiver which will be paged when a checkpoint has been processed and // the corresponding package data can be deleted from the buffer. let package_buffer = IndexingPackageBuffer::start(commit_receiver); - let pg_blocking_cp = CheckpointHandler::pg_blocking_cp(store.clone()).unwrap(); - let package_db_resolver = IndexerStorePackageResolver::new(pg_blocking_cp); + let package_db_resolver = IndexerStorePackageResolver::new(store.pool()); let in_mem_package_resolver = InterimPackageResolver::new( package_db_resolver, package_buffer.clone(), diff --git a/crates/sui-indexer/src/handlers/pruner.rs b/crates/sui-indexer/src/handlers/pruner.rs index abb95e754e5f5..ce62a03ef2ac2 100644 --- a/crates/sui-indexer/src/handlers/pruner.rs +++ b/crates/sui-indexer/src/handlers/pruner.rs @@ -12,8 +12,6 @@ use crate::store::pg_partition_manager::PgPartitionManager; use crate::store::PgIndexerStore; use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult}; -use super::checkpoint_handler::CheckpointHandler; - pub struct Pruner { pub store: PgIndexerStore, pub partition_manager: PgPartitionManager, @@ -27,7 +25,7 @@ impl Pruner { epochs_to_keep: u64, metrics: IndexerMetrics, ) -> Result { - let blocking_cp = CheckpointHandler::pg_blocking_cp(store.clone()).unwrap(); + let blocking_cp = store.blocking_cp(); let partition_manager = PgPartitionManager::new(blocking_cp.clone())?; Ok(Self { store, diff --git a/crates/sui-indexer/src/indexer_reader.rs b/crates/sui-indexer/src/indexer_reader.rs index f7d9693d50867..e70e0f0832061 100644 --- a/crates/sui-indexer/src/indexer_reader.rs +++ b/crates/sui-indexer/src/indexer_reader.rs @@ -80,7 +80,7 @@ pub type PackageResolver = Arc Self { - let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(blocking_pool.clone()); + let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(pool.clone()); let package_cache = PackageStoreWithLruCache::new(indexer_store_pkg_resolver); let package_resolver = Arc::new(Resolver::new(package_cache)); let package_obj_type_cache = Arc::new(Mutex::new(SizedCache::with_size(10000))); @@ -113,7 +113,7 @@ impl IndexerReader { let pool = ConnectionPool::new(db_url.parse()?, config).await?; - let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(blocking_pool.clone()); + let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(pool.clone()); let package_cache = PackageStoreWithLruCache::new(indexer_store_pkg_resolver); let package_resolver = Arc::new(Resolver::new(package_cache)); let package_obj_type_cache = Arc::new(Mutex::new(SizedCache::with_size(10000))); diff --git a/crates/sui-indexer/src/store/package_resolver.rs b/crates/sui-indexer/src/store/package_resolver.rs index 47cd3615a9232..de03c80df9362 100644 --- a/crates/sui-indexer/src/store/package_resolver.rs +++ b/crates/sui-indexer/src/store/package_resolver.rs @@ -3,39 +3,29 @@ use std::sync::{Arc, Mutex}; +use crate::database::ConnectionPool; +use crate::handlers::tx_processor::IndexingPackageBuffer; +use crate::metrics::IndexerMetrics; +use crate::schema::objects; +use anyhow::anyhow; use async_trait::async_trait; use diesel::ExpressionMethods; -use diesel::OptionalExtension; -use diesel::{QueryDsl, RunQueryDsl}; - +use diesel::QueryDsl; +use diesel_async::RunQueryDsl; use move_core_types::account_address::AccountAddress; use sui_package_resolver::{error::Error as PackageResolverError, Package, PackageStore}; use sui_types::base_types::ObjectID; use sui_types::object::Object; -use crate::db::ConnectionPool; -use crate::errors::IndexerError; -use crate::handlers::tx_processor::IndexingPackageBuffer; -use crate::metrics::IndexerMetrics; -use crate::schema::objects; -use crate::store::diesel_macro::*; - /// A package resolver that reads packages from the database. +#[derive(Clone)] pub struct IndexerStorePackageResolver { - cp: ConnectionPool, -} - -impl Clone for IndexerStorePackageResolver { - fn clone(&self) -> IndexerStorePackageResolver { - Self { - cp: self.cp.clone(), - } - } + pool: ConnectionPool, } impl IndexerStorePackageResolver { - pub fn new(cp: ConnectionPool) -> Self { - Self { cp } + pub fn new(pool: ConnectionPool) -> Self { + Self { pool } } } @@ -43,42 +33,30 @@ impl IndexerStorePackageResolver { impl PackageStore for IndexerStorePackageResolver { async fn fetch(&self, id: AccountAddress) -> Result, PackageResolverError> { let pkg = self - .get_package_from_db_in_blocking_task(id) + .get_package_from_db(id) .await .map_err(|e| PackageResolverError::Store { store: "PostgresDB", - source: Arc::new(e), + error: e.to_string(), })?; Ok(Arc::new(pkg)) } } impl IndexerStorePackageResolver { - fn get_package_from_db(&self, id: AccountAddress) -> Result { - let Some(bcs) = read_only_blocking!(&self.cp, |conn| { - let query = objects::dsl::objects - .select(objects::dsl::serialized_object) - .filter(objects::dsl::object_id.eq(id.to_vec())); - query.get_result::>(conn).optional() - })? - else { - return Err(IndexerError::PostgresReadError(format!( - "Package not found in DB: {:?}", - id - ))); - }; - let object = bcs::from_bytes::(&bcs)?; - Package::read_from_object(&object).map_err(|e| { - IndexerError::PostgresReadError(format!("Failed parsing object to package: {:?}", e)) - }) - } + async fn get_package_from_db(&self, id: AccountAddress) -> Result { + let mut connection = self.pool.get().await?; + + let bcs = objects::dsl::objects + .select(objects::dsl::serialized_object) + .filter(objects::dsl::object_id.eq(id.to_vec())) + .get_result::>(&mut connection) + .await + .map_err(|e| anyhow!("Package not found in DB: {e}"))?; - async fn get_package_from_db_in_blocking_task( - &self, - id: AccountAddress, - ) -> Result { - let this = self.clone(); - tokio::task::spawn_blocking(move || this.get_package_from_db(id)).await? + let object = bcs::from_bytes::(&bcs)?; + Package::read_from_object(&object) + .map_err(|e| anyhow!("Failed parsing object to package: {e}")) } } @@ -114,7 +92,7 @@ impl PackageStore for InterimPackageResolver { self.metrics.indexing_package_resolver_in_mem_hit.inc(); let pkg = Package::read_from_object(&obj).map_err(|e| PackageResolverError::Store { store: "InMemoryPackageBuffer", - source: Arc::new(e), + error: e.to_string(), })?; Ok(Arc::new(pkg)) } else { diff --git a/crates/sui-package-resolver/src/error.rs b/crates/sui-package-resolver/src/error.rs index debdfffa96997..5af7f9ab35276 100644 --- a/crates/sui-package-resolver/src/error.rs +++ b/crates/sui-package-resolver/src/error.rs @@ -13,11 +13,8 @@ pub enum Error { #[error("{0}")] Bcs(#[from] bcs::Error), - #[error("Store {} error: {}", store, source)] - Store { - store: &'static str, - source: Arc, - }, + #[error("Store {} error: {}", store, error)] + Store { store: &'static str, error: String }, #[error("{0}")] Deserialize(VMError),