-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: global queue capacity #225
base: main
Are you sure you want to change the base?
Changes from 9 commits
dd74a09
77089aa
0b86241
b54984e
0271230
9314838
0642003
f947226
17fca66
d879611
74fe93d
16c9188
38af70c
9e33ba1
38ee2f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -23,6 +23,9 @@ use shc_file_transfer_service::{ | |||||||||
use shc_forest_manager::traits::ForestStorage; | ||||||||||
use storage_hub_runtime::{StorageDataUnit, MILLIUNIT}; | ||||||||||
|
||||||||||
use std::sync::Arc; | ||||||||||
use tokio::sync::Mutex; | ||||||||||
|
||||||||||
use crate::services::{forest_storage::NoKey, handler::StorageHubHandler}; | ||||||||||
use crate::tasks::{BspForestStorageHandlerT, FileStorageT}; | ||||||||||
|
||||||||||
|
@@ -53,6 +56,7 @@ where | |||||||||
{ | ||||||||||
storage_hub_handler: StorageHubHandler<FL, FSH>, | ||||||||||
file_key_cleanup: Option<H256>, | ||||||||||
capacity_queue: Arc<Mutex<Vec<u64>>>, | ||||||||||
} | ||||||||||
|
||||||||||
impl<FL, FSH> Clone for BspUploadFileTask<FL, FSH> | ||||||||||
|
@@ -64,6 +68,7 @@ where | |||||||||
Self { | ||||||||||
storage_hub_handler: self.storage_hub_handler.clone(), | ||||||||||
file_key_cleanup: self.file_key_cleanup, | ||||||||||
capacity_queue: Arc::clone(&self.capacity_queue), | ||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
|
@@ -77,6 +82,7 @@ where | |||||||||
Self { | ||||||||||
storage_hub_handler, | ||||||||||
file_key_cleanup: None, | ||||||||||
capacity_queue: Arc::new(Mutex::new(vec![])), | ||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
|
@@ -498,12 +504,6 @@ where | |||||||||
return Err(anyhow::anyhow!(err_msg)); | ||||||||||
} | ||||||||||
|
||||||||||
let new_capacity = self.calculate_capacity(&event, current_capacity)?; | ||||||||||
|
||||||||||
let call = storage_hub_runtime::RuntimeCall::Providers( | ||||||||||
pallet_storage_providers::Call::change_capacity { new_capacity }, | ||||||||||
); | ||||||||||
|
||||||||||
let earliest_change_capacity_block = self | ||||||||||
.storage_hub_handler | ||||||||||
.blockchain | ||||||||||
|
@@ -517,29 +517,52 @@ where | |||||||||
anyhow::anyhow!("Failed to query storage provider capacity: {:?}", e) | ||||||||||
})?; | ||||||||||
|
||||||||||
// we registered it to the queue | ||||||||||
let mut capacity_queue = self.capacity_queue.lock().await; | ||||||||||
|
||||||||||
capacity_queue.push(event.size); | ||||||||||
|
||||||||||
drop(capacity_queue); | ||||||||||
|
||||||||||
// Wait for the earliest block where the capacity can be changed. | ||||||||||
self.storage_hub_handler | ||||||||||
.blockchain | ||||||||||
.wait_for_block(earliest_change_capacity_block) | ||||||||||
.await?; | ||||||||||
|
||||||||||
self.storage_hub_handler | ||||||||||
.blockchain | ||||||||||
.send_extrinsic(call, Tip::from(0)) | ||||||||||
.await? | ||||||||||
.with_timeout(Duration::from_secs( | ||||||||||
self.storage_hub_handler | ||||||||||
.provider_config | ||||||||||
.extrinsic_retry_timeout, | ||||||||||
)) | ||||||||||
.watch_for_success(&self.storage_hub_handler.blockchain) | ||||||||||
.await?; | ||||||||||
// wwe read from the queue | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
let mut capacity_queue = self.capacity_queue.lock().await; | ||||||||||
|
||||||||||
info!( | ||||||||||
target: LOG_TARGET, | ||||||||||
"Increased storage capacity to {:?} bytes", | ||||||||||
new_capacity | ||||||||||
); | ||||||||||
// if the queue is not empty it is that the capacity hasn't been updated yet | ||||||||||
if !capacity_queue.is_empty() { | ||||||||||
let size: u64 = capacity_queue.drain(..).sum(); | ||||||||||
|
||||||||||
let new_capacity = self.calculate_capacity(size, current_capacity)?; | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I couldn't find a good way to test this, especially for the max storage value because it would require a 4GB file and it is not practical. A way this function could be tested is to pass the config as an argument and have a test in rust to make sure the calculation is right and that it will goes higher than the max storage authorize. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could pass these aditional arguments in your test and reduce it to a reasonable size. storage-hub/test/suites/integration/bsp/onboard.test.ts Lines 22 to 25 in 64cc252
|
||||||||||
|
||||||||||
let call = storage_hub_runtime::RuntimeCall::Providers( | ||||||||||
pallet_storage_providers::Call::change_capacity { new_capacity }, | ||||||||||
); | ||||||||||
|
||||||||||
self.storage_hub_handler | ||||||||||
.blockchain | ||||||||||
.send_extrinsic(call, Tip::from(0)) | ||||||||||
.await? | ||||||||||
.with_timeout(Duration::from_secs( | ||||||||||
self.storage_hub_handler | ||||||||||
.provider_config | ||||||||||
.extrinsic_retry_timeout, | ||||||||||
)) | ||||||||||
.watch_for_success(&self.storage_hub_handler.blockchain) | ||||||||||
.await?; | ||||||||||
|
||||||||||
drop(capacity_queue); | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why drop the lock here and not outside? |
||||||||||
|
||||||||||
info!( | ||||||||||
target: LOG_TARGET, | ||||||||||
"Increased storage capacity to {:?} bytes", | ||||||||||
new_capacity | ||||||||||
); | ||||||||||
} | ||||||||||
|
||||||||||
let available_capacity = self | ||||||||||
.storage_hub_handler | ||||||||||
|
@@ -652,12 +675,12 @@ where | |||||||||
/// | ||||||||||
/// The `max_storage_capacity` is returned if the new capacity exceeds it. | ||||||||||
fn calculate_capacity( | ||||||||||
&mut self, | ||||||||||
event: &NewStorageRequest, | ||||||||||
&self, | ||||||||||
size: StorageDataUnit, | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think of renaming this to |
||||||||||
current_capacity: StorageDataUnit, | ||||||||||
) -> Result<StorageDataUnit, anyhow::Error> { | ||||||||||
let jump_capacity = self.storage_hub_handler.provider_config.jump_capacity; | ||||||||||
let jumps_needed = (event.size + jump_capacity - 1) / jump_capacity; | ||||||||||
let jumps_needed = (size + jump_capacity - 1) / jump_capacity; | ||||||||||
let jumps = max(jumps_needed, 1); | ||||||||||
let bytes_to_add = jumps * jump_capacity; | ||||||||||
let required_capacity = current_capacity.checked_add(bytes_to_add).ok_or_else(|| { | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
import assert from "node:assert"; | ||
Check failure on line 1 in test/suites/integration/bsp/storage-capacity.test.ts GitHub Actions / Run BSPNet Tests (4)Test BSP storage size increased twice in the same increasing period (check for race condition)
Check failure on line 1 in test/suites/integration/bsp/storage-capacity.test.ts GitHub Actions / Run BSPNet Tests (4)Test BSP storage size increased twice in the same increasing period (check for race condition)
|
||
import { | ||
bspKey, | ||
describeBspNet, | ||
|
@@ -104,6 +104,12 @@ | |
|
||
await userApi.sealBlock(); | ||
|
||
const updatedCapacity = BigInt(bspApi.shConsts.JUMP_CAPACITY_BSP + newCapacity); | ||
const bspCapacityAfter = await bspApi.query.providers.backupStorageProviders( | ||
bspApi.shConsts.DUMMY_BSP_ID | ||
); | ||
assert.strictEqual(bspCapacityAfter.unwrap().capacity.toBigInt(), updatedCapacity); | ||
|
||
// Assert that the BSP was accepted as a volunteer. | ||
await userApi.assert.eventPresent("fileSystem", "AcceptedBspVolunteer"); | ||
}); | ||
|
@@ -155,4 +161,65 @@ | |
assert.strictEqual(eventInfo.asModule.index.toNumber(), providersPallet?.index.toNumber()); | ||
assert.strictEqual(eventInfo.asModule.error[0], newCapacityLessThanUsedStorageErrorIndex); | ||
}); | ||
|
||
it("Test BSP storage size increased twice in the same increasing period (check for race condition)", async () => { | ||
const capacityUsed = ( | ||
await bspApi.query.providers.backupStorageProviders(bspApi.shConsts.DUMMY_BSP_ID) | ||
) | ||
.unwrap() | ||
.capacityUsed.toNumber(); | ||
await bspApi.block.skipToMinChangeTime(); | ||
const minCapacity = bspApi.consts.providers.spMinCapacity.toNumber(); | ||
const newCapacity = Math.max(minCapacity, capacityUsed + 1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why |
||
|
||
// Set BSP's available capacity to 0 to force the BSP to increase its capacity before volunteering for the storage request. | ||
const { extSuccess } = await bspApi.sealBlock( | ||
bspApi.tx.providers.changeCapacity(newCapacity), | ||
bspKey | ||
); | ||
assert.strictEqual(extSuccess, true); | ||
|
||
// First storage request | ||
const source1 = "res/cloud.jpg"; | ||
const location1 = "test/cloud.jpg"; | ||
const bucketName1 = "toobig-1"; | ||
await bspApi.file.newStorageRequest(source1, location1, bucketName1); | ||
|
||
// Second storage request | ||
const source2 = "res/adolphus.jpg"; | ||
const location2 = "test/adolphus.jpg"; | ||
const bucketName2 = "nothingmuch-2"; | ||
await bspApi.file.newStorageRequest(source2, location2, bucketName2); | ||
|
||
//To allow for BSP to react to request | ||
await sleep(500); | ||
|
||
// TODO: get the last block number and the period for which you can change capacity | ||
// userApi.consts.providers.minBlocksBetweenCapacityChanges | ||
|
||
const nextCapacityChangeBlock = 29; | ||
|
||
await bspApi.advanceToBlock(nextCapacityChangeBlock); | ||
|
||
// Allow BSP enough time to send call to change capacity. | ||
await sleep(500); | ||
|
||
// Assert BSP has sent a call to increase its capacity. | ||
await bspApi.assert.extrinsicPresent({ | ||
module: "providers", | ||
method: "changeCapacity", | ||
checkTxPool: true | ||
}); | ||
|
||
await bspApi.sealBlock(); | ||
|
||
// Assert that the capacity has changed. | ||
await bspApi.assert.eventPresent("providers", "CapacityChanged"); | ||
|
||
const updatedCapacity = BigInt(bspApi.shConsts.JUMP_CAPACITY_BSP + newCapacity); | ||
const bspCapacityAfter = await bspApi.query.providers.backupStorageProviders( | ||
bspApi.shConsts.DUMMY_BSP_ID | ||
); | ||
assert.strictEqual(bspCapacityAfter.unwrap().capacity.toBigInt(), updatedCapacity); | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something, but as far as I understand this could be just a
u64
instead ofVec<u64>
- seems this is the way we are using it.