Skip to content

Commit

Permalink
feat: Add ActixMessageWrapper to rate limit PeerActor (#5198)
Browse files Browse the repository at this point in the history
We want to track of sizes of messages sent from `PeerActor` to `PeerManagerActor`.
We will stop reading from given `PeerActor` once we reach the limit.

To achieve this a new type `ActixMessageWrapper` was introduced. It wraps around messages sent to peer, and contains metadata needed for stat counting.

This is a draft:
- [x] Add a wrapper for sending messages from`PeerActor` to `PeerManagerActor`
- [x] Modify `PeerActor` to use 
- [x] Add a wrapper for reply's to the above messages (wating for `DeepSizeOf`)
- [ ] Wait for `DeepSizeOf` (another PR)
- [ ] Add a wrapper for messages to Peer - for example for routing (another PR)
- [ ] Add a wrapper for messages to EdgeVerifierActor - for example for routing (another PR)
- [ ] Add a wrapper for messages to RoutingTableActor - for example for routing (another PR)
- [ ] Add a wrapper for messages to ClientActor - for example for routing (another PR)
- [ ] Extensions to `performance_stats` to track per thread limits (another PR)
- [ ] Extension to track bandwidth (another PR)

This PR is based on, not yet merged, #4927

Resolves #5155

Blocked by Aeledfyr/deepsize#18
  • Loading branch information
pmnoxx authored Nov 23, 2021
1 parent b7f8847 commit 43c6ae1
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 62 deletions.
63 changes: 63 additions & 0 deletions chain/network/src/common/message_wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use actix::dev::MessageResponse;
use actix::Message;
use near_rate_limiter::{ThrottleController, ThrottleToken};

// Wrapper around Actix messages, used to track size of all messages sent to PeerManager.
// TODO(#5155) Finish implementation of this.

#[allow(unused)]
/// TODO - Once we start using this `ActixMessageWrapper` we will need to make following changes
/// to get this struct to work
/// - Add needed decorators. Probably `Debug`, `Message` from Actix, etc.
/// - Add two rate limiters (local per peer, global one)
/// - Any other metadata we need debugging, etc.
pub struct ActixMessageWrapper<T> {
msg: T,
throttle_token: ThrottleToken,
}

impl<T> ActixMessageWrapper<T> {
pub fn new_without_size(msg: T, throttle_controller: ThrottleController) -> Self {
Self { msg, throttle_token: ThrottleToken::new(throttle_controller, 0) }
}

#[allow(unused)]
pub fn into_inner(mut self) -> T {
return self.msg;
}

#[allow(unused)]
pub fn take(mut self) -> (T, ThrottleToken) {
return (self.msg, self.throttle_token);
}
}

impl<T: Message> Message for ActixMessageWrapper<T> {
type Result = ActixMessageResponse<T::Result>;
}

#[derive(MessageResponse)]
pub struct ActixMessageResponse<T> {
msg: T,
/// Ignore the warning, this code is used. We decrease counters `throttle_controller` when
/// this attribute gets dropped.
#[allow(unused)]
throttle_token: ThrottleToken,
}

impl<T> ActixMessageResponse<T> {
#[allow(unused)]
pub fn new(msg: T, throttle_token: ThrottleToken) -> Self {
Self { msg, throttle_token }
}

#[allow(unused)]
pub fn into_inner(mut self) -> T {
return self.msg;
}

#[allow(unused)]
pub fn take(mut self) -> (T, ThrottleToken) {
return (self.msg, self.throttle_token);
}
}
1 change: 1 addition & 0 deletions chain/network/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod message_wrapper;
1 change: 1 addition & 0 deletions chain/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use crate::stats::metrics;
// TODO(#5307)
pub use near_network_primitives::types::PeerInfo;

pub(crate) mod common;
mod peer;
mod peer_manager;
pub mod routing;
Expand Down
69 changes: 45 additions & 24 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::common::message_wrapper::ActixMessageWrapper;
use crate::peer::codec::{self, Codec};
use crate::peer::tracker::Tracker;
use crate::routing::edge::{Edge, EdgeInfo};
Expand Down Expand Up @@ -723,8 +724,11 @@ impl StreamHandler<Result<Vec<u8>, ReasonForBan>> for PeerActor {
}
HandshakeFailureReason::InvalidTarget => {
debug!(target: "network", "Peer found was not what expected. Updating peer info with {:?}", peer_info);
self.peer_manager_addr.do_send(PeerManagerMessageRequest::PeerRequest(
PeerRequest::UpdatePeerInfo(peer_info),
self.peer_manager_addr.do_send(ActixMessageWrapper::new_without_size(
PeerManagerMessageRequest::PeerRequest(PeerRequest::UpdatePeerInfo(
peer_info,
)),
self.throttle_controller.clone(),
));
}
}
Expand Down Expand Up @@ -801,18 +805,18 @@ impl StreamHandler<Result<Vec<u8>, ReasonForBan>> for PeerActor {
};
self.chain_info = handshake.chain_info.clone();
self.peer_manager_addr
.send(PeerManagerMessageRequest::Consolidate(Consolidate {
.send(ActixMessageWrapper::new_without_size(PeerManagerMessageRequest::Consolidate(Consolidate {
actor: ctx.address(),
peer_info: peer_info.clone(),
peer_type: self.peer_type,
chain_info: handshake.chain_info.clone(),
this_edge_info: self.edge_info.clone(),
other_edge_info: handshake.edge_info.clone(),
peer_protocol_version: self.protocol_version,
}))
}), self.throttle_controller.clone()))
.into_actor(self)
.then(move |res, act, ctx| {
match res.map(|f|f.as_consolidate_response()) {
match res.map(|f|f.into_inner().as_consolidate_response()) {
Ok(ConsolidateResponse::Accept(edge_info)) => {
act.peer_info = Some(peer_info).into();
act.peer_status = PeerStatus::Ready;
Expand Down Expand Up @@ -853,13 +857,16 @@ impl StreamHandler<Result<Vec<u8>, ReasonForBan>> for PeerActor {
}

self.peer_manager_addr
.send(PeerManagerMessageRequest::PeerRequest(PeerRequest::UpdateEdge((
self.peer_id().unwrap(),
edge.next(),
))))
.send(ActixMessageWrapper::new_without_size(
PeerManagerMessageRequest::PeerRequest(PeerRequest::UpdateEdge((
self.peer_id().unwrap(),
edge.next(),
))),
self.throttle_controller.clone(),
))
.into_actor(self)
.then(|res, act, ctx| {
match res.map(|f| f.as_peer_response()) {
match res.map(|f| f.into_inner().as_peer_response()) {
Ok(PeerResponse::UpdatedEdge(edge_info)) => {
act.edge_info = Some(edge_info);
act.send_handshake(ctx);
Expand All @@ -879,8 +886,11 @@ impl StreamHandler<Result<Vec<u8>, ReasonForBan>> for PeerActor {
debug!(target: "network", "Duplicate handshake from {}", self.peer_info);
}
(_, PeerStatus::Ready, PeerMessage::PeersRequest) => {
self.peer_manager_addr.send(PeerManagerMessageRequest::PeersRequest(PeersRequest {})).into_actor(self).then(|res, act, _ctx| {
if let Ok(peers) = res.map(|f|f.as_peers_request_result()) {
self.peer_manager_addr.send(ActixMessageWrapper::new_without_size(PeerManagerMessageRequest::PeersRequest(PeersRequest {}),
self.throttle_controller.clone(),

)).into_actor(self).then(|res, act, _ctx| {
if let Ok(peers) = res.map(|f|f.into_inner().as_peers_request_result()) {
if !peers.peers.is_empty() {
debug!(target: "network", "Peers request from {}: sending {} peers.", act.peer_info, peers.peers.len());
act.send_message(&PeerMessage::PeersResponse(peers.peers));
Expand All @@ -891,8 +901,10 @@ impl StreamHandler<Result<Vec<u8>, ReasonForBan>> for PeerActor {
}
(_, PeerStatus::Ready, PeerMessage::PeersResponse(peers)) => {
debug!(target: "network", "Received peers from {}: {} peers.", self.peer_info, peers.len());
self.peer_manager_addr
.do_send(PeerManagerMessageRequest::PeersResponse(PeersResponse { peers }));
self.peer_manager_addr.do_send(ActixMessageWrapper::new_without_size(
PeerManagerMessageRequest::PeersResponse(PeersResponse { peers }),
self.throttle_controller.clone(),
));
}
(_, PeerStatus::Ready, PeerMessage::RequestUpdateNonce(edge_info)) => self
.peer_manager_addr
Expand Down Expand Up @@ -930,8 +942,12 @@ impl StreamHandler<Result<Vec<u8>, ReasonForBan>> for PeerActor {
})
.spawn(ctx),
(_, PeerStatus::Ready, PeerMessage::RoutingTableSync(sync_data)) => {
self.peer_manager_addr.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::Sync { peer_id: self.peer_id().unwrap(), sync_data },
self.peer_manager_addr.do_send(ActixMessageWrapper::new_without_size(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::Sync {
peer_id: self.peer_id().unwrap(),
sync_data,
}),
self.throttle_controller.clone(),
));
}
#[cfg(feature = "protocol_feature_routing_exchange_algorithm")]
Expand All @@ -941,11 +957,12 @@ impl StreamHandler<Result<Vec<u8>, ReasonForBan>> for PeerActor {
// self.rate_limiter.clone, NetworkRequests::IbfMessage {
// ...

self.peer_manager_addr.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::IbfMessage {
self.peer_manager_addr.do_send(ActixMessageWrapper::new_without_size(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::IbfMessage {
peer_id: self.peer_id().unwrap(),
ibf_msg: ibf_message,
},
}),
self.throttle_controller.clone(),
));
}
(_, PeerStatus::Ready, PeerMessage::Routed(routed_message)) => {
Expand All @@ -956,13 +973,17 @@ impl StreamHandler<Result<Vec<u8>, ReasonForBan>> for PeerActor {
self.ban_peer(ctx, ReasonForBan::InvalidSignature);
} else {
self.peer_manager_addr
.send(PeerManagerMessageRequest::RoutedMessageFrom(RoutedMessageFrom {
msg: routed_message.clone(),
from: self.peer_id().unwrap(),
}))
.send(ActixMessageWrapper::new_without_size(
PeerManagerMessageRequest::RoutedMessageFrom(RoutedMessageFrom {
msg: routed_message.clone(),
from: self.peer_id().unwrap(),
}),
self.throttle_controller.clone(),
))
.into_actor(self)
.then(move |res, act, ctx| {
if res.map(|f| f.as_routed_message_from()).unwrap_or(false) {
if res.map(|f| f.into_inner().as_routed_message_from()).unwrap_or(false)
{
act.receive_message(ctx, PeerMessage::Routed(routed_message));
}
actix::fut::ready(())
Expand Down
71 changes: 45 additions & 26 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,31 @@
use crate::common::message_wrapper::{ActixMessageResponse, ActixMessageWrapper};
use crate::peer::codec::Codec;
use crate::peer::peer_actor::PeerActor;
use crate::peer_manager::peer_store::{PeerStore, TrustLevel};
#[cfg(all(
feature = "test_features",
feature = "protocol_feature_routing_exchange_algorithm"
))]
use crate::routing::edge::SimpleEdge;
use crate::routing::edge::{Edge, EdgeInfo, EdgeType};
use crate::routing::edge_verifier_actor::{EdgeVerifierActor, EdgeVerifierHelper};
use crate::routing::routing::{
PeerRequestResult, RoutingTableView, DELETE_PEERS_AFTER_TIME, MAX_NUM_PEERS,
};
use crate::routing::routing_table_actor::Prune;
use crate::stats::metrics;
use crate::stats::metrics::NetworkMetrics;
#[cfg(feature = "test_features")]
use crate::types::SetAdvOptions;
use crate::types::{
Consolidate, ConsolidateResponse, EdgeList, GetPeerId, GetPeerIdResult, NetworkInfo,
PeerManagerMessageRequest, PeerManagerMessageResponse, PeerMessage, PeerRequest, PeerResponse,
PeersRequest, PeersResponse, SendMessage, StopMsg, SyncData, Unregister,
};
use crate::types::{FullPeerInfo, NetworkClientMessages, NetworkRequests, NetworkResponses};
use crate::{RoutingTableActor, RoutingTableMessages, RoutingTableMessagesResponse};
#[cfg(feature = "protocol_feature_routing_exchange_algorithm")]
use crate::types::{RoutingSyncV2, RoutingVersion2};
use crate::{PeerInfo, RoutingTableActor, RoutingTableMessages, RoutingTableMessagesResponse};
use actix::{
Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, ContextFutureSpawner, Handler,
Recipient, Running, StreamHandler, SyncArbiter, WrapFuture,
Expand All @@ -28,6 +49,7 @@ use near_primitives::network::{AnnounceAccount, PeerId};
use near_primitives::time::Clock;
use near_primitives::types::{AccountId, ProtocolVersion};
use near_primitives::utils::from_timestamp;
use near_rate_limiter::{ThrottleController, ThrottleToken, ThrottledFrameRead};
use near_store::Store;
use rand::seq::{IteratorRandom, SliceRandom};
use rand::thread_rng;
Expand All @@ -45,29 +67,6 @@ use tokio::sync::Semaphore;
use tokio_util::sync::PollSemaphore;
use tracing::{debug, error, info, trace, warn};

#[cfg(all(
feature = "test_features",
feature = "protocol_feature_routing_exchange_algorithm"
))]
use crate::routing::edge::SimpleEdge;
use crate::routing::edge::{Edge, EdgeInfo, EdgeType};
use crate::routing::edge_verifier_actor::{EdgeVerifierActor, EdgeVerifierHelper};
use crate::routing::routing::{
PeerRequestResult, RoutingTableView, DELETE_PEERS_AFTER_TIME, MAX_NUM_PEERS,
};
use crate::routing::routing_table_actor::Prune;
#[cfg(feature = "test_features")]
use crate::types::SetAdvOptions;
use crate::types::{
Consolidate, ConsolidateResponse, EdgeList, GetPeerId, GetPeerIdResult, NetworkInfo,
PeerManagerMessageRequest, PeerManagerMessageResponse, PeerMessage, PeerRequest, PeerResponse,
PeersRequest, PeersResponse, SendMessage, StopMsg, SyncData, Unregister,
};
#[cfg(feature = "protocol_feature_routing_exchange_algorithm")]
use crate::types::{RoutingSyncV2, RoutingVersion2};
use crate::PeerInfo;
use near_rate_limiter::{ThrottleController, ThrottledFrameRead};

/// How often to request peers from active peers.
const REQUEST_PEERS_INTERVAL: Duration = Duration::from_millis(60_000);
/// How much time to wait (in milliseconds) after we send update nonce request before disconnecting.
Expand Down Expand Up @@ -101,9 +100,11 @@ const WAIT_FOR_SYNC_DELAY: Duration = Duration::from_millis(1_000);
const UPDATE_ROUTING_TABLE_INTERVAL: Duration = Duration::from_millis(1_000);

/// Max number of messages we received from peer, and they are in progress, before we start throttling.
const MAX_MESSAGES_COUNT: usize = 20;
/// Disabled for now (TODO PUT UNDER FEATURE FLAG)
const MAX_MESSAGES_COUNT: usize = usize::MAX;
/// Max total size of all messages that are in progress, before we start throttling.
const MAX_MESSAGES_TOTAL_SIZE: usize = 500_000_000;
/// Disabled for now (TODO PUT UNDER FEATURE FLAG)
const MAX_MESSAGES_TOTAL_SIZE: usize = usize::MAX;

macro_rules! unwrap_or_error(($obj: expr, $error: expr) => (match $obj {
Ok(result) => result,
Expand Down Expand Up @@ -2208,6 +2209,24 @@ impl PeerManagerActor {
}
}

impl Handler<ActixMessageWrapper<PeerManagerMessageRequest>> for PeerManagerActor {
type Result = ActixMessageResponse<PeerManagerMessageResponse>;

fn handle(
&mut self,
msg: ActixMessageWrapper<PeerManagerMessageRequest>,
ctx: &mut Self::Context,
) -> Self::Result {
// Unpack throttle controller
let (msg, throttle_token) = msg.take();

let result = self.handle(msg, ctx);

// TODO(#5155) Add support for DeepSizeOf to result
ActixMessageResponse::new(result, ThrottleToken::new(throttle_token.into_inner(), 0))
}
}

impl Handler<PeerManagerMessageRequest> for PeerManagerActor {
type Result = PeerManagerMessageResponse;

Expand Down
Loading

0 comments on commit 43c6ae1

Please sign in to comment.