Skip to content

Commit

Permalink
Use entry to access/mutate ActiveRequests
Browse files Browse the repository at this point in the history
  • Loading branch information
njgheorghita committed Jul 20, 2023
1 parent f1d4d80 commit 7ae82df
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 136 deletions.
174 changes: 72 additions & 102 deletions src/handler/active_requests.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,7 @@
use super::*;
use delay_map::HashMapDelay;
use std::fmt;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ActiveRequestsError {
InvalidState,
}

impl fmt::Display for ActiveRequestsError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ActiveRequestsError::InvalidState => {
write!(f, "Invalid state: active requests mappings are not in sync")
}
}
}
}

impl std::error::Error for ActiveRequestsError {}
use more_asserts::debug_unreachable;
use std::collections::hash_map::Entry;

pub(super) struct ActiveRequests {
/// A list of raw messages we are awaiting a response from the remote.
Expand All @@ -40,94 +24,86 @@ impl ActiveRequests {
// Insert a new request into the active requests mapping.
pub fn insert(&mut self, node_address: NodeAddress, request_call: RequestCall) {
let nonce = *request_call.packet().message_nonce();
let mut request_calls = self
.active_requests_mapping
.remove(&node_address)
.unwrap_or_default();
request_calls.push(request_call);
self.active_requests_mapping
.insert(node_address.clone(), request_calls);
.entry(node_address.clone())
.or_insert_with(Vec::new)
.push(request_call);
self.active_requests_nonce_mapping
.insert(nonce, node_address);
}

// Remove a single request identified by its nonce.
pub fn remove_by_nonce(
&mut self,
nonce: &MessageNonce,
) -> Result<(NodeAddress, RequestCall), ActiveRequestsError> {
let node_address = self
.active_requests_nonce_mapping
.remove(nonce)
.ok_or_else(|| ActiveRequestsError::InvalidState)?;
let mut requests = self
.active_requests_mapping
.remove(&node_address)
.ok_or_else(|| ActiveRequestsError::InvalidState)?;
let index = match requests
.iter()
.position(|req| req.packet().message_nonce() == nonce)
{
Some(index) => index,
None => {
// if nonce req is missing, reinsert remaining requests into mapping
if !requests.is_empty() {
self.active_requests_mapping
.insert(node_address.clone(), requests);
}
return Err(ActiveRequestsError::InvalidState);
}
pub fn remove_by_nonce(&mut self, nonce: &MessageNonce) -> Option<(NodeAddress, RequestCall)> {
let node_address = match self.active_requests_nonce_mapping.remove(nonce) {
Some(val) => val,
None => return None,
};
let req = requests.remove(index);
if !requests.is_empty() {
self.active_requests_mapping
.insert(node_address.clone(), requests);
match self.active_requests_mapping.entry(node_address.clone()) {
Entry::Vacant(_) => {
debug_unreachable!("expected to find node address in active_requests_mapping");
error!("expected to find node address in active_requests_mapping");
None
}
Entry::Occupied(mut requests) => {
let index = requests
.get()
.iter()
.position(|req| req.packet().message_nonce() == nonce)
.expect("to find request call by nonce");
Some((node_address, requests.get_mut().remove(index)))
}
}
Ok((node_address, req))
}

// Remove all requests associated with a node.
pub fn remove_requests(
&mut self,
node_address: &NodeAddress,
) -> Result<Vec<RequestCall>, ActiveRequestsError> {
let requests = self
.active_requests_mapping
.remove(&node_address)
.ok_or_else(|| ActiveRequestsError::InvalidState)?;
pub fn remove_requests(&mut self, node_address: &NodeAddress) -> Option<Vec<RequestCall>> {
let requests = match self.active_requests_mapping.remove(node_address) {
Some(val) => val,
None => return None,
};
// Account for node addresses in `active_requests_nonce_mapping` with an empty list
if requests.is_empty() {
return None;
}
for req in &requests {
self.active_requests_nonce_mapping
.remove(req.packet().message_nonce());
if self
.active_requests_nonce_mapping
.remove(req.packet().message_nonce())
.is_none()
{
debug_unreachable!("expected to find req with nonce");
error!("expected to find req with nonce");
}
}
Ok(requests)
Some(requests)
}

// Remove a single request identified by its id.
pub fn remove_request(
&mut self,
node_address: &NodeAddress,
id: &RequestId,
) -> Result<RequestCall, ActiveRequestsError> {
let reqs = self
.active_requests_mapping
.get(node_address)
.ok_or_else(|| ActiveRequestsError::InvalidState)?;
let index = reqs
.iter()
.position(|req| {
let req_id: RequestId = req.id().into();
&req_id == id
})
.ok_or_else(|| ActiveRequestsError::InvalidState)?;
let nonce = reqs
.get(index)
.ok_or_else(|| ActiveRequestsError::InvalidState)?
.packet()
.message_nonce()
.clone();
// Remove the associated nonce mapping.
let (_, request_call) = self.remove_by_nonce(&nonce)?;
Ok(request_call)
) -> Option<RequestCall> {
match self.active_requests_mapping.entry(node_address.to_owned()) {
Entry::Vacant(_) => None,
Entry::Occupied(mut requests) => {
let index = requests.get().iter().position(|req| {
let req_id: RequestId = req.id().into();
&req_id == id
});
let index = match index {
Some(index) => index,
// Node address existence in active requests mapping does not guarantee request
// id existence.
None => return None,
};
let request_call = requests.get_mut().remove(index);
// Remove the associated nonce mapping.
self.active_requests_nonce_mapping
.remove(request_call.packet().message_nonce());
Some(request_call)
}
}
}

/// Checks that `active_requests_mapping` and `active_requests_nonce_mapping` are in sync.
Expand Down Expand Up @@ -160,23 +136,17 @@ impl Stream for ActiveRequests {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.active_requests_nonce_mapping.poll_next_unpin(cx) {
Poll::Ready(Some(Ok((nonce, node_address)))) => {
// remove the associated mapping
let mut reqs = self
.active_requests_mapping
.remove(&node_address)
.ok_or_else(|| ActiveRequestsError::InvalidState)
.unwrap();
let index = reqs
.iter()
.position(|req| req.packet().message_nonce() == &nonce)
.ok_or_else(|| ActiveRequestsError::InvalidState)
.unwrap();
let req = reqs.remove(index);
if reqs.len() > 0 {
self.active_requests_mapping
.insert(node_address.clone(), reqs);
match self.active_requests_mapping.entry(node_address.clone()) {
Entry::Vacant(_) => panic!("invalid ActiveRequests state"),
Entry::Occupied(mut requests) => {
let index = requests
.get()
.iter()
.position(|req| req.packet().message_nonce() == &nonce)
.expect("to find request call by nonce");
Poll::Ready(Some(Ok((node_address, requests.get_mut().remove(index)))))
}
}
Poll::Ready(Some(Ok((node_address, req))))
}
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
Poll::Ready(None) => Poll::Ready(None),
Expand Down
Loading

0 comments on commit 7ae82df

Please sign in to comment.