Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for concurrent requests to a single peer. #200

Merged
merged 46 commits into from
Dec 10, 2023
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
29b5267
Initial support for concurrent requests
njgheorghita Jul 3, 2023
a46961e
Fail all concurrent requests for a session if any one req fails
njgheorghita Jul 6, 2023
7c1c8f8
move hashmapdelay to nonce mapping
njgheorghita Jul 10, 2023
d334d20
Use entry to access/mutate ActiveRequests
njgheorghita Jul 19, 2023
3330a6b
Return none rather than panicking for request mapping mismatches
njgheorghita Jul 27, 2023
fe34db7
Remove expected response upon failed session
njgheorghita Aug 3, 2023
e9e8b54
Replay active requests if active session re-established
njgheorghita Aug 3, 2023
c92a0cb
Relocate remove_expected_response
njgheorghita Aug 4, 2023
3955588
Remove empty lists from active requests mapping
njgheorghita Aug 4, 2023
5ecbfb1
Send pending requests rather than replay active requests on outgoing …
njgheorghita Aug 16, 2023
ee1332c
Filter replaying newly established session handshake req
njgheorghita Aug 16, 2023
abf83c8
Add test for replay_active_requests
ackintosh Aug 22, 2023
9c2851f
Fix duplicated request on replay_active_requests()
ackintosh Aug 26, 2023
0df2d70
Fix missing active request
ackintosh Aug 26, 2023
e761d9e
Fix wrong expected_response after replaying
ackintosh Aug 26, 2023
b2051c1
Fix replaying test
ackintosh Aug 26, 2023
ee3ae51
cargo fmt
ackintosh Aug 26, 2023
cd71703
Add comment
ackintosh Aug 26, 2023
54a344a
Add test for ActiveRequests::remove_requests_except()
ackintosh Aug 26, 2023
d69576d
Refactor ActiveRequests::remove_requests_except()
ackintosh Aug 26, 2023
cb52be9
Fix linting errors
njgheorghita Sep 2, 2023
399288c
Fix test: avoid ports used by other tests
ackintosh Sep 4, 2023
6ef70f2
Add a test for sending pending requests
ackintosh Sep 9, 2023
1bc3cea
Fix the timeout issue
ackintosh Sep 9, 2023
1dca750
Add a log entry
ackintosh Sep 10, 2023
b81c4dc
Tweak log format to be readable
ackintosh Sep 10, 2023
7b169b9
Move send_pending_requests into new_session
ackintosh Sep 11, 2023
b525edd
Update mermaid diagram
ackintosh Sep 12, 2023
a94fd26
Add test
ackintosh Sep 23, 2023
2db8534
Change key type of active_nodes_responses to handle concurrent responses
ackintosh Sep 23, 2023
18ba66f
cargo fmt
ackintosh Sep 23, 2023
c053ac2
Fix redundant clone
ackintosh Sep 23, 2023
b153b14
Tweak udp4 port numbers to avoid `Address already in use` error
ackintosh Sep 24, 2023
4fdb780
Add docstring comments to new handler methods
njgheorghita Sep 27, 2023
d791fe3
Fix replay
ackintosh Oct 8, 2023
13bd6fa
Add doc comment
ackintosh Oct 11, 2023
5c84d22
Add test for update_packet()
ackintosh Oct 11, 2023
f9e75c3
Add doc comment & cargo fmt
ackintosh Oct 14, 2023
76b88d9
Fix clippy errors
ackintosh Oct 15, 2023
742d460
Merge latest master
AgeManning Nov 29, 2023
f2d345c
fmt
AgeManning Nov 29, 2023
35cfcb3
Fix broken clippy lint
njgheorghita Nov 30, 2023
30a8f2c
Merge branch 'master' into experimental
divagant-martian Dec 3, 2023
3e325d6
smmal code cleanup
divagant-martian Dec 3, 2023
9c9cdd4
Replace naked unwrap with expect
njgheorghita Dec 5, 2023
84f303a
Log error message instead of panicking, if unable to re-encrypt activ…
njgheorghita Dec 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/discv5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ lazy_static! {
RwLock::new(crate::PermitBanList::default());
}

mod test;
pub(crate) mod test;

/// Events that can be produced by the `Discv5` event stream.
#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion src/discv5/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async fn build_nodes_from_keypairs_dual_stack(
}

