Skip to content

Commit

Permalink
enhancement: remove panics and expects (#1395)
Browse files Browse the repository at this point in the history
* remove panics

* remove more panics, expects, and unwraps
  • Loading branch information
lostman authored Oct 5, 2023
1 parent 42fe0d0 commit 8f7b76a
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 55 deletions.
25 changes: 25 additions & 0 deletions packages/fuel-indexer-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ pub enum WasmIndexerError {
KillSwitch,
DatabaseError,
MissingBlocksError,
InvalidLogLevel,
GetObjectIdFailed,
GetStringFailed,
AllocMissing,
AllocFailed,
GeneralError,
}

Expand All @@ -60,6 +65,11 @@ impl From<u32> for WasmIndexerError {
5 => Self::KillSwitch,
6 => Self::DatabaseError,
7 => Self::MissingBlocksError,
8 => Self::InvalidLogLevel,
9 => Self::GetObjectIdFailed,
10 => Self::GetStringFailed,
11 => Self::AllocMissing,
12 => Self::AllocFailed,
_ => Self::GeneralError,
}
}
Expand Down Expand Up @@ -95,6 +105,21 @@ impl std::fmt::Display for WasmIndexerError {
Self::MissingBlocksError => {
write!(f, "Some blocks are missing")
}
Self::InvalidLogLevel => {
write!(f, "Invalid log level.")
}
Self::GetObjectIdFailed => {
write!(f, "get_object_id failed.")
}
Self::GetStringFailed => {
write!(f, "get_string failed.")
}
Self::AllocMissing => {
write!(f, "alloc is missing.")
}
Self::AllocFailed => {
write!(f, "alloc failed.")
}
Self::GeneralError => write!(f, "Some unspecified WASM error occurred."),
}
}
Expand Down
56 changes: 28 additions & 28 deletions packages/fuel-indexer/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
database::Database, ffi, queries::ClientExt, IndexerConfig, IndexerError,
IndexerResult,
};
use anyhow::Context;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use fuel_core_client::client::{
Expand Down Expand Up @@ -78,7 +79,7 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(
config: &IndexerConfig,
pool: IndexerConnectionPool,
mut executor: T,
) -> impl Future<Output = ()> {
) -> anyhow::Result<impl Future<Output = IndexerResult<()>>> {
// TODO: https://github.com/FuelLabs/fuel-indexer/issues/286

let end_block = executor.manifest().end_block();
Expand All @@ -104,9 +105,9 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

info!("Indexer({indexer_uid}) subscribing to Fuel node at {fuel_node_addr}");

let client = FuelClient::from_str(&fuel_node_addr).unwrap_or_else(|e| {
panic!("Indexer({indexer_uid}) client node connection failed: {e}.")
});
let client = FuelClient::from_str(&fuel_node_addr).with_context(|| {
format!("Indexer({indexer_uid}) client node connection failed.")
})?;

if let Some(end_block) = end_block {
info!("Indexer({indexer_uid}) will stop at block #{end_block}.");
Expand All @@ -116,27 +117,26 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

let allow_non_sequential_blocks = config.allow_non_sequential_blocks;

async move {
let mut conn = pool.acquire().await.unwrap_or_else(|_| {
panic!("Indexer({indexer_uid}) was unable to acquire a database connection.")
});
let task = async move {
let mut conn = pool.acquire().await.with_context(|| {
format!("Indexer({indexer_uid}) was unable to acquire a database connection.")
})?;

if allow_non_sequential_blocks {
queries::remove_ensure_block_height_consecutive_trigger(
&mut conn,
executor.manifest().namespace(),
executor.manifest().identifier(),
)
.await
.unwrap_or_else(|_| panic!("Unable to remove the sequential blocks trigger for Indexer({indexer_uid})"));
.await.with_context(|| format!("Unable to remove the sequential blocks trigger for Indexer({indexer_uid})"))?;
} else {
queries::create_ensure_block_height_consecutive_trigger(
&mut conn,
executor.manifest().namespace(),
executor.manifest().identifier(),
)
.await
.unwrap_or_else(|_| panic!("Unable to create the sequential blocks trigger for Indexer({indexer_uid})"));
.with_context(|| format!("Unable to create the sequential blocks trigger for Indexer({indexer_uid})"))?;
}

// If we reach an issue that continues to fail, we'll retry a few times before giving up, as
Expand All @@ -163,8 +163,7 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(
loop {
// If something else has signaled that this indexer should stop, then stop.
if executor.kill_switch().load(Ordering::SeqCst) {
info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>");
break;
return Err(IndexerError::KillSwitch(indexer_uid));
}

// Fetch the next page of blocks, and the starting cursor for the subsequent page
Expand Down Expand Up @@ -205,8 +204,9 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(
);

if num_empty_block_reqs == max_empty_block_reqs {
error!("No blocks being produced after {num_empty_block_reqs} empty responses. Indexer({indexer_uid}) giving up. <('.')>");
break;
return Err(anyhow::format_err!(
"No blocks being produced after {num_empty_block_reqs} empty responses. Indexer({indexer_uid}) giving up. <('.')>"
).into());
}

// There is no work to do, so we sleep for a bit, then continue without updating our cursor.
Expand All @@ -219,33 +219,32 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

// If the kill switch has been triggered, the executor exits early.
if executor.kill_switch().load(Ordering::SeqCst) {
info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>");
break;
return Err(IndexerError::KillSwitch(indexer_uid));
}

if let Err(e) = result {
if let IndexerError::RuntimeError(ref e) = e {
if let Some(&WasmIndexerError::MissingBlocksError) =
e.downcast_ref::<WasmIndexerError>()
{
error!(
return Err(anyhow::format_err!(
"Indexer({indexer_uid}) terminating due to missing blocks."
);
break;
)
.into());
}
}
// Run time metering is deterministic. There is no point in retrying.
if let IndexerError::RunTimeLimitExceededError = e {
error!("Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points");
break;
return Err(anyhow::format_err!(
"Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points"
).into());
}

// We don't want to retry forever as that eats resources.
if consecutive_retries >= INDEXER_FAILED_CALLS {
error!(
return Err(anyhow::format_err!(
"Indexer({indexer_uid}) failed after too many retries, giving up. <('.')>"
);
break;
).into());
}

if let IndexerError::SqlxError(sqlx::Error::Database(inner)) = e {
Expand Down Expand Up @@ -276,14 +275,15 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

// Again, check if something else has signaled that this indexer should stop, then stop.
if executor.kill_switch().load(Ordering::SeqCst) {
info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>");
break;
return Err(IndexerError::KillSwitch(indexer_uid));
}

// Since we had successful call, we reset the retry count.
consecutive_retries = 0;
}
}
};

Ok(task)
}

/// Retrieve blocks from a client node.
Expand Down
39 changes: 21 additions & 18 deletions packages/fuel-indexer/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ fn get_string(mem: &MemoryView, ptr: u32, len: u32) -> FFIResult<String> {

/// Fetch the object ID at the given pointer from memory.
fn get_object_id(mem: &MemoryView, ptr: u32, len: u32) -> FFIResult<String> {
let id = get_string(mem, ptr, len).unwrap();
let id = get_string(mem, ptr, len)?;
// https://stackoverflow.com/a/1348551
let id: String = id.chars().filter(|&c| c != '\0').collect();
Ok(id)
Expand All @@ -90,7 +90,7 @@ fn log_data(
let mem = idx_env
.memory
.as_mut()
.expect("Memory unitialized.")
.ok_or(WasmIndexerError::UninitializedMemory)?
.view(&store);

let log_string =
Expand All @@ -102,15 +102,16 @@ fn log_data(
LOG_LEVEL_INFO => info!("{log_string}",),
LOG_LEVEL_DEBUG => debug!("{log_string}",),
LOG_LEVEL_TRACE => trace!("{log_string}",),
l => panic!("Invalid log level: {l}"),
l => {
error!("Invalid log level: {l}");
return Err(WasmIndexerError::InvalidLogLevel);
}
}

Ok(())
}

/// Fetch the given type at the given pointer from memory.
///
/// This function is fallible, and will panic if the type cannot be fetched.
fn get_object(
mut env: FunctionEnvMut<IndexEnv>,
type_id: i64,
Expand All @@ -131,13 +132,14 @@ fn get_object(
let mem = idx_env
.memory
.as_mut()
.expect("Memory unitialized.")
.ok_or(WasmIndexerError::UninitializedMemory)?
.view(&store);

let offset = 1;
let len = 64;
let padding = 6;
let id = get_object_id(&mem, ptr + offset, len + padding + offset).unwrap();
let id = get_object_id(&mem, ptr + offset, len + padding + offset)
.map_err(|_| WasmIndexerError::GetObjectIdFailed)?;

let rt = tokio::runtime::Handle::current();
let bytes = rt
Expand All @@ -148,10 +150,15 @@ fn get_object(
})?;

if let Some(bytes) = bytes {
let alloc_fn = idx_env.alloc.as_mut().expect("Alloc export is missing.");
let alloc_fn = idx_env
.alloc
.as_mut()
.ok_or(WasmIndexerError::AllocMissing)?;

let size = bytes.len() as u32;
let result = alloc_fn.call(&mut store, size).expect("Alloc failed.");
let result = alloc_fn
.call(&mut store, size)
.map_err(|_| WasmIndexerError::AllocFailed)?;
let range = result as usize..result as usize + size as usize;

let mem = idx_env
Expand All @@ -175,8 +182,6 @@ fn get_object(
}

/// Put the given type at the given pointer into memory.
///
/// This function is fallible, and will panic if the type cannot be saved.
fn put_object(
mut env: FunctionEnvMut<IndexEnv>,
type_id: i64,
Expand All @@ -194,11 +199,11 @@ fn put_object(
return Err(WasmIndexerError::KillSwitch);
}

let mem = if let Some(memory) = idx_env.memory.as_mut() {
memory.view(&store)
} else {
return Err(WasmIndexerError::UninitializedMemory);
};
let mem = idx_env
.memory
.as_mut()
.ok_or(WasmIndexerError::UninitializedMemory)?
.view(&store);

let mut bytes = Vec::with_capacity(len as usize);
let range = ptr as usize..ptr as usize + len as usize;
Expand Down Expand Up @@ -234,8 +239,6 @@ fn put_object(
}

/// Execute the arbitrary query at the given pointer.
///
/// This function is fallible, and will panic if the query cannot be executed.
fn put_many_to_many_record(
mut env: FunctionEnvMut<IndexEnv>,
ptr: u32,
Expand Down
4 changes: 4 additions & 0 deletions packages/fuel-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,8 @@ pub enum IndexerError {
EndBlockMet,
#[error("Invalid schema: {0:?}")]
SchemaVersionMismatch(String),
#[error(transparent)]
Other(#[from] anyhow::Error),
#[error("Indexer({0}) kill switch flipped. <('.')>")]
KillSwitch(String),
}
31 changes: 22 additions & 9 deletions packages/fuel-indexer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct IndexerService {
manager: SchemaManager,

/// Tasks for the spawned indexers.
tasks: tokio::task::JoinSet<()>,
tasks: tokio::task::JoinSet<IndexerResult<()>>,

/// Channel used to receive `ServiceRequest`s.
rx: Receiver<ServiceRequest>,
Expand Down Expand Up @@ -177,7 +177,7 @@ impl IndexerService {
manifest.identifier()
);

self.start_executor(executor);
self.start_executor(executor)?;

Ok(())
}
Expand All @@ -204,7 +204,7 @@ impl IndexerService {
{
info!("Registered Indexer({})", manifest.uid());

self.start_executor(executor);
self.start_executor(executor)?;
} else {
error!(
"Failed to register Indexer({}) from registry.",
Expand Down Expand Up @@ -257,7 +257,7 @@ impl IndexerService {

info!("Registered NativeIndex({})", uid);

self.start_executor(executor);
self.start_executor(executor)?;

Ok(())
}
Expand All @@ -267,8 +267,15 @@ impl IndexerService {
loop {
tokio::select! {
// Calling join_next will remove finished tasks from the set.
Some(Err(e)) = self.tasks.join_next() => {
error!("Error retiring indexer task {e}");
Some(result) = self.tasks.join_next() => {
match result {
Ok(Ok(())) => (),
Ok(Err(e)) => match e {
IndexerError::KillSwitch(_) => info!{"{e}"},
_ => error!("{e}"),
}
Err(e) => error!("Error retiring indexer task {e}"),
}
}
Some(service_request) = self.rx.recv() => {
match service_request {
Expand Down Expand Up @@ -311,7 +318,7 @@ impl IndexerService {
)
.await
{
Ok(executor) => self.start_executor(executor),
Ok(executor) => self.start_executor(executor)?,
Err(e) => {
error!(
"Failed to reload Indexer({}.{}): {e:?}",
Expand Down Expand Up @@ -350,7 +357,11 @@ impl IndexerService {

// Spawn and register a tokio::task running the Executor loop, as well as
// the kill switch and the abort handle.
fn start_executor<T: 'static + Executor + Send + Sync>(&mut self, executor: T) {
#[allow(clippy::result_large_err)]
fn start_executor<T: 'static + Executor + Send + Sync>(
&mut self,
executor: T,
) -> IndexerResult<()> {
let uid = executor.manifest().uid();

self.killers
Expand All @@ -360,7 +371,9 @@ impl IndexerService {
&self.config,
self.pool.clone(),
executor,
));
)?);

Ok(())
}
}

Expand Down

0 comments on commit 8f7b76a

Please sign in to comment.