Skip to content

Commit

Permalink
remove panics
Browse files Browse the repository at this point in the history
  • Loading branch information
lostman committed Oct 4, 2023
1 parent ab70a95 commit dae8025
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 37 deletions.
58 changes: 30 additions & 28 deletions packages/fuel-indexer/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
database::Database, ffi, queries::ClientExt, IndexerConfig, IndexerError,
IndexerResult,
};
use anyhow::Context;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use fuel_core_client::client::{
Expand Down Expand Up @@ -78,7 +79,7 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(
config: &IndexerConfig,
pool: IndexerConnectionPool,
mut executor: T,
) -> impl Future<Output = ()> {
) -> anyhow::Result<impl Future<Output = IndexerResult<()>>> {
// TODO: https://github.com/FuelLabs/fuel-indexer/issues/286

let end_block = executor.manifest().end_block();
Expand All @@ -104,9 +105,9 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

info!("Indexer({indexer_uid}) subscribing to Fuel node at {fuel_node_addr}");

let client = FuelClient::from_str(&fuel_node_addr).unwrap_or_else(|e| {
panic!("Indexer({indexer_uid}) client node connection failed: {e}.")
});
let client = FuelClient::from_str(&fuel_node_addr).with_context(|| {
format!("Indexer({indexer_uid}) client node connection failed.")
})?;

if let Some(end_block) = end_block {
info!("Indexer({indexer_uid}) will stop at block #{end_block}.");
Expand All @@ -116,27 +117,26 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

let allow_non_sequential_blocks = config.allow_non_sequential_blocks;

async move {
let mut conn = pool.acquire().await.unwrap_or_else(|_| {
panic!("Indexer({indexer_uid}) was unable to acquire a database connection.")
});
let task = async move {
let mut conn = pool.acquire().await.with_context(|| {
format!("Indexer({indexer_uid}) was unable to acquire a database connection.")
})?;

if allow_non_sequential_blocks {
queries::remove_ensure_block_height_consecutive_trigger(
&mut conn,
executor.manifest().namespace(),
executor.manifest().identifier(),
)
.await
.unwrap_or_else(|_| panic!("Unable to remove the sequential blocks trigger for Indexer({indexer_uid})"));
.await.with_context(|| format!("Unable to remove the sequential blocks trigger for Indexer({indexer_uid})"))?;
} else {
queries::create_ensure_block_height_consecutive_trigger(
&mut conn,
executor.manifest().namespace(),
executor.manifest().identifier(),
)
.await
.unwrap_or_else(|_| panic!("Unable to create the sequential blocks trigger for Indexer({indexer_uid})"));
.with_context(|| format!("Unable to create the sequential blocks trigger for Indexer({indexer_uid})"))?;
}

// If we reach an issue that continues to fail, we'll retry a few times before giving up, as
Expand All @@ -163,8 +163,7 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(
loop {
// If something else has signaled that this indexer should stop, then stop.
if executor.kill_switch().load(Ordering::SeqCst) {
info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>");
break;
return Err(IndexerError::KillSwitch(indexer_uid));
}

// Fetch the next page of blocks, and the starting cursor for the subsequent page
Expand Down Expand Up @@ -205,8 +204,9 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(
);

if num_empty_block_reqs == max_empty_block_reqs {
error!("No blocks being produced after {num_empty_block_reqs} empty responses. Indexer({indexer_uid}) giving up. <('.')>");
break;
return Err(anyhow::format_err!(
"No blocks being produced after {num_empty_block_reqs} empty responses. Indexer({indexer_uid}) giving up. <('.')>"
).into());
}

// There is no work to do, so we sleep for a bit, then continue without updating our cursor.
Expand All @@ -219,33 +219,32 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

// If the kill switch has been triggered, the executor exits early.
if executor.kill_switch().load(Ordering::SeqCst) {
info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>");
break;
return Err(IndexerError::KillSwitch(indexer_uid));
}

if let Err(e) = result {
if let IndexerError::RuntimeError(ref e) = e {
if let Some(&WasmIndexerError::MissingBlocksError) =
e.downcast_ref::<WasmIndexerError>()
{
error!(
return Err(anyhow::format_err!(
"Indexer({indexer_uid}) terminating due to missing blocks."
);
break;
)
.into());
}
}
// Run time metering is deterministic. There is no point in retrying.
if let IndexerError::RunTimeLimitExceededError = e {
error!("Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points");
break;
return Err(anyhow::format_err!(
"Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points"
).into());
}

// We don't want to retry forever as that eats resources.
if consecutive_retries >= INDEXER_FAILED_CALLS {
error!(
return Err(anyhow::format_err!(
"Indexer({indexer_uid}) failed after too many retries, giving up. <('.')>"
);
break;
).into());
}

if let IndexerError::SqlxError(sqlx::Error::Database(inner)) = e {
Expand Down Expand Up @@ -276,14 +275,17 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

// Again, check if something else has signaled that this indexer should stop, then stop.
if executor.kill_switch().load(Ordering::SeqCst) {
info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>");
break;
return Err(IndexerError::KillSwitch(indexer_uid));
}

// Since we had successful call, we reset the retry count.
consecutive_retries = 0;
}
}

Ok(())
};

Ok(task)
}

/// Retrieve blocks from a client node.
Expand Down
4 changes: 4 additions & 0 deletions packages/fuel-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,8 @@ pub enum IndexerError {
EndBlockMet,
#[error("Invalid schema: {0:?}")]
SchemaVersionMismatch(String),
#[error(transparent)]
Other(#[from] anyhow::Error),
#[error("Indexer({0}) kill switch flipped. <('.')>")]
KillSwitch(String),
}
30 changes: 21 additions & 9 deletions packages/fuel-indexer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct IndexerService {
manager: SchemaManager,

/// Tasks for the spawned indexers.
tasks: tokio::task::JoinSet<()>,
tasks: tokio::task::JoinSet<IndexerResult<()>>,

/// Channel used to receive `ServiceRequest`s.
rx: Receiver<ServiceRequest>,
Expand Down Expand Up @@ -177,7 +177,7 @@ impl IndexerService {
manifest.identifier()
);

self.start_executor(executor);
self.start_executor(executor)?;

Ok(())
}
Expand All @@ -204,7 +204,7 @@ impl IndexerService {
{
info!("Registered Indexer({})", manifest.uid());

self.start_executor(executor);
self.start_executor(executor)?;
} else {
error!(
"Failed to register Indexer({}) from registry.",
Expand Down Expand Up @@ -257,7 +257,7 @@ impl IndexerService {

info!("Registered NativeIndex({})", uid);

self.start_executor(executor);
self.start_executor(executor)?;

Ok(())
}
Expand All @@ -267,8 +267,15 @@ impl IndexerService {
loop {
tokio::select! {
// Calling join_next will remove finished tasks from the set.
Some(Err(e)) = self.tasks.join_next() => {
error!("Error retiring indexer task {e}");
Some(result) = self.tasks.join_next() => {
match result {
Ok(Ok(())) => (),
Ok(Err(e)) => match e {
IndexerError::KillSwitch(_) => info!{"{e}"},
_ => error!("{e}"),
}
Err(e) => error!("Error retiring indexer task {e}"),
}
}
Some(service_request) = self.rx.recv() => {
match service_request {
Expand Down Expand Up @@ -311,7 +318,7 @@ impl IndexerService {
)
.await
{
Ok(executor) => self.start_executor(executor),
Ok(executor) => self.start_executor(executor)?,
Err(e) => {
error!(
"Failed to reload Indexer({}.{}): {e:?}",
Expand Down Expand Up @@ -350,7 +357,10 @@ impl IndexerService {

// Spawn and register a tokio::task running the Executor loop, as well as
// the kill switch and the abort handle.
fn start_executor<T: 'static + Executor + Send + Sync>(&mut self, executor: T) {
fn start_executor<T: 'static + Executor + Send + Sync>(
&mut self,
executor: T,
) -> IndexerResult<()> {
let uid = executor.manifest().uid();

self.killers
Expand All @@ -360,7 +370,9 @@ impl IndexerService {
&self.config,
self.pool.clone(),
executor,
));
)?);

Ok(())
}
}

Expand Down

0 comments on commit dae8025

Please sign in to comment.