Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: global queue capacity #225

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
73 changes: 48 additions & 25 deletions node/src/tasks/bsp_upload_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use shc_file_transfer_service::{
use shc_forest_manager::traits::ForestStorage;
use storage_hub_runtime::{StorageDataUnit, MILLIUNIT};

use std::sync::Arc;
use tokio::sync::Mutex;

use crate::services::{forest_storage::NoKey, handler::StorageHubHandler};
use crate::tasks::{BspForestStorageHandlerT, FileStorageT};

Expand Down Expand Up @@ -53,6 +56,7 @@ where
{
storage_hub_handler: StorageHubHandler<FL, FSH>,
file_key_cleanup: Option<H256>,
capacity_queue: Arc<Mutex<Vec<u64>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something, but as far as I understand this could be just a u64 instead of Vec<u64> - seems this is the way we are using it.

}

impl<FL, FSH> Clone for BspUploadFileTask<FL, FSH>
Expand All @@ -64,6 +68,7 @@ where
Self {
storage_hub_handler: self.storage_hub_handler.clone(),
file_key_cleanup: self.file_key_cleanup,
capacity_queue: Arc::clone(&self.capacity_queue),
}
}
}
Expand All @@ -77,6 +82,7 @@ where
Self {
storage_hub_handler,
file_key_cleanup: None,
capacity_queue: Arc::new(Mutex::new(vec![])),
}
}
}
Expand Down Expand Up @@ -498,12 +504,6 @@ where
return Err(anyhow::anyhow!(err_msg));
}

let new_capacity = self.calculate_capacity(&event, current_capacity)?;

let call = storage_hub_runtime::RuntimeCall::Providers(
pallet_storage_providers::Call::change_capacity { new_capacity },
);

let earliest_change_capacity_block = self
.storage_hub_handler
.blockchain
Expand All @@ -517,29 +517,52 @@ where
anyhow::anyhow!("Failed to query storage provider capacity: {:?}", e)
})?;

// we registered it to the queue
let mut capacity_queue = self.capacity_queue.lock().await;

capacity_queue.push(event.size);

drop(capacity_queue);

// Wait for the earliest block where the capacity can be changed.
self.storage_hub_handler
.blockchain
.wait_for_block(earliest_change_capacity_block)
.await?;

self.storage_hub_handler
.blockchain
.send_extrinsic(call, Tip::from(0))
.await?
.with_timeout(Duration::from_secs(
self.storage_hub_handler
.provider_config
.extrinsic_retry_timeout,
))
.watch_for_success(&self.storage_hub_handler.blockchain)
.await?;
// wwe read from the queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// wwe read from the queue
// we read from the queue

let mut capacity_queue = self.capacity_queue.lock().await;

info!(
target: LOG_TARGET,
"Increased storage capacity to {:?} bytes",
new_capacity
);
// if the queue is not empty it is that the capacity hasn't been updated yet
if !capacity_queue.is_empty() {
let size: u64 = capacity_queue.drain(..).sum();

let new_capacity = self.calculate_capacity(size, current_capacity)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a good way to test this, especially for the max storage value because it would require a 4GB file and it is not practical.

A way this function could be tested is to pass the config as an argument and have a test in rust to make sure the calculation is right and that it will goes higher than the max storage authorize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could pass these aditional arguments in your test and reduce it to a reasonable size.

additionalArgs: [
`--max-storage-capacity=${MAX_STORAGE_CAPACITY}`,
`--jump-capacity=${CAPACITY[1024]}`
]


let call = storage_hub_runtime::RuntimeCall::Providers(
pallet_storage_providers::Call::change_capacity { new_capacity },
);

self.storage_hub_handler
.blockchain
.send_extrinsic(call, Tip::from(0))
.await?
.with_timeout(Duration::from_secs(
self.storage_hub_handler
.provider_config
.extrinsic_retry_timeout,
))
.watch_for_success(&self.storage_hub_handler.blockchain)
.await?;

drop(capacity_queue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why drop the lock here and not outside?


info!(
target: LOG_TARGET,
"Increased storage capacity to {:?} bytes",
new_capacity
);
}

let available_capacity = self
.storage_hub_handler
Expand Down Expand Up @@ -652,12 +675,12 @@ where
///
/// The `max_storage_capacity` is returned if the new capacity exceeds it.
fn calculate_capacity(
&mut self,
event: &NewStorageRequest,
&self,
size: StorageDataUnit,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of renaming this to required_additional_capacity?

current_capacity: StorageDataUnit,
) -> Result<StorageDataUnit, anyhow::Error> {
let jump_capacity = self.storage_hub_handler.provider_config.jump_capacity;
let jumps_needed = (event.size + jump_capacity - 1) / jump_capacity;
let jumps_needed = (size + jump_capacity - 1) / jump_capacity;
let jumps = max(jumps_needed, 1);
let bytes_to_add = jumps * jump_capacity;
let required_capacity = current_capacity.checked_add(bytes_to_add).ok_or_else(|| {
Expand Down
67 changes: 67 additions & 0 deletions test/suites/integration/bsp/storage-capacity.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import assert from "node:assert";

Check failure on line 1 in test/suites/integration/bsp/storage-capacity.test.ts

View workflow job for this annotation

GitHub Actions / Run BSPNet Tests (4)

Test BSP storage size increased twice in the same increasing period (check for race condition)

[Error [ERR_TEST_FAILURE]: 20000: Error at calling runtime api: Execution failed: Execution aborted due to trap: wasm trap: wasm `unreachable` instruction executed WASM backtrace: error while executing at wasm backtrace: 0: 0x5c2a9 - storage_hub_runtime.wasm!rust_begin_unwind 1: 0xfe8a - storage_hub_runtime.wasm!core::panicking::panic_fmt::h702386a327f78312 2: 0x4d328c - storage_hub_runtime.wasm!<pallet_aura::pallet::Pallet<T> as frame_support::traits::hooks::OnInitialize<<<<T as frame_system::pallet::Config>::Block as sp_runtime::traits::HeaderProvider>::HeaderT as sp_runtime::traits::Header>::Number>>::on_initialize::h85caa0d173a3b4a6 3: 0x4056c2 - storage_hub_runtime.wasm!<(TupleElement0,TupleElement1,TupleElement2,TupleElement3,TupleElement4,TupleElement5,TupleElement6,TupleElement7,TupleElement8,TupleElement9,TupleElement10,TupleElement11,TupleElement12,TupleElement13,TupleElement14,TupleElement15,TupleElement16,TupleElement17,TupleElement18,TupleElement19,TupleElement20,TupleElement21,TupleElement22,TupleElement23) as frame_support::traits::hooks::OnInitialize<BlockNumber>>::on_initialize::h8eb3e106d7c1ad2f 4: 0x4778f2 - storage_hub_runtime.wasm!frame_executive::Executive<System,Block,Context,UnsignedValidator,AllPalletsWithSystem,COnRuntimeUpgrade>::initialize_block::hf0e0dd498d1c109c 5: 0x3b82c2 - storage_hub_runtime.wasm!Core_initialize_block] { code: 'ERR_TEST_FAILURE', failureType: 'testCodeFailure', cause: Error [RpcError]: 20000: Error at calling runtime api: Execution failed: Execution aborted due to trap: wasm trap: wasm `unreachable` instruction executed WASM backtrace: error while executing at wasm backtrace: 0: 0x5c2a9 - storage_hub_runtime.wasm!rust_begin_unwind 1: 0xfe8a - storage_hub_runtime.wasm!core::panicking::panic_fmt::h702386a327f78312 2: 0x4d328c - storage_hub_runtime.wasm!<pallet_aura::pallet::Pallet<T> as frame_support::traits::hooks::OnInitialize<<<<T as frame_system::pallet::Config>::Block as sp_runtime::traits::HeaderProvider>::HeaderT as sp_runtime::traits::Header>::Number>>::on_initialize::h85caa0d173a3b4a6 3: 0x4056c2 - storage_hub_runtime.wasm!<(TupleElement0,TupleElement1,TupleElement2,TupleElement3,TupleElement4,TupleElement5,TupleElement6,TupleElement7,TupleElement8,TupleElement9,TupleElement10,TupleElement11,TupleElement12,TupleElement13,TupleElement14,TupleElement15,TupleElement16,TupleElement17,TupleElement18,TupleElement19,TupleElement20,TupleElement21,TupleElement22,TupleElement23) as frame_support::traits::hooks::OnInitialize<BlockNumber>>::on_initialize::h8eb3e106d7c1ad2f 4: 0x4778f2 - storage_hub_runtime.wasm!frame_executive::Executive<System,Block,Context,UnsignedValidator,AllPalletsWithSystem,COnRuntimeUpgrade>::initialize_block::hf0e0dd498d1c109c 5: 0x3b82c2 - storage_hub_runtime.wasm!Core_initialize_block at checkError (file:///home/runner/work/storage-hub/storage-hub/node_modules/.pnpm/@PolkaDot+rpc-provider@12.4.2/node_modules/@polkadot/rpc-provider/coder/index.js:19:15) at RpcCoder.decodeResponse (file:///home/runner/work/storage-hub/storage-hub/node_modules/.pnpm/@PolkaDot+rpc-provider@12.4.2/node_modules/@polkadot/rpc-provider/coder/index.js:35:9) at WsProvider.__internal__onSocketMessageResult (file:///home/runner/work/storage-hub/storage-hub/node_modules/.pnpm/@PolkaDot+rpc-provider@12.4.2/node_modules/@polkadot/rpc-provider/ws/index.js:409:51) at WebSocket.__internal__onSocketMessage (file:///home/runner/work/storage-hub/storage-hub/node_modules/.pnpm/@PolkaDot+rpc-provider@12.4.2/node_modules/@polkadot/rpc-provider/ws/index.js:398:20) at [nodejs.internal.kHybridDispatch] (node:internal/event_target:816:20) at WebSocket.dispatchEvent (node:internal/event_target:751:26) at fireEvent (node:internal/deps/undici/undici:11340:14) at websocketMessageReceived (node:internal/deps/undici/undici:11362:7) at ByteParser.run (node:internal/deps/undici/undici:11984:19) at ByteParser._write (node:internal/deps/undici/undici:11877:14) }

Check failure on line 1 in test/suites/integration/bsp/storage-capacity.test.ts

View workflow job for this annotation

GitHub Actions / Run BSPNet Tests (4)

Test BSP storage size increased twice in the same increasing period (check for race condition)

[Error [ERR_TEST_FAILURE]: 20000: Error at calling runtime api: Execution failed: Execution aborted due to trap: wasm trap: wasm `unreachable` instruction executed WASM backtrace: error while executing at wasm backtrace: 0: 0x5c2a9 - storage_hub_runtime.wasm!rust_begin_unwind 1: 0xfe8a - storage_hub_runtime.wasm!core::panicking::panic_fmt::h702386a327f78312 2: 0x4d328c - storage_hub_runtime.wasm!<pallet_aura::pallet::Pallet<T> as frame_support::traits::hooks::OnInitialize<<<<T as frame_system::pallet::Config>::Block as sp_runtime::traits::HeaderProvider>::HeaderT as sp_runtime::traits::Header>::Number>>::on_initialize::h85caa0d173a3b4a6 3: 0x4056c2 - storage_hub_runtime.wasm!<(TupleElement0,TupleElement1,TupleElement2,TupleElement3,TupleElement4,TupleElement5,TupleElement6,TupleElement7,TupleElement8,TupleElement9,TupleElement10,TupleElement11,TupleElement12,TupleElement13,TupleElement14,TupleElement15,TupleElement16,TupleElement17,TupleElement18,TupleElement19,TupleElement20,TupleElement21,TupleElement22,TupleElement23) as frame_support::traits::hooks::OnInitialize<BlockNumber>>::on_initialize::h8eb3e106d7c1ad2f 4: 0x4778f2 - storage_hub_runtime.wasm!frame_executive::Executive<System,Block,Context,UnsignedValidator,AllPalletsWithSystem,COnRuntimeUpgrade>::initialize_block::hf0e0dd498d1c109c 5: 0x3b82c2 - storage_hub_runtime.wasm!Core_initialize_block] { code: 'ERR_TEST_FAILURE', failureType: 'testCodeFailure', cause: Error [RpcError]: 20000: Error at calling runtime api: Execution failed: Execution aborted due to trap: wasm trap: wasm `unreachable` instruction executed WASM backtrace: error while executing at wasm backtrace: 0: 0x5c2a9 - storage_hub_runtime.wasm!rust_begin_unwind 1: 0xfe8a - storage_hub_runtime.wasm!core::panicking::panic_fmt::h702386a327f78312 2: 0x4d328c - storage_hub_runtime.wasm!<pallet_aura::pallet::Pallet<T> as frame_support::traits::hooks::OnInitialize<<<<T as frame_system::pallet::Config>::Block as sp_runtime::traits::HeaderProvider>::HeaderT as sp_runtime::traits::Header>::Number>>::on_initialize::h85caa0d173a3b4a6 3: 0x4056c2 - storage_hub_runtime.wasm!<(TupleElement0,TupleElement1,TupleElement2,TupleElement3,TupleElement4,TupleElement5,TupleElement6,TupleElement7,TupleElement8,TupleElement9,TupleElement10,TupleElement11,TupleElement12,TupleElement13,TupleElement14,TupleElement15,TupleElement16,TupleElement17,TupleElement18,TupleElement19,TupleElement20,TupleElement21,TupleElement22,TupleElement23) as frame_support::traits::hooks::OnInitialize<BlockNumber>>::on_initialize::h8eb3e106d7c1ad2f 4: 0x4778f2 - storage_hub_runtime.wasm!frame_executive::Executive<System,Block,Context,UnsignedValidator,AllPalletsWithSystem,COnRuntimeUpgrade>::initialize_block::hf0e0dd498d1c109c 5: 0x3b82c2 - storage_hub_runtime.wasm!Core_initialize_block at checkError (file:///home/runner/work/storage-hub/storage-hub/node_modules/.pnpm/@PolkaDot+rpc-provider@12.4.2/node_modules/@polkadot/rpc-provider/coder/index.js:19:15) at RpcCoder.decodeResponse (file:///home/runner/work/storage-hub/storage-hub/node_modules/.pnpm/@PolkaDot+rpc-provider@12.4.2/node_modules/@polkadot/rpc-provider/coder/index.js:35:9) at WsProvider.__internal__onSocketMessageResult (file:///home/runner/work/storage-hub/storage-hub/node_modules/.pnpm/@PolkaDot+rpc-provider@12.4.2/node_modules/@polkadot/rpc-provider/ws/index.js:409:51) at WebSocket.__internal__onSocketMessage (file:///home/runner/work/storage-hub/storage-hub/node_modules/.pnpm/@PolkaDot+rpc-provider@12.4.2/node_modules/@polkadot/rpc-provider/ws/index.js:398:20) at [nodejs.internal.kHybridDispatch] (node:internal/event_target:816:20) at WebSocket.dispatchEvent (node:internal/event_target:751:26) at fireEvent (node:internal/deps/undici/undici:11340:14) at websocketMessageReceived (node:internal/deps/undici/undici:11362:7) at ByteParser.run (node:internal/deps/undici/undici:11984:19) at ByteParser._write (node:internal/deps/undici/undici:11877:14) }
import {
bspKey,
describeBspNet,
Expand Down Expand Up @@ -104,6 +104,12 @@

await userApi.sealBlock();

const updatedCapacity = BigInt(bspApi.shConsts.JUMP_CAPACITY_BSP + newCapacity);
const bspCapacityAfter = await bspApi.query.providers.backupStorageProviders(
bspApi.shConsts.DUMMY_BSP_ID
);
assert.strictEqual(bspCapacityAfter.unwrap().capacity.toBigInt(), updatedCapacity);

// Assert that the BSP was accepted as a volunteer.
await userApi.assert.eventPresent("fileSystem", "AcceptedBspVolunteer");
});
Expand Down Expand Up @@ -155,4 +161,65 @@
assert.strictEqual(eventInfo.asModule.index.toNumber(), providersPallet?.index.toNumber());
assert.strictEqual(eventInfo.asModule.error[0], newCapacityLessThanUsedStorageErrorIndex);
});

it("Test BSP storage size increased twice in the same increasing period (check for race condition)", async () => {
const capacityUsed = (
await bspApi.query.providers.backupStorageProviders(bspApi.shConsts.DUMMY_BSP_ID)
)
.unwrap()
.capacityUsed.toNumber();
await bspApi.block.skipToMinChangeTime();
const minCapacity = bspApi.consts.providers.spMinCapacity.toNumber();
const newCapacity = Math.max(minCapacity, capacityUsed + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why capacityUsed + 1 and not just capacityUsed?


// Set BSP's available capacity to 0 to force the BSP to increase its capacity before volunteering for the storage request.
const { extSuccess } = await bspApi.sealBlock(
bspApi.tx.providers.changeCapacity(newCapacity),
bspKey
);
assert.strictEqual(extSuccess, true);

// First storage request
const source1 = "res/cloud.jpg";
const location1 = "test/cloud.jpg";
const bucketName1 = "toobig-1";
await bspApi.file.newStorageRequest(source1, location1, bucketName1);

// Second storage request
const source2 = "res/adolphus.jpg";
const location2 = "test/adolphus.jpg";
const bucketName2 = "nothingmuch-2";
await bspApi.file.newStorageRequest(source2, location2, bucketName2);

//To allow for BSP to react to request
await sleep(500);

// TODO: get the last block number and the period for which you can change capacity
// userApi.consts.providers.minBlocksBetweenCapacityChanges

const nextCapacityChangeBlock = 29;

await bspApi.advanceToBlock(nextCapacityChangeBlock);

// Allow BSP enough time to send call to change capacity.
await sleep(500);

// Assert BSP has sent a call to increase its capacity.
await bspApi.assert.extrinsicPresent({
module: "providers",
method: "changeCapacity",
checkTxPool: true
});

await bspApi.sealBlock();

// Assert that the capacity has changed.
await bspApi.assert.eventPresent("providers", "CapacityChanged");

const updatedCapacity = BigInt(bspApi.shConsts.JUMP_CAPACITY_BSP + newCapacity);
const bspCapacityAfter = await bspApi.query.providers.backupStorageProviders(
bspApi.shConsts.DUMMY_BSP_ID
);
assert.strictEqual(bspCapacityAfter.unwrap().capacity.toBigInt(), updatedCapacity);
});
});
2 changes: 2 additions & 0 deletions test/util/bspNet/consts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,5 @@ export const REMARK_WEIGHT_REF_TIME = 127_121_340;
export const REMARK_WEIGHT_PROOF_SIZE = 142;
export const TRANSFER_WEIGHT_REF_TIME = 297_297_000;
export const TRANSFER_WEIGHT_PROOF_SIZE = 308;

export const JUMP_CAPACITY_BSP = 1073741824;
Loading