Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: full anyhow::Result<()> support in indexers #1425

Merged
merged 3 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions packages/fuel-indexer-macros/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ fn process_fn_items(
}

let fn_name = &fn_item.sig.ident;
let fn_name_string = fn_name.to_string();

if arg_list.is_empty() {
proc_macro_error::abort_call_site!(
Expand All @@ -569,9 +570,26 @@ fn process_fn_items(
);
}

let fn_call = if fn_item.sig.output == syn::ReturnType::Default {
quote! {
#fn_name(#(#arg_list),*)#awaitness
}
} else {
quote! {
if let Err(e) = #fn_name(#(#arg_list),*)#awaitness {
unsafe {
if !ERROR_MESSAGE.is_empty() {
ERROR_MESSAGE += "\n";
}
ERROR_MESSAGE += &format!("{} failed with an error: {}", #fn_name_string, e.to_string());
}
}
}
};

abi_dispatchers.push(quote! {
if ( #(#input_checks)&&* ) {
#fn_name(#(#arg_list),*)#awaitness;
#fn_call;
}
});

Expand All @@ -586,6 +604,23 @@ fn process_fn_items(
}
}

let error_message_handler = match manifest.execution_source() {
ExecutionSource::Native => {
quote! {}
}
ExecutionSource::Wasm => {
quote! {
unsafe {
if !ERROR_MESSAGE.is_empty() {
let message = ERROR_MESSAGE.lines().map(|l| format!(" {l}")).collect::<Vec<String>>().join("\n");
ERROR_MESSAGE = format!("At height {}:\n", block_height) + &message;
early_exit(WasmIndexerError::GeneralError)
}
}
}
}
};

let decoder_struct = quote! {
#[derive(Default)]
struct Decoders {
Expand Down Expand Up @@ -666,8 +701,9 @@ fn process_fn_items(
}
}

pub #asyncness fn dispatch(&self) {
pub #asyncness fn dispatch(&self, block_height: u32) {
#(#abi_dispatchers)*
#error_message_handler
}
}
};
Expand Down Expand Up @@ -881,7 +917,7 @@ fn process_fn_items(
}
}
}
decoder.dispatch()#awaitness;
decoder.dispatch(block.header.height)#awaitness;

let metadata = IndexMetadataEntity::new(block.time as u64, block.header.height, block.id);
metadata.save()#awaitness;
Expand Down
13 changes: 7 additions & 6 deletions packages/fuel-indexer-macros/src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub fn handler_block_wasm(
#[no_mangle]
fn handle_events(blob: *mut u8, len: usize) {
register_panic_hook();

use fuel_indexer_utils::plugin::deserialize;
let bytes = unsafe { Vec::from_raw_parts(blob, len, len) };
let blocks: Vec<BlockData> = match deserialize(&bytes) {
Expand All @@ -41,16 +42,16 @@ pub fn handler_block_wasm(
/// retrieved by the indexer service and logged.
fn panic_hook() -> proc_macro2::TokenStream {
quote! {
static mut PANIC_MESSAGE: String = String::new();
static mut ERROR_MESSAGE: String = String::new();

#[no_mangle]
fn get_panic_message_ptr() -> *const u8 {
unsafe { PANIC_MESSAGE.as_ptr() }
fn get_error_message_ptr() -> *const u8 {
unsafe { ERROR_MESSAGE.as_ptr() }
}

#[no_mangle]
fn get_panic_message_len() -> u32 {
unsafe { PANIC_MESSAGE.len() as u32 }
fn get_error_message_len() -> u32 {
unsafe { ERROR_MESSAGE.len() as u32 }
}

#[no_mangle]
Expand All @@ -62,7 +63,7 @@ fn panic_hook() -> proc_macro2::TokenStream {
SET_HOOK.call_once(|| {
panic::set_hook(Box::new(|info| {
unsafe {
PANIC_MESSAGE = info.to_string();
ERROR_MESSAGE = info.to_string();
}
early_exit(WasmIndexerError::Panic);
}));
Expand Down
15 changes: 8 additions & 7 deletions packages/fuel-indexer/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,10 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(
Some(&WasmIndexerError::MissingBlocksError) => {
return Err(anyhow::anyhow!("{e}").into());
}
Some(&WasmIndexerError::Panic) => {
Some(&WasmIndexerError::Panic)
| Some(&WasmIndexerError::GeneralError) => {
let message = executor
.get_panic_message()
.get_error_message()
.await
.unwrap_or("unknown".to_string());
return Err(anyhow::anyhow!("{message}").into());
Expand Down Expand Up @@ -578,7 +579,7 @@ where

fn kill_switch(&self) -> &Arc<AtomicBool>;

async fn get_panic_message(&self) -> IndexerResult<String>;
async fn get_error_message(&self) -> IndexerResult<String>;
}

/// WASM indexer runtime environment responsible for fetching/saving data to and from the database.
Expand Down Expand Up @@ -713,9 +714,9 @@ where
&self.manifest
}

async fn get_panic_message(&self) -> IndexerResult<String> {
async fn get_error_message(&self) -> IndexerResult<String> {
return Err(anyhow::anyhow!(
"get_panic_message() not supported in native exetutor."
"get_error_message() not supported in native exetutor."
)
.into());
}
Expand Down Expand Up @@ -1006,9 +1007,9 @@ impl Executor for WasmIndexExecutor {
&self.manifest
}

async fn get_panic_message(&self) -> IndexerResult<String> {
async fn get_error_message(&self) -> IndexerResult<String> {
let mut store = self.store.lock().await;
let result = ffi::get_panic_message(&mut store, &self.instance)?;
let result = ffi::get_error_message(&mut store, &self.instance)?;
Ok(result)
}
}
6 changes: 3 additions & 3 deletions packages/fuel-indexer/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ fn get_string_from_instance(
}

/// Get the version of the indexer schema stored in the WASM instance.
pub fn get_panic_message(store: &mut Store, instance: &Instance) -> FFIResult<String> {
pub fn get_error_message(store: &mut Store, instance: &Instance) -> FFIResult<String> {
get_string_from_instance(
store,
instance,
"get_panic_message_ptr",
"get_panic_message_len",
"get_error_message_ptr",
"get_error_message_len",
)
}

Expand Down
Loading