-
Notifications
You must be signed in to change notification settings - Fork 70
Add support for concurrent requests to a single peer. #200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
46 commits
Select commit
Hold shift + click to select a range
29b5267
Initial support for concurrent requests
njgheorghita a46961e
Fail all concurrent requests for a session if any one req fails
njgheorghita 7c1c8f8
move hashmapdelay to nonce mapping
njgheorghita d334d20
Use entry to access/mutate ActiveRequests
njgheorghita 3330a6b
Return none rather than panicking for request mapping mismatches
njgheorghita fe34db7
Remove expected response upon failed session
njgheorghita e9e8b54
Replay active requests if active session re-established
njgheorghita c92a0cb
Relocate remove_expected_response
njgheorghita 3955588
Remove empty lists from active requests mapping
njgheorghita 5ecbfb1
Send pending requests rather than replay active requests on outgoing …
njgheorghita ee1332c
Filter replaying newly established session handshake req
njgheorghita abf83c8
Add test for replay_active_requests
ackintosh 9c2851f
Fix duplicated request on replay_active_requests()
ackintosh 0df2d70
Fix missing active request
ackintosh e761d9e
Fix wrong expected_response after replaying
ackintosh b2051c1
Fix replaying test
ackintosh ee3ae51
cargo fmt
ackintosh cd71703
Add comment
ackintosh 54a344a
Add test for ActiveRequests::remove_requests_except()
ackintosh d69576d
Refactor ActiveRequests::remove_requests_except()
ackintosh cb52be9
Fix linting errors
njgheorghita 399288c
Fix test: avoid ports used by other tests
ackintosh 6ef70f2
Add a test for sending pending requests
ackintosh 1bc3cea
Fix the timeout issue
ackintosh 1dca750
Add a log entry
ackintosh b81c4dc
Tweak log format to be readable
ackintosh 7b169b9
Move send_pending_requests into new_session
ackintosh b525edd
Update mermaid diagram
ackintosh a94fd26
Add test
ackintosh 2db8534
Change key type of active_nodes_responses to handle concurrent responses
ackintosh 18ba66f
cargo fmt
ackintosh c053ac2
Fix redundant clone
ackintosh b153b14
Tweak udp4 port numbers to avoid `Address already in use` error
ackintosh 4fdb780
Add docstring comments to new handler methods
njgheorghita d791fe3
Fix replay
ackintosh 13bd6fa
Add doc comment
ackintosh 5c84d22
Add test for update_packet()
ackintosh f9e75c3
Add doc comment & cargo fmt
ackintosh 76b88d9
Fix clippy errors
ackintosh 742d460
Merge latest master
AgeManning f2d345c
fmt
AgeManning 35cfcb3
Fix broken clippy lint
njgheorghita 30a8f2c
Merge branch 'master' into experimental
divagant-martian 3e325d6
smmal code cleanup
divagant-martian 9c9cdd4
Replace naked unwrap with expect
njgheorghita 84f303a
Log error message instead of panicking, if unable to re-encrypt activ…
njgheorghita File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,68 +1,142 @@ | ||
use super::*; | ||
use delay_map::HashMapDelay; | ||
use more_asserts::debug_unreachable; | ||
use std::collections::hash_map::Entry; | ||
|
||
pub(super) struct ActiveRequests { | ||
/// A list of raw messages we are awaiting a response from the remote. | ||
active_requests_mapping: HashMapDelay<NodeAddress, RequestCall>, | ||
active_requests_mapping: HashMap<NodeAddress, Vec<RequestCall>>, | ||
// WHOAREYOU messages do not include the source node id. We therefore maintain another | ||
// mapping of active_requests via message_nonce. This allows us to match WHOAREYOU | ||
// requests with active requests sent. | ||
/// A mapping of all pending active raw requests message nonces to their NodeAddress. | ||
active_requests_nonce_mapping: HashMap<MessageNonce, NodeAddress>, | ||
/// A mapping of all active raw requests message nonces to their NodeAddress. | ||
active_requests_nonce_mapping: HashMapDelay<MessageNonce, NodeAddress>, | ||
} | ||
|
||
impl ActiveRequests { | ||
pub fn new(request_timeout: Duration) -> Self { | ||
ActiveRequests { | ||
active_requests_mapping: HashMapDelay::new(request_timeout), | ||
active_requests_nonce_mapping: HashMap::new(), | ||
active_requests_mapping: HashMap::new(), | ||
active_requests_nonce_mapping: HashMapDelay::new(request_timeout), | ||
} | ||
} | ||
|
||
/// Insert a new request into the active requests mapping. | ||
pub fn insert(&mut self, node_address: NodeAddress, request_call: RequestCall) { | ||
let nonce = *request_call.packet().message_nonce(); | ||
self.active_requests_mapping | ||
.insert(node_address.clone(), request_call); | ||
.entry(node_address.clone()) | ||
.or_default() | ||
.push(request_call); | ||
self.active_requests_nonce_mapping | ||
.insert(nonce, node_address); | ||
} | ||
|
||
pub fn get(&self, node_address: &NodeAddress) -> Option<&RequestCall> { | ||
/// Update the underlying packet for the request via message nonce. | ||
pub fn update_packet(&mut self, old_nonce: MessageNonce, new_packet: Packet) { | ||
let node_address = | ||
if let Some(node_address) = self.active_requests_nonce_mapping.remove(&old_nonce) { | ||
node_address | ||
} else { | ||
debug_unreachable!("expected to find nonce in active_requests_nonce_mapping"); | ||
error!("expected to find nonce in active_requests_nonce_mapping"); | ||
return; | ||
}; | ||
|
||
self.active_requests_nonce_mapping | ||
.insert(new_packet.header.message_nonce, node_address.clone()); | ||
|
||
match self.active_requests_mapping.entry(node_address) { | ||
Entry::Occupied(mut requests) => { | ||
let maybe_request_call = requests | ||
.get_mut() | ||
.iter_mut() | ||
.find(|req| req.packet().message_nonce() == &old_nonce); | ||
|
||
if let Some(request_call) = maybe_request_call { | ||
request_call.update_packet(new_packet); | ||
} else { | ||
debug_unreachable!("expected to find request call in active_requests_mapping"); | ||
error!("expected to find request call in active_requests_mapping"); | ||
} | ||
} | ||
Entry::Vacant(_) => { | ||
debug_unreachable!("expected to find node address in active_requests_mapping"); | ||
error!("expected to find node address in active_requests_mapping"); | ||
} | ||
} | ||
} | ||
|
||
pub fn get(&self, node_address: &NodeAddress) -> Option<&Vec<RequestCall>> { | ||
self.active_requests_mapping.get(node_address) | ||
} | ||
|
||
/// Remove a single request identified by its nonce. | ||
pub fn remove_by_nonce(&mut self, nonce: &MessageNonce) -> Option<(NodeAddress, RequestCall)> { | ||
match self.active_requests_nonce_mapping.remove(nonce) { | ||
Some(node_address) => match self.active_requests_mapping.remove(&node_address) { | ||
Some(request_call) => Some((node_address, request_call)), | ||
None => { | ||
debug_unreachable!("A matching request call doesn't exist"); | ||
error!("A matching request call doesn't exist"); | ||
None | ||
let node_address = self.active_requests_nonce_mapping.remove(nonce)?; | ||
match self.active_requests_mapping.entry(node_address.clone()) { | ||
Entry::Vacant(_) => { | ||
debug_unreachable!("expected to find node address in active_requests_mapping"); | ||
error!("expected to find node address in active_requests_mapping"); | ||
None | ||
} | ||
Entry::Occupied(mut requests) => { | ||
let result = requests | ||
.get() | ||
.iter() | ||
.position(|req| req.packet().message_nonce() == nonce) | ||
.map(|index| (node_address, requests.get_mut().remove(index))); | ||
if requests.get().is_empty() { | ||
requests.remove(); | ||
} | ||
}, | ||
None => None, | ||
result | ||
} | ||
} | ||
} | ||
|
||
pub fn remove(&mut self, node_address: &NodeAddress) -> Option<RequestCall> { | ||
match self.active_requests_mapping.remove(node_address) { | ||
Some(request_call) => { | ||
// Remove the associated nonce mapping. | ||
match self | ||
.active_requests_nonce_mapping | ||
.remove(request_call.packet().message_nonce()) | ||
{ | ||
Some(_) => Some(request_call), | ||
None => { | ||
debug_unreachable!("A matching nonce mapping doesn't exist"); | ||
error!("A matching nonce mapping doesn't exist"); | ||
None | ||
} | ||
/// Remove all requests associated with a node. | ||
pub fn remove_requests(&mut self, node_address: &NodeAddress) -> Option<Vec<RequestCall>> { | ||
let requests = self.active_requests_mapping.remove(node_address)?; | ||
// Account for node addresses in `active_requests_nonce_mapping` with an empty list | ||
if requests.is_empty() { | ||
debug_unreachable!("expected to find requests in active_requests_mapping"); | ||
return None; | ||
} | ||
for req in &requests { | ||
if self | ||
.active_requests_nonce_mapping | ||
.remove(req.packet().message_nonce()) | ||
.is_none() | ||
{ | ||
debug_unreachable!("expected to find req with nonce"); | ||
error!("expected to find req with nonce"); | ||
njgheorghita marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
Some(requests) | ||
} | ||
|
||
/// Remove a single request identified by its id. | ||
pub fn remove_request( | ||
&mut self, | ||
node_address: &NodeAddress, | ||
id: &RequestId, | ||
) -> Option<RequestCall> { | ||
match self.active_requests_mapping.entry(node_address.clone()) { | ||
Entry::Vacant(_) => None, | ||
Entry::Occupied(mut requests) => { | ||
let index = requests.get().iter().position(|req| { | ||
let req_id: RequestId = req.id().into(); | ||
&req_id == id | ||
})?; | ||
let request_call = requests.get_mut().remove(index); | ||
if requests.get().is_empty() { | ||
requests.remove(); | ||
} | ||
// Remove the associated nonce mapping. | ||
self.active_requests_nonce_mapping | ||
.remove(request_call.packet().message_nonce()); | ||
Some(request_call) | ||
Comment on lines
+137
to
+138
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you need to check the |
||
} | ||
None => None, | ||
} | ||
} | ||
|
||
|
@@ -80,10 +154,12 @@ impl ActiveRequests { | |
} | ||
} | ||
|
||
for (address, request) in self.active_requests_mapping.iter() { | ||
let nonce = request.packet().message_nonce(); | ||
if !self.active_requests_nonce_mapping.contains_key(nonce) { | ||
panic!("Address {} maps to request with nonce {:?}, which does not exist in `active_requests_nonce_mapping`", address, nonce); | ||
for (address, requests) in self.active_requests_mapping.iter() { | ||
for req in requests { | ||
let nonce = req.packet().message_nonce(); | ||
if !self.active_requests_nonce_mapping.contains_key(nonce) { | ||
panic!("Address {} maps to request with nonce {:?}, which does not exist in `active_requests_nonce_mapping`", address, nonce); | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -92,12 +168,27 @@ impl ActiveRequests { | |
impl Stream for ActiveRequests { | ||
type Item = Result<(NodeAddress, RequestCall), String>; | ||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
match self.active_requests_mapping.poll_next_unpin(cx) { | ||
Poll::Ready(Some(Ok((node_address, request_call)))) => { | ||
// Remove the associated nonce mapping. | ||
self.active_requests_nonce_mapping | ||
.remove(request_call.packet().message_nonce()); | ||
Poll::Ready(Some(Ok((node_address, request_call)))) | ||
match self.active_requests_nonce_mapping.poll_next_unpin(cx) { | ||
Poll::Ready(Some(Ok((nonce, node_address)))) => { | ||
match self.active_requests_mapping.entry(node_address.clone()) { | ||
Entry::Vacant(_) => Poll::Ready(None), | ||
Entry::Occupied(mut requests) => { | ||
match requests | ||
.get() | ||
.iter() | ||
.position(|req| req.packet().message_nonce() == &nonce) | ||
{ | ||
Some(index) => { | ||
let result = (node_address, requests.get_mut().remove(index)); | ||
if requests.get().is_empty() { | ||
requests.remove(); | ||
} | ||
Poll::Ready(Some(Ok(result))) | ||
} | ||
None => Poll::Ready(None), | ||
} | ||
} | ||
} | ||
} | ||
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))), | ||
Poll::Ready(None) => Poll::Ready(None), | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.