diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f6a6b88710..605085fca4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future. - [2386](https://github.com/FuelLabs/fuel-core/pull/2386): Add a flag to define the maximum number of file descriptors that RocksDB can use. By default it's half of the OS limit. - [2376](https://github.com/FuelLabs/fuel-core/pull/2376): Add a way to fetch transactions in P2P without specifying a peer. +- [2327](https://github.com/FuelLabs/fuel-core/pull/2327): Add more services tests and more checks of the pool. Also add an high level documentation for users of the pool and contributors. +- [2416](https://github.com/FuelLabs/fuel-core/issues/2416): Define the `GasPriceServiceV1` task. + ### Fixed - [2366](https://github.com/FuelLabs/fuel-core/pull/2366): The `importer_gas_price_for_block` metric is properly collected. @@ -29,6 +32,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Changed - [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message. +- [2377](https://github.com/FuelLabs/fuel-core/pull/2377): Add more errors that can be returned as responses when using protocol `/fuel/req_res/0.0.2`. The errors supported are `ProtocolV1EmptyResponse` (status code `0`) for converting empty responses sent via protocol `/fuel/req_res/0.0.1`, `RequestedRangeTooLarge`(status code `1`) if the client requests a range of objects such as sealed block headers or transactions too large, `Timeout` (status code `2`) if the remote peer takes too long to fulfill a request, or `SyncProcessorOutOfCapacity` if the remote peer is fulfilling too many requests concurrently. + +#### Breaking +- [2389](https://github.com/FuelLabs/fuel-core/pull/2258): Updated the `messageProof` GraphQL schema to return a non-nullable `MessageProof`. ## [Version 0.40.0] @@ -57,7 +64,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2324](https://github.com/FuelLabs/fuel-core/pull/2324): Added metrics for sync, async processor and for all GraphQL queries. - [2320](https://github.com/FuelLabs/fuel-core/pull/2320): Added new CLI flag `graphql-max-resolver-recursive-depth` to limit recursion within resolver. The default value it "1". - ## Fixed - [2320](https://github.com/FuelLabs/fuel-core/issues/2320): Prevent `/health` and `/v1/health` from being throttled by the concurrency limiter. - [2322](https://github.com/FuelLabs/fuel-core/issues/2322): Set the salt of genesis contracts to zero on execution. diff --git a/crates/client/assets/debugAdapterProtocol.json b/crates/client/assets/debugAdapterProtocol.json index 44a0c2eed9c..b435aef0f85 100644 --- a/crates/client/assets/debugAdapterProtocol.json +++ b/crates/client/assets/debugAdapterProtocol.json @@ -1440,7 +1440,7 @@ { "$ref": "#/definitions/Request" }, { "type": "object", - "description": "Replaces all existing instruction breakpoints. Typically, instruction breakpoints would be set from a diassembly window. \nTo clear all instruction breakpoints, specify an empty array.\nWhen an instruction breakpoint is hit, a 'stopped' event (with reason 'instruction breakpoint') is generated.\nClients should only call this request if the capability 'supportsInstructionBreakpoints' is true.", + "description": "Replaces all existing instruction breakpoints. Typically, instruction breakpoints would be set from a disassembly window. \nTo clear all instruction breakpoints, specify an empty array.\nWhen an instruction breakpoint is hit, a 'stopped' event (with reason 'instruction breakpoint') is generated.\nClients should only call this request if the capability 'supportsInstructionBreakpoints' is true.", "properties": { "command": { "type": "string", diff --git a/crates/client/assets/schema.sdl b/crates/client/assets/schema.sdl index b9048362caa..54db4ec0995 100644 --- a/crates/client/assets/schema.sdl +++ b/crates/client/assets/schema.sdl @@ -980,7 +980,7 @@ type Query { """ owner: Address, first: Int, after: String, last: Int, before: String ): MessageConnection! - messageProof(transactionId: TransactionId!, nonce: Nonce!, commitBlockId: BlockId, commitBlockHeight: U32): MessageProof + messageProof(transactionId: TransactionId!, nonce: Nonce!, commitBlockId: BlockId, commitBlockHeight: U32): MessageProof! messageStatus(nonce: Nonce!): MessageStatus! relayedTransactionStatus( """ diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 65789f6c9c3..1743f631547 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1141,7 +1141,7 @@ impl FuelClient { nonce: &Nonce, commit_block_id: Option<&BlockId>, commit_block_height: Option, - ) -> io::Result> { + ) -> io::Result { let transaction_id: TransactionId = (*transaction_id).into(); let nonce: schema::Nonce = (*nonce).into(); let commit_block_id: Option = @@ -1153,14 +1153,7 @@ impl FuelClient { commit_block_id, commit_block_height, }); - - let proof = self - .query(query) - .await? - .message_proof - .map(TryInto::try_into) - .transpose()?; - + let proof = self.query(query).await?.message_proof.try_into()?; Ok(proof) } diff --git a/crates/client/src/client/schema/message.rs b/crates/client/src/client/schema/message.rs index 7c47ead200a..7f4e39829d9 100644 --- a/crates/client/src/client/schema/message.rs +++ b/crates/client/src/client/schema/message.rs @@ -114,7 +114,7 @@ pub struct MessageProofQuery { commitBlockId: $commit_block_id, commitBlockHeight: $commit_block_height )] - pub message_proof: Option, + pub message_proof: MessageProof, } #[derive(cynic::QueryFragment, Clone, Debug)] diff --git a/crates/fuel-core/src/schema/message.rs b/crates/fuel-core/src/schema/message.rs index 4d9e5048bd2..95e3f421589 100644 --- a/crates/fuel-core/src/schema/message.rs +++ b/crates/fuel-core/src/schema/message.rs @@ -136,7 +136,7 @@ impl MessageQuery { nonce: Nonce, commit_block_id: Option, commit_block_height: Option, - ) -> async_graphql::Result> { + ) -> async_graphql::Result { let query = ctx.read_view()?; let height = match (commit_block_id, commit_block_height) { (Some(commit_block_id), None) => { @@ -157,7 +157,7 @@ impl MessageQuery { height, )?; - Ok(Some(MessageProof(proof))) + Ok(MessageProof(proof)) } #[graphql(complexity = "query_costs().storage_read + child_complexity")] diff --git a/crates/fuel-gas-price-algorithm/src/v1.rs b/crates/fuel-gas-price-algorithm/src/v1.rs index 9a9da066fee..a3ace733423 100644 --- a/crates/fuel-gas-price-algorithm/src/v1.rs +++ b/crates/fuel-gas-price-algorithm/src/v1.rs @@ -313,6 +313,7 @@ impl AlgorithmUpdaterV1 { if !height_range.is_empty() { self.da_block_update(height_range, range_cost)?; self.recalculate_projected_cost(); + self.update_da_gas_price(); } Ok(()) } diff --git a/crates/fuel-gas-price-algorithm/src/v1/tests/update_da_record_data_tests.rs b/crates/fuel-gas-price-algorithm/src/v1/tests/update_da_record_data_tests.rs index 123520b4e99..efa0dc62d6f 100644 --- a/crates/fuel-gas-price-algorithm/src/v1/tests/update_da_record_data_tests.rs +++ b/crates/fuel-gas-price-algorithm/src/v1/tests/update_da_record_data_tests.rs @@ -270,3 +270,166 @@ fn update_da_record_data__da_block_updates_projected_total_cost_with_known_and_g let expected = new_known_total_cost + guessed_part; assert_eq!(actual, expected as u128); } + +#[test] +fn update_da_record_data__da_block_lowers_da_gas_price() { + // given + let da_cost_per_byte = 40; + let da_recorded_block_height = 10; + let l2_block_height = 11; + let original_known_total_cost = 150; + let unrecorded_blocks = vec![BlockBytes { + height: 11, + block_bytes: 3000, + }]; + let da_p_component = 2; + let guessed_cost: u64 = unrecorded_blocks + .iter() + .map(|block| block.block_bytes * da_cost_per_byte) + .sum(); + let projected_total_cost = original_known_total_cost + guessed_cost; + + let mut updater = UpdaterBuilder::new() + .with_da_cost_per_byte(da_cost_per_byte as u128) + .with_da_p_component(da_p_component) + .with_last_profit(10, 0) + .with_da_recorded_block_height(da_recorded_block_height) + .with_l2_block_height(l2_block_height) + .with_projected_total_cost(projected_total_cost as u128) + .with_known_total_cost(original_known_total_cost as u128) + .with_unrecorded_blocks(unrecorded_blocks.clone()) + .build(); + + let new_cost_per_byte = 100; + let (recorded_heights, recorded_cost) = + unrecorded_blocks + .iter() + .fold((vec![], 0), |(mut range, cost), block| { + range.push(block.height); + (range, cost + block.block_bytes * new_cost_per_byte) + }); + let min = recorded_heights.iter().min().unwrap(); + let max = recorded_heights.iter().max().unwrap(); + let recorded_range = *min..(max + 1); + + let old_da_gas_price = updater.new_scaled_da_gas_price; + + // when + updater + .update_da_record_data(recorded_range, recorded_cost as u128) + .unwrap(); + + // then + let new_da_gas_price = updater.new_scaled_da_gas_price; + // because the profit is 10 and the da_p_component is 2, the new da gas price should be lesser than the previous one. + assert_eq!(new_da_gas_price, 0); + assert_ne!(old_da_gas_price, new_da_gas_price); +} + +#[test] +fn update_da_record_data__da_block_increases_da_gas_price() { + // given + let da_cost_per_byte = 40; + let da_recorded_block_height = 10; + let l2_block_height = 11; + let original_known_total_cost = 150; + let unrecorded_blocks = vec![BlockBytes { + height: 11, + block_bytes: 3000, + }]; + let da_p_component = 2; + let guessed_cost: u64 = unrecorded_blocks + .iter() + .map(|block| block.block_bytes * da_cost_per_byte) + .sum(); + let projected_total_cost = original_known_total_cost + guessed_cost; + + let mut updater = UpdaterBuilder::new() + .with_da_cost_per_byte(da_cost_per_byte as u128) + .with_da_p_component(da_p_component) + .with_last_profit(-10, 0) + .with_da_recorded_block_height(da_recorded_block_height) + .with_l2_block_height(l2_block_height) + .with_projected_total_cost(projected_total_cost as u128) + .with_known_total_cost(original_known_total_cost as u128) + .with_unrecorded_blocks(unrecorded_blocks.clone()) + .build(); + + let new_cost_per_byte = 100; + let (recorded_heights, recorded_cost) = + unrecorded_blocks + .iter() + .fold((vec![], 0), |(mut range, cost), block| { + range.push(block.height); + (range, cost + block.block_bytes * new_cost_per_byte) + }); + let min = recorded_heights.iter().min().unwrap(); + let max = recorded_heights.iter().max().unwrap(); + let recorded_range = *min..(max + 1); + + let old_da_gas_price = updater.new_scaled_da_gas_price; + + // when + updater + .update_da_record_data(recorded_range, recorded_cost as u128) + .unwrap(); + + // then + let new_da_gas_price = updater.new_scaled_da_gas_price; + // because the profit is -10 and the da_p_component is 2, the new da gas price should be greater than the previous one. + assert_eq!(new_da_gas_price, 6); + assert_ne!(old_da_gas_price, new_da_gas_price); +} + +#[test] +fn update_da_record_data__da_block_will_not_change_da_gas_price() { + // given + let da_cost_per_byte = 40; + let da_recorded_block_height = 10; + let l2_block_height = 11; + let original_known_total_cost = 150; + let unrecorded_blocks = vec![BlockBytes { + height: 11, + block_bytes: 3000, + }]; + let da_p_component = 2; + let guessed_cost: u64 = unrecorded_blocks + .iter() + .map(|block| block.block_bytes * da_cost_per_byte) + .sum(); + let projected_total_cost = original_known_total_cost + guessed_cost; + + let mut updater = UpdaterBuilder::new() + .with_da_cost_per_byte(da_cost_per_byte as u128) + .with_da_p_component(da_p_component) + .with_last_profit(0, 0) + .with_da_recorded_block_height(da_recorded_block_height) + .with_l2_block_height(l2_block_height) + .with_projected_total_cost(projected_total_cost as u128) + .with_known_total_cost(original_known_total_cost as u128) + .with_unrecorded_blocks(unrecorded_blocks.clone()) + .build(); + + let new_cost_per_byte = 100; + let (recorded_heights, recorded_cost) = + unrecorded_blocks + .iter() + .fold((vec![], 0), |(mut range, cost), block| { + range.push(block.height); + (range, cost + block.block_bytes * new_cost_per_byte) + }); + let min = recorded_heights.iter().min().unwrap(); + let max = recorded_heights.iter().max().unwrap(); + let recorded_range = *min..(max + 1); + + let old_da_gas_price = updater.new_scaled_da_gas_price; + + // when + updater + .update_da_record_data(recorded_range, recorded_cost as u128) + .unwrap(); + + // then + let new_da_gas_price = updater.new_scaled_da_gas_price; + assert_eq!(old_da_gas_price, new_da_gas_price); +} diff --git a/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs b/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs index 4e76ee88e2f..e81ea58b2c3 100644 --- a/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs +++ b/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs @@ -17,6 +17,10 @@ use crate::{ ports::MetadataStorage, }; use fuel_core_storage::{ + codec::{ + postcard::Postcard, + Encode, + }, kv_store::KeyValueInspect, structured_storage::StructuredStorage, transactional::{ @@ -101,6 +105,8 @@ pub fn get_block_info( height: (*block.header().height()).into(), gas_used: used_gas, block_gas_capacity: block_gas_limit, + block_bytes: Postcard::encode(block).len() as u64, + block_fees: fee, }; Ok(info) } diff --git a/crates/services/gas_price_service/src/common/utils.rs b/crates/services/gas_price_service/src/common/utils.rs index 712e288adca..a3813e53e4f 100644 --- a/crates/services/gas_price_service/src/common/utils.rs +++ b/crates/services/gas_price_service/src/common/utils.rs @@ -36,5 +36,9 @@ pub enum BlockInfo { gas_used: u64, // Total gas capacity of the block block_gas_capacity: u64, + // The size of block in bytes + block_bytes: u64, + // The fees the block has collected + block_fees: u64, }, } diff --git a/crates/services/gas_price_service/src/v0/service.rs b/crates/services/gas_price_service/src/v0/service.rs index ee062097dfd..7ef806fdd23 100644 --- a/crates/services/gas_price_service/src/v0/service.rs +++ b/crates/services/gas_price_service/src/v0/service.rs @@ -104,6 +104,7 @@ where height, gas_used, block_gas_capacity, + .. } => { self.handle_normal_block(height, gas_used, block_gas_capacity) .await?; @@ -225,6 +226,8 @@ mod tests { height: block_height, gas_used: 60, block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, }; let (l2_block_sender, l2_block_receiver) = mpsc::channel(1); diff --git a/crates/services/gas_price_service/src/v0/tests.rs b/crates/services/gas_price_service/src/v0/tests.rs index c395b39aeba..7526975e467 100644 --- a/crates/services/gas_price_service/src/v0/tests.rs +++ b/crates/services/gas_price_service/src/v0/tests.rs @@ -147,6 +147,8 @@ async fn next_gas_price__affected_by_new_l2_block() { height: 1, gas_used: 60, block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, }; let (l2_block_sender, l2_block_receiver) = tokio::sync::mpsc::channel(1); let l2_block_source = FakeL2BlockSource { @@ -186,6 +188,8 @@ async fn next__new_l2_block_saves_old_metadata() { height: 1, gas_used: 60, block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, }; let (l2_block_sender, l2_block_receiver) = tokio::sync::mpsc::channel(1); let l2_block_source = FakeL2BlockSource { diff --git a/crates/services/gas_price_service/src/v1.rs b/crates/services/gas_price_service/src/v1.rs index 4770112762a..fafb7245ef8 100644 --- a/crates/services/gas_price_service/src/v1.rs +++ b/crates/services/gas_price_service/src/v1.rs @@ -1,3 +1,4 @@ pub mod algorithm; pub mod da_source_service; pub mod metadata; +pub mod service; diff --git a/crates/services/gas_price_service/src/v1/algorithm.rs b/crates/services/gas_price_service/src/v1/algorithm.rs index 92333d8cce6..c7f87aed2ad 100644 --- a/crates/services/gas_price_service/src/v1/algorithm.rs +++ b/crates/services/gas_price_service/src/v1/algorithm.rs @@ -1,4 +1,7 @@ -use crate::common::gas_price_algorithm::GasPriceAlgorithm; +use crate::common::gas_price_algorithm::{ + GasPriceAlgorithm, + SharedGasPriceAlgo, +}; use fuel_core_types::fuel_types::BlockHeight; use fuel_gas_price_algorithm::v1::AlgorithmV1; @@ -11,3 +14,5 @@ impl GasPriceAlgorithm for AlgorithmV1 { self.worst_case(block_height.into()) } } + +pub type SharedV1Algorithm = SharedGasPriceAlgo; diff --git a/crates/services/gas_price_service/src/v1/da_source_service.rs b/crates/services/gas_price_service/src/v1/da_source_service.rs index eb699233455..840345bff98 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service.rs @@ -7,7 +7,7 @@ pub mod service; #[derive(Debug, Default, Clone, Eq, Hash, PartialEq)] pub struct DaBlockCosts { - pub l2_block_range: core::ops::Range, + pub l2_block_range: core::ops::Range, pub blob_size_bytes: u32, pub blob_cost_wei: u128, } diff --git a/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs b/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs index 7016fe0da08..e0629cc0edd 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs @@ -21,12 +21,12 @@ trait BlockCommitterApi: Send + Sync { /// Used to get the costs for a specific seqno async fn get_costs_by_seqno( &self, - number: u64, + number: u32, ) -> DaBlockCostsResult>; /// Used to get the costs for a range of blocks (inclusive) async fn get_cost_bundles_by_range( &self, - range: core::ops::Range, + range: core::ops::Range, ) -> DaBlockCostsResult>>; } @@ -40,9 +40,9 @@ pub struct BlockCommitterDaBlockCosts { #[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq)] pub struct RawDaBlockCosts { /// Sequence number (Monotonically increasing nonce) - pub sequence_number: u64, + pub sequence_number: u32, /// The range of blocks that the costs apply to - pub blocks_range: core::ops::Range, + pub blocks_range: core::ops::Range, /// The DA block height of the last transaction for the range of blocks pub da_block_height: DaBlockHeight, /// Rolling sum cost of posting blobs (wei) @@ -143,7 +143,7 @@ impl BlockCommitterApi for BlockCommitterHttpApi { async fn get_costs_by_seqno( &self, - number: u64, + number: u32, ) -> DaBlockCostsResult> { let response = self .client @@ -157,7 +157,7 @@ impl BlockCommitterApi for BlockCommitterHttpApi { async fn get_cost_bundles_by_range( &self, - range: core::ops::Range, + range: core::ops::Range, ) -> DaBlockCostsResult>> { let response = self .client @@ -192,7 +192,7 @@ mod tests { } async fn get_costs_by_seqno( &self, - seq_no: u64, + seq_no: u32, ) -> DaBlockCostsResult> { // arbitrary logic to generate a new value let mut value = self.value.clone(); @@ -200,7 +200,8 @@ mod tests { value.sequence_number = seq_no; value.blocks_range = value.blocks_range.end * seq_no..value.blocks_range.end * seq_no + 10; - value.da_block_height = value.da_block_height + (seq_no + 1).into(); + value.da_block_height = + value.da_block_height + ((seq_no + 1) as u64).into(); value.total_cost += 1; value.total_size_bytes += 1; } @@ -208,7 +209,7 @@ mod tests { } async fn get_cost_bundles_by_range( &self, - _: core::ops::Range, + _: core::ops::Range, ) -> DaBlockCostsResult>> { Ok(vec![self.value.clone()]) } @@ -286,7 +287,7 @@ mod tests { } async fn get_costs_by_seqno( &self, - seq_no: u64, + seq_no: u32, ) -> DaBlockCostsResult> { // arbitrary logic to generate a new value let mut value = self.value.clone(); @@ -301,7 +302,7 @@ mod tests { } async fn get_cost_bundles_by_range( &self, - _: core::ops::Range, + _: core::ops::Range, ) -> DaBlockCostsResult>> { Ok(vec![self.value.clone()]) } diff --git a/crates/services/gas_price_service/src/v1/da_source_service/service.rs b/crates/services/gas_price_service/src/v1/da_source_service/service.rs index 1f9cadf02dd..e20a506a097 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/service.rs @@ -15,6 +15,7 @@ use tokio::{ use crate::v1::da_source_service::DaBlockCosts; pub use anyhow::Result; +use fuel_core_services::stream::BoxFuture; #[derive(Clone)] pub struct SharedState(Sender); diff --git a/crates/services/gas_price_service/src/v1/metadata.rs b/crates/services/gas_price_service/src/v1/metadata.rs index 422cdf42507..c5a1e1c0447 100644 --- a/crates/services/gas_price_service/src/v1/metadata.rs +++ b/crates/services/gas_price_service/src/v1/metadata.rs @@ -1,7 +1,6 @@ use crate::v0::metadata::V0Metadata; use fuel_gas_price_algorithm::v1::{ AlgorithmUpdaterV1, - ClampedPercentage, L2ActivityTracker, }; use std::num::NonZeroU64; @@ -41,7 +40,7 @@ pub struct V1Metadata { impl V1Metadata { pub fn construct_from_v0_metadata( v0_metadata: V0Metadata, - config: V1AlgorithmConfig, + config: &V1AlgorithmConfig, ) -> anyhow::Result { let metadata = Self { new_scaled_exec_price: v0_metadata @@ -68,19 +67,56 @@ impl V1Metadata { } pub struct V1AlgorithmConfig { - new_exec_gas_price: u64, - min_exec_gas_price: u64, - exec_gas_price_change_percent: u16, - l2_block_fullness_threshold_percent: u8, - gas_price_factor: NonZeroU64, - min_da_gas_price: u64, - max_da_gas_price_change_percent: u16, - da_p_component: i64, - da_d_component: i64, - normal_range_size: u16, - capped_range_size: u16, - decrease_range_size: u16, - block_activity_threshold: u8, + pub new_exec_gas_price: u64, + pub min_exec_gas_price: u64, + pub exec_gas_price_change_percent: u16, + pub l2_block_fullness_threshold_percent: u8, + pub gas_price_factor: NonZeroU64, + pub min_da_gas_price: u64, + pub max_da_gas_price_change_percent: u16, + pub da_p_component: i64, + pub da_d_component: i64, + pub normal_range_size: u16, + pub capped_range_size: u16, + pub decrease_range_size: u16, + pub block_activity_threshold: u8, + pub unrecorded_blocks: Vec<(u32, u64)>, +} + +impl From<&V1AlgorithmConfig> for AlgorithmUpdaterV1 { + fn from(value: &V1AlgorithmConfig) -> Self { + let l2_activity = L2ActivityTracker::new_full( + value.normal_range_size, + value.capped_range_size, + value.decrease_range_size, + value.block_activity_threshold.into(), + ); + let unrecorded_blocks = value.unrecorded_blocks.clone().into_iter().collect(); + Self { + new_scaled_exec_price: value.new_exec_gas_price, + l2_block_height: 0, + new_scaled_da_gas_price: value.min_da_gas_price, + gas_price_factor: value.gas_price_factor, + total_da_rewards_excess: 0, + da_recorded_block_height: 0, + latest_known_total_da_cost_excess: 0, + projected_total_da_cost: 0, + last_profit: 0, + second_to_last_profit: 0, + latest_da_cost_per_byte: 0, + l2_activity, + min_exec_gas_price: value.min_exec_gas_price, + exec_gas_price_change_percent: value.exec_gas_price_change_percent, + l2_block_fullness_threshold_percent: value + .l2_block_fullness_threshold_percent + .into(), + min_da_gas_price: value.min_da_gas_price, + max_da_gas_price_change_percent: value.max_da_gas_price_change_percent, + da_p_component: value.da_p_component, + da_d_component: value.da_d_component, + unrecorded_blocks, + } + } } impl From for V1Metadata { @@ -104,7 +140,7 @@ impl From for V1Metadata { pub fn v1_algorithm_from_metadata( metadata: V1Metadata, - config: V1AlgorithmConfig, + config: &V1AlgorithmConfig, ) -> AlgorithmUpdaterV1 { let l2_activity = L2ActivityTracker::new_full( config.normal_range_size, @@ -112,7 +148,11 @@ pub fn v1_algorithm_from_metadata( config.decrease_range_size, config.block_activity_threshold.into(), ); - let unrecorded_blocks = metadata.unrecorded_blocks.into_iter().collect(); + let unrecorded_blocks = metadata + .unrecorded_blocks + .into_iter() + .chain(config.unrecorded_blocks.clone()) + .collect(); AlgorithmUpdaterV1 { new_scaled_exec_price: metadata.new_scaled_exec_price, l2_block_height: metadata.l2_block_height, diff --git a/crates/services/gas_price_service/src/v1/service.rs b/crates/services/gas_price_service/src/v1/service.rs new file mode 100644 index 00000000000..5ab56160334 --- /dev/null +++ b/crates/services/gas_price_service/src/v1/service.rs @@ -0,0 +1,516 @@ +use crate::{ + common::{ + gas_price_algorithm::SharedGasPriceAlgo, + l2_block_source::L2BlockSource, + updater_metadata::UpdaterMetadata, + utils::BlockInfo, + }, + ports::MetadataStorage, + v0::metadata::V0Metadata, + v1::{ + algorithm::SharedV1Algorithm, + da_source_service::{ + service::{ + DaBlockCostsSource, + DaSourceService, + SharedState as DaSharedState, + }, + DaBlockCosts, + }, + metadata::{ + v1_algorithm_from_metadata, + V1AlgorithmConfig, + V1Metadata, + }, + }, +}; +use anyhow::anyhow; +use async_trait::async_trait; +use fuel_core_services::{ + RunnableService, + RunnableTask, + StateWatcher, +}; +use fuel_gas_price_algorithm::{ + v0::AlgorithmUpdaterV0, + v1::{ + AlgorithmUpdaterV1, + AlgorithmV1, + }, +}; +use futures::FutureExt; +use std::num::NonZeroU64; +use tokio::sync::broadcast::Receiver; + +/// The service that updates the gas price algorithm. +pub struct GasPriceServiceV1 +where + DA: DaBlockCostsSource, +{ + /// The algorithm that can be used in the next block + shared_algo: SharedV1Algorithm, + /// The L2 block source + l2_block_source: L2, + /// The metadata storage + metadata_storage: Metadata, + /// The algorithm updater + algorithm_updater: AlgorithmUpdaterV1, + /// the da source adapter handle + da_source_adapter_handle: DaSourceService, + /// The da source channel + da_source_channel: Receiver, +} + +impl GasPriceServiceV1 +where + Metadata: MetadataStorage, + DA: DaBlockCostsSource, +{ + pub fn new( + l2_block_source: L2, + metadata_storage: Metadata, + shared_algo: SharedV1Algorithm, + algorithm_updater: AlgorithmUpdaterV1, + da_source_adapter_handle: DaSourceService, + ) -> Self { + let da_source_channel = + da_source_adapter_handle.shared_data().clone().subscribe(); + Self { + shared_algo, + l2_block_source, + metadata_storage, + algorithm_updater, + da_source_adapter_handle, + da_source_channel, + } + } + + pub fn algorithm_updater(&self) -> &AlgorithmUpdaterV1 { + &self.algorithm_updater + } + + pub fn next_block_algorithm(&self) -> SharedV1Algorithm { + self.shared_algo.clone() + } + + async fn update(&mut self, new_algorithm: AlgorithmV1) { + self.shared_algo.update(new_algorithm).await; + } + + fn validate_block_gas_capacity( + &self, + block_gas_capacity: u64, + ) -> anyhow::Result { + NonZeroU64::new(block_gas_capacity) + .ok_or_else(|| anyhow!("Block gas capacity must be non-zero")) + } + + async fn set_metadata(&mut self) -> anyhow::Result<()> { + let metadata: UpdaterMetadata = self.algorithm_updater.clone().into(); + self.metadata_storage + .set_metadata(&metadata) + .map_err(|err| anyhow!(err)) + } + + async fn handle_normal_block( + &mut self, + height: u32, + gas_used: u64, + block_gas_capacity: u64, + block_bytes: u64, + block_fees: u64, + ) -> anyhow::Result<()> { + let capacity = self.validate_block_gas_capacity(block_gas_capacity)?; + + self.algorithm_updater.update_l2_block_data( + height, + gas_used, + capacity, + block_bytes, + block_fees as u128, + )?; + + self.set_metadata().await?; + Ok(()) + } + + async fn handle_da_block_costs( + &mut self, + da_block_costs: DaBlockCosts, + ) -> anyhow::Result<()> { + self.algorithm_updater.update_da_record_data( + da_block_costs.l2_block_range, + da_block_costs.blob_cost_wei, + )?; + + self.set_metadata().await?; + Ok(()) + } + + async fn apply_block_info_to_gas_algorithm( + &mut self, + l2_block: BlockInfo, + ) -> anyhow::Result<()> { + match l2_block { + BlockInfo::GenesisBlock => { + self.set_metadata().await?; + } + BlockInfo::Block { + height, + gas_used, + block_gas_capacity, + block_bytes, + block_fees, + } => { + self.handle_normal_block( + height, + gas_used, + block_gas_capacity, + block_bytes, + block_fees, + ) + .await?; + } + } + + self.update(self.algorithm_updater.algorithm()).await; + Ok(()) + } + + async fn apply_da_block_costs_to_gas_algorithm( + &mut self, + da_block_costs: DaBlockCosts, + ) -> anyhow::Result<()> { + self.handle_da_block_costs(da_block_costs).await?; + self.update(self.algorithm_updater.algorithm()).await; + Ok(()) + } +} + +#[async_trait] +impl RunnableTask for GasPriceServiceV1 +where + L2: L2BlockSource, + Metadata: MetadataStorage, + DA: DaBlockCostsSource, +{ + async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { + let should_continue; + tokio::select! { + biased; + _ = watcher.while_started() => { + tracing::debug!("Stopping gas price service"); + should_continue = false; + } + l2_block_res = self.l2_block_source.get_l2_block() => { + tracing::info!("Received L2 block result: {:?}", l2_block_res); + let block = l2_block_res?; + + tracing::debug!("Updating gas price algorithm"); + self.apply_block_info_to_gas_algorithm(block).await?; + should_continue = true; + } + da_block_costs = self.da_source_channel.recv() => { + tracing::info!("Received DA block costs: {:?}", da_block_costs); + let da_block_costs = da_block_costs?; + + tracing::debug!("Updating DA block costs"); + self.apply_da_block_costs_to_gas_algorithm(da_block_costs).await?; + should_continue = true; + } + } + Ok(should_continue) + } + + async fn shutdown(mut self) -> anyhow::Result<()> { + // handle all the remaining l2 blocks + while let Some(Ok(block)) = self.l2_block_source.get_l2_block().now_or_never() { + tracing::debug!("Updating gas price algorithm"); + self.apply_block_info_to_gas_algorithm(block).await?; + } + + while let Ok(da_block_costs) = self.da_source_channel.try_recv() { + tracing::debug!("Updating DA block costs"); + self.apply_da_block_costs_to_gas_algorithm(da_block_costs) + .await?; + } + + // run shutdown hooks for internal services + self.da_source_adapter_handle.shutdown().await?; + + Ok(()) + } +} + +fn convert_to_v1_metadata( + updater_metadata: UpdaterMetadata, + config: &V1AlgorithmConfig, +) -> crate::common::utils::Result { + if let Ok(v1_metadata) = V1Metadata::try_from(updater_metadata.clone()) { + Ok(v1_metadata) + } else { + let v0_metadata = V0Metadata::try_from(updater_metadata).map_err(|_| { + crate::common::utils::Error::CouldNotInitUpdater(anyhow::anyhow!( + "Could not convert metadata to V0Metadata" + )) + })?; + V1Metadata::construct_from_v0_metadata(v0_metadata, config).map_err(|err| { + crate::common::utils::Error::CouldNotInitUpdater(anyhow::anyhow!(err)) + }) + } +} + +pub fn initialize_algorithm( + config: &V1AlgorithmConfig, + latest_block_height: u32, + metadata_storage: &Metadata, +) -> crate::common::utils::Result<(AlgorithmUpdaterV1, SharedV1Algorithm)> +where + Metadata: MetadataStorage, +{ + let algorithm_updater; + if let Some(updater_metadata) = metadata_storage + .get_metadata(&latest_block_height.into()) + .map_err(|err| { + crate::common::utils::Error::CouldNotInitUpdater(anyhow::anyhow!(err)) + })? + { + let v1_metadata = convert_to_v1_metadata(updater_metadata, config)?; + algorithm_updater = v1_algorithm_from_metadata(v1_metadata, config); + } else { + algorithm_updater = AlgorithmUpdaterV1::from(config); + } + + let shared_algo = + SharedGasPriceAlgo::new_with_algorithm(algorithm_updater.algorithm()); + + Ok((algorithm_updater, shared_algo)) +} + +#[allow(clippy::arithmetic_side_effects)] +#[allow(non_snake_case)] +#[cfg(test)] +mod tests { + use crate::{ + common::{ + l2_block_source::L2BlockSource, + updater_metadata::UpdaterMetadata, + utils::{ + BlockInfo, + Result as GasPriceResult, + }, + }, + ports::MetadataStorage, + v1::{ + da_source_service::{ + dummy_costs::DummyDaBlockCosts, + service::DaSourceService, + DaBlockCosts, + }, + metadata::V1AlgorithmConfig, + service::{ + initialize_algorithm, + GasPriceServiceV1, + }, + }, + }; + use fuel_core_services::{ + RunnableTask, + StateWatcher, + }; + use fuel_core_types::fuel_types::BlockHeight; + use std::{ + num::NonZeroU64, + sync::Arc, + time::Duration, + }; + use tokio::sync::mpsc; + + struct FakeL2BlockSource { + l2_block: mpsc::Receiver, + } + + #[async_trait::async_trait] + impl L2BlockSource for FakeL2BlockSource { + async fn get_l2_block(&mut self) -> GasPriceResult { + let block = self.l2_block.recv().await.unwrap(); + Ok(block) + } + } + + struct FakeMetadata { + inner: Arc>>, + } + + impl FakeMetadata { + fn empty() -> Self { + Self { + inner: Arc::new(std::sync::Mutex::new(None)), + } + } + } + + impl MetadataStorage for FakeMetadata { + fn get_metadata( + &self, + _: &BlockHeight, + ) -> GasPriceResult> { + let metadata = self.inner.lock().unwrap().clone(); + Ok(metadata) + } + + fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> GasPriceResult<()> { + *self.inner.lock().unwrap() = Some(metadata.clone()); + Ok(()) + } + } + + #[tokio::test] + async fn run__updates_gas_price_with_l2_block_source() { + // given + let block_height = 1; + let l2_block = BlockInfo::Block { + height: block_height, + gas_used: 60, + block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, + }; + + let (l2_block_sender, l2_block_receiver) = mpsc::channel(1); + let l2_block_source = FakeL2BlockSource { + l2_block: l2_block_receiver, + }; + + let metadata_storage = FakeMetadata::empty(); + let l2_block_height = 0; + let config = V1AlgorithmConfig { + new_exec_gas_price: 100, + min_exec_gas_price: 50, + exec_gas_price_change_percent: 20, + l2_block_fullness_threshold_percent: 20, + gas_price_factor: NonZeroU64::new(10).unwrap(), + min_da_gas_price: 10, + max_da_gas_price_change_percent: 20, + da_p_component: 4, + da_d_component: 2, + normal_range_size: 10, + capped_range_size: 100, + decrease_range_size: 4, + block_activity_threshold: 20, + unrecorded_blocks: vec![], + }; + let (algo_updater, shared_algo) = + initialize_algorithm(&config, l2_block_height, &metadata_storage).unwrap(); + + let notifier = Arc::new(tokio::sync::Notify::new()); + let dummy_da_source = DaSourceService::new( + DummyDaBlockCosts::new( + Err(anyhow::anyhow!("unused at the moment")), + notifier.clone(), + ), + None, + ); + + let mut service = GasPriceServiceV1::new( + l2_block_source, + metadata_storage, + shared_algo, + algo_updater, + dummy_da_source, + ); + let read_algo = service.next_block_algorithm(); + let mut watcher = StateWatcher::default(); + let initial_price = read_algo.next_gas_price(); + + // when + service.run(&mut watcher).await.unwrap(); + l2_block_sender.send(l2_block).await.unwrap(); + service.shutdown().await.unwrap(); + + // then + let actual_price = read_algo.next_gas_price(); + assert_ne!(initial_price, actual_price); + } + + #[tokio::test] + async fn run__updates_gas_price_with_da_block_cost_source() { + // given + let block_height = 1; + let l2_block = BlockInfo::Block { + height: block_height, + gas_used: 60, + block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, + }; + + let (l2_block_sender, l2_block_receiver) = mpsc::channel(1); + let l2_block_source = FakeL2BlockSource { + l2_block: l2_block_receiver, + }; + + let metadata_storage = FakeMetadata::empty(); + let config = V1AlgorithmConfig { + new_exec_gas_price: 100, + min_exec_gas_price: 50, + exec_gas_price_change_percent: 20, + l2_block_fullness_threshold_percent: 20, + gas_price_factor: NonZeroU64::new(10).unwrap(), + min_da_gas_price: 100, + max_da_gas_price_change_percent: 50, + da_p_component: 4, + da_d_component: 2, + normal_range_size: 10, + capped_range_size: 100, + decrease_range_size: 4, + block_activity_threshold: 20, + unrecorded_blocks: vec![(1, 100)], + }; + let (algo_updater, shared_algo) = + initialize_algorithm(&config, block_height, &metadata_storage).unwrap(); + + let notifier = Arc::new(tokio::sync::Notify::new()); + let da_source = DaSourceService::new( + DummyDaBlockCosts::new( + Ok(DaBlockCosts { + l2_block_range: 1..2, + blob_cost_wei: 9000, + blob_size_bytes: 3000, + }), + notifier.clone(), + ), + Some(Duration::from_millis(1)), + ); + let mut watcher = StateWatcher::default(); + + let mut service = GasPriceServiceV1::new( + l2_block_source, + metadata_storage, + shared_algo, + algo_updater, + da_source, + ); + let read_algo = service.next_block_algorithm(); + let initial_price = read_algo.next_gas_price(); + + // the RunnableTask depends on the handle passed to it for the da block cost source to already be running, + // which is the responsibility of the UninitializedTask in the `into_task` method of the RunnableService + // here we mimic that behaviour by running the da block cost service. + let mut da_source_watcher = StateWatcher::started(); + service + .da_source_adapter_handle + .run(&mut da_source_watcher) + .await + .unwrap(); + + // when + service.run(&mut watcher).await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + service.shutdown().await.unwrap(); + + // then + let actual_price = read_algo.next_gas_price(); + assert_ne!(initial_price, actual_price); + } +} diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index db3fe814e4f..e2b7a97f9b1 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -180,16 +180,17 @@ impl NetworkCodec for PostcardCodec { fn get_req_res_protocols( &self, ) -> impl Iterator::Protocol> { - // TODO: Iterating over versions in reverse order should prefer + // TODO: https://github.com/FuelLabs/fuel-core/issues/2374 + // Iterating over versions in reverse order should prefer // peers to use V2 over V1 for exchanging messages. However, this is - // not guaranteed by the specs for the `request_response` protocol. + // not guaranteed by the specs for the `request_response` protocol, + // and it should be tested. PostcardProtocol::iter().rev() } } -#[derive(Debug, Default, Clone, EnumIter)] +#[derive(Debug, Clone, EnumIter)] pub enum PostcardProtocol { - #[default] V1, V2, } @@ -206,7 +207,6 @@ impl AsRef for PostcardProtocol { #[cfg(test)] #[allow(non_snake_case)] mod tests { - use fuel_core_types::blockchain::SealedBlockHeader; use request_response::Codec as _; @@ -310,10 +310,8 @@ mod tests { async fn codec__serialzation_roundtrip_using_v1_on_error_response_returns_predefined_error_code( ) { // Given - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Change this to a different ResponseMessageErrorCode once these have been implemented. let response = V2ResponseMessage::SealedHeaders(Err( - ResponseMessageErrorCode::ProtocolV1EmptyResponse, + ResponseMessageErrorCode::RequestedRangeTooLarge, )); let mut codec = PostcardCodec::new(1024); let mut buf = Vec::with_capacity(1024); diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 47d189ae85f..94646d50815 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -675,9 +675,7 @@ impl FuelP2PService { let send_ok = match channel { ResponseSender::SealedHeaders(c) => match response { V2ResponseMessage::SealedHeaders(v) => { - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Change type of ResponseSender and remove the .ok() here - c.send(Ok((peer, Ok(v.ok())))).is_ok() + c.send(Ok((peer, Ok(v)))).is_ok() } _ => { warn!( @@ -690,7 +688,7 @@ impl FuelP2PService { }, ResponseSender::Transactions(c) => match response { V2ResponseMessage::Transactions(v) => { - c.send(Ok((peer, Ok(v.ok())))).is_ok() + c.send(Ok((peer, Ok(v)))).is_ok() } _ => { warn!( @@ -703,7 +701,7 @@ impl FuelP2PService { }, ResponseSender::TransactionsFromPeer(c) => match response { V2ResponseMessage::Transactions(v) => { - c.send((peer, Ok(v.ok()))).is_ok() + c.send((peer, Ok(v))).is_ok() } _ => { warn!( @@ -715,7 +713,7 @@ impl FuelP2PService { }, ResponseSender::TxPoolAllTransactionsIds(c) => match response { V2ResponseMessage::TxPoolAllTransactionsIds(v) => { - c.send((peer, Ok(v.ok()))).is_ok() + c.send((peer, Ok(v))).is_ok() } _ => { warn!( @@ -727,7 +725,7 @@ impl FuelP2PService { }, ResponseSender::TxPoolFullTransactions(c) => match response { V2ResponseMessage::TxPoolFullTransactions(v) => { - c.send((peer, Ok(v.ok()))).is_ok() + c.send((peer, Ok(v))).is_ok() } _ => { warn!( @@ -1719,12 +1717,12 @@ mod tests { if let Ok(response) = response_message { match response { - Ok((_, Ok(Some(sealed_headers)))) => { + Ok((_, Ok(Ok(sealed_headers)))) => { let check = expected.iter().zip(sealed_headers.iter()).all(|(a, b)| eq_except_metadata(a, b)); let _ = tx_test_end.send(check).await; }, - Ok((_, Ok(None))) => { - tracing::error!("Node A did not return any headers"); + Ok((_, Ok(Err(e)))) => { + tracing::error!("Node A did not return any headers: {:?}", e); let _ = tx_test_end.send(false).await; }, Ok((_, Err(e))) => { @@ -1752,12 +1750,12 @@ mod tests { if let Ok(response) = response_message { match response { - Ok((_, Ok(Some(transactions)))) => { + Ok((_, Ok(Ok(transactions)))) => { let check = transactions.len() == 1 && transactions[0].0.len() == 5; let _ = tx_test_end.send(check).await; }, - Ok((_, Ok(None))) => { - tracing::error!("Node A did not return any transactions"); + Ok((_, Ok(Err(e)))) => { + tracing::error!("Node A did not return any transactions: {:?}", e); let _ = tx_test_end.send(false).await; }, Ok((_, Err(e))) => { @@ -1782,7 +1780,7 @@ mod tests { tokio::spawn(async move { let response_message = rx_orchestrator.await; - if let Ok((_, Ok(Some(transaction_ids)))) = response_message { + if let Ok((_, Ok(Ok(transaction_ids)))) = response_message { let tx_ids: Vec = (0..5).map(|_| Transaction::default_test_tx().id(&ChainId::new(1))).collect(); let check = transaction_ids.len() == 5 && transaction_ids.iter().zip(tx_ids.iter()).all(|(a, b)| a == b); let _ = tx_test_end.send(check).await; @@ -1799,7 +1797,7 @@ mod tests { tokio::spawn(async move { let response_message = rx_orchestrator.await; - if let Ok((_, Ok(Some(transactions)))) = response_message { + if let Ok((_, Ok(Ok(transactions)))) = response_message { let txs: Vec> = tx_ids.iter().enumerate().map(|(i, _)| { if i == 0 { None diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index f1c3b176f4b..5a453b784cf 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -40,6 +40,15 @@ pub enum ResponseMessageErrorCode { /// The peer sent an empty response using protocol `/fuel/req_res/0.0.1` #[error("Empty response sent by peer using legacy protocol /fuel/req_res/0.0.1")] ProtocolV1EmptyResponse = 0, + #[error("The requested range is too large")] + RequestedRangeTooLarge = 1, + #[error("Timeout while processing request")] + Timeout = 2, + #[error("Sync processor is out of capacity")] + SyncProcessorOutOfCapacity = 3, + #[error("The peer sent an unknown error code")] + #[serde(skip_serializing, other)] + Unknown, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -113,11 +122,22 @@ pub type OnResponseWithPeerSelection = #[derive(Debug)] pub enum ResponseSender { - SealedHeaders(OnResponseWithPeerSelection>>), - Transactions(OnResponseWithPeerSelection>>), - TransactionsFromPeer(OnResponse>>), - TxPoolAllTransactionsIds(OnResponse>>), - TxPoolFullTransactions(OnResponse>>>), + SealedHeaders( + OnResponseWithPeerSelection< + Result, ResponseMessageErrorCode>, + >, + ), + Transactions( + OnResponseWithPeerSelection, ResponseMessageErrorCode>>, + ), + TransactionsFromPeer(OnResponse, ResponseMessageErrorCode>>), + + TxPoolAllTransactionsIds(OnResponse, ResponseMessageErrorCode>>), + TxPoolFullTransactions( + OnResponse< + Result>, ResponseMessageErrorCode>, + >, + ), } #[derive(Debug, Error)] @@ -146,3 +166,42 @@ pub enum ResponseSendError { #[error("Failed to convert response to intermediate format")] ConversionToIntermediateFailed, } + +#[cfg(test)] +#[allow(non_snake_case)] +mod tests { + use super::ResponseMessageErrorCode; + + #[test] + fn response_message_error_code__unknown_error_cannot_be_serialized() { + let error = super::ResponseMessageErrorCode::Unknown; + let serialized = postcard::to_allocvec(&error); + assert!(serialized.is_err()); + } + + #[test] + fn response_message_error_code__known_error_code_is_deserialized_to_variant() { + let serialized_error_code = + postcard::to_stdvec(&ResponseMessageErrorCode::ProtocolV1EmptyResponse) + .unwrap(); + println!("Error code: {:?}", serialized_error_code); + let response_message_error_code: ResponseMessageErrorCode = + postcard::from_bytes(&serialized_error_code).unwrap(); + assert!(matches!( + response_message_error_code, + ResponseMessageErrorCode::ProtocolV1EmptyResponse + )); + } + + #[test] + fn response_message_error_code__unknown_error_code_is_deserialized_to_unknown_variant( + ) { + let serialized_error_code = vec![42]; + let response_message_error_code: ResponseMessageErrorCode = + postcard::from_bytes(&serialized_error_code).unwrap(); + assert!(matches!( + response_message_error_code, + ResponseMessageErrorCode::Unknown + )); + } +} diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 30c5dd9310a..0004a15ea63 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -121,25 +121,31 @@ pub enum TaskRequest { }, GetSealedHeaders { block_height_range: Range, - channel: OnResponseWithPeerSelection>>, + channel: OnResponseWithPeerSelection< + Result, ResponseMessageErrorCode>, + >, }, GetTransactions { block_height_range: Range, - channel: OnResponseWithPeerSelection>>, + channel: OnResponseWithPeerSelection< + Result, ResponseMessageErrorCode>, + >, }, GetTransactionsFromPeer { block_height_range: Range, from_peer: PeerId, - channel: OnResponse>>, + channel: OnResponse, ResponseMessageErrorCode>>, }, TxPoolGetAllTxIds { from_peer: PeerId, - channel: OnResponse>>, + channel: OnResponse, ResponseMessageErrorCode>>, }, TxPoolGetFullTransactions { tx_ids: Vec, from_peer: PeerId, - channel: OnResponse>>>, + channel: OnResponse< + Result>, ResponseMessageErrorCode>, + >, }, // Responds back to the p2p network RespondWithGossipsubMessageReport((GossipsubMessageInfo, GossipsubMessageAcceptance)), @@ -567,11 +573,9 @@ where tracing::error!( requested_length = range.len(), max_len, - "Requested range is too big" + "Requested range is too large" ); - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Return helpful error message to requester. - let response = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); + let response = Err(ResponseMessageErrorCode::RequestedRangeTooLarge); let _ = self .p2p_service .send_response_msg(request_id, response_sender(response)); @@ -585,22 +589,18 @@ where return; } - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Add new error code let response = db_lookup(&view, range.clone()) .ok() .flatten() - .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse); + .ok_or(ResponseMessageErrorCode::Timeout); let _ = response_channel .try_send(task_request(response, request_id)) .trace_err("Failed to send response to the request channel"); }); - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Handle error cases and return meaningful status codes if result.is_err() { - let err = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); + let err = Err(ResponseMessageErrorCode::SyncProcessorOutOfCapacity); let _ = self .p2p_service .send_response_msg(request_id, response_sender(err)); @@ -674,17 +674,13 @@ where return; }; - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Return helpful error message to requester. let _ = response_channel .try_send(task_request(Ok(response), request_id)) .trace_err("Failed to send response to the request channel"); }); if result.is_err() { - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Return better error code - let res = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); + let res = Err(ResponseMessageErrorCode::SyncProcessorOutOfCapacity); let _ = self .p2p_service .send_response_msg(request_id, response_sender(res)); @@ -715,13 +711,11 @@ where tx_ids: Vec, request_id: InboundRequestId, ) -> anyhow::Result<()> { - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Return helpful error message to requester. if tx_ids.len() > self.max_txs_per_request { self.p2p_service.send_response_msg( request_id, V2ResponseMessage::TxPoolFullTransactions(Err( - ResponseMessageErrorCode::ProtocolV1EmptyResponse, + ResponseMessageErrorCode::RequestedRangeTooLarge, )), )?; return Ok(()); @@ -1069,7 +1063,13 @@ impl SharedState { .map_err(|e| anyhow!("{e}"))?; let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; - Ok((peer_id.to_bytes(), data)) + if let Err(ref response_error_code) = data { + warn!( + "Peer {peer_id:?} failed to respond with sealed headers: {response_error_code:?}" + ); + }; + + Ok((peer_id.to_bytes(), data.ok())) } pub async fn get_transactions( @@ -1096,8 +1096,17 @@ impl SharedState { .map_err(|e| anyhow!("{e}"))? .map_err(|e| anyhow!("{e}"))?; - let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; - Ok((peer_id.to_bytes(), data)) + let data = match response { + Err(request_response_protocol_error) => Err(anyhow!( + "Invalid response from peer {request_response_protocol_error:?}" + )), + Ok(Err(response_error_code)) => { + warn!("Peer {peer_id:?} failed to respond with sealed headers: {response_error_code:?}"); + Ok(None) + } + Ok(Ok(headers)) => Ok(Some(headers)), + }; + data.map(|data| (peer_id.to_bytes(), data)) } pub async fn get_transactions_from_peer( @@ -1123,7 +1132,18 @@ impl SharedState { "Bug: response from non-requested peer" ); - response.map_err(|e| anyhow!("Invalid response from peer {e:?}")) + match response { + Err(request_response_protocol_error) => Err(anyhow!( + "Invalid response from peer {request_response_protocol_error:?}" + )), + Ok(Err(response_error_code)) => { + warn!( + "Peer {peer_id:?} failed to respond with transactions: {response_error_code:?}" + ); + Ok(None) + } + Ok(Ok(txs)) => Ok(Some(txs)), + } } pub async fn get_all_transactions_ids_from_peer( @@ -1147,11 +1167,11 @@ impl SharedState { "Bug: response from non-requested peer" ); - let Some(txs) = - response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))? - else { - return Ok(vec![]); - }; + let response = + response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; + + let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get all transactions ids: {e:?}"); } ).unwrap_or_default(); + if txs.len() > self.max_txs_per_request { return Err(anyhow!("Too many transactions requested: {}", txs.len())); } @@ -1180,11 +1200,10 @@ impl SharedState { "Bug: response from non-requested peer" ); - let Some(txs) = - response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))? - else { - return Ok(vec![]); - }; + let response = + response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; + let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get full transactions: {e:?}"); } ).unwrap_or_default(); + if txs.len() > self.max_txs_per_request { return Err(anyhow!("Too many transactions requested: {}", txs.len())); } diff --git a/crates/services/txpool_v2/src/collision_manager/basic.rs b/crates/services/txpool_v2/src/collision_manager/basic.rs index 0143c6687f2..bf971d34756 100644 --- a/crates/services/txpool_v2/src/collision_manager/basic.rs +++ b/crates/services/txpool_v2/src/collision_manager/basic.rs @@ -47,6 +47,9 @@ use super::{ Collisions, }; +#[cfg(test)] +use fuel_core_types::services::txpool::ArcPoolTx; + pub struct BasicCollisionManager { /// Message -> Transaction that currently use the Message messages_spenders: HashMap, @@ -75,6 +78,81 @@ impl BasicCollisionManager { && self.contracts_creators.is_empty() && self.blobs_users.is_empty() } + + #[cfg(test)] + /// Expected transactions are the transactions that must be populate all elements present in the collision manager. + /// This function will check if all elements present in the collision manager are present in the expected transactions and vice versa. + pub(crate) fn assert_integrity(&self, expected_txs: &[ArcPoolTx]) { + use std::ops::Deref; + + let mut message_spenders = HashMap::new(); + let mut coins_spenders = BTreeMap::new(); + let mut contracts_creators = HashMap::new(); + let mut blobs_users = HashMap::new(); + for tx in expected_txs { + if let PoolTransaction::Blob(checked_tx, _) = tx.deref() { + let blob_id = checked_tx.transaction().blob_id(); + blobs_users.insert(*blob_id, tx.id()); + } + for input in tx.inputs() { + match input { + Input::CoinSigned(CoinSigned { utxo_id, .. }) + | Input::CoinPredicate(CoinPredicate { utxo_id, .. }) => { + coins_spenders.insert(*utxo_id, tx.id()); + } + Input::MessageCoinSigned(MessageCoinSigned { nonce, .. }) + | Input::MessageCoinPredicate(MessageCoinPredicate { + nonce, .. + }) + | Input::MessageDataSigned(MessageDataSigned { nonce, .. }) + | Input::MessageDataPredicate(MessageDataPredicate { + nonce, .. + }) => { + message_spenders.insert(*nonce, tx.id()); + } + Input::Contract { .. } => {} + } + } + for output in tx.outputs() { + if let Output::ContractCreated { contract_id, .. } = output { + contracts_creators.insert(*contract_id, tx.id()); + } + } + } + for nonce in self.messages_spenders.keys() { + message_spenders.remove(nonce).unwrap_or_else(|| panic!( + "A message ({}) spender is present on the collision manager that shouldn't be there.", + nonce + )); + } + assert!( + message_spenders.is_empty(), + "Some message spenders are missing from the collision manager: {:?}", + message_spenders + ); + for utxo_id in self.coins_spenders.keys() { + coins_spenders.remove(utxo_id).unwrap_or_else(|| panic!( + "A coin ({}) spender is present on the collision manager that shouldn't be there.", + utxo_id + )); + } + assert!( + coins_spenders.is_empty(), + "Some coin senders are missing from the collision manager: {:?}", + coins_spenders + ); + for contract_id in self.contracts_creators.keys() { + contracts_creators.remove(contract_id).unwrap_or_else(|| panic!( + "A contract ({}) creator is present on the collision manager that shouldn't be there.", + contract_id + )); + } + assert!( + contracts_creators.is_empty(), + "Some contract creators are missing from the collision manager: {:?}", + contracts_creators + ); + } } impl Default for BasicCollisionManager { diff --git a/crates/services/txpool_v2/src/lib.rs b/crates/services/txpool_v2/src/lib.rs index d2530e00038..c462d43b760 100644 --- a/crates/services/txpool_v2/src/lib.rs +++ b/crates/services/txpool_v2/src/lib.rs @@ -1,10 +1,46 @@ +//! This crate manage the verification, storage, organisation and selection of the transactions for the network. +//! A transaction in Fuel has inputs and outputs. Inputs are outputs of previous transactions. +//! In a case where one of the input is an output of a transaction that has not been executed in a committed block (transaction still in the pool), +//! then the new transaction is considered dependent on that transaction. +//! +//! If a transaction has a dependency, it cannot be selected in a block until the dependent transaction has been selected. +//! A transaction can have a dependency per input and this dependency transaction can also have its own dependencies. +//! This creates a dependency tree between transactions inside the pool which can be very costly to compute for insertion/deletion etc... +//! In order to avoid too much cost, the transaction pool only allow a maximum number of transaction inside a dependency chain. +//! There is others limits on the pool that prevent its size to grow too much: maximum gas in the pool, maximum bytes in the pool, maximum number of transactions in the pool. +//! The pool also implements a TTL for the transactions, if a transaction is not selected in a block after a certain time, it is removed from the pool. +//! +//! All the transactions ordered by their ratio of gas/tip to be selected in a block. +//! It's possible that a transaction is not profitable enough to be selected for now and so either it will be selected later or it will be removed from the pool. +//! In order to make a transaction more likely to be selected, it's needed to submit a new colliding transaction (see below) with a higher tip/gas ratio. +//! +//! When a transaction is inserted it's possible that it use same inputs as one or multiple transactions already in the pool: this is what we call a collision. +//! The pool has to choose which transaction to keep and which to remove. +//! The pool will always try to maximize the number of transactions that can be selected in the next block and so +//! during collision resolution it will prioritize transactions without dependencies. +//! In a collision case, the transaction is considered a conflict and can be inserted under certain conditions : +//! - The transaction has dependencies: +//! - Can collide only with one other transaction. So, the user can submit +//! the same transaction with a higher tip but not merge one or more +//! transactions into one. +//! - A new transaction can be accepted if its profitability is higher than +//! the cumulative profitability of the colliding transactions, and all +//! the transactions that depend on it. +//! - A transaction doesn't have dependencies: +//! - A new transaction can be accepted if its profitability is higher +//! than the collided subtrees'. +//! +//! The pool provides a way to subscribe for updates on a transaction status. +//! It usually stream one or two messages: +//! - If the insertion of the transaction fails, you can expect only one message with the error. +//! - If the transaction is inserted, you can expect two messages: one with the validation of the insertion and one when the transaction is selected in a block. + +// TODO: Rename the folder from `txpool_v2` to `txpool` after the migration is complete. #![deny(clippy::arithmetic_side_effects)] #![deny(clippy::cast_possible_truncation)] #![deny(unused_crate_dependencies)] #![deny(warnings)] -// TODO: Rename the folder from `txpool_v2` to `txpool` after the migration is complete. - mod collision_manager; pub mod config; pub mod error; diff --git a/crates/services/txpool_v2/src/pool.rs b/crates/services/txpool_v2/src/pool.rs index be417dce5f4..07c7774f95a 100644 --- a/crates/services/txpool_v2/src/pool.rs +++ b/crates/services/txpool_v2/src/pool.rs @@ -46,6 +46,9 @@ use crate::{ }, }; +#[cfg(test)] +use std::collections::HashSet; + /// The pool is the main component of the txpool service. It is responsible for storing transactions /// and allowing the selection of transactions for inclusion in a block. pub struct Pool { @@ -561,6 +564,22 @@ where } self.register_transaction_counts(); } + + #[cfg(test)] + pub fn assert_integrity(&self, mut expected_txs: HashSet) { + for tx in &self.tx_id_to_storage_id { + if !expected_txs.remove(tx.0) { + panic!( + "Transaction with id {:?} is not in the expected transactions", + tx.0 + ); + } + } + assert!( + expected_txs.is_empty(), + "Some transactions are not found in the pool" + ); + } } pub struct NotEnoughSpace { diff --git a/crates/services/txpool_v2/src/selection_algorithms/ratio_tip_gas.rs b/crates/services/txpool_v2/src/selection_algorithms/ratio_tip_gas.rs index 7236940ef14..61fc4141736 100644 --- a/crates/services/txpool_v2/src/selection_algorithms/ratio_tip_gas.rs +++ b/crates/services/txpool_v2/src/selection_algorithms/ratio_tip_gas.rs @@ -21,6 +21,12 @@ use super::{ SelectionAlgorithm, }; +#[cfg(test)] +use fuel_core_types::services::txpool::ArcPoolTx; + +#[cfg(test)] +use std::collections::HashMap; + pub trait RatioTipGasSelectionAlgorithmStorage { type StorageIndex: Debug; @@ -116,6 +122,27 @@ where self.executable_transactions_sorted_tip_gas_ratio .remove(&Reverse(key)); } + + #[cfg(test)] + pub(crate) fn assert_integrity(&self, expected_txs: &[ArcPoolTx]) { + let mut expected_txs: HashMap = expected_txs + .iter() + .map(|tx| (tx.id(), tx.clone())) + .collect(); + for key in self.executable_transactions_sorted_tip_gas_ratio.keys() { + expected_txs.remove(&key.0.tx_id).unwrap_or_else(|| { + panic!( + "Transaction with id {:?} is not in the expected transactions.", + key.0.tx_id + ) + }); + } + assert!( + expected_txs.is_empty(), + "Some transactions are missing from the selection algorithm: {:?}", + expected_txs.keys().collect::>() + ); + } } impl SelectionAlgorithm for RatioTipGasSelection diff --git a/crates/services/txpool_v2/src/storage/graph.rs b/crates/services/txpool_v2/src/storage/graph.rs index b3b40575331..8f690eb17fd 100644 --- a/crates/services/txpool_v2/src/storage/graph.rs +++ b/crates/services/txpool_v2/src/storage/graph.rs @@ -368,6 +368,94 @@ impl GraphStorage { fn has_dependent(&self, index: NodeIndex) -> bool { self.get_direct_dependents(index).next().is_some() } + + #[cfg(test)] + pub(crate) fn assert_integrity( + &self, + expected_txs: &[ArcPoolTx], + ) -> Vec<(NodeIndex, bool)> { + use std::ops::Deref; + + let mut txs_map: HashMap = expected_txs + .iter() + .map(|tx| (tx.id(), tx.clone())) + .collect(); + let mut tx_id_node_id = HashMap::new(); + let mut txs_info = Vec::new(); + + for node_id in self.graph.node_indices() { + let node = self + .graph + .node_weight(node_id) + .expect("A node not expected exists in storage"); + let has_dependencies = Storage::has_dependencies(self, &node_id); + let tx_id = node.transaction.id(); + let tx = txs_map + .remove(&tx_id) + .expect("A transaction not expected exists in storage"); + assert_eq!(tx.deref(), node.transaction.deref()); + tx_id_node_id.insert(tx_id, node_id); + txs_info.push((node_id, has_dependencies)); + } + assert!( + txs_map.is_empty(), + "Some transactions are missing in storage {:?}", + txs_map.keys() + ); + + let mut coins_creators = HashMap::new(); + let mut contracts_creators = HashMap::new(); + for expected_tx in expected_txs { + for (i, output) in expected_tx.outputs().iter().enumerate() { + match output { + Output::Coin { .. } => { + let utxo_id = + UtxoId::new(expected_tx.id(), i.try_into().unwrap()); + coins_creators.insert(utxo_id, expected_tx.id()); + } + Output::ContractCreated { contract_id, .. } => { + contracts_creators.insert(*contract_id, expected_tx.id()); + } + Output::Contract(_) + | Output::Change { .. } + | Output::Variable { .. } => {} + } + } + } + for (utxo_id, node_id) in &self.coins_creators { + let tx_id = coins_creators.remove(utxo_id).unwrap_or_else(|| panic!("A coin creator (coin: {}) is present in the storage that shouldn't be there", utxo_id)); + let expected_node_id = tx_id_node_id.get(&tx_id).unwrap_or_else(|| { + panic!("A node id is missing for a transaction (tx_id: {})", tx_id) + }); + assert_eq!( + expected_node_id, node_id, + "The node id is different from the expected one" + ); + } + assert!( + coins_creators.is_empty(), + "Some contract creators are missing in storage: {:?}", + coins_creators + ); + + for (contract_id, node_id) in &self.contracts_creators { + let tx_id = contracts_creators.remove(contract_id).unwrap_or_else(|| panic!("A contract creator (contract: {}) is present in the storage that shouldn't be there", contract_id)); + let expected_node_id = tx_id_node_id.get(&tx_id).unwrap_or_else(|| { + panic!("A node id is missing for a transaction (tx_id: {})", tx_id) + }); + assert_eq!( + expected_node_id, node_id, + "The node id is different from the expected one" + ); + } + assert!( + contracts_creators.is_empty(), + "Some contract creators are missing in storage: {:?}", + contracts_creators + ); + + txs_info + } } impl Storage for GraphStorage { diff --git a/crates/services/txpool_v2/src/tests/mod.rs b/crates/services/txpool_v2/src/tests/mod.rs index 982a9034dbb..ed8c3f0b3c2 100644 --- a/crates/services/txpool_v2/src/tests/mod.rs +++ b/crates/services/txpool_v2/src/tests/mod.rs @@ -1,3 +1,5 @@ +#![allow(non_snake_case)] + mod mocks; mod stability_test; mod tests_e2e; diff --git a/crates/services/txpool_v2/src/tests/stability_test.rs b/crates/services/txpool_v2/src/tests/stability_test.rs index ea7dccdcec3..e7d2dbdf0d2 100644 --- a/crates/services/txpool_v2/src/tests/stability_test.rs +++ b/crates/services/txpool_v2/src/tests/stability_test.rs @@ -3,7 +3,6 @@ //! correct(not in the unexpected state). //! It relies on the `debug_assert` which are present in the code. -#![allow(non_snake_case)] #![allow(clippy::cast_possible_truncation)] #![allow(clippy::arithmetic_side_effects)] diff --git a/crates/services/txpool_v2/src/tests/tests_pool.rs b/crates/services/txpool_v2/src/tests/tests_pool.rs index 358143cf09a..d601150f4db 100644 --- a/crates/services/txpool_v2/src/tests/tests_pool.rs +++ b/crates/services/txpool_v2/src/tests/tests_pool.rs @@ -1,5 +1,3 @@ -#![allow(non_snake_case)] - use crate::{ config::{ Config, @@ -74,10 +72,12 @@ fn insert_one_tx_succeeds() { let tx = universe.build_script_transaction(None, None, 0); // When - let result = universe.verify_and_insert(tx); + let result = universe.verify_and_insert(tx.clone()); // Then assert!(result.is_ok()); + let tx = result.unwrap().0; + universe.assert_pool_integrity(&[tx]); } #[test] @@ -98,6 +98,7 @@ fn insert__tx_with_blacklisted_utxo_id() { assert!( matches!(err, Error::Blacklisted(BlacklistedError::BlacklistedUTXO(id)) if id == utxo_id) ); + universe.assert_pool_integrity(&[]); } #[test] @@ -118,6 +119,7 @@ fn insert__tx_with_blacklisted_owner() { assert!( matches!(err, Error::Blacklisted(BlacklistedError::BlacklistedOwner(id)) if id == owner_addr) ); + universe.assert_pool_integrity(&[]); } #[test] @@ -149,6 +151,7 @@ fn insert__tx_with_blacklisted_contract() { assert!( matches!(err, Error::Blacklisted(BlacklistedError::BlacklistedContract(id)) if id == contract_id) ); + universe.assert_pool_integrity(&[]); } #[test] @@ -169,6 +172,7 @@ fn insert__tx_with_blacklisted_message() { assert!( matches!(err, Error::Blacklisted(BlacklistedError::BlacklistedMessage(id)) if id == nonce) ); + universe.assert_pool_integrity(&[]); } #[test] @@ -190,6 +194,7 @@ fn insert__tx2_succeeds_after_dependent_tx1() { // Then assert!(result1.is_ok()); assert!(result2.is_ok()); + universe.assert_pool_integrity(&[result1.unwrap().0, result2.unwrap().0]); } #[test] @@ -227,7 +232,7 @@ fn insert__tx2_collided_on_contract_id() { .add_input(gas_coin) .add_output(create_contract_output(contract_id)) .finalize_as_transaction(); - universe.verify_and_insert(tx).unwrap(); + let tx = universe.verify_and_insert(tx).unwrap().0; // When let result2 = universe.verify_and_insert(tx_faulty); @@ -237,6 +242,7 @@ fn insert__tx2_collided_on_contract_id() { assert!( matches!(err, Error::Collided(CollisionReason::ContractCreation(id)) if id == contract_id) ); + universe.assert_pool_integrity(&[tx]); } #[test] @@ -263,7 +269,7 @@ fn insert__tx_with_dependency_on_invalid_utxo_type() { universe.random_predicate(AssetId::BASE, TEST_COIN_AMOUNT, Some(utxo_id)); let tx_faulty = universe.build_script_transaction(Some(vec![random_predicate]), None, 0); - universe.verify_and_insert(tx).unwrap(); + let tx = universe.verify_and_insert(tx).unwrap().0; // When let result2 = universe.verify_and_insert(tx_faulty); @@ -274,6 +280,7 @@ fn insert__tx_with_dependency_on_invalid_utxo_type() { assert!( matches!(err, Error::InputValidation(InputValidationError::UtxoNotFound(id)) if id == utxo_id) ); + universe.assert_pool_integrity(&[tx]); } #[test] @@ -286,7 +293,7 @@ fn insert__already_known_tx_returns_error() { // Given let tx = universe.build_script_transaction(None, None, 0); - universe.verify_and_insert(tx.clone()).unwrap(); + let pool_tx = universe.verify_and_insert(tx.clone()).unwrap().0; // When let result2 = universe.verify_and_insert(tx.clone()); @@ -296,6 +303,7 @@ fn insert__already_known_tx_returns_error() { assert!( matches!(err, Error::InputValidation(InputValidationError::DuplicateTxId(id)) if id == tx.id(&ChainId::default())) ); + universe.assert_pool_integrity(&[pool_tx]); } #[test] @@ -316,6 +324,7 @@ fn insert__unknown_utxo_returns_error() { assert!( matches!(err, Error::InputValidation(InputValidationError::UtxoNotFound(id)) if id == utxo_id) ); + universe.assert_pool_integrity(&[]); } #[test] @@ -335,7 +344,8 @@ fn insert__higher_priced_tx_removes_lower_priced_tx() { let result = universe.verify_and_insert(tx2).unwrap(); // Then - assert_eq!(result[0].id(), tx_id); + assert_eq!(result.1[0].id(), tx_id); + universe.assert_pool_integrity(&[result.0]); } #[test] @@ -351,8 +361,8 @@ fn insert__colliding_dependent_and_underpriced_returns_error() { // Given let tx2 = universe.build_script_transaction(Some(vec![input.clone()]), None, 20); let tx3 = universe.build_script_transaction(Some(vec![input]), None, 10); - universe.verify_and_insert(tx1).unwrap(); - universe.verify_and_insert(tx2).unwrap(); + let tx1 = universe.verify_and_insert(tx1).unwrap().0; + let tx2 = universe.verify_and_insert(tx2).unwrap().0; // When let result3 = universe.verify_and_insert(tx3); @@ -360,6 +370,7 @@ fn insert__colliding_dependent_and_underpriced_returns_error() { // Then let err = result3.unwrap_err(); assert!(matches!(err, Error::Collided(CollisionReason::Utxo(id)) if id == utxo_id)); + universe.assert_pool_integrity(&[tx1, tx2]); } #[test] @@ -402,6 +413,7 @@ fn insert_dependent_contract_creation() { // Then assert!(result1.is_ok()); assert!(result2.is_ok()); + universe.assert_pool_integrity(&[result1.unwrap().0, result2.unwrap().0]); } #[test] @@ -432,10 +444,11 @@ fn insert_more_priced_tx3_removes_tx1_and_dependent_tx2() { let result3 = universe.verify_and_insert(tx3); // Then - let removed_txs = result3.unwrap(); + let (pool_tx, removed_txs) = result3.unwrap(); assert_eq!(removed_txs.len(), 2); assert_eq!(removed_txs[0].id(), tx1_id); assert_eq!(removed_txs[1].id(), tx2_id); + universe.assert_pool_integrity(&[pool_tx]); } #[test] @@ -464,13 +477,14 @@ fn insert_more_priced_tx2_removes_tx1_and_more_priced_tx3_removes_tx2() { // Then assert!(result2.is_ok()); - let removed_txs = result2.unwrap(); + let removed_txs = result2.unwrap().1; assert_eq!(removed_txs.len(), 1); assert_eq!(removed_txs[0].id(), tx1_id); assert!(result3.is_ok()); - let removed_txs = result3.unwrap(); + let (pool_tx, removed_txs) = result3.unwrap(); assert_eq!(removed_txs.len(), 1); assert_eq!(removed_txs[0].id(), tx2_id); + universe.assert_pool_integrity(&[pool_tx]); } #[test] @@ -488,7 +502,7 @@ fn insert__tx_limit_hit() { // Given let tx1 = universe.build_script_transaction(None, None, 10); let tx2 = universe.build_script_transaction(None, None, 0); - universe.verify_and_insert(tx1).unwrap(); + let pool_tx = universe.verify_and_insert(tx1).unwrap().0; // When let result2 = universe.verify_and_insert(tx2); @@ -496,6 +510,7 @@ fn insert__tx_limit_hit() { // Then let err = result2.unwrap_err(); assert!(matches!(err, Error::NotInsertedLimitHit)); + universe.assert_pool_integrity(&[pool_tx]); } #[test] @@ -522,7 +537,7 @@ fn insert__tx_gas_limit() { ..Default::default() }); universe.build_pool(); - universe.verify_and_insert(tx1).unwrap(); + let pool_tx = universe.verify_and_insert(tx1).unwrap().0; // When let result2 = universe.verify_and_insert(tx2); @@ -530,6 +545,7 @@ fn insert__tx_gas_limit() { // Then let err = result2.unwrap_err(); assert!(matches!(err, Error::NotInsertedLimitHit)); + universe.assert_pool_integrity(&[pool_tx]); } #[test] @@ -556,7 +572,7 @@ fn insert__tx_bytes_limit() { ..Default::default() }); universe.build_pool(); - universe.verify_and_insert(tx1).unwrap(); + let pool_tx = universe.verify_and_insert(tx1).unwrap().0; // When let result2 = universe.verify_and_insert(tx2); @@ -564,6 +580,7 @@ fn insert__tx_bytes_limit() { // Then let err = result2.unwrap_err(); assert!(matches!(err, Error::NotInsertedLimitHit)); + universe.assert_pool_integrity(&[pool_tx]); } #[test] @@ -584,8 +601,8 @@ fn insert__dependency_chain_length_hit() { let input = unset_input.into_input(UtxoId::new(tx2.id(&Default::default()), 0)); let tx3 = universe.build_script_transaction(Some(vec![input]), None, 0); - universe.verify_and_insert(tx1).unwrap(); - universe.verify_and_insert(tx2).unwrap(); + let tx1 = universe.verify_and_insert(tx1).unwrap().0; + let tx2 = universe.verify_and_insert(tx2).unwrap().0; // When let result3 = universe.verify_and_insert(tx3); @@ -596,6 +613,7 @@ fn insert__dependency_chain_length_hit() { err, Error::Dependency(DependencyError::NotInsertedChainDependencyTooBig) )); + universe.assert_pool_integrity(&[tx1, tx2]); } #[test] @@ -632,6 +650,7 @@ fn get_sorted_out_tx1_2_3() { assert_eq!(txs[0].id(), tx3_id, "First should be tx3"); assert_eq!(txs[1].id(), tx1_id, "Second should be tx1"); assert_eq!(txs[2].id(), tx2_id, "Third should be tx2"); + universe.assert_pool_integrity(&[]); } #[test] @@ -688,6 +707,7 @@ fn get_sorted_out_tx_same_tips() { assert_eq!(txs[0].id(), tx3_id, "First should be tx3"); assert_eq!(txs[1].id(), tx2_id, "Second should be tx2"); assert_eq!(txs[2].id(), tx1_id, "Third should be tx1"); + universe.assert_pool_integrity(&[]); } #[test] @@ -744,6 +764,7 @@ fn get_sorted_out_tx_profitable_ratios() { assert_eq!(txs[0].id(), tx3_id, "First should be tx3"); assert_eq!(txs[1].id(), tx2_id, "Second should be tx2"); assert_eq!(txs[2].id(), tx1_id, "Third should be tx1"); + universe.assert_pool_integrity(&[]); } #[test] @@ -786,6 +807,7 @@ fn get_sorted_out_tx_by_creation_instant() { assert_eq!(txs[1].id(), tx2_id, "Second should be tx2"); assert_eq!(txs[2].id(), tx3_id, "Third should be tx3"); assert_eq!(txs[3].id(), tx4_id, "Fourth should be tx4"); + universe.assert_pool_integrity(&[]); } #[test] @@ -826,6 +848,7 @@ fn insert__tx_below_min_gas_price() { // Then assert!(matches!(err, Error::InsufficientMaxFee { .. })); + universe.assert_pool_integrity(&[]); } #[test] @@ -839,9 +862,10 @@ fn insert_tx_when_input_message_id_exists_in_db() { let tx = universe.build_script_transaction(Some(vec![input]), None, 0); // When - universe.verify_and_insert(tx) + let pool_tx = universe.verify_and_insert(tx) // Then - .unwrap(); + .unwrap().0; + universe.assert_pool_integrity(&[pool_tx]); } #[test] @@ -861,6 +885,7 @@ fn insert__tx_when_input_message_id_do_not_exists_in_db() { err, Error::InputValidation(InputValidationError::NotInsertedInputMessageUnknown(msg_id)) if msg_id == *message.id() )); + universe.assert_pool_integrity(&[]); } #[test] @@ -887,13 +912,14 @@ fn insert__tx_tip_lower_than_another_tx_with_same_message_id() { ); // When - universe.verify_and_insert(tx_high).unwrap(); + let pool_tx = universe.verify_and_insert(tx_high).unwrap().0; let err = universe.verify_and_insert(tx_low).unwrap_err(); // Then assert!( matches!(err, Error::Collided(CollisionReason::Message(msg_id)) if msg_id == *message.id()) ); + universe.assert_pool_integrity(&[pool_tx]); } #[test] @@ -927,9 +953,10 @@ fn insert_tx_tip_higher_than_another_tx_with_same_message_id() { // Then assert!(result1.is_ok()); assert!(result2.is_ok()); - let removed_txs = result2.unwrap(); + let (pool_tx, removed_txs) = result2.unwrap(); assert_eq!(removed_txs.len(), 1); assert_eq!(removed_txs[0].id(), tx_high_id); + universe.assert_pool_integrity(&[pool_tx]); } #[test] @@ -965,6 +992,7 @@ fn insert_again_message_after_squeeze_with_even_lower_tip() { assert!(result1.is_ok()); assert!(result2.is_ok()); assert!(result3.is_ok()); + universe.assert_pool_integrity(&[result2.unwrap().0, result3.unwrap().0]); } #[test] @@ -993,6 +1021,7 @@ fn insert__tx_with_predicates_incorrect_owner() { ValidityError::InputPredicateOwner { index: 0 } )) )); + universe.assert_pool_integrity(&[]); } #[test] @@ -1035,6 +1064,7 @@ fn insert__tx_with_predicate_without_enough_gas() { PredicateVerificationFailed::OutOfGas )) )); + universe.assert_pool_integrity(&[]); } #[test] @@ -1068,6 +1098,7 @@ fn insert__tx_with_predicate_that_returns_false() { PredicateVerificationFailed::Panic(PanicReason::PredicateReturnedNonOne) )) )); + universe.assert_pool_integrity(&[]); } #[test] @@ -1089,9 +1120,10 @@ fn insert_tx_with_blob() { .finalize_as_transaction(); // When - universe.verify_and_insert(tx) + let pool_tx = universe.verify_and_insert(tx) // Then - .unwrap(); + .unwrap().0; + universe.assert_pool_integrity(&[pool_tx]); } #[test] @@ -1113,7 +1145,7 @@ fn insert__tx_with_blob_already_inserted_at_higher_tip() { .add_fee_input() .finalize_as_transaction(); - universe.verify_and_insert(tx).unwrap(); + let pool_tx = universe.verify_and_insert(tx).unwrap().0; let same_blob_tx = TransactionBuilder::blob(BlobBody { id: blob_id, @@ -1128,6 +1160,7 @@ fn insert__tx_with_blob_already_inserted_at_higher_tip() { // Then assert!(matches!(err, Error::Collided(CollisionReason::Blob(b)) if b == blob_id)); + universe.assert_pool_integrity(&[pool_tx]); } #[test] @@ -1166,6 +1199,7 @@ fn insert_tx_with_blob_already_insert_at_lower_tip() { // Then assert!(result.is_ok()); + universe.assert_pool_integrity(&[result.unwrap().0]); } #[test] @@ -1196,6 +1230,7 @@ fn insert__tx_blob_already_in_db() { err, Error::InputValidation(InputValidationError::NotInsertedBlobIdAlreadyTaken(b)) if b == blob_id )); + universe.assert_pool_integrity(&[]); } #[test] @@ -1217,8 +1252,8 @@ fn insert__if_tx3_depends_and_collides_with_tx2() { // Given // tx3 {inputs: {coinA, coinB}, outputs:{}, tip: 20} let input_b = unset_input.into_input(UtxoId::new(tx2.id(&Default::default()), 0)); - universe.verify_and_insert(tx1).unwrap(); - universe.verify_and_insert(tx2).unwrap(); + let tx1 = universe.verify_and_insert(tx1).unwrap().0; + let tx2 = universe.verify_and_insert(tx2).unwrap().0; let tx3 = universe.build_script_transaction(Some(vec![input_a, input_b]), None, 20); @@ -1230,6 +1265,7 @@ fn insert__if_tx3_depends_and_collides_with_tx2() { err, Error::Dependency(DependencyError::DependentTransactionIsADiamondDeath) )); + universe.assert_pool_integrity(&[tx1, tx2]); } #[test] @@ -1266,4 +1302,5 @@ fn insert__tx_upgrade_with_invalid_wasm() { result, Error::WasmValidity(WasmValidityError::NotEnabled) )); + universe.assert_pool_integrity(&[]); } diff --git a/crates/services/txpool_v2/src/tests/universe.rs b/crates/services/txpool_v2/src/tests/universe.rs index 0703aaebe48..2813421d8da 100644 --- a/crates/services/txpool_v2/src/tests/universe.rs +++ b/crates/services/txpool_v2/src/tests/universe.rs @@ -1,4 +1,7 @@ -use std::sync::Arc; +use std::{ + collections::HashSet, + sync::Arc, +}; use fuel_core_types::{ entities::{ @@ -197,10 +200,11 @@ impl TestPoolUniverse { tx_builder.finalize().into() } + // Returns the added transaction and the list of transactions that were removed from the pool pub fn verify_and_insert( &mut self, tx: Transaction, - ) -> Result, Error> { + ) -> Result<(ArcPoolTx, Vec), Error> { if let Some(pool) = &self.pool { let mut mock_consensus_params_provider = MockConsensusParametersProvider::default(); @@ -222,7 +226,8 @@ impl TestPoolUniverse { Default::default(), true, )?; - pool.write().insert(Arc::new(tx), &self.mock_db) + let tx = Arc::new(tx); + Ok((tx.clone(), pool.write().insert(tx, &self.mock_db)?)) } else { panic!("Pool needs to be built first"); } @@ -293,6 +298,28 @@ impl TestPoolUniverse { } } + pub fn assert_pool_integrity(&self, expected_txs: &[ArcPoolTx]) { + let pool = self.pool.as_ref().unwrap(); + let pool = pool.read(); + let storage_ids_dependencies = pool.storage.assert_integrity(expected_txs); + let txs_without_dependencies = expected_txs + .iter() + .zip(storage_ids_dependencies) + .filter_map(|(tx, (_, has_dependencies))| { + if !has_dependencies { + Some(tx.clone()) + } else { + None + } + }) + .collect::>(); + pool.selection_algorithm + .assert_integrity(&txs_without_dependencies); + pool.collision_manager.assert_integrity(expected_txs); + let txs: HashSet = expected_txs.iter().map(|tx| tx.id()).collect(); + pool.assert_integrity(txs); + } + pub fn get_pool(&self) -> Shared { self.pool.clone().unwrap() } diff --git a/tests/tests/messages.rs b/tests/tests/messages.rs index bee948105ea..26eceac59b9 100644 --- a/tests/tests/messages.rs +++ b/tests/tests/messages.rs @@ -485,7 +485,6 @@ async fn can_get_message_proof() { let result = client .message_proof(&transaction_id, nonce, None, Some(last_height)) .await - .unwrap() .unwrap(); // 1. Generate the message id (message fields) diff --git a/tests/tests/regenesis.rs b/tests/tests/regenesis.rs index 3ba03d27366..6e92f3dad40 100644 --- a/tests/tests/regenesis.rs +++ b/tests/tests/regenesis.rs @@ -401,8 +401,7 @@ async fn test_regenesis_message_proofs_are_preserved() -> anyhow::Result<()> { .client .message_proof(&tx_id, nonce, None, Some((message_block_height + 1).into())) .await - .expect("Unable to get message proof") - .expect("Message proof not found"); + .expect("Unable to get message proof"); let prev_root = proof.commit_block_header.prev_root; let block_proof_index = proof.block_proof.proof_index; let block_proof_set: Vec<_> = proof @@ -460,8 +459,7 @@ async fn test_regenesis_message_proofs_are_preserved() -> anyhow::Result<()> { .client .message_proof(&tx_id, nonce, None, Some(block_height.into())) .await - .expect("Unable to get message proof") - .expect("Message proof not found"); + .expect("Unable to get message proof"); let prev_root = proof.commit_block_header.prev_root; let block_proof_set: Vec<_> = proof .block_proof