Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: add Entity::delete_many #1514

Merged
merged 4 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/fuel-indexer-api-server/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ pub fn check_wasm_toolchain_version(data: Vec<u8>) -> anyhow::Result<String> {
"ff_find_many".to_string(),
Function::new_typed(&mut store, |_: i64, _: i32, _: i32| 0i32),
);
exports.insert(
"ff_delete_many".to_string(),
Function::new_typed(&mut store, |_: i64, _: i32, _: i32| 0i64),
);
exports.insert(
"ff_early_exit".to_string(),
Function::new_typed(&mut store, |_: i32| {}),
Expand Down
24 changes: 24 additions & 0 deletions packages/fuel-indexer-macros/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,14 @@ pub struct ObjectDecoder {
type_id: i64,
}

impl ObjectDecoder {
fn is_virtual(&self) -> bool {
self.impl_decoder
.parsed
.is_virtual_typedef(&self.impl_decoder.typdef.name.to_string())
}
}

impl Default for ObjectDecoder {
fn default() -> Self {
Self {
Expand Down Expand Up @@ -760,6 +768,8 @@ impl Decoder for EnumDecoder {

impl From<ObjectDecoder> for TokenStream {
fn from(decoder: ObjectDecoder) -> Self {
let is_virtual = decoder.is_virtual();

let ObjectDecoder {
struct_fields,
ident,
Expand Down Expand Up @@ -826,6 +836,18 @@ impl From<ObjectDecoder> for TokenStream {
quote! { None }
};

let impl_entity_delete = if !is_virtual {
quote! {
impl<'a> EntityDelete<'a> for #ident {
fn delete(&self) -> bool {
Self::delete_many(Self::id().eq(self.id.clone())) == 1
}
}
}
} else {
quote! {}
};

let impl_entity = quote! {
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct #ident {
Expand All @@ -850,6 +872,8 @@ impl From<ObjectDecoder> for TokenStream {
}

}

#impl_entity_delete
};

let impl_new = TokenStream::from(impl_decoder);
Expand Down
22 changes: 22 additions & 0 deletions packages/fuel-indexer-plugin/src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub use crate::find::{Field, Filter, ManyFilter, OptionField, SingleFilter};
extern "C" {
fn ff_get_object(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8;
fn ff_find_many(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8;
fn ff_delete_many(type_id: i64, ptr: *const u8, len: *mut u8) -> u64;
fn ff_log_data(ptr: *const u8, len: u32, log_level: u32);
fn ff_put_object(type_id: i64, ptr: *const u8, len: u32);
fn ff_put_many_to_many_record(ptr: *const u8, len: u32);
Expand Down Expand Up @@ -56,6 +57,14 @@ impl Logger {
}
}

/// Trait for a type entity which supports the `delete()` operation.
pub trait EntityDelete<'a>: Sized + PartialEq + Eq + std::fmt::Debug {
/// Deletes the entity with the corresponding `ID` from the database.
/// Returns `true` when the value has been deleted, and `false` if it has
/// not.
fn delete(&self) -> bool;
}

/// Trait for a type entity.
///
/// Any entity type that will be processed through a WASM indexer is required to implement this trait.
Expand Down Expand Up @@ -161,6 +170,19 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug {
}
}

/// Delete the entities that satisfy the given constraints.
fn delete_many(filter: impl Into<ManyFilter<Self>>) -> usize {
let filter: ManyFilter<Self> = filter.into();
let buff =
bincode::serialize(&filter.to_string()).expect("Failed to serialize query");
let mut bufflen = (buff.len() as u32).to_le_bytes();

let count =
unsafe { ff_delete_many(Self::TYPE_ID, buff.as_ptr(), bufflen.as_mut_ptr()) };

count as usize
}

/// Saves a record.
fn save(&self) {
self.save_unsafe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,59 @@ mod fuel_indexer_test {
assert_eq!(fs.len(), 2);
assert_eq!(fs[0].string_value, "find5");
assert_eq!(fs[1].string_value, "find4");

// Test delete()
let count: usize = FindEntity::delete_many(
FindEntity::string_value().eq("find3".to_string()),
);
assert_eq!(count, 1);

// "find3" has already been deleted
let count: usize = FindEntity::delete_many(
FindEntity::string_value().eq("find3".to_string()),
);
assert_eq!(count, 0);

// Test searching for multiple entities, with limit
let fs: Vec<FindEntity> = FindEntity::find_many(
FindEntity::string_value()
.gt("f".to_string())
.order_by(FindEntity::value()),
);
// There were four, but one has been deleted
assert_eq!(fs.len(), 3);

// Next, delete "find2" and "find4"
let count: usize = FindEntity::delete_many(
FindEntity::string_value()
.gt("f".to_string())
.and(FindEntity::string_value().lt("find5".to_string())),
);
assert_eq!(count, 2);

// Test searching for multiple entities, with limit
let fs: Vec<FindEntity> = FindEntity::find_many(
FindEntity::string_value()
.gt("f".to_string())
.order_by(FindEntity::value()),
);

// Now there is only one left
assert_eq!(fs.len(), 1);
assert_eq!(fs[0].string_value, "find5");

// Directly delete the last value
let deleted = fs[0].delete();
assert!(deleted);

let fs: Vec<FindEntity> = FindEntity::find_many(
FindEntity::string_value()
.gt("f".to_string())
.order_by(FindEntity::value()),
);

// Nothing left.
assert_eq!(fs.len(), 0);
} else if block_data.height == 6 {
// There is no such block. The lookup will fail.
IndexMetadataEntity::find(IndexMetadataEntity::block_height().eq(777))
Expand Down
2 changes: 1 addition & 1 deletion packages/fuel-indexer-tests/tests/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ async fn test_no_missing_blocks() {
}

#[actix_web::test]
async fn test_find() {
async fn test_find_and_delete() {
let IndexingTestComponents {
ref node, ref db, ..
} = setup_indexing_test_components(None).await;
Expand Down
25 changes: 25 additions & 0 deletions packages/fuel-indexer/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,31 @@ Do your WASM modules need to be rebuilt?
}
}

/// Delete multiple objects from the database that satisfy the given constraints.
pub async fn delete_many(
&mut self,
type_id: i64,
constraints: String,
) -> IndexerResult<usize> {
let table = &self
.tables
.get(&type_id)
.ok_or(IndexerDatabaseError::TableMappingDoesNotExist(type_id))?;

let query = format!("DELETE from {table} WHERE {constraints}");

info!("QUERY: {query}");

let conn = self
.stashed
.as_mut()
.ok_or(IndexerError::NoTransactionError("find_many".to_string()))?;

let count = queries::execute_query(conn, query).await?;

Ok(count)
}

/// Load the schema for this indexer from the database, and build a mapping of `TypeId`s to tables.
pub async fn load_schema(&mut self, version: String) -> IndexerResult<()> {
self.version = version;
Expand Down
49 changes: 49 additions & 0 deletions packages/fuel-indexer/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ fn get_object(
}
}

/// Get multiple objects from the database that satisfy the given constraints.
fn find_many(
mut env: FunctionEnvMut<IndexEnv>,
type_id: i64,
Expand Down Expand Up @@ -282,6 +283,52 @@ fn find_many(
}
}

/// Delete multiple objects from the database that satisfy the given constraints.
fn delete_many(
mut env: FunctionEnvMut<IndexEnv>,
type_id: i64,
ptr: u32,
len_ptr: u32,
) -> Result<u64, WasmIndexerError> {
let (idx_env, store) = env.data_and_store_mut();

if idx_env
.kill_switch
.load(std::sync::atomic::Ordering::SeqCst)
{
// If the kill switch has been flipped, returning an error will cause an
// early termination of WASM execution.
return Err(WasmIndexerError::KillSwitch);
}

let mem = idx_env
.memory
.as_mut()
.ok_or(WasmIndexerError::UninitializedMemory)?
.view(&store);

let len = WasmPtr::<u32>::new(len_ptr)
.deref(&mem)
.read()
.expect("Failed to read length from memory.");

let constraints = get_object_id(&mem, ptr + 1, len - 1).unwrap();

let rt = tokio::runtime::Handle::current();
let count = rt
.block_on(async {
idx_env
.db
.lock()
.await
.delete_many(type_id, constraints)
.await
})
.unwrap();

Ok(count as u64)
}

/// Put the given type at the given pointer into memory.
fn put_object(
mut env: FunctionEnvMut<IndexEnv>,
Expand Down Expand Up @@ -428,6 +475,7 @@ pub fn get_exports(store: &mut Store, env: &wasmer::FunctionEnv<IndexEnv>) -> Ex

let f_get_obj = Function::new_typed_with_env(store, env, get_object);
let f_find_many = Function::new_typed_with_env(store, env, find_many);
let f_delete_many = Function::new_typed_with_env(store, env, delete_many);
let f_put_obj = Function::new_typed_with_env(store, env, put_object);
let f_log_data = Function::new_typed_with_env(store, env, log_data);
let f_put_many_to_many_record =
Expand All @@ -437,6 +485,7 @@ pub fn get_exports(store: &mut Store, env: &wasmer::FunctionEnv<IndexEnv>) -> Ex
exports.insert("ff_early_exit".to_string(), f_early_exit);
exports.insert("ff_get_object".to_string(), f_get_obj);
exports.insert("ff_find_many".to_string(), f_find_many);
exports.insert("ff_delete_many".to_string(), f_delete_many);
exports.insert("ff_put_object".to_string(), f_put_obj);
exports.insert(
"ff_put_many_to_many_record".to_string(),
Expand Down
Loading