Skip to content

Commit

Permalink
indexer: use async connection for package resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
bmwill committed Sep 5, 2024
1 parent 728accd commit 07248c9
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 69 deletions.
2 changes: 1 addition & 1 deletion crates/sui-analytics-indexer/src/package_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl From<Error> for PackageResolverError {
match source {
Error::TypedStore(store_error) => Self::Store {
store: STORE,
source: Arc::new(store_error),
error: store_error.to_string(),
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-graphql-rpc/src/data/package_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Loader<PackageKey> for Db {
.await
.map_err(|e| PackageResolverError::Store {
store: STORE,
source: Arc::new(e),
error: e.to_string(),
})?;

let mut id_to_package = HashMap::new();
Expand Down
8 changes: 1 addition & 7 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary
use sui_types::sui_system_state::{get_sui_system_state, SuiSystemStateTrait};
use sui_types::transaction::TransactionDataAPI;

use crate::db::ConnectionPool;
use crate::errors::IndexerError;
use crate::handlers::committer::start_tx_checkpoint_commit_task;
use crate::handlers::tx_processor::IndexingPackageBuffer;
Expand Down Expand Up @@ -154,8 +153,7 @@ impl CheckpointHandler {
package_tx: watch::Receiver<Option<CheckpointSequenceNumber>>,
) -> Self {
let package_buffer = IndexingPackageBuffer::start(package_tx);
let pg_blocking_cp = Self::pg_blocking_cp(state.clone()).unwrap();
let package_db_resolver = IndexerStorePackageResolver::new(pg_blocking_cp);
let package_db_resolver = IndexerStorePackageResolver::new(state.pool());
let in_mem_package_resolver = InterimPackageResolver::new(
package_db_resolver,
package_buffer.clone(),
Expand Down Expand Up @@ -664,10 +662,6 @@ impl CheckpointHandler {
})
.collect()
}

pub(crate) fn pg_blocking_cp(state: PgIndexerStore) -> Result<ConnectionPool, IndexerError> {
Ok(state.blocking_cp())
}
}

async fn get_move_struct_layout_map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ impl ObjectsSnapshotProcessor {
// We include a commit receiver which will be paged when a checkpoint has been processed and
// the corresponding package data can be deleted from the buffer.
let package_buffer = IndexingPackageBuffer::start(commit_receiver);
let pg_blocking_cp = CheckpointHandler::pg_blocking_cp(store.clone()).unwrap();
let package_db_resolver = IndexerStorePackageResolver::new(pg_blocking_cp);
let package_db_resolver = IndexerStorePackageResolver::new(store.pool());
let in_mem_package_resolver = InterimPackageResolver::new(
package_db_resolver,
package_buffer.clone(),
Expand Down
4 changes: 1 addition & 3 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use crate::store::pg_partition_manager::PgPartitionManager;
use crate::store::PgIndexerStore;
use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult};

use super::checkpoint_handler::CheckpointHandler;

pub struct Pruner {
pub store: PgIndexerStore,
pub partition_manager: PgPartitionManager,
Expand All @@ -27,7 +25,7 @@ impl Pruner {
epochs_to_keep: u64,
metrics: IndexerMetrics,
) -> Result<Self, IndexerError> {
let blocking_cp = CheckpointHandler::pg_blocking_cp(store.clone()).unwrap();
let blocking_cp = store.blocking_cp();
let partition_manager = PgPartitionManager::new(blocking_cp.clone())?;
Ok(Self {
store,
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub type PackageResolver = Arc<Resolver<PackageStoreWithLruCache<IndexerStorePac
// Impl for common initialization and utilities
impl IndexerReader {
pub fn new(blocking_pool: BlockingConnectionPool, pool: ConnectionPool) -> Self {
let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(blocking_pool.clone());
let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(pool.clone());
let package_cache = PackageStoreWithLruCache::new(indexer_store_pkg_resolver);
let package_resolver = Arc::new(Resolver::new(package_cache));
let package_obj_type_cache = Arc::new(Mutex::new(SizedCache::with_size(10000)));
Expand Down Expand Up @@ -113,7 +113,7 @@ impl IndexerReader {

let pool = ConnectionPool::new(db_url.parse()?, config).await?;

let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(blocking_pool.clone());
let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(pool.clone());
let package_cache = PackageStoreWithLruCache::new(indexer_store_pkg_resolver);
let package_resolver = Arc::new(Resolver::new(package_cache));
let package_obj_type_cache = Arc::new(Mutex::new(SizedCache::with_size(10000)));
Expand Down
74 changes: 26 additions & 48 deletions crates/sui-indexer/src/store/package_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,82 +3,60 @@

use std::sync::{Arc, Mutex};

use crate::database::ConnectionPool;
use crate::handlers::tx_processor::IndexingPackageBuffer;
use crate::metrics::IndexerMetrics;
use crate::schema::objects;
use anyhow::anyhow;
use async_trait::async_trait;
use diesel::ExpressionMethods;
use diesel::OptionalExtension;
use diesel::{QueryDsl, RunQueryDsl};

use diesel::QueryDsl;
use diesel_async::RunQueryDsl;
use move_core_types::account_address::AccountAddress;
use sui_package_resolver::{error::Error as PackageResolverError, Package, PackageStore};
use sui_types::base_types::ObjectID;
use sui_types::object::Object;

use crate::db::ConnectionPool;
use crate::errors::IndexerError;
use crate::handlers::tx_processor::IndexingPackageBuffer;
use crate::metrics::IndexerMetrics;
use crate::schema::objects;
use crate::store::diesel_macro::*;

/// A package resolver that reads packages from the database.
#[derive(Clone)]
pub struct IndexerStorePackageResolver {
cp: ConnectionPool,
}

impl Clone for IndexerStorePackageResolver {
fn clone(&self) -> IndexerStorePackageResolver {
Self {
cp: self.cp.clone(),
}
}
pool: ConnectionPool,
}

impl IndexerStorePackageResolver {
pub fn new(cp: ConnectionPool) -> Self {
Self { cp }
pub fn new(pool: ConnectionPool) -> Self {
Self { pool }
}
}

#[async_trait]
impl PackageStore for IndexerStorePackageResolver {
async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>, PackageResolverError> {
let pkg = self
.get_package_from_db_in_blocking_task(id)
.get_package_from_db(id)
.await
.map_err(|e| PackageResolverError::Store {
store: "PostgresDB",
source: Arc::new(e),
error: e.to_string(),
})?;
Ok(Arc::new(pkg))
}
}

impl IndexerStorePackageResolver {
fn get_package_from_db(&self, id: AccountAddress) -> Result<Package, IndexerError> {
let Some(bcs) = read_only_blocking!(&self.cp, |conn| {
let query = objects::dsl::objects
.select(objects::dsl::serialized_object)
.filter(objects::dsl::object_id.eq(id.to_vec()));
query.get_result::<Vec<u8>>(conn).optional()
})?
else {
return Err(IndexerError::PostgresReadError(format!(
"Package not found in DB: {:?}",
id
)));
};
let object = bcs::from_bytes::<Object>(&bcs)?;
Package::read_from_object(&object).map_err(|e| {
IndexerError::PostgresReadError(format!("Failed parsing object to package: {:?}", e))
})
}
async fn get_package_from_db(&self, id: AccountAddress) -> Result<Package, anyhow::Error> {
let mut connection = self.pool.get().await?;

let bcs = objects::dsl::objects
.select(objects::dsl::serialized_object)
.filter(objects::dsl::object_id.eq(id.to_vec()))
.get_result::<Vec<u8>>(&mut connection)
.await
.map_err(|e| anyhow!("Package not found in DB: {e}"))?;

async fn get_package_from_db_in_blocking_task(
&self,
id: AccountAddress,
) -> Result<Package, IndexerError> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.get_package_from_db(id)).await?
let object = bcs::from_bytes::<Object>(&bcs)?;
Package::read_from_object(&object)
.map_err(|e| anyhow!("Failed parsing object to package: {e}"))
}
}

Expand Down Expand Up @@ -114,7 +92,7 @@ impl PackageStore for InterimPackageResolver {
self.metrics.indexing_package_resolver_in_mem_hit.inc();
let pkg = Package::read_from_object(&obj).map_err(|e| PackageResolverError::Store {
store: "InMemoryPackageBuffer",
source: Arc::new(e),
error: e.to_string(),
})?;
Ok(Arc::new(pkg))
} else {
Expand Down
7 changes: 2 additions & 5 deletions crates/sui-package-resolver/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ pub enum Error {
#[error("{0}")]
Bcs(#[from] bcs::Error),

#[error("Store {} error: {}", store, source)]
Store {
store: &'static str,
source: Arc<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Store {} error: {}", store, error)]
Store { store: &'static str, error: String },

#[error("{0}")]
Deserialize(VMError),
Expand Down

0 comments on commit 07248c9

Please sign in to comment.