Skip to content

Commit

Permalink
chore(indexer-alt): standardise Db Errors on anyhow
Browse files Browse the repository at this point in the history
## Description

Return `anyhow::Result`s from `sui_pg_db::Db` APIs, hiding the
underlying connection library's error type.

This was done to make it easier to support read-only transactions, which
require executing a DB statement after acquiring a connection (to ensure
it is read-only), which might produce diesel's error type, while
acquiring the connection produces an error from `bb8`.

## Test plan
CI
  • Loading branch information
amnn committed Dec 19, 2024
1 parent bc7a3f8 commit ac4f3ac
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ pub(super) fn committer<H: Handler + 'static>(
pipeline = H::NAME,
"Committed failed to get connection for DB"
);

metrics
.total_committer_batches_failed
.with_label_values(&[H::NAME])
.inc();
BE::transient(Break::Err(e.into()))

BE::transient(Break::Err(e))
})?;

let affected = H::commit(values.as_slice(), &mut conn).await;
Expand Down
5 changes: 2 additions & 3 deletions crates/sui-indexer-alt-jsonrpc/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use diesel::query_builder::QueryFragment;
use diesel::query_dsl::methods::LimitDsl;
use diesel::result::Error as DieselError;
use diesel_async::methods::LoadQuery;
use diesel_async::pooled_connection::bb8::RunError;
use diesel_async::RunQueryDsl;
use jsonrpsee::core::Error as RpcError;
use jsonrpsee::types::{
Expand All @@ -26,15 +25,15 @@ struct Connection<'p>(db::Connection<'p>);
#[derive(thiserror::Error, Debug)]
enum DbError {
#[error(transparent)]
Connect(#[from] RunError),
Connect(anyhow::Error),

#[error(transparent)]
RunQuery(#[from] DieselError),
}

impl<'p> Connection<'p> {
async fn get(db: &'p db::Db) -> Result<Self, DbError> {
Ok(Self(db.connect().await?))
Ok(Self(db.connect().await.map_err(DbError::Connect)?))
}

async fn first<'q, Q, U>(&mut self, query: Q) -> Result<U, DbError>
Expand Down
16 changes: 8 additions & 8 deletions crates/sui-pg-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use diesel::pg::Pg;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::{
pooled_connection::{
bb8::{Pool, PooledConnection, RunError},
AsyncDieselConnectionManager, PoolError,
bb8::{Pool, PooledConnection},
AsyncDieselConnectionManager,
},
AsyncPgConnection, RunQueryDsl,
};
Expand Down Expand Up @@ -50,7 +50,7 @@ impl DbArgs {
impl Db {
/// Construct a new DB connection pool. Instances of [Db] can be cloned to share access to the
/// same pool.
pub async fn new(config: DbArgs) -> Result<Self, PoolError> {
pub async fn new(config: DbArgs) -> anyhow::Result<Self> {
let manager = AsyncDieselConnectionManager::new(config.database_url.as_str());

let pool = Pool::builder()
Expand All @@ -64,16 +64,16 @@ impl Db {

/// Retrieves a connection from the pool. Can fail with a timeout if a connection cannot be
/// established before the [DbArgs::connection_timeout] has elapsed.
pub async fn connect(&self) -> Result<Connection<'_>, RunError> {
self.pool.get().await
pub async fn connect(&self) -> anyhow::Result<Connection<'_>> {
Ok(self.pool.get().await?)
}

/// Statistics about the connection pool
pub fn state(&self) -> bb8::State {
self.pool.state()
}

async fn clear_database(&self) -> Result<(), anyhow::Error> {
async fn clear_database(&self) -> anyhow::Result<()> {
info!("Clearing the database...");
let mut conn = self.connect().await?;
let drop_all_tables = "
Expand Down Expand Up @@ -129,7 +129,7 @@ impl Db {
pub async fn run_migrations<S: MigrationSource<Pg> + Send + Sync + 'static>(
&self,
migrations: S,
) -> Result<Vec<MigrationVersion<'static>>, anyhow::Error> {
) -> anyhow::Result<Vec<MigrationVersion<'static>>> {
use diesel_migrations::MigrationHarness;

info!("Running migrations ...");
Expand Down Expand Up @@ -167,7 +167,7 @@ impl Default for DbArgs {
pub async fn reset_database<S: MigrationSource<Pg> + Send + Sync + 'static>(
db_config: DbArgs,
migrations: Option<S>,
) -> Result<(), anyhow::Error> {
) -> anyhow::Result<()> {
let db = Db::new(db_config).await?;
db.clear_database().await?;
if let Some(migrations) = migrations {
Expand Down

0 comments on commit ac4f3ac

Please sign in to comment.