Skip to content

Commit

Permalink
Add a way to fetch transactions in P2P without specifying a peer (#2376)
Browse files Browse the repository at this point in the history
## Linked Issues/PRs
This is a requirement for
#2361

## Description

This PR adds a way to fetch transactions with p2p but without giving a
specific peer and let p2p choose the one they prefer.
This will be used in #2361

## Checklist
- [x] Breaking changes are clearly marked as such in the PR description
and changelog
- [x] New behavior is reflected in tests
- [x] [The specification](https://github.com/FuelLabs/fuel-specs/)
matches the implemented behavior (link update PR if changes are needed)

### Before requesting review
- [x] I have reviewed the code myself
- [x] I have created follow-up issues caused by this PR and linked them
here

---------

Co-authored-by: Green Baneling <XgreenX9999@gmail.com>
  • Loading branch information
AurelienFT and xgreenx authored Oct 31, 2024
1 parent ec41f56 commit 32b29a3
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2347](https://github.com/FuelLabs/fuel-core/pull/2364): Add activity concept in order to protect against infinitely increasing DA gas price scenarios
- [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future.
- [2386](https://github.com/FuelLabs/fuel-core/pull/2386): Add a flag to define the maximum number of file descriptors that RocksDB can use. By default it's half of the OS limit.
- [2376](https://github.com/FuelLabs/fuel-core/pull/2376): Add a way to fetch transactions in P2P without specifying a peer.

## [Version 0.40.0]

Expand Down
144 changes: 103 additions & 41 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,17 +677,31 @@ impl FuelP2PService {
V2ResponseMessage::SealedHeaders(v) => {
// TODO: https://github.com/FuelLabs/fuel-core/issues/1311
// Change type of ResponseSender and remove the .ok() here
c.send((peer, Ok(v.ok()))).is_ok()
c.send(Ok((peer, Ok(v.ok())))).is_ok()
}
_ => {
warn!(
"Invalid response type received for request {:?}",
request_id
);
c.send((peer, Err(ResponseError::TypeMismatch))).is_ok()
c.send(Ok((peer, Err(ResponseError::TypeMismatch))))
.is_ok()
}
},
ResponseSender::Transactions(c) => match response {
V2ResponseMessage::Transactions(v) => {
c.send(Ok((peer, Ok(v.ok())))).is_ok()
}
_ => {
warn!(
"Invalid response type received for request {:?}",
request_id
);
c.send(Ok((peer, Err(ResponseError::TypeMismatch))))
.is_ok()
}
},
ResponseSender::TransactionsFromPeer(c) => match response {
V2ResponseMessage::Transactions(v) => {
c.send((peer, Ok(v.ok()))).is_ok()
}
Expand Down Expand Up @@ -750,9 +764,12 @@ impl FuelP2PService {
if let Some(channel) = self.outbound_requests_table.remove(&request_id) {
match channel {
ResponseSender::SealedHeaders(c) => {
let _ = c.send((peer, Err(ResponseError::P2P(error))));
let _ = c.send(Ok((peer, Err(ResponseError::P2P(error)))));
}
ResponseSender::Transactions(c) => {
let _ = c.send(Ok((peer, Err(ResponseError::P2P(error)))));
}
ResponseSender::TransactionsFromPeer(c) => {
let _ = c.send((peer, Err(ResponseError::P2P(error))));
}
ResponseSender::TxPoolAllTransactionsIds(c) => {
Expand Down Expand Up @@ -1700,9 +1717,25 @@ mod tests {

let expected = arbitrary_headers_for_range(range.clone());

if let Ok((_, Ok(sealed_headers))) = response_message {
let check = expected.iter().zip(sealed_headers.unwrap().iter()).all(|(a, b)| eq_except_metadata(a, b));
let _ = tx_test_end.send(check).await;
if let Ok(response) = response_message {
match response {
Ok((_, Ok(Some(sealed_headers)))) => {
let check = expected.iter().zip(sealed_headers.iter()).all(|(a, b)| eq_except_metadata(a, b));
let _ = tx_test_end.send(check).await;
},
Ok((_, Ok(None))) => {
tracing::error!("Node A did not return any headers");
let _ = tx_test_end.send(false).await;
},
Ok((_, Err(e))) => {
tracing::error!("Error in P2P communication: {:?}", e);
let _ = tx_test_end.send(false).await;
},
Err(e) => {
tracing::error!("Error in P2P before sending message: {:?}", e);
let _ = tx_test_end.send(false).await;
},
}
} else {
tracing::error!("Orchestrator failed to receive a message: {:?}", response_message);
let _ = tx_test_end.send(false).await;
Expand All @@ -1717,9 +1750,25 @@ mod tests {
tokio::spawn(async move {
let response_message = rx_orchestrator.await;

if let Ok((_, Ok(Some(transactions)))) = response_message {
let check = transactions.len() == 1 && transactions[0].0.len() == 5;
let _ = tx_test_end.send(check).await;
if let Ok(response) = response_message {
match response {
Ok((_, Ok(Some(transactions)))) => {
let check = transactions.len() == 1 && transactions[0].0.len() == 5;
let _ = tx_test_end.send(check).await;
},
Ok((_, Ok(None))) => {
tracing::error!("Node A did not return any transactions");
let _ = tx_test_end.send(false).await;
},
Ok((_, Err(e))) => {
tracing::error!("Error in P2P communication: {:?}", e);
let _ = tx_test_end.send(false).await;
},
Err(e) => {
tracing::error!("Error in P2P before sending message: {:?}", e);
let _ = tx_test_end.send(false).await;
},
}
} else {
tracing::error!("Orchestrator failed to receive a message: {:?}", response_message);
let _ = tx_test_end.send(false).await;
Expand Down Expand Up @@ -1878,23 +1927,28 @@ mod tests {
tokio::spawn(async move {
let response_message = rx_orchestrator.await;

match response_message {
Ok((_, Ok(_))) => {
let _ = tx_test_end.send(false).await;
panic!("Request succeeded unexpectedly");
},
Ok((_, Err(ResponseError::TypeMismatch))) => {
// Got Invalid Response Type as expected, so end test
let _ = tx_test_end.send(true).await;
},
Ok((_, Err(err))) => {
let _ = tx_test_end.send(false).await;
panic!("Unexpected error: {:?}", err);
},
Err(_) => {
let _ = tx_test_end.send(false).await;
panic!("Channel closed unexpectedly");
},
if let Ok(response) = response_message {
match response {
Ok((_, Ok(_))) => {
let _ = tx_test_end.send(false).await;
panic!("Request succeeded unexpectedly");
},
Ok((_, Err(ResponseError::TypeMismatch))) => {
// Got Invalid Response Type as expected, so end test
let _ = tx_test_end.send(true).await;
},
Ok((_, Err(err))) => {
let _ = tx_test_end.send(false).await;
panic!("Unexpected error in P2P communication: {:?}", err);
},
Err(e) => {
let _ = tx_test_end.send(false).await;
panic!("Error in P2P before sending message: {:?}", e);
},
}
} else {
let _ = tx_test_end.send(false).await;
panic!("Orchestrator failed to receive a message: {:?}", response_message);
}
});
}
Expand Down Expand Up @@ -1964,21 +2018,29 @@ mod tests {

tokio::spawn(async move {
// 3. Simulating NetworkOrchestrator receiving a Timeout Error Message!
match rx_orchestrator.await {
Ok((_, Ok(_))) => {
let _ = tx_test_end.send(false).await;
panic!("Request succeeded unexpectedly")},
Ok((_, Err(ResponseError::P2P(_)))) => {
// Got timeout as expected, so end test
let _ = tx_test_end.send(true).await;
},
Ok((_, Err(err))) => {
let _ = tx_test_end.send(false).await;
panic!("Unexpected error: {:?}", err);
},
Err(e) => {
let _ = tx_test_end.send(false).await;
panic!("Channel closed unexpectedly: {:?}", e)},
let response_message = rx_orchestrator.await;
if let Ok(response) = response_message {
match response {
Ok((_, Ok(_))) => {
let _ = tx_test_end.send(false).await;
panic!("Request succeeded unexpectedly");
},
Ok((_, Err(ResponseError::P2P(_)))) => {
// Got Invalid Response Type as expected, so end test
let _ = tx_test_end.send(true).await;
},
Ok((_, Err(err))) => {
let _ = tx_test_end.send(false).await;
panic!("Unexpected error in P2P communication: {:?}", err);
},
Err(e) => {
let _ = tx_test_end.send(false).await;
panic!("Error in P2P before sending message: {:?}", e);
},
}
} else {
let _ = tx_test_end.send(false).await;
panic!("Orchestrator failed to receive a message: {:?}", response_message);
}
});
}
Expand Down
11 changes: 9 additions & 2 deletions crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use std::ops::Range;
use thiserror::Error;
use tokio::sync::oneshot;

use crate::service::TaskError;

pub(crate) const V1_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1";
pub(crate) const V2_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.2";

Expand Down Expand Up @@ -104,11 +106,16 @@ impl From<V2ResponseMessage> for V1ResponseMessage {
}

pub type OnResponse<T> = oneshot::Sender<(PeerId, Result<T, ResponseError>)>;
// This type is more complex because it's used in tasks that need to select a peer to send the request and this
// can cause errors where the peer is not defined.
pub type OnResponseWithPeerSelection<T> =
oneshot::Sender<Result<(PeerId, Result<T, ResponseError>), TaskError>>;

#[derive(Debug)]
pub enum ResponseSender {
SealedHeaders(OnResponse<Option<Vec<SealedBlockHeader>>>),
Transactions(OnResponse<Option<Vec<Transactions>>>),
SealedHeaders(OnResponseWithPeerSelection<Option<Vec<SealedBlockHeader>>>),
Transactions(OnResponseWithPeerSelection<Option<Vec<Transactions>>>),
TransactionsFromPeer(OnResponse<Option<Vec<Transactions>>>),
TxPoolAllTransactionsIds(OnResponse<Option<Vec<TxId>>>),
TxPoolFullTransactions(OnResponse<Option<Vec<Option<NetworkableTransactionPool>>>>),
}
Expand Down
Loading

0 comments on commit 32b29a3

Please sign in to comment.