diff --git a/packages/fuel-indexer/src/executor.rs b/packages/fuel-indexer/src/executor.rs index 70a64f01d..898afc5b1 100644 --- a/packages/fuel-indexer/src/executor.rs +++ b/packages/fuel-indexer/src/executor.rs @@ -3,6 +3,7 @@ use crate::{ database::Database, ffi, queries::ClientExt, IndexerConfig, IndexerError, IndexerResult, }; +use anyhow::Context; use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use fuel_core_client::client::{ @@ -78,7 +79,7 @@ pub fn run_executor( config: &IndexerConfig, pool: IndexerConnectionPool, mut executor: T, -) -> impl Future { +) -> anyhow::Result>> { // TODO: https://github.com/FuelLabs/fuel-indexer/issues/286 let end_block = executor.manifest().end_block(); @@ -104,9 +105,9 @@ pub fn run_executor( info!("Indexer({indexer_uid}) subscribing to Fuel node at {fuel_node_addr}"); - let client = FuelClient::from_str(&fuel_node_addr).unwrap_or_else(|e| { - panic!("Indexer({indexer_uid}) client node connection failed: {e}.") - }); + let client = FuelClient::from_str(&fuel_node_addr).with_context(|| { + format!("Indexer({indexer_uid}) client node connection failed.") + })?; if let Some(end_block) = end_block { info!("Indexer({indexer_uid}) will stop at block #{end_block}."); @@ -116,10 +117,10 @@ pub fn run_executor( let allow_non_sequential_blocks = config.allow_non_sequential_blocks; - async move { - let mut conn = pool.acquire().await.unwrap_or_else(|_| { - panic!("Indexer({indexer_uid}) was unable to acquire a database connection.") - }); + let task = async move { + let mut conn = pool.acquire().await.with_context(|| { + format!("Indexer({indexer_uid}) was unable to acquire a database connection.") + })?; if allow_non_sequential_blocks { queries::remove_ensure_block_height_consecutive_trigger( @@ -127,8 +128,7 @@ pub fn run_executor( executor.manifest().namespace(), executor.manifest().identifier(), ) - .await - .unwrap_or_else(|_| panic!("Unable to remove the sequential blocks trigger for Indexer({indexer_uid})")); + .await.with_context(|| format!("Unable to remove the sequential blocks trigger for Indexer({indexer_uid})"))?; } else { queries::create_ensure_block_height_consecutive_trigger( &mut conn, @@ -136,7 +136,7 @@ pub fn run_executor( executor.manifest().identifier(), ) .await - .unwrap_or_else(|_| panic!("Unable to create the sequential blocks trigger for Indexer({indexer_uid})")); + .with_context(|| format!("Unable to create the sequential blocks trigger for Indexer({indexer_uid})"))?; } // If we reach an issue that continues to fail, we'll retry a few times before giving up, as @@ -163,8 +163,7 @@ pub fn run_executor( loop { // If something else has signaled that this indexer should stop, then stop. if executor.kill_switch().load(Ordering::SeqCst) { - info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); - break; + return Err(IndexerError::KillSwitch(indexer_uid)); } // Fetch the next page of blocks, and the starting cursor for the subsequent page @@ -205,8 +204,9 @@ pub fn run_executor( ); if num_empty_block_reqs == max_empty_block_reqs { - error!("No blocks being produced after {num_empty_block_reqs} empty responses. Indexer({indexer_uid}) giving up. <('.')>"); - break; + return Err(anyhow::format_err!( + "No blocks being produced after {num_empty_block_reqs} empty responses. Indexer({indexer_uid}) giving up. <('.')>" + ).into()); } // There is no work to do, so we sleep for a bit, then continue without updating our cursor. @@ -219,8 +219,7 @@ pub fn run_executor( // If the kill switch has been triggered, the executor exits early. if executor.kill_switch().load(Ordering::SeqCst) { - info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); - break; + return Err(IndexerError::KillSwitch(indexer_uid)); } if let Err(e) = result { @@ -228,24 +227,24 @@ pub fn run_executor( if let Some(&WasmIndexerError::MissingBlocksError) = e.downcast_ref::() { - error!( + return Err(anyhow::format_err!( "Indexer({indexer_uid}) terminating due to missing blocks." - ); - break; + ) + .into()); } } // Run time metering is deterministic. There is no point in retrying. if let IndexerError::RunTimeLimitExceededError = e { - error!("Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points"); - break; + return Err(anyhow::format_err!( + "Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points" + ).into()); } // We don't want to retry forever as that eats resources. if consecutive_retries >= INDEXER_FAILED_CALLS { - error!( + return Err(anyhow::format_err!( "Indexer({indexer_uid}) failed after too many retries, giving up. <('.')>" - ); - break; + ).into()); } if let IndexerError::SqlxError(sqlx::Error::Database(inner)) = e { @@ -276,14 +275,15 @@ pub fn run_executor( // Again, check if something else has signaled that this indexer should stop, then stop. if executor.kill_switch().load(Ordering::SeqCst) { - info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); - break; + return Err(IndexerError::KillSwitch(indexer_uid)); } // Since we had successful call, we reset the retry count. consecutive_retries = 0; } - } + }; + + Ok(task) } /// Retrieve blocks from a client node. diff --git a/packages/fuel-indexer/src/lib.rs b/packages/fuel-indexer/src/lib.rs index d02050b8a..b479bcbf6 100644 --- a/packages/fuel-indexer/src/lib.rs +++ b/packages/fuel-indexer/src/lib.rs @@ -88,4 +88,8 @@ pub enum IndexerError { EndBlockMet, #[error("Invalid schema: {0:?}")] SchemaVersionMismatch(String), + #[error(transparent)] + Other(#[from] anyhow::Error), + #[error("Indexer({0}) kill switch flipped. <('.')>")] + KillSwitch(String), } diff --git a/packages/fuel-indexer/src/service.rs b/packages/fuel-indexer/src/service.rs index 16891fafd..4cd30ecfa 100644 --- a/packages/fuel-indexer/src/service.rs +++ b/packages/fuel-indexer/src/service.rs @@ -29,7 +29,7 @@ pub struct IndexerService { manager: SchemaManager, /// Tasks for the spawned indexers. - tasks: tokio::task::JoinSet<()>, + tasks: tokio::task::JoinSet>, /// Channel used to receive `ServiceRequest`s. rx: Receiver, @@ -177,7 +177,7 @@ impl IndexerService { manifest.identifier() ); - self.start_executor(executor); + self.start_executor(executor)?; Ok(()) } @@ -204,7 +204,7 @@ impl IndexerService { { info!("Registered Indexer({})", manifest.uid()); - self.start_executor(executor); + self.start_executor(executor)?; } else { error!( "Failed to register Indexer({}) from registry.", @@ -257,7 +257,7 @@ impl IndexerService { info!("Registered NativeIndex({})", uid); - self.start_executor(executor); + self.start_executor(executor)?; Ok(()) } @@ -267,8 +267,15 @@ impl IndexerService { loop { tokio::select! { // Calling join_next will remove finished tasks from the set. - Some(Err(e)) = self.tasks.join_next() => { - error!("Error retiring indexer task {e}"); + Some(result) = self.tasks.join_next() => { + match result { + Ok(Ok(())) => (), + Ok(Err(e)) => match e { + IndexerError::KillSwitch(_) => info!{"{e}"}, + _ => error!("{e}"), + } + Err(e) => error!("Error retiring indexer task {e}"), + } } Some(service_request) = self.rx.recv() => { match service_request { @@ -311,7 +318,7 @@ impl IndexerService { ) .await { - Ok(executor) => self.start_executor(executor), + Ok(executor) => self.start_executor(executor)?, Err(e) => { error!( "Failed to reload Indexer({}.{}): {e:?}", @@ -350,7 +357,11 @@ impl IndexerService { // Spawn and register a tokio::task running the Executor loop, as well as // the kill switch and the abort handle. - fn start_executor(&mut self, executor: T) { + #[allow(clippy::result_large_err)] + fn start_executor( + &mut self, + executor: T, + ) -> IndexerResult<()> { let uid = executor.manifest().uid(); self.killers @@ -360,7 +371,9 @@ impl IndexerService { &self.config, self.pool.clone(), executor, - )); + )?); + + Ok(()) } }