diff --git a/CHANGELOG.md b/CHANGELOG.md index fede191b13b..645237e21c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`). +- [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future. ## [Version 0.40.0] diff --git a/Cargo.lock b/Cargo.lock index 7574ea76ac7..d4d274e7f71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3558,6 +3558,8 @@ dependencies = [ "serde", "serde_with", "sha2 0.10.8", + "strum 0.25.0", + "strum_macros 0.25.3", "thiserror", "tokio", "tracing", diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index fbc9f32afb4..3a5f6e78009 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -28,7 +28,7 @@ use fuel_core_p2p::{ p2p_service::FuelP2PEvent, request_response::messages::{ RequestMessage, - ResponseMessage, + V2ResponseMessage, }, service::to_message_acceptance, }; @@ -178,7 +178,7 @@ impl Bootstrap { if request_message == RequestMessage::TxPoolAllTransactionsIds { let _ = bootstrap.send_response_msg( request_id, - ResponseMessage::TxPoolAllTransactionsIds(Some(vec![])), + V2ResponseMessage::TxPoolAllTransactionsIds(Ok(vec![])), ); } } diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index 5455e83c5f3..09662d26311 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -48,6 +48,8 @@ rayon = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_with = { workspace = true } sha2 = "0.10" +strum = { workspace = true } +strum_macros = { workspace = true } thiserror = "1.0.47" tokio = { workspace = true, features = ["sync"] } tracing = { workspace = true } diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 2b689eb3949..8fd6808e348 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -13,7 +13,7 @@ use crate::{ peer_report, request_response::messages::{ RequestMessage, - ResponseMessage, + V2ResponseMessage, }, }; use fuel_core_types::fuel_types::BlockHeight; @@ -112,15 +112,16 @@ impl FuelBehaviour { BlockHeight::default(), ); - let req_res_protocol = - core::iter::once((codec.get_req_res_protocol(), ProtocolSupport::Full)); + let req_res_protocol = codec + .get_req_res_protocols() + .map(|protocol| (protocol, ProtocolSupport::Full)); let req_res_config = request_response::Config::default() .with_request_timeout(p2p_config.set_request_timeout) .with_max_concurrent_streams(p2p_config.max_concurrent_streams); let request_response = request_response::Behaviour::with_codec( - codec, + codec.clone(), req_res_protocol, req_res_config, ); @@ -165,9 +166,9 @@ impl FuelBehaviour { pub fn send_response_msg( &mut self, - channel: ResponseChannel, - message: ResponseMessage, - ) -> Result<(), ResponseMessage> { + channel: ResponseChannel, + message: V2ResponseMessage, + ) -> Result<(), V2ResponseMessage> { self.request_response.send_response(channel, message) } diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index c22aacd5671..505cf40c9bf 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -8,7 +8,7 @@ use crate::{ }, request_response::messages::{ RequestMessage, - ResponseMessage, + V2ResponseMessage, }, }; use libp2p::request_response; @@ -28,18 +28,22 @@ pub trait GossipsubCodec { ) -> Result; } +// TODO: https://github.com/FuelLabs/fuel-core/issues/2368 +// Remove this trait /// Main Codec trait /// Needs to be implemented and provided to FuelBehaviour pub trait NetworkCodec: GossipsubCodec< RequestMessage = GossipsubBroadcastRequest, ResponseMessage = GossipsubMessage, - > + request_response::Codec + > + request_response::Codec + Clone + Send + 'static { /// Returns RequestResponse's Protocol /// Needed for initialization of RequestResponse Behaviour - fn get_req_res_protocol(&self) -> ::Protocol; + fn get_req_res_protocols( + &self, + ) -> impl Iterator::Protocol>; } diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 94f23cd6fd2..db3fe814e4f 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -10,8 +10,10 @@ use crate::{ }, request_response::messages::{ RequestMessage, - ResponseMessage, - REQUEST_RESPONSE_PROTOCOL_ID, + V1ResponseMessage, + V2ResponseMessage, + V1_REQUEST_RESPONSE_PROTOCOL_ID, + V2_REQUEST_RESPONSE_PROTOCOL_ID, }, }; use async_trait::async_trait; @@ -26,6 +28,8 @@ use serde::{ Serialize, }; use std::io; +use strum::IntoEnumIterator; +use strum_macros::EnumIter; /// Helper method for decoding data /// Reusable across `RequestResponseCodec` and `GossipsubCodec` @@ -69,13 +73,13 @@ impl PostcardCodec { /// run into a timeout waiting for the response. #[async_trait] impl request_response::Codec for PostcardCodec { - type Protocol = MessageExchangePostcardProtocol; + type Protocol = PostcardProtocol; type Request = RequestMessage; - type Response = ResponseMessage; + type Response = V2ResponseMessage; async fn read_request( &mut self, - _: &Self::Protocol, + _protocol: &Self::Protocol, socket: &mut T, ) -> io::Result where @@ -91,7 +95,7 @@ impl request_response::Codec for PostcardCodec { async fn read_response( &mut self, - _: &Self::Protocol, + protocol: &Self::Protocol, socket: &mut T, ) -> io::Result where @@ -103,7 +107,13 @@ impl request_response::Codec for PostcardCodec { .read_to_end(&mut response) .await?; - deserialize(&response) + match protocol { + PostcardProtocol::V1 => { + let v1_response = deserialize::(&response)?; + Ok(v1_response.into()) + } + PostcardProtocol::V2 => deserialize::(&response), + } } async fn write_request( @@ -122,14 +132,20 @@ impl request_response::Codec for PostcardCodec { async fn write_response( &mut self, - _protocol: &Self::Protocol, + protocol: &Self::Protocol, socket: &mut T, res: Self::Response, ) -> io::Result<()> where T: futures::AsyncWrite + Unpin + Send, { - let encoded_data = serialize(&res)?; + let encoded_data = match protocol { + PostcardProtocol::V1 => { + let v1_response: V1ResponseMessage = res.into(); + serialize(&v1_response)? + } + PostcardProtocol::V2 => serialize(&res)?, + }; socket.write_all(&encoded_data).await?; Ok(()) } @@ -161,24 +177,44 @@ impl GossipsubCodec for PostcardCodec { } impl NetworkCodec for PostcardCodec { - fn get_req_res_protocol(&self) -> ::Protocol { - MessageExchangePostcardProtocol {} + fn get_req_res_protocols( + &self, + ) -> impl Iterator::Protocol> { + // TODO: Iterating over versions in reverse order should prefer + // peers to use V2 over V1 for exchanging messages. However, this is + // not guaranteed by the specs for the `request_response` protocol. + PostcardProtocol::iter().rev() } } -#[derive(Default, Debug, Clone)] -pub struct MessageExchangePostcardProtocol; +#[derive(Debug, Default, Clone, EnumIter)] +pub enum PostcardProtocol { + #[default] + V1, + V2, +} -impl AsRef for MessageExchangePostcardProtocol { +impl AsRef for PostcardProtocol { fn as_ref(&self) -> &str { - REQUEST_RESPONSE_PROTOCOL_ID + match self { + PostcardProtocol::V1 => V1_REQUEST_RESPONSE_PROTOCOL_ID, + PostcardProtocol::V2 => V2_REQUEST_RESPONSE_PROTOCOL_ID, + } } } #[cfg(test)] +#[allow(non_snake_case)] mod tests { + + use fuel_core_types::blockchain::SealedBlockHeader; + use request_response::Codec as _; + use super::*; - use crate::request_response::messages::MAX_REQUEST_SIZE; + use crate::request_response::messages::{ + ResponseMessageErrorCode, + MAX_REQUEST_SIZE, + }; #[test] fn test_request_size_fits() { @@ -186,4 +222,169 @@ mod tests { let m = RequestMessage::Transactions(arbitrary_range); assert!(postcard::to_stdvec(&m).unwrap().len() <= MAX_REQUEST_SIZE); } + + #[tokio::test] + async fn codec__serialization_roundtrip_using_v2_on_successful_response_returns_original_value( + ) { + // Given + let sealed_block_headers = vec![SealedBlockHeader::default()]; + let response = V2ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + + // When + codec + .write_response(&PostcardProtocol::V2, &mut buf, response) + .await + .expect("Valid Vec should be serialized using v1"); + + let deserialized = codec + .read_response(&PostcardProtocol::V2, &mut buf.as_slice()) + .await + .expect("Valid Vec should be deserialized using v1"); + + // Then + assert!(matches!( + deserialized, + V2ResponseMessage::SealedHeaders(Ok(sealed_headers)) if sealed_headers == sealed_block_headers + )); + } + + #[tokio::test] + async fn codec__serialization_roundtrip_using_v1_on_successful_response_returns_original_value( + ) { + // Given + let sealed_block_headers = vec![SealedBlockHeader::default()]; + let response = V2ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + + // When + codec + .write_response(&PostcardProtocol::V1, &mut buf, response) + .await + .expect("Valid Vec should be serialized using v1"); + + let deserialized = codec + .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .await + .expect("Valid Vec should be deserialized using v1"); + + // Then + assert!( + matches!(deserialized, V2ResponseMessage::SealedHeaders(Ok(sealed_headers)) if sealed_headers == sealed_block_headers) + ); + } + + #[tokio::test] + async fn codec__serialization_roundtrip_using_v2_on_error_response_returns_original_value( + ) { + // Given + let response = V2ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + + // When + codec + .write_response(&PostcardProtocol::V2, &mut buf, response.clone()) + .await + .expect("Valid Vec is serialized using v1"); + + let deserialized = codec + .read_response(&PostcardProtocol::V2, &mut buf.as_slice()) + .await + .expect("Valid Vec is deserialized using v1"); + + // Then + assert!(matches!( + deserialized, + V2ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse + )) + )); + } + + #[tokio::test] + async fn codec__serialzation_roundtrip_using_v1_on_error_response_returns_predefined_error_code( + ) { + // Given + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Change this to a different ResponseMessageErrorCode once these have been implemented. + let response = V2ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + + // When + codec + .write_response(&PostcardProtocol::V1, &mut buf, response.clone()) + .await + .expect("Valid Vec is serialized using v1"); + + let deserialized = codec + .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .await + .expect("Valid Vec is deserialized using v1"); + + // Then + assert!(matches!( + deserialized, + V2ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse + )) + )); + } + + #[tokio::test] + async fn codec__write_response_is_backwards_compatible_with_v1() { + // Given + let response = V2ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + + // When + codec + .write_response(&PostcardProtocol::V1, &mut buf, response.clone()) + .await + .expect("Valid Vec is serialized using v1"); + + let deserialized_as_v1 = + // We cannot access the codec trait from an old node here, + // so we deserialize directly using the `V1ResponseMessage` type. + deserialize::(&buf).expect("Deserialization as V1ResponseMessage should succeed"); + + // Then + assert!(matches!( + deserialized_as_v1, + V1ResponseMessage::SealedHeaders(None) + )); + } + + #[tokio::test] + async fn codec__read_response_is_backwards_compatible_with_v1() { + // Given + let response = V1ResponseMessage::SealedHeaders(None); + let mut codec = PostcardCodec::new(1024); + + // When + let buf = serialize(&response) + .expect("Serialization as V1ResponseMessage should succeed"); + let deserialized = codec + .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .await + .expect("Valid Vec is deserialized using v1"); + + // Then + assert!(matches!( + deserialized, + V2ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse + )) + )); + } } diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 024cc006785..76f6bdbf615 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -30,9 +30,9 @@ use crate::{ RequestError, RequestMessage, ResponseError, - ResponseMessage, ResponseSendError, ResponseSender, + V2ResponseMessage, }, TryPeerId, }; @@ -119,7 +119,7 @@ pub struct FuelP2PService { /// Whenever we're done processing the request, it's removed from this table, /// and the channel is used to send the result to libp2p, which will forward it /// to the peer that requested it. - inbound_requests_table: HashMap>, + inbound_requests_table: HashMap>, /// NetworkCodec used as `` for encoding and decoding of Gossipsub messages network_codec: PostcardCodec, @@ -428,7 +428,7 @@ impl FuelP2PService { pub fn send_response_msg( &mut self, request_id: InboundRequestId, - message: ResponseMessage, + message: V2ResponseMessage, ) -> Result<(), ResponseSendError> { let Some(channel) = self.inbound_requests_table.remove(&request_id) else { debug!("ResponseChannel for {:?} does not exist!", request_id); @@ -646,7 +646,7 @@ impl FuelP2PService { fn handle_request_response_event( &mut self, - event: request_response::Event, + event: request_response::Event, ) -> Option { match event { request_response::Event::Message { peer, message } => match message { @@ -674,8 +674,10 @@ impl FuelP2PService { let send_ok = match channel { ResponseSender::SealedHeaders(c) => match response { - ResponseMessage::SealedHeaders(v) => { - c.send((peer, Ok(v))).is_ok() + V2ResponseMessage::SealedHeaders(v) => { + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Change type of ResponseSender and remove the .ok() here + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -686,8 +688,8 @@ impl FuelP2PService { } }, ResponseSender::Transactions(c) => match response { - ResponseMessage::Transactions(v) => { - c.send((peer, Ok(v))).is_ok() + V2ResponseMessage::Transactions(v) => { + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -698,8 +700,8 @@ impl FuelP2PService { } }, ResponseSender::TxPoolAllTransactionsIds(c) => match response { - ResponseMessage::TxPoolAllTransactionsIds(v) => { - c.send((peer, Ok(v))).is_ok() + V2ResponseMessage::TxPoolAllTransactionsIds(v) => { + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -710,8 +712,8 @@ impl FuelP2PService { } }, ResponseSender::TxPoolFullTransactions(c) => match response { - ResponseMessage::TxPoolFullTransactions(v) => { - c.send((peer, Ok(v))).is_ok() + V2ResponseMessage::TxPoolFullTransactions(v) => { + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -847,8 +849,8 @@ mod tests { request_response::messages::{ RequestMessage, ResponseError, - ResponseMessage, ResponseSender, + V2ResponseMessage, }, service::to_message_acceptance, }; @@ -1778,16 +1780,16 @@ mod tests { RequestMessage::SealedHeaders(range) => { let sealed_headers: Vec<_> = arbitrary_headers_for_range(range.clone()); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Some(sealed_headers))); + let _ = node_b.send_response_msg(*request_id, V2ResponseMessage::SealedHeaders(Ok(sealed_headers))); } RequestMessage::Transactions(_) => { let txs = (0..5).map(|_| Transaction::default_test_tx()).collect(); let transactions = vec![Transactions(txs)]; - let _ = node_b.send_response_msg(*request_id, ResponseMessage::Transactions(Some(transactions))); + let _ = node_b.send_response_msg(*request_id, V2ResponseMessage::Transactions(Ok(transactions))); } RequestMessage::TxPoolAllTransactionsIds => { let tx_ids = (0..5).map(|_| Transaction::default_test_tx().id(&ChainId::new(1))).collect(); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolAllTransactionsIds(Some(tx_ids))); + let _ = node_b.send_response_msg(*request_id, V2ResponseMessage::TxPoolAllTransactionsIds(Ok(tx_ids))); } RequestMessage::TxPoolFullTransactions(tx_ids) => { let txs = tx_ids.iter().enumerate().map(|(i, _)| { @@ -1797,7 +1799,7 @@ mod tests { Some(NetworkableTransactionPool::Transaction(Transaction::default_test_tx())) } }).collect(); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolFullTransactions(Some(txs))); + let _ = node_b.send_response_msg(*request_id, V2ResponseMessage::TxPoolFullTransactions(Ok(txs))); } } } @@ -1905,7 +1907,7 @@ mod tests { // 2. Node B receives the RequestMessage from Node A initiated by the NetworkOrchestrator if let Some(FuelP2PEvent::InboundRequestMessage{ request_id, request_message: _ }) = &node_b_event { let sealed_headers: Vec<_> = arbitrary_headers_for_range(1..3); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Some(sealed_headers))); + let _ = node_b.send_response_msg(*request_id, V2ResponseMessage::SealedHeaders(Ok(sealed_headers))); } tracing::info!("Node B Event: {:?}", node_b_event); diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 83f3f7a3a50..2a0e03ba2cd 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -18,7 +18,8 @@ use std::ops::Range; use thiserror::Error; use tokio::sync::oneshot; -pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; +pub(crate) const V1_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; +pub(crate) const V2_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.2"; /// Max Size in Bytes of the Request Message #[cfg(test)] @@ -32,14 +33,76 @@ pub enum RequestMessage { TxPoolFullTransactions(Vec), } +#[derive(Error, Debug, Clone, Serialize, Deserialize)] +pub enum ResponseMessageErrorCode { + /// The peer sent an empty response using protocol `/fuel/req_res/0.0.1` + #[error("Empty response sent by peer using legacy protocol /fuel/req_res/0.0.1")] + ProtocolV1EmptyResponse = 0, +} + #[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ResponseMessage { +pub enum V1ResponseMessage { SealedHeaders(Option>), Transactions(Option>), TxPoolAllTransactionsIds(Option>), TxPoolFullTransactions(Option>>), } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum V2ResponseMessage { + SealedHeaders(Result, ResponseMessageErrorCode>), + Transactions(Result, ResponseMessageErrorCode>), + TxPoolAllTransactionsIds(Result, ResponseMessageErrorCode>), + TxPoolFullTransactions( + Result>, ResponseMessageErrorCode>, + ), +} + +impl From for V2ResponseMessage { + fn from(v1_response: V1ResponseMessage) -> Self { + match v1_response { + V1ResponseMessage::SealedHeaders(sealed_headers) => { + V2ResponseMessage::SealedHeaders( + sealed_headers + .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ) + } + V1ResponseMessage::Transactions(vec) => V2ResponseMessage::Transactions( + vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ), + V1ResponseMessage::TxPoolAllTransactionsIds(vec) => { + V2ResponseMessage::TxPoolAllTransactionsIds( + vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ) + } + V1ResponseMessage::TxPoolFullTransactions(vec) => { + V2ResponseMessage::TxPoolFullTransactions( + vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ) + } + } + } +} + +impl From for V1ResponseMessage { + fn from(response: V2ResponseMessage) -> Self { + match response { + V2ResponseMessage::SealedHeaders(sealed_headers) => { + V1ResponseMessage::SealedHeaders(sealed_headers.ok()) + } + V2ResponseMessage::Transactions(transactions) => { + V1ResponseMessage::Transactions(transactions.ok()) + } + V2ResponseMessage::TxPoolAllTransactionsIds(tx_ids) => { + V1ResponseMessage::TxPoolAllTransactionsIds(tx_ids.ok()) + } + V2ResponseMessage::TxPoolFullTransactions(tx_pool) => { + V1ResponseMessage::TxPoolFullTransactions(tx_pool.ok()) + } + } + } +} + pub type OnResponse = oneshot::Sender<(PeerId, Result)>; #[derive(Debug)] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 3ff09da0beb..c85e1e3a6c8 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -22,8 +22,9 @@ use crate::{ request_response::messages::{ OnResponse, RequestMessage, - ResponseMessage, + ResponseMessageErrorCode, ResponseSender, + V2ResponseMessage, }, }; use anyhow::anyhow; @@ -136,19 +137,20 @@ pub enum TaskRequest { reporting_service: &'static str, }, DatabaseTransactionsLookUp { - response: Option>, + response: Result, ResponseMessageErrorCode>, request_id: InboundRequestId, }, DatabaseHeaderLookUp { - response: Option>, + response: Result, ResponseMessageErrorCode>, request_id: InboundRequestId, }, TxPoolAllTransactionsIds { - response: Option>, + response: Result, ResponseMessageErrorCode>, request_id: InboundRequestId, }, TxPoolFullTransactions { - response: Option>>, + response: + Result>, ResponseMessageErrorCode>, request_id: InboundRequestId, }, } @@ -223,7 +225,7 @@ pub trait TaskP2PService: Send { fn send_response_msg( &mut self, request_id: InboundRequestId, - message: ResponseMessage, + message: V2ResponseMessage, ) -> anyhow::Result<()>; fn report_message( @@ -297,7 +299,7 @@ impl TaskP2PService for FuelP2PService { fn send_response_msg( &mut self, request_id: InboundRequestId, - message: ResponseMessage, + message: V2ResponseMessage, ) -> anyhow::Result<()> { self.send_response_msg(request_id, message)?; Ok(()) @@ -532,8 +534,11 @@ where where DbLookUpFn: Fn(&V::LatestView, Range) -> anyhow::Result> + Send + 'static, - ResponseSenderFn: Fn(Option) -> ResponseMessage + Send + 'static, - TaskRequestFn: Fn(Option, InboundRequestId) -> TaskRequest + Send + 'static, + ResponseSenderFn: + Fn(Result) -> V2ResponseMessage + Send + 'static, + TaskRequestFn: Fn(Result, InboundRequestId) -> TaskRequest + + Send + + 'static, R: Send + 'static, { let instant = Instant::now(); @@ -549,8 +554,9 @@ where max_len, "Requested range is too big" ); - // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 - let response = None; + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Return helpful error message to requester. + let response = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = self .p2p_service .send_response_msg(request_id, response_sender(response)); @@ -564,17 +570,25 @@ where return; } - let response = db_lookup(&view, range.clone()).ok().flatten(); + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Add new error code + let response = db_lookup(&view, range.clone()) + .ok() + .flatten() + .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = response_channel .try_send(task_request(response, request_id)) .trace_err("Failed to send response to the request channel"); }); + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Handle error cases and return meaningful status codes if result.is_err() { + let err = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = self .p2p_service - .send_response_msg(request_id, response_sender(None)); + .send_response_msg(request_id, response_sender(err)); } Ok(()) @@ -588,7 +602,7 @@ where self.handle_db_request( range, request_id, - ResponseMessage::Transactions, + V2ResponseMessage::Transactions, |view, range| view.get_transactions(range).map_err(anyhow::Error::from), |response, request_id| TaskRequest::DatabaseTransactionsLookUp { response, @@ -606,7 +620,7 @@ where self.handle_db_request( range, request_id, - ResponseMessage::SealedHeaders, + V2ResponseMessage::SealedHeaders, |view, range| view.get_sealed_headers(range).map_err(anyhow::Error::from), |response, request_id| TaskRequest::DatabaseHeaderLookUp { response, @@ -624,8 +638,11 @@ where task_request: TaskRequestFn, ) -> anyhow::Result<()> where - ResponseSenderFn: Fn(Option) -> ResponseMessage + Send + 'static, - TaskRequestFn: Fn(Option, InboundRequestId) -> TaskRequest + Send + 'static, + ResponseSenderFn: + Fn(Result) -> V2ResponseMessage + Send + 'static, + TaskRequestFn: Fn(Result, InboundRequestId) -> TaskRequest + + Send + + 'static, F: Future> + Send + 'static, { let instant = Instant::now(); @@ -642,16 +659,20 @@ where return; }; - // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Return helpful error message to requester. let _ = response_channel - .try_send(task_request(Some(response), request_id)) + .try_send(task_request(Ok(response), request_id)) .trace_err("Failed to send response to the request channel"); }); if result.is_err() { + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Return better error code + let res = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = self .p2p_service - .send_response_msg(request_id, response_sender(None)); + .send_response_msg(request_id, response_sender(res)); } Ok(()) @@ -666,7 +687,7 @@ where self.handle_txpool_request( request_id, async move { tx_pool.get_tx_ids(max_txs).await }, - ResponseMessage::TxPoolAllTransactionsIds, + V2ResponseMessage::TxPoolAllTransactionsIds, |response, request_id| TaskRequest::TxPoolAllTransactionsIds { response, request_id, @@ -679,11 +700,14 @@ where tx_ids: Vec, request_id: InboundRequestId, ) -> anyhow::Result<()> { - // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Return helpful error message to requester. if tx_ids.len() > self.max_txs_per_request { self.p2p_service.send_response_msg( request_id, - ResponseMessage::TxPoolFullTransactions(None), + V2ResponseMessage::TxPoolFullTransactions(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )), )?; return Ok(()); } @@ -691,7 +715,7 @@ where self.handle_txpool_request( request_id, async move { tx_pool.get_full_txs(tx_ids).await }, - ResponseMessage::TxPoolFullTransactions, + V2ResponseMessage::TxPoolFullTransactions, |response, request_id| TaskRequest::TxPoolFullTransactions { response, request_id, @@ -886,16 +910,16 @@ where let _ = channel.send(peers); } Some(TaskRequest::DatabaseTransactionsLookUp { response, request_id }) => { - let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::Transactions(response)); + let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::Transactions(response)); } Some(TaskRequest::DatabaseHeaderLookUp { response, request_id }) => { - let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::SealedHeaders(response)); + let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::SealedHeaders(response)); } Some(TaskRequest::TxPoolAllTransactionsIds { response, request_id }) => { - let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::TxPoolAllTransactionsIds(response)); + let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::TxPoolAllTransactionsIds(response)); } Some(TaskRequest::TxPoolFullTransactions { response, request_id }) => { - let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::TxPoolFullTransactions(response)); + let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::TxPoolFullTransactions(response)); } None => { tracing::error!("The P2P `Task` should be holder of the `Sender`"); @@ -1414,7 +1438,7 @@ pub mod tests { fn send_response_msg( &mut self, _request_id: InboundRequestId, - _message: ResponseMessage, + _message: V2ResponseMessage, ) -> anyhow::Result<()> { todo!() }