Skip to content

Commit 8da201a

Browse files
committed
Revert batched VSS lazy deletes, rather spawn them into the background
Previously, we implemented `lazy` deletes in `VssStore` by batching them with the next write call as part of the next `PutObjectRequest` sent. However, we unfortunately overlooked that in this instance any non-existent `delete_items` would yield a `ConflictError`. Rather than batched `VssStore` lazy deletes, we therefore here opt to simply spawn them into the background and ignore any errors.
1 parent 3ed1d13 commit 8da201a

File tree

1 file changed

+16
-109
lines changed

1 file changed

+16
-109
lines changed

src/io/vss_store.rs

Lines changed: 16 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1515
use std::sync::{Arc, Mutex};
1616
use std::time::Duration;
1717

18-
use bdk_chain::Merge;
1918
use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine};
2019
use lightning::impl_writeable_tlv_based_enum;
2120
use lightning::io::{self, Error, ErrorKind};
@@ -244,11 +243,15 @@ impl KVStoreSync for VssStore {
244243
primary_namespace,
245244
secondary_namespace,
246245
key,
247-
lazy,
248246
)
249247
.await
250248
};
251-
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
249+
if lazy {
250+
internal_runtime.spawn(async { fut.await });
251+
Ok(())
252+
} else {
253+
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
254+
}
252255
}
253256

254257
fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
@@ -316,7 +319,7 @@ impl KVStore for VssStore {
316319
let secondary_namespace = secondary_namespace.to_string();
317320
let key = key.to_string();
318321
let inner = Arc::clone(&self.inner);
319-
Box::pin(async move {
322+
let fut = async move {
320323
inner
321324
.remove_internal(
322325
&inner.async_client,
@@ -326,10 +329,15 @@ impl KVStore for VssStore {
326329
primary_namespace,
327330
secondary_namespace,
328331
key,
329-
lazy,
330332
)
331333
.await
332-
})
334+
};
335+
if lazy {
336+
tokio::task::spawn(async { fut.await });
337+
Box::pin(async { Ok(()) })
338+
} else {
339+
Box::pin(async { fut.await })
340+
}
333341
}
334342
fn list(
335343
&self, primary_namespace: &str, secondary_namespace: &str,
@@ -362,7 +370,6 @@ struct VssStoreInner {
362370
// Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
363371
// The lock also encapsulates the latest written version per key.
364372
locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<u64>>>>,
365-
pending_lazy_deletes: Mutex<Vec<KeyValue>>,
366373
}
367374

368375
impl VssStoreInner {
@@ -372,7 +379,6 @@ impl VssStoreInner {
372379
data_encryption_key: [u8; 32], key_obfuscator: KeyObfuscator,
373380
) -> Self {
374381
let locks = Mutex::new(HashMap::new());
375-
let pending_lazy_deletes = Mutex::new(Vec::new());
376382
Self {
377383
schema_version,
378384
blocking_client,
@@ -381,7 +387,6 @@ impl VssStoreInner {
381387
data_encryption_key,
382388
key_obfuscator,
383389
locks,
384-
pending_lazy_deletes,
385390
}
386391
}
387392

@@ -520,12 +525,6 @@ impl VssStoreInner {
520525
"write",
521526
)?;
522527

523-
let delete_items = self
524-
.pending_lazy_deletes
525-
.try_lock()
526-
.ok()
527-
.and_then(|mut guard| guard.take())
528-
.unwrap_or_default();
529528
let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
530529
let vss_version = -1;
531530
let storable_builder = StorableBuilder::new(RandEntropySource);
@@ -541,16 +540,11 @@ impl VssStoreInner {
541540
version: vss_version,
542541
value: storable.encode_to_vec(),
543542
}],
544-
delete_items: delete_items.clone(),
543+
delete_items: vec![],
545544
};
546545

547546
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
548547
client.put_object(&request).await.map_err(|e| {
549-
// Restore delete items so they'll be retried on next write.
550-
if !delete_items.is_empty() {
551-
self.pending_lazy_deletes.lock().unwrap().extend(delete_items);
552-
}
553-
554548
let msg = format!(
555549
"Failed to write to key {}/{}/{}: {}",
556550
primary_namespace, secondary_namespace, key, e
@@ -566,7 +560,7 @@ impl VssStoreInner {
566560
async fn remove_internal(
567561
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
568562
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
569-
key: String, lazy: bool,
563+
key: String,
570564
) -> io::Result<()> {
571565
check_namespace_key_validity(
572566
&primary_namespace,
@@ -579,12 +573,6 @@ impl VssStoreInner {
579573
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
580574

581575
let key_value = KeyValue { key: obfuscated_key, version: -1, value: vec![] };
582-
if lazy {
583-
let mut pending_lazy_deletes = self.pending_lazy_deletes.lock().unwrap();
584-
pending_lazy_deletes.push(key_value);
585-
return Ok(());
586-
}
587-
588576
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
589577
let request =
590578
DeleteObjectRequest { store_id: self.store_id.clone(), key_value: Some(key_value) };
@@ -851,85 +839,4 @@ mod tests {
851839
do_read_write_remove_list_persist(&vss_store);
852840
drop(vss_store)
853841
}
854-
855-
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
856-
async fn vss_lazy_delete() {
857-
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
858-
let mut rng = rng();
859-
let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
860-
let mut vss_seed = [0u8; 32];
861-
rng.fill_bytes(&mut vss_seed);
862-
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
863-
let vss_store =
864-
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap();
865-
866-
let primary_namespace = "test_namespace";
867-
let secondary_namespace = "";
868-
let key_to_delete = "key_to_delete";
869-
let key_for_trigger = "key_for_trigger";
870-
let data_to_delete = b"data_to_delete".to_vec();
871-
let trigger_data = b"trigger_data".to_vec();
872-
873-
// Write the key that we'll later lazily delete
874-
KVStore::write(
875-
&vss_store,
876-
primary_namespace,
877-
secondary_namespace,
878-
key_to_delete,
879-
data_to_delete.clone(),
880-
)
881-
.await
882-
.unwrap();
883-
884-
// Verify the key exists
885-
let read_data =
886-
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete)
887-
.await
888-
.unwrap();
889-
assert_eq!(read_data, data_to_delete);
890-
891-
// Perform a lazy delete
892-
KVStore::remove(&vss_store, primary_namespace, secondary_namespace, key_to_delete, true)
893-
.await
894-
.unwrap();
895-
896-
// Verify the key still exists (lazy delete doesn't immediately remove it)
897-
let read_data =
898-
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete)
899-
.await
900-
.unwrap();
901-
assert_eq!(read_data, data_to_delete);
902-
903-
// Verify the key is still in the list
904-
let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap();
905-
assert!(keys.contains(&key_to_delete.to_string()));
906-
907-
// Trigger the actual deletion by performing a write operation
908-
KVStore::write(
909-
&vss_store,
910-
primary_namespace,
911-
secondary_namespace,
912-
key_for_trigger,
913-
trigger_data.clone(),
914-
)
915-
.await
916-
.unwrap();
917-
918-
// Now verify the key is actually deleted
919-
let read_result =
920-
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete).await;
921-
assert!(read_result.is_err());
922-
assert_eq!(read_result.unwrap_err().kind(), ErrorKind::NotFound);
923-
924-
// Verify the key is no longer in the list
925-
let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap();
926-
assert!(!keys.contains(&key_to_delete.to_string()));
927-
928-
// Verify the trigger key still exists
929-
let read_data =
930-
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_for_trigger)
931-
.await
932-
.unwrap();
933-
assert_eq!(read_data, trigger_data);
934-
}
935842
}

0 commit comments

Comments
 (0)