/// Generate `n` deterministic keypairs from a given seed.
fn generate_deterministic_keypair(n: usize, seed: u64) -> Vec<CombinedKey> {
pub(crate) fn generate_deterministic_keypair(n: usize, seed: u64) -> Vec<CombinedKey> {
let mut keypairs = Vec::new();
for i in 0..n {
let sk = {
Expand Down
173 changes: 132 additions & 41 deletions src/handler/active_requests.rs
Original file line number Diff line number Diff line change
@@ -1,68 +1,142 @@
use super::*;
use delay_map::HashMapDelay;
use more_asserts::debug_unreachable;
use std::collections::hash_map::Entry;

pub(super) struct ActiveRequests {
/// A list of raw messages we are awaiting a response from the remote.
active_requests_mapping: HashMapDelay<NodeAddress, RequestCall>,
active_requests_mapping: HashMap<NodeAddress, Vec<RequestCall>>,
// WHOAREYOU messages do not include the source node id. We therefore maintain another
// mapping of active_requests via message_nonce. This allows us to match WHOAREYOU
// requests with active requests sent.
/// A mapping of all pending active raw requests message nonces to their NodeAddress.
active_requests_nonce_mapping: HashMap<MessageNonce, NodeAddress>,
/// A mapping of all active raw requests message nonces to their NodeAddress.
active_requests_nonce_mapping: HashMapDelay<MessageNonce, NodeAddress>,
}

impl ActiveRequests {
pub fn new(request_timeout: Duration) -> Self {
ActiveRequests {
active_requests_mapping: HashMapDelay::new(request_timeout),
active_requests_nonce_mapping: HashMap::new(),
active_requests_mapping: HashMap::new(),
active_requests_nonce_mapping: HashMapDelay::new(request_timeout),
}
}

/// Insert a new request into the active requests mapping.
pub fn insert(&mut self, node_address: NodeAddress, request_call: RequestCall) {
let nonce = *request_call.packet().message_nonce();
self.active_requests_mapping
.insert(node_address.clone(), request_call);
.entry(node_address.clone())
.or_default()
.push(request_call);
self.active_requests_nonce_mapping
.insert(nonce, node_address);
}

pub fn get(&self, node_address: &NodeAddress) -> Option<&RequestCall> {
/// Update the underlying packet for the request via message nonce.
pub fn update_packet(&mut self, old_nonce: MessageNonce, new_packet: Packet) {
let node_address =
if let Some(node_address) = self.active_requests_nonce_mapping.remove(&old_nonce) {
node_address
} else {
debug_unreachable!("expected to find nonce in active_requests_nonce_mapping");
error!("expected to find nonce in active_requests_nonce_mapping");
return;
};

self.active_requests_nonce_mapping
.insert(new_packet.header.message_nonce, node_address.clone());

match self.active_requests_mapping.entry(node_address) {
Entry::Occupied(mut requests) => {
let maybe_request_call = requests
.get_mut()
.iter_mut()
.find(|req| req.packet().message_nonce() == &old_nonce);

if let Some(request_call) = maybe_request_call {
request_call.update_packet(new_packet);
} else {
debug_unreachable!("expected to find request call in active_requests_mapping");
error!("expected to find request call in active_requests_mapping");
}
}
Entry::Vacant(_) => {
debug_unreachable!("expected to find node address in active_requests_mapping");
error!("expected to find node address in active_requests_mapping");
}
}
}

pub fn get(&self, node_address: &NodeAddress) -> Option<&Vec<RequestCall>> {
self.active_requests_mapping.get(node_address)
}

/// Remove a single request identified by its nonce.
pub fn remove_by_nonce(&mut self, nonce: &MessageNonce) -> Option<(NodeAddress, RequestCall)> {
match self.active_requests_nonce_mapping.remove(nonce) {
Some(node_address) => match self.active_requests_mapping.remove(&node_address) {
Some(request_call) => Some((node_address, request_call)),
None => {
debug_unreachable!("A matching request call doesn't exist");
error!("A matching request call doesn't exist");
None
let node_address = self.active_requests_nonce_mapping.remove(nonce)?;
match self.active_requests_mapping.entry(node_address.clone()) {
Entry::Vacant(_) => {
debug_unreachable!("expected to find node address in active_requests_mapping");
error!("expected to find node address in active_requests_mapping");
None
}
Entry::Occupied(mut requests) => {
let result = requests
.get()
.iter()
.position(|req| req.packet().message_nonce() == nonce)
.map(|index| (node_address, requests.get_mut().remove(index)));
if requests.get().is_empty() {
requests.remove();
}
},
None => None,
result
}
}
}

pub fn remove(&mut self, node_address: &NodeAddress) -> Option<RequestCall> {
match self.active_requests_mapping.remove(node_address) {
Some(request_call) => {
// Remove the associated nonce mapping.
match self
.active_requests_nonce_mapping
.remove(request_call.packet().message_nonce())
{
Some(_) => Some(request_call),
None => {
debug_unreachable!("A matching nonce mapping doesn't exist");
error!("A matching nonce mapping doesn't exist");
None
}
/// Remove all requests associated with a node.
pub fn remove_requests(&mut self, node_address: &NodeAddress) -> Option<Vec<RequestCall>> {
let requests = self.active_requests_mapping.remove(node_address)?;
// Account for node addresses in `active_requests_nonce_mapping` with an empty list
if requests.is_empty() {
debug_unreachable!("expected to find requests in active_requests_mapping");
return None;
njgheorghita marked this conversation as resolved.
Show resolved Hide resolved
}
for req in &requests {
if self
.active_requests_nonce_mapping
.remove(req.packet().message_nonce())
.is_none()
{
debug_unreachable!("expected to find req with nonce");
error!("expected to find req with nonce");
njgheorghita marked this conversation as resolved.
Show resolved Hide resolved
}
}
Some(requests)
}

/// Remove a single request identified by its id.
pub fn remove_request(
&mut self,
node_address: &NodeAddress,
id: &RequestId,
) -> Option<RequestCall> {
match self.active_requests_mapping.entry(node_address.clone()) {
Entry::Vacant(_) => None,
Entry::Occupied(mut requests) => {
let index = requests.get().iter().position(|req| {
let req_id: RequestId = req.id().into();
&req_id == id
})?;
let request_call = requests.get_mut().remove(index);
if requests.get().is_empty() {
requests.remove();
}
// Remove the associated nonce mapping.
self.active_requests_nonce_mapping
.remove(request_call.packet().message_nonce());
Some(request_call)
Comment on lines +137 to +138
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to check the len of requests. Otherwise you will end up with an entry with an empty list

}
None => None,
}
}

Expand All @@ -80,10 +154,12 @@ impl ActiveRequests {
}
}

for (address, request) in self.active_requests_mapping.iter() {
let nonce = request.packet().message_nonce();
if !self.active_requests_nonce_mapping.contains_key(nonce) {
panic!("Address {} maps to request with nonce {:?}, which does not exist in `active_requests_nonce_mapping`", address, nonce);
for (address, requests) in self.active_requests_mapping.iter() {
for req in requests {
let nonce = req.packet().message_nonce();
if !self.active_requests_nonce_mapping.contains_key(nonce) {
panic!("Address {} maps to request with nonce {:?}, which does not exist in `active_requests_nonce_mapping`", address, nonce);
}
}
}
}
Expand All @@ -92,12 +168,27 @@ impl ActiveRequests {
impl Stream for ActiveRequests {
type Item = Result<(NodeAddress, RequestCall), String>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.active_requests_mapping.poll_next_unpin(cx) {
Poll::Ready(Some(Ok((node_address, request_call)))) => {
// Remove the associated nonce mapping.
self.active_requests_nonce_mapping
.remove(request_call.packet().message_nonce());
Poll::Ready(Some(Ok((node_address, request_call))))
match self.active_requests_nonce_mapping.poll_next_unpin(cx) {
Poll::Ready(Some(Ok((nonce, node_address)))) => {
match self.active_requests_mapping.entry(node_address.clone()) {
Entry::Vacant(_) => Poll::Ready(None),
Entry::Occupied(mut requests) => {
match requests
.get()
.iter()
.position(|req| req.packet().message_nonce() == &nonce)
{
Some(index) => {
let result = (node_address, requests.get_mut().remove(index));
if requests.get().is_empty() {
requests.remove();
}
Poll::Ready(Some(Ok(result)))
}
None => Poll::Ready(None),
}
}
}
}
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
Poll::Ready(None) => Poll::Ready(None),
Expand Down
Loading