-
Notifications
You must be signed in to change notification settings - Fork 814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delayed RPC Send Using Tokens #5923
base: unstable
Are you sure you want to change the base?
Changes from 1 commit
0154359
d5fe64e
aab59f5
e00e679
670ec96
c0ae632
7e0c630
6322210
3947bf6
933dc00
b62537f
86cf8fb
8fd37c5
6c1015e
817ce97
7e42568
94c2493
9ad4eb7
de9d943
7adb142
627fd33
b55ffca
73e9879
cdef58d
3190d9a
19fe6b0
5a9237f
4609624
a325438
3b6edab
2ab853c
2621ce8
51247e3
5ed47b7
cbfb2ea
9f6177d
9008d3e
bd9f13c
5dbac58
ae67804
4852b20
156565c
0e1e58b
cb87af0
023c542
14ffeec
5c9e063
3c058b3
a9a675a
5d70573
4e872a0
450326c
14fb84c
636224c
f6fd85b
95f8378
9d2b263
2d7a679
b73a336
dfd092d
810c5de
d46cbe8
540436c
3d39f2c
60c9900
eec6b4a
9a6eb72
0d0f48d
ce7ae4b
cfea9d2
9850bce
d859dad
b0ce89d
1f521d6
a09fc1a
5f6485a
14cf204
9b1c180
c0c4459
580f921
a9910e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,6 @@ | |
//! direct peer-to-peer communication primarily for sending/receiving chain information for | ||
//! syncing. | ||
|
||
use futures::future::FutureExt; | ||
use handler::RPCHandler; | ||
use libp2p::core::transport::PortUse; | ||
use libp2p::swarm::{ | ||
|
@@ -13,8 +12,7 @@ use libp2p::swarm::{ | |
}; | ||
use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent}; | ||
use libp2p::PeerId; | ||
use rate_limiter::RPCRateLimiter as RateLimiter; | ||
use slog::{crit, debug, error, o, trace}; | ||
use slog::{debug, error, o, trace}; | ||
use std::collections::HashMap; | ||
use std::marker::PhantomData; | ||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||
|
@@ -33,8 +31,8 @@ use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig}; | |
use self::protocol::RPCProtocol; | ||
use self::self_limiter::SelfRateLimiter; | ||
use crate::rpc::active_requests_limiter::ActiveRequestsLimiter; | ||
use crate::rpc::delayed_responses::DelayedResponses; | ||
use crate::rpc::rate_limiter::{RateLimitedErr, RateLimiterItem}; | ||
use crate::rpc::rate_limiter::RateLimiterItem; | ||
use crate::rpc::response_limiter::ResponseLimiter; | ||
pub use handler::SubstreamId; | ||
pub use methods::{ | ||
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest, | ||
|
@@ -45,12 +43,12 @@ pub use protocol::{max_rpc_size, Protocol, RPCError}; | |
mod active_requests_limiter; | ||
pub(crate) mod codec; | ||
pub mod config; | ||
mod delayed_responses; | ||
mod handler; | ||
pub mod methods; | ||
mod outbound; | ||
mod protocol; | ||
mod rate_limiter; | ||
mod response_limiter; | ||
mod self_limiter; | ||
|
||
static NEXT_REQUEST_ID: AtomicUsize = AtomicUsize::new(1); | ||
|
@@ -156,10 +154,8 @@ pub struct NetworkParams { | |
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level | ||
/// logic. | ||
pub struct RPC<Id: ReqId, E: EthSpec> { | ||
/// Rate limiter for our responses. This is shared with RPCHandlers. | ||
response_limiter: Option<RateLimiter>, | ||
/// Responses queued for sending. These responses are stored when the response limiter rejects them. | ||
delayed_responses: DelayedResponses<E>, | ||
/// Rate limiter for our responses. | ||
response_limiter: Option<ResponseLimiter<E>>, | ||
/// Rate limiter for our own requests. | ||
outbound_request_limiter: Option<SelfRateLimiter<Id, E>>, | ||
/// Limiter for inbound requests, which restricts more than two requests from running | ||
|
@@ -193,8 +189,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> { | |
|
||
let response_limiter = inbound_rate_limiter_config.clone().map(|config| { | ||
debug!(log, "Using response rate limiting params"; "config" => ?config); | ||
RateLimiter::new_with_config(config.0) | ||
.expect("Inbound limiter configuration parameters are valid") | ||
ResponseLimiter::new(config, log.clone()) | ||
}); | ||
|
||
let outbound_request_limiter = outbound_rate_limiter_config.map(|config| { | ||
|
@@ -203,7 +198,6 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> { | |
|
||
RPC { | ||
response_limiter, | ||
delayed_responses: DelayedResponses::new(), | ||
outbound_request_limiter, | ||
active_inbound_requests_limiter: ActiveRequestsLimiter::new(), | ||
active_inbound_requests: HashMap::new(), | ||
|
@@ -241,38 +235,14 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> { | |
} | ||
|
||
if let Some(response_limiter) = self.response_limiter.as_mut() { | ||
// First check that there are not already other responses waiting to be sent. | ||
let protocol = request.r#type.protocol(); | ||
if self.delayed_responses.exists(peer_id, protocol) { | ||
self.delayed_responses.add( | ||
peer_id, | ||
protocol, | ||
connection_id, | ||
request.substream_id, | ||
event, | ||
); | ||
return; | ||
} | ||
|
||
match Self::try_response_limiter( | ||
response_limiter, | ||
&peer_id, | ||
protocol, | ||
if !response_limiter.allows( | ||
peer_id, | ||
request.r#type.protocol(), | ||
connection_id, | ||
request.substream_id, | ||
event.clone(), | ||
&self.log, | ||
) { | ||
Ok(()) => {} | ||
Err(wait_time) => { | ||
self.delayed_responses.push_back( | ||
peer_id, | ||
protocol, | ||
connection_id, | ||
request.substream_id, | ||
event, | ||
wait_time, | ||
); | ||
return; | ||
} | ||
return; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, we can return an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, where the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yup, thanks for explaining Akihito There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we then leave a comment explain that Akihito, as we have on |
||
} | ||
} | ||
|
||
|
@@ -289,36 +259,6 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> { | |
}); | ||
} | ||
|
||
/// Checks if the response limiter allows the response. If the response should be delayed, the | ||
/// duration to wait is returned. | ||
fn try_response_limiter( | ||
limiter: &mut RateLimiter, | ||
peer_id: &PeerId, | ||
protocol: Protocol, | ||
response: RpcResponse<E>, | ||
log: &slog::Logger, | ||
) -> Result<(), Duration> { | ||
match limiter.allows(peer_id, &(response, protocol)) { | ||
Ok(()) => Ok(()), | ||
Err(e) => match e { | ||
RateLimitedErr::TooLarge => { | ||
// This should never happen with default parameters. Let's just send the response. | ||
// Log a crit since this is a config issue. | ||
crit!( | ||
log, | ||
"Response rate limiting error for a batch that will never fit. Sending response anyway. Check configuration parameters."; | ||
"protocol" => %protocol | ||
); | ||
Ok(()) | ||
} | ||
RateLimitedErr::TooSoon(wait_time) => { | ||
debug!(log, "Response rate limiting"; "protocol" => %protocol, "wait_time_ms" => wait_time.as_millis(), "peer_id" => %peer_id); | ||
Err(wait_time) | ||
} | ||
}, | ||
} | ||
} | ||
|
||
/// Submits an RPC request. | ||
/// | ||
/// The peer must be connected for this to succeed. | ||
|
@@ -616,56 +556,21 @@ where | |
|
||
fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> { | ||
if let Some(response_limiter) = self.response_limiter.as_mut() { | ||
// let the rate limiter prune. | ||
let _ = response_limiter.poll_unpin(cx); | ||
|
||
let mut response_to_requeue = None; | ||
let mut should_remove = false; | ||
let mut key_to_remove = None; | ||
if let Some(q) = self.delayed_responses.poll_next_response(cx) { | ||
if let Some(response) = q.front() { | ||
key_to_remove = Some((response.peer_id, response.protocol)); | ||
} | ||
// Take delayed responses from the queue and send them, as long as the limiter allows it. | ||
while let Some(response) = q.pop_front() { | ||
match Self::try_response_limiter( | ||
response_limiter, | ||
&response.peer_id, | ||
response.protocol, | ||
response.response.clone(), | ||
&self.log, | ||
) { | ||
Ok(()) => { | ||
self.active_inbound_requests_limiter.remove_request( | ||
response.peer_id, | ||
&response.connection_id, | ||
&response.substream_id, | ||
); | ||
|
||
self.events.push(ToSwarm::NotifyHandler { | ||
peer_id: response.peer_id, | ||
handler: NotifyHandler::One(response.connection_id), | ||
event: RPCSend::Response(response.substream_id, response.response), | ||
}); | ||
} | ||
Err(wait_time) => { | ||
response_to_requeue = Some((response, wait_time)); | ||
break; | ||
} | ||
} | ||
} | ||
should_remove = q.is_empty(); | ||
} | ||
if should_remove { | ||
// Remove the queue since now it's empty. | ||
if let Some((peer_id, protocol)) = key_to_remove { | ||
self.delayed_responses.remove(peer_id, protocol); | ||
if let Poll::Ready(responses) = response_limiter.poll_ready(cx) { | ||
for response in responses { | ||
self.active_inbound_requests_limiter.remove_request( | ||
response.peer_id, | ||
&response.connection_id, | ||
&response.substream_id, | ||
); | ||
|
||
self.events.push(ToSwarm::NotifyHandler { | ||
peer_id: response.peer_id, | ||
handler: NotifyHandler::One(response.connection_id), | ||
event: RPCSend::Response(response.substream_id, response.response), | ||
}); | ||
} | ||
} | ||
if let Some((response, wait_time)) = response_to_requeue { | ||
// The response was taken from the queue, but the limiter didn't allow it. | ||
self.delayed_responses.push_front(response, wait_time); | ||
} | ||
} | ||
|
||
if let Some(self_limiter) = self.outbound_request_limiter.as_mut() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would keep the previous naming, to also still follow the naming of the configurations. But mostly what we are rate limiting are external requests right? Not our responses