Skip to content

Commit

Permalink
Add support for concurrent requests to a single peer. (#200)
Browse files Browse the repository at this point in the history
Co-authored-by: ackintosh <sora.akatsuki@gmail.com>
Co-authored-by: Diva M <divma@protonmail.com>
Co-authored-by: Age Manning <Age@AgeManning.com>
  • Loading branch information
4 people authored Dec 10, 2023
1 parent aa12e38 commit 546e19c
Show file tree
Hide file tree
Showing 7 changed files with 1,022 additions and 147 deletions.
2 changes: 1 addition & 1 deletion src/discv5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ lazy_static! {
RwLock::new(crate::PermitBanList::default());
}

mod test;
pub(crate) mod test;

/// Events that can be produced by the `Discv5` event stream.
#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion src/discv5/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async fn build_nodes_from_keypairs_dual_stack(
}

/// Generate `n` deterministic keypairs from a given seed.
fn generate_deterministic_keypair(n: usize, seed: u64) -> Vec<CombinedKey> {
pub(crate) fn generate_deterministic_keypair(n: usize, seed: u64) -> Vec<CombinedKey> {
let mut keypairs = Vec::new();
for i in 0..n {
let sk = {
Expand Down
173 changes: 132 additions & 41 deletions src/handler/active_requests.rs
Original file line number Diff line number Diff line change
@@ -1,68 +1,142 @@
use super::*;
use delay_map::HashMapDelay;
use more_asserts::debug_unreachable;
use std::collections::hash_map::Entry;

pub(super) struct ActiveRequests {
/// A list of raw messages we are awaiting a response from the remote.
active_requests_mapping: HashMapDelay<NodeAddress, RequestCall>,
active_requests_mapping: HashMap<NodeAddress, Vec<RequestCall>>,
// WHOAREYOU messages do not include the source node id. We therefore maintain another
// mapping of active_requests via message_nonce. This allows us to match WHOAREYOU
// requests with active requests sent.
/// A mapping of all pending active raw requests message nonces to their NodeAddress.
active_requests_nonce_mapping: HashMap<MessageNonce, NodeAddress>,
/// A mapping of all active raw requests message nonces to their NodeAddress.
active_requests_nonce_mapping: HashMapDelay<MessageNonce, NodeAddress>,
}

impl ActiveRequests {
pub fn new(request_timeout: Duration) -> Self {
ActiveRequests {
active_requests_mapping: HashMapDelay::new(request_timeout),
active_requests_nonce_mapping: HashMap::new(),
active_requests_mapping: HashMap::new(),
active_requests_nonce_mapping: HashMapDelay::new(request_timeout),
}
}

/// Insert a new request into the active requests mapping.
pub fn insert(&mut self, node_address: NodeAddress, request_call: RequestCall) {
let nonce = *request_call.packet().message_nonce();
self.active_requests_mapping
.insert(node_address.clone(), request_call);
.entry(node_address.clone())
.or_default()
.push(request_call);
self.active_requests_nonce_mapping
.insert(nonce, node_address);
}

pub fn get(&self, node_address: &NodeAddress) -> Option<&RequestCall> {
/// Update the underlying packet for the request via message nonce.
pub fn update_packet(&mut self, old_nonce: MessageNonce, new_packet: Packet) {
let node_address =
if let Some(node_address) = self.active_requests_nonce_mapping.remove(&old_nonce) {
node_address
} else {
debug_unreachable!("expected to find nonce in active_requests_nonce_mapping");
error!("expected to find nonce in active_requests_nonce_mapping");
return;
};

self.active_requests_nonce_mapping
.insert(new_packet.header.message_nonce, node_address.clone());

match self.active_requests_mapping.entry(node_address) {
Entry::Occupied(mut requests) => {
let maybe_request_call = requests
.get_mut()
.iter_mut()
.find(|req| req.packet().message_nonce() == &old_nonce);

if let Some(request_call) = maybe_request_call {
request_call.update_packet(new_packet);
} else {
debug_unreachable!("expected to find request call in active_requests_mapping");
error!("expected to find request call in active_requests_mapping");
}
}
Entry::Vacant(_) => {
debug_unreachable!("expected to find node address in active_requests_mapping");
error!("expected to find node address in active_requests_mapping");
}
}
}

pub fn get(&self, node_address: &NodeAddress) -> Option<&Vec<RequestCall>> {
self.active_requests_mapping.get(node_address)
}

/// Remove a single request identified by its nonce.
pub fn remove_by_nonce(&mut self, nonce: &MessageNonce) -> Option<(NodeAddress, RequestCall)> {
match self.active_requests_nonce_mapping.remove(nonce) {
Some(node_address) => match self.active_requests_mapping.remove(&node_address) {
Some(request_call) => Some((node_address, request_call)),
None => {
debug_unreachable!("A matching request call doesn't exist");
error!("A matching request call doesn't exist");
None
let node_address = self.active_requests_nonce_mapping.remove(nonce)?;
match self.active_requests_mapping.entry(node_address.clone()) {
Entry::Vacant(_) => {
debug_unreachable!("expected to find node address in active_requests_mapping");
error!("expected to find node address in active_requests_mapping");
None
}
Entry::Occupied(mut requests) => {
let result = requests
.get()
.iter()
.position(|req| req.packet().message_nonce() == nonce)
.map(|index| (node_address, requests.get_mut().remove(index)));
if requests.get().is_empty() {
requests.remove();
}
},
None => None,
result
}
}
}

pub fn remove(&mut self, node_address: &NodeAddress) -> Option<RequestCall> {
match self.active_requests_mapping.remove(node_address) {
Some(request_call) => {
// Remove the associated nonce mapping.
match self
.active_requests_nonce_mapping
.remove(request_call.packet().message_nonce())
{
Some(_) => Some(request_call),
None => {
debug_unreachable!("A matching nonce mapping doesn't exist");
error!("A matching nonce mapping doesn't exist");
None
}
/// Remove all requests associated with a node.
pub fn remove_requests(&mut self, node_address: &NodeAddress) -> Option<Vec<RequestCall>> {
let requests = self.active_requests_mapping.remove(node_address)?;
// Account for node addresses in `active_requests_nonce_mapping` with an empty list
if requests.is_empty() {
debug_unreachable!("expected to find requests in active_requests_mapping");
return None;
}
for req in &requests {
if self
.active_requests_nonce_mapping
.remove(req.packet().message_nonce())
.is_none()
{
debug_unreachable!("expected to find req with nonce");
error!("expected to find req with nonce");
}
}
Some(requests)
}

/// Remove a single request identified by its id.
pub fn remove_request(
&mut self,
node_address: &NodeAddress,
id: &RequestId,
) -> Option<RequestCall> {
match self.active_requests_mapping.entry(node_address.clone()) {
Entry::Vacant(_) => None,
Entry::Occupied(mut requests) => {
let index = requests.get().iter().position(|req| {
let req_id: RequestId = req.id().into();
&req_id == id
})?;
let request_call = requests.get_mut().remove(index);
if requests.get().is_empty() {
requests.remove();
}
// Remove the associated nonce mapping.
self.active_requests_nonce_mapping
.remove(request_call.packet().message_nonce());
Some(request_call)
}
None => None,
}
}

Expand All @@ -80,10 +154,12 @@ impl ActiveRequests {
}
}

for (address, request) in self.active_requests_mapping.iter() {
let nonce = request.packet().message_nonce();
if !self.active_requests_nonce_mapping.contains_key(nonce) {
panic!("Address {} maps to request with nonce {:?}, which does not exist in `active_requests_nonce_mapping`", address, nonce);
for (address, requests) in self.active_requests_mapping.iter() {
for req in requests {
let nonce = req.packet().message_nonce();
if !self.active_requests_nonce_mapping.contains_key(nonce) {
panic!("Address {} maps to request with nonce {:?}, which does not exist in `active_requests_nonce_mapping`", address, nonce);
}
}
}
}
Expand All @@ -92,12 +168,27 @@ impl ActiveRequests {
impl Stream for ActiveRequests {
type Item = Result<(NodeAddress, RequestCall), String>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.active_requests_mapping.poll_next_unpin(cx) {
Poll::Ready(Some(Ok((node_address, request_call)))) => {
// Remove the associated nonce mapping.
self.active_requests_nonce_mapping
.remove(request_call.packet().message_nonce());
Poll::Ready(Some(Ok((node_address, request_call))))
match self.active_requests_nonce_mapping.poll_next_unpin(cx) {
Poll::Ready(Some(Ok((nonce, node_address)))) => {
match self.active_requests_mapping.entry(node_address.clone()) {
Entry::Vacant(_) => Poll::Ready(None),
Entry::Occupied(mut requests) => {
match requests
.get()
.iter()
.position(|req| req.packet().message_nonce() == &nonce)
{
Some(index) => {
let result = (node_address, requests.get_mut().remove(index));
if requests.get().is_empty() {
requests.remove();
}
Poll::Ready(Some(Ok(result)))
}
None => Poll::Ready(None),
}
}
}
}
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
Poll::Ready(None) => Poll::Ready(None),
Expand Down
Loading

0 comments on commit 546e19c

Please sign in to comment.