diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index e246c79c0..717f30a1d 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -46,6 +46,11 @@ pub enum WasmIndexerError { KillSwitch, DatabaseError, MissingBlocksError, + InvalidLogLevel, + GetObjectIdFailed, + GetStringFailed, + AllocMissing, + AllocFailed, GeneralError, } @@ -60,6 +65,11 @@ impl From for WasmIndexerError { 5 => Self::KillSwitch, 6 => Self::DatabaseError, 7 => Self::MissingBlocksError, + 8 => Self::InvalidLogLevel, + 9 => Self::GetObjectIdFailed, + 10 => Self::GetStringFailed, + 11 => Self::AllocMissing, + 12 => Self::AllocFailed, _ => Self::GeneralError, } } @@ -95,6 +105,21 @@ impl std::fmt::Display for WasmIndexerError { Self::MissingBlocksError => { write!(f, "Some blocks are missing") } + Self::InvalidLogLevel => { + write!(f, "Invalid log level.") + } + Self::GetObjectIdFailed => { + write!(f, "get_object_id failed.") + } + Self::GetStringFailed => { + write!(f, "get_string failed.") + } + Self::AllocMissing => { + write!(f, "alloc is missing.") + } + Self::AllocFailed => { + write!(f, "alloc failed.") + } Self::GeneralError => write!(f, "Some unspecified WASM error occurred."), } } diff --git a/packages/fuel-indexer/src/executor.rs b/packages/fuel-indexer/src/executor.rs index 70a64f01d..898afc5b1 100644 --- a/packages/fuel-indexer/src/executor.rs +++ b/packages/fuel-indexer/src/executor.rs @@ -3,6 +3,7 @@ use crate::{ database::Database, ffi, queries::ClientExt, IndexerConfig, IndexerError, IndexerResult, }; +use anyhow::Context; use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use fuel_core_client::client::{ @@ -78,7 +79,7 @@ pub fn run_executor( config: &IndexerConfig, pool: IndexerConnectionPool, mut executor: T, -) -> impl Future { +) -> anyhow::Result>> { // TODO: https://github.com/FuelLabs/fuel-indexer/issues/286 let end_block = executor.manifest().end_block(); @@ -104,9 +105,9 @@ pub fn run_executor( info!("Indexer({indexer_uid}) subscribing to Fuel node at {fuel_node_addr}"); - let client = FuelClient::from_str(&fuel_node_addr).unwrap_or_else(|e| { - panic!("Indexer({indexer_uid}) client node connection failed: {e}.") - }); + let client = FuelClient::from_str(&fuel_node_addr).with_context(|| { + format!("Indexer({indexer_uid}) client node connection failed.") + })?; if let Some(end_block) = end_block { info!("Indexer({indexer_uid}) will stop at block #{end_block}."); @@ -116,10 +117,10 @@ pub fn run_executor( let allow_non_sequential_blocks = config.allow_non_sequential_blocks; - async move { - let mut conn = pool.acquire().await.unwrap_or_else(|_| { - panic!("Indexer({indexer_uid}) was unable to acquire a database connection.") - }); + let task = async move { + let mut conn = pool.acquire().await.with_context(|| { + format!("Indexer({indexer_uid}) was unable to acquire a database connection.") + })?; if allow_non_sequential_blocks { queries::remove_ensure_block_height_consecutive_trigger( @@ -127,8 +128,7 @@ pub fn run_executor( executor.manifest().namespace(), executor.manifest().identifier(), ) - .await - .unwrap_or_else(|_| panic!("Unable to remove the sequential blocks trigger for Indexer({indexer_uid})")); + .await.with_context(|| format!("Unable to remove the sequential blocks trigger for Indexer({indexer_uid})"))?; } else { queries::create_ensure_block_height_consecutive_trigger( &mut conn, @@ -136,7 +136,7 @@ pub fn run_executor( executor.manifest().identifier(), ) .await - .unwrap_or_else(|_| panic!("Unable to create the sequential blocks trigger for Indexer({indexer_uid})")); + .with_context(|| format!("Unable to create the sequential blocks trigger for Indexer({indexer_uid})"))?; } // If we reach an issue that continues to fail, we'll retry a few times before giving up, as @@ -163,8 +163,7 @@ pub fn run_executor( loop { // If something else has signaled that this indexer should stop, then stop. if executor.kill_switch().load(Ordering::SeqCst) { - info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); - break; + return Err(IndexerError::KillSwitch(indexer_uid)); } // Fetch the next page of blocks, and the starting cursor for the subsequent page @@ -205,8 +204,9 @@ pub fn run_executor( ); if num_empty_block_reqs == max_empty_block_reqs { - error!("No blocks being produced after {num_empty_block_reqs} empty responses. Indexer({indexer_uid}) giving up. <('.')>"); - break; + return Err(anyhow::format_err!( + "No blocks being produced after {num_empty_block_reqs} empty responses. Indexer({indexer_uid}) giving up. <('.')>" + ).into()); } // There is no work to do, so we sleep for a bit, then continue without updating our cursor. @@ -219,8 +219,7 @@ pub fn run_executor( // If the kill switch has been triggered, the executor exits early. if executor.kill_switch().load(Ordering::SeqCst) { - info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); - break; + return Err(IndexerError::KillSwitch(indexer_uid)); } if let Err(e) = result { @@ -228,24 +227,24 @@ pub fn run_executor( if let Some(&WasmIndexerError::MissingBlocksError) = e.downcast_ref::() { - error!( + return Err(anyhow::format_err!( "Indexer({indexer_uid}) terminating due to missing blocks." - ); - break; + ) + .into()); } } // Run time metering is deterministic. There is no point in retrying. if let IndexerError::RunTimeLimitExceededError = e { - error!("Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points"); - break; + return Err(anyhow::format_err!( + "Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points" + ).into()); } // We don't want to retry forever as that eats resources. if consecutive_retries >= INDEXER_FAILED_CALLS { - error!( + return Err(anyhow::format_err!( "Indexer({indexer_uid}) failed after too many retries, giving up. <('.')>" - ); - break; + ).into()); } if let IndexerError::SqlxError(sqlx::Error::Database(inner)) = e { @@ -276,14 +275,15 @@ pub fn run_executor( // Again, check if something else has signaled that this indexer should stop, then stop. if executor.kill_switch().load(Ordering::SeqCst) { - info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); - break; + return Err(IndexerError::KillSwitch(indexer_uid)); } // Since we had successful call, we reset the retry count. consecutive_retries = 0; } - } + }; + + Ok(task) } /// Retrieve blocks from a client node. diff --git a/packages/fuel-indexer/src/ffi.rs b/packages/fuel-indexer/src/ffi.rs index 6cb957563..d0955a555 100644 --- a/packages/fuel-indexer/src/ffi.rs +++ b/packages/fuel-indexer/src/ffi.rs @@ -63,7 +63,7 @@ fn get_string(mem: &MemoryView, ptr: u32, len: u32) -> FFIResult { /// Fetch the object ID at the given pointer from memory. fn get_object_id(mem: &MemoryView, ptr: u32, len: u32) -> FFIResult { - let id = get_string(mem, ptr, len).unwrap(); + let id = get_string(mem, ptr, len)?; // https://stackoverflow.com/a/1348551 let id: String = id.chars().filter(|&c| c != '\0').collect(); Ok(id) @@ -90,7 +90,7 @@ fn log_data( let mem = idx_env .memory .as_mut() - .expect("Memory unitialized.") + .ok_or(WasmIndexerError::UninitializedMemory)? .view(&store); let log_string = @@ -102,15 +102,16 @@ fn log_data( LOG_LEVEL_INFO => info!("{log_string}",), LOG_LEVEL_DEBUG => debug!("{log_string}",), LOG_LEVEL_TRACE => trace!("{log_string}",), - l => panic!("Invalid log level: {l}"), + l => { + error!("Invalid log level: {l}"); + return Err(WasmIndexerError::InvalidLogLevel); + } } Ok(()) } /// Fetch the given type at the given pointer from memory. -/// -/// This function is fallible, and will panic if the type cannot be fetched. fn get_object( mut env: FunctionEnvMut, type_id: i64, @@ -131,13 +132,14 @@ fn get_object( let mem = idx_env .memory .as_mut() - .expect("Memory unitialized.") + .ok_or(WasmIndexerError::UninitializedMemory)? .view(&store); let offset = 1; let len = 64; let padding = 6; - let id = get_object_id(&mem, ptr + offset, len + padding + offset).unwrap(); + let id = get_object_id(&mem, ptr + offset, len + padding + offset) + .map_err(|_| WasmIndexerError::GetObjectIdFailed)?; let rt = tokio::runtime::Handle::current(); let bytes = rt @@ -148,10 +150,15 @@ fn get_object( })?; if let Some(bytes) = bytes { - let alloc_fn = idx_env.alloc.as_mut().expect("Alloc export is missing."); + let alloc_fn = idx_env + .alloc + .as_mut() + .ok_or(WasmIndexerError::AllocMissing)?; let size = bytes.len() as u32; - let result = alloc_fn.call(&mut store, size).expect("Alloc failed."); + let result = alloc_fn + .call(&mut store, size) + .map_err(|_| WasmIndexerError::AllocFailed)?; let range = result as usize..result as usize + size as usize; let mem = idx_env @@ -175,8 +182,6 @@ fn get_object( } /// Put the given type at the given pointer into memory. -/// -/// This function is fallible, and will panic if the type cannot be saved. fn put_object( mut env: FunctionEnvMut, type_id: i64, @@ -194,11 +199,11 @@ fn put_object( return Err(WasmIndexerError::KillSwitch); } - let mem = if let Some(memory) = idx_env.memory.as_mut() { - memory.view(&store) - } else { - return Err(WasmIndexerError::UninitializedMemory); - }; + let mem = idx_env + .memory + .as_mut() + .ok_or(WasmIndexerError::UninitializedMemory)? + .view(&store); let mut bytes = Vec::with_capacity(len as usize); let range = ptr as usize..ptr as usize + len as usize; @@ -234,8 +239,6 @@ fn put_object( } /// Execute the arbitrary query at the given pointer. -/// -/// This function is fallible, and will panic if the query cannot be executed. fn put_many_to_many_record( mut env: FunctionEnvMut, ptr: u32, diff --git a/packages/fuel-indexer/src/lib.rs b/packages/fuel-indexer/src/lib.rs index d02050b8a..b479bcbf6 100644 --- a/packages/fuel-indexer/src/lib.rs +++ b/packages/fuel-indexer/src/lib.rs @@ -88,4 +88,8 @@ pub enum IndexerError { EndBlockMet, #[error("Invalid schema: {0:?}")] SchemaVersionMismatch(String), + #[error(transparent)] + Other(#[from] anyhow::Error), + #[error("Indexer({0}) kill switch flipped. <('.')>")] + KillSwitch(String), } diff --git a/packages/fuel-indexer/src/service.rs b/packages/fuel-indexer/src/service.rs index 16891fafd..4cd30ecfa 100644 --- a/packages/fuel-indexer/src/service.rs +++ b/packages/fuel-indexer/src/service.rs @@ -29,7 +29,7 @@ pub struct IndexerService { manager: SchemaManager, /// Tasks for the spawned indexers. - tasks: tokio::task::JoinSet<()>, + tasks: tokio::task::JoinSet>, /// Channel used to receive `ServiceRequest`s. rx: Receiver, @@ -177,7 +177,7 @@ impl IndexerService { manifest.identifier() ); - self.start_executor(executor); + self.start_executor(executor)?; Ok(()) } @@ -204,7 +204,7 @@ impl IndexerService { { info!("Registered Indexer({})", manifest.uid()); - self.start_executor(executor); + self.start_executor(executor)?; } else { error!( "Failed to register Indexer({}) from registry.", @@ -257,7 +257,7 @@ impl IndexerService { info!("Registered NativeIndex({})", uid); - self.start_executor(executor); + self.start_executor(executor)?; Ok(()) } @@ -267,8 +267,15 @@ impl IndexerService { loop { tokio::select! { // Calling join_next will remove finished tasks from the set. - Some(Err(e)) = self.tasks.join_next() => { - error!("Error retiring indexer task {e}"); + Some(result) = self.tasks.join_next() => { + match result { + Ok(Ok(())) => (), + Ok(Err(e)) => match e { + IndexerError::KillSwitch(_) => info!{"{e}"}, + _ => error!("{e}"), + } + Err(e) => error!("Error retiring indexer task {e}"), + } } Some(service_request) = self.rx.recv() => { match service_request { @@ -311,7 +318,7 @@ impl IndexerService { ) .await { - Ok(executor) => self.start_executor(executor), + Ok(executor) => self.start_executor(executor)?, Err(e) => { error!( "Failed to reload Indexer({}.{}): {e:?}", @@ -350,7 +357,11 @@ impl IndexerService { // Spawn and register a tokio::task running the Executor loop, as well as // the kill switch and the abort handle. - fn start_executor(&mut self, executor: T) { + #[allow(clippy::result_large_err)] + fn start_executor( + &mut self, + executor: T, + ) -> IndexerResult<()> { let uid = executor.manifest().uid(); self.killers @@ -360,7 +371,9 @@ impl IndexerService { &self.config, self.pool.clone(), executor, - )); + )?); + + Ok(()) } }