Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: remove panics and expects #1395

Merged
merged 2 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/fuel-indexer-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ pub enum WasmIndexerError {
KillSwitch,
DatabaseError,
MissingBlocksError,
InvalidLogLevel,
GetObjectIdFailed,
GetStringFailed,
AllocMissing,
AllocFailed,
GeneralError,
}

Expand All @@ -60,6 +65,11 @@ impl From<u32> for WasmIndexerError {
5 => Self::KillSwitch,
6 => Self::DatabaseError,
7 => Self::MissingBlocksError,
8 => Self::InvalidLogLevel,
9 => Self::GetObjectIdFailed,
10 => Self::GetStringFailed,
11 => Self::AllocMissing,
12 => Self::AllocFailed,
_ => Self::GeneralError,
}
}
Expand Down Expand Up @@ -95,6 +105,21 @@ impl std::fmt::Display for WasmIndexerError {
Self::MissingBlocksError => {
write!(f, "Some blocks are missing")
}
Self::InvalidLogLevel => {
write!(f, "Invalid log level.")
}
Self::GetObjectIdFailed => {
write!(f, "get_object_id failed.")
}
Self::GetStringFailed => {
write!(f, "get_string failed.")
}
Self::AllocMissing => {
write!(f, "alloc is missing.")
}
Self::AllocFailed => {
write!(f, "alloc failed.")
}
Self::GeneralError => write!(f, "Some unspecified WASM error occurred."),
}
}
Expand Down
56 changes: 28 additions & 28 deletions packages/fuel-indexer/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
database::Database, ffi, queries::ClientExt, IndexerConfig, IndexerError,
IndexerResult,
};
use anyhow::Context;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use fuel_core_client::client::{
Expand Down Expand Up @@ -78,7 +79,7 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(
config: &IndexerConfig,
pool: IndexerConnectionPool,
mut executor: T,
) -> impl Future<Output = ()> {
) -> anyhow::Result<impl Future<Output = IndexerResult<()>>> {
// TODO: https://github.com/FuelLabs/fuel-indexer/issues/286

let end_block = executor.manifest().end_block();
Expand All @@ -104,9 +105,9 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

info!("Indexer({indexer_uid}) subscribing to Fuel node at {fuel_node_addr}");

let client = FuelClient::from_str(&fuel_node_addr).unwrap_or_else(|e| {
panic!("Indexer({indexer_uid}) client node connection failed: {e}.")
});
let client = FuelClient::from_str(&fuel_node_addr).with_context(|| {
format!("Indexer({indexer_uid}) client node connection failed.")
})?;

if let Some(end_block) = end_block {
info!("Indexer({indexer_uid}) will stop at block #{end_block}.");
Expand All @@ -116,27 +117,26 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

let allow_non_sequential_blocks = config.allow_non_sequential_blocks;

async move {
let mut conn = pool.acquire().await.unwrap_or_else(|_| {
panic!("Indexer({indexer_uid}) was unable to acquire a database connection.")
});
let task = async move {
let mut conn = pool.acquire().await.with_context(|| {
format!("Indexer({indexer_uid}) was unable to acquire a database connection.")
})?;

if allow_non_sequential_blocks {
queries::remove_ensure_block_height_consecutive_trigger(
&mut conn,
executor.manifest().namespace(),
executor.manifest().identifier(),
)
.await
.unwrap_or_else(|_| panic!("Unable to remove the sequential blocks trigger for Indexer({indexer_uid})"));
.await.with_context(|| format!("Unable to remove the sequential blocks trigger for Indexer({indexer_uid})"))?;
} else {
queries::create_ensure_block_height_consecutive_trigger(
&mut conn,
executor.manifest().namespace(),
executor.manifest().identifier(),
)
.await
.unwrap_or_else(|_| panic!("Unable to create the sequential blocks trigger for Indexer({indexer_uid})"));
.with_context(|| format!("Unable to create the sequential blocks trigger for Indexer({indexer_uid})"))?;
}

// If we reach an issue that continues to fail, we'll retry a few times before giving up, as
Expand All @@ -163,8 +163,7 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(
loop {
// If something else has signaled that this indexer should stop, then stop.
if executor.kill_switch().load(Ordering::SeqCst) {
info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>");
break;
return Err(IndexerError::KillSwitch(indexer_uid));
}

// Fetch the next page of blocks, and the starting cursor for the subsequent page
Expand Down Expand Up @@ -205,8 +204,9 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(
);

if num_empty_block_reqs == max_empty_block_reqs {
error!("No blocks being produced after {num_empty_block_reqs} empty responses. Indexer({indexer_uid}) giving up. <('.')>");
break;
return Err(anyhow::format_err!(
"No blocks being produced after {num_empty_block_reqs} empty responses. Indexer({indexer_uid}) giving up. <('.')>"
).into());
}

// There is no work to do, so we sleep for a bit, then continue without updating our cursor.
Expand All @@ -219,33 +219,32 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

// If the kill switch has been triggered, the executor exits early.
if executor.kill_switch().load(Ordering::SeqCst) {
info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>");
break;
return Err(IndexerError::KillSwitch(indexer_uid));
}

if let Err(e) = result {
if let IndexerError::RuntimeError(ref e) = e {
if let Some(&WasmIndexerError::MissingBlocksError) =
e.downcast_ref::<WasmIndexerError>()
{
error!(
return Err(anyhow::format_err!(
"Indexer({indexer_uid}) terminating due to missing blocks."
);
break;
)
.into());
}
}
// Run time metering is deterministic. There is no point in retrying.
if let IndexerError::RunTimeLimitExceededError = e {
error!("Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points");
break;
return Err(anyhow::format_err!(
"Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points"
).into());
}

// We don't want to retry forever as that eats resources.
if consecutive_retries >= INDEXER_FAILED_CALLS {
error!(
return Err(anyhow::format_err!(
"Indexer({indexer_uid}) failed after too many retries, giving up. <('.')>"
);
break;
).into());
}

if let IndexerError::SqlxError(sqlx::Error::Database(inner)) = e {
Expand Down Expand Up @@ -276,14 +275,15 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

// Again, check if something else has signaled that this indexer should stop, then stop.
if executor.kill_switch().load(Ordering::SeqCst) {
info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>");
break;
return Err(IndexerError::KillSwitch(indexer_uid));
}

// Since we had successful call, we reset the retry count.
consecutive_retries = 0;
}
}
};

Ok(task)
}

/// Retrieve blocks from a client node.
Expand Down
39 changes: 21 additions & 18 deletions packages/fuel-indexer/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ fn get_string(mem: &MemoryView, ptr: u32, len: u32) -> FFIResult<String> {

/// Fetch the object ID at the given pointer from memory.
fn get_object_id(mem: &MemoryView, ptr: u32, len: u32) -> FFIResult<String> {
let id = get_string(mem, ptr, len).unwrap();
let id = get_string(mem, ptr, len)?;
// https://stackoverflow.com/a/1348551
let id: String = id.chars().filter(|&c| c != '\0').collect();
Ok(id)
Expand All @@ -90,7 +90,7 @@ fn log_data(
let mem = idx_env
.memory
.as_mut()
.expect("Memory unitialized.")
.ok_or(WasmIndexerError::UninitializedMemory)?
.view(&store);

let log_string =
Expand All @@ -102,15 +102,16 @@ fn log_data(
LOG_LEVEL_INFO => info!("{log_string}",),
LOG_LEVEL_DEBUG => debug!("{log_string}",),
LOG_LEVEL_TRACE => trace!("{log_string}",),
l => panic!("Invalid log level: {l}"),
l => {
error!("Invalid log level: {l}");
return Err(WasmIndexerError::InvalidLogLevel);
}
}

Ok(())
}

/// Fetch the given type at the given pointer from memory.
///
/// This function is fallible, and will panic if the type cannot be fetched.
fn get_object(
mut env: FunctionEnvMut<IndexEnv>,
type_id: i64,
Expand All @@ -131,13 +132,14 @@ fn get_object(
let mem = idx_env
.memory
.as_mut()
.expect("Memory unitialized.")
.ok_or(WasmIndexerError::UninitializedMemory)?
.view(&store);

let offset = 1;
let len = 64;
let padding = 6;
let id = get_object_id(&mem, ptr + offset, len + padding + offset).unwrap();
let id = get_object_id(&mem, ptr + offset, len + padding + offset)
.map_err(|_| WasmIndexerError::GetObjectIdFailed)?;

let rt = tokio::runtime::Handle::current();
let bytes = rt
Expand All @@ -148,10 +150,15 @@ fn get_object(
})?;

if let Some(bytes) = bytes {
let alloc_fn = idx_env.alloc.as_mut().expect("Alloc export is missing.");
let alloc_fn = idx_env
.alloc
.as_mut()
.ok_or(WasmIndexerError::AllocMissing)?;

let size = bytes.len() as u32;
let result = alloc_fn.call(&mut store, size).expect("Alloc failed.");
let result = alloc_fn
.call(&mut store, size)
.map_err(|_| WasmIndexerError::AllocFailed)?;
let range = result as usize..result as usize + size as usize;

let mem = idx_env
Expand All @@ -175,8 +182,6 @@ fn get_object(
}

/// Put the given type at the given pointer into memory.
///
/// This function is fallible, and will panic if the type cannot be saved.
fn put_object(
mut env: FunctionEnvMut<IndexEnv>,
type_id: i64,
Expand All @@ -194,11 +199,11 @@ fn put_object(
return Err(WasmIndexerError::KillSwitch);
}

let mem = if let Some(memory) = idx_env.memory.as_mut() {
memory.view(&store)
} else {
return Err(WasmIndexerError::UninitializedMemory);
};
let mem = idx_env
.memory
.as_mut()
.ok_or(WasmIndexerError::UninitializedMemory)?
.view(&store);

let mut bytes = Vec::with_capacity(len as usize);
let range = ptr as usize..ptr as usize + len as usize;
Expand Down Expand Up @@ -234,8 +239,6 @@ fn put_object(
}

/// Execute the arbitrary query at the given pointer.
///
/// This function is fallible, and will panic if the query cannot be executed.
fn put_many_to_many_record(
mut env: FunctionEnvMut<IndexEnv>,
ptr: u32,
Expand Down
4 changes: 4 additions & 0 deletions packages/fuel-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,8 @@ pub enum IndexerError {
EndBlockMet,
#[error("Invalid schema: {0:?}")]
SchemaVersionMismatch(String),
#[error(transparent)]
Other(#[from] anyhow::Error),
#[error("Indexer({0}) kill switch flipped. <('.')>")]
KillSwitch(String),
}
31 changes: 22 additions & 9 deletions packages/fuel-indexer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct IndexerService {
manager: SchemaManager,

/// Tasks for the spawned indexers.
tasks: tokio::task::JoinSet<()>,
tasks: tokio::task::JoinSet<IndexerResult<()>>,

/// Channel used to receive `ServiceRequest`s.
rx: Receiver<ServiceRequest>,
Expand Down Expand Up @@ -177,7 +177,7 @@ impl IndexerService {
manifest.identifier()
);

self.start_executor(executor);
self.start_executor(executor)?;

Ok(())
}
Expand All @@ -204,7 +204,7 @@ impl IndexerService {
{
info!("Registered Indexer({})", manifest.uid());

self.start_executor(executor);
self.start_executor(executor)?;
} else {
error!(
"Failed to register Indexer({}) from registry.",
Expand Down Expand Up @@ -257,7 +257,7 @@ impl IndexerService {

info!("Registered NativeIndex({})", uid);

self.start_executor(executor);
self.start_executor(executor)?;

Ok(())
}
Expand All @@ -267,8 +267,15 @@ impl IndexerService {
loop {
tokio::select! {
// Calling join_next will remove finished tasks from the set.
Some(Err(e)) = self.tasks.join_next() => {
error!("Error retiring indexer task {e}");
Some(result) = self.tasks.join_next() => {
match result {
Ok(Ok(())) => (),
Ok(Err(e)) => match e {
IndexerError::KillSwitch(_) => info!{"{e}"},
_ => error!("{e}"),
}
Err(e) => error!("Error retiring indexer task {e}"),
}
}
Some(service_request) = self.rx.recv() => {
match service_request {
Expand Down Expand Up @@ -311,7 +318,7 @@ impl IndexerService {
)
.await
{
Ok(executor) => self.start_executor(executor),
Ok(executor) => self.start_executor(executor)?,
Err(e) => {
error!(
"Failed to reload Indexer({}.{}): {e:?}",
Expand Down Expand Up @@ -350,7 +357,11 @@ impl IndexerService {

// Spawn and register a tokio::task running the Executor loop, as well as
// the kill switch and the abort handle.
fn start_executor<T: 'static + Executor + Send + Sync>(&mut self, executor: T) {
#[allow(clippy::result_large_err)]
fn start_executor<T: 'static + Executor + Send + Sync>(
&mut self,
executor: T,
) -> IndexerResult<()> {
let uid = executor.manifest().uid();

self.killers
Expand All @@ -360,7 +371,9 @@ impl IndexerService {
&self.config,
self.pool.clone(),
executor,
));
)?);

Ok(())
}
}

Expand Down
Loading