Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mute denied nodes #322

Merged
merged 14 commits into from
Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions backend/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::{HashMap, HashSet};
use actix::prelude::*;
use actix_web_actors::ws::{CloseReason, CloseCode};
use lazy_static::lazy_static;

use crate::node::connector::Initialize;
use crate::node::connector::{Initialize, Mute};
use crate::feed::connector::{FeedConnector, Connected, FeedId};
use crate::util::DenseMap;
use crate::feed::{self, FeedMessageSerializer};
Expand Down Expand Up @@ -30,7 +31,7 @@ pub struct ChainEntry {
}

lazy_static! {
/// Labels of chains we consider "first party". These chains are allowed any
/// Labels of chains we consider "first party". These chains allow any
/// number of nodes to connect.
static ref FIRST_PARTY_NETWORKS: HashSet<&'static str> = {
let mut set = HashSet::new();
Expand All @@ -42,7 +43,7 @@ lazy_static! {
};
}
/// Max number of nodes allowed to connect to the telemetry server.
const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500;
const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 2;

impl Aggregator {
pub fn new(denylist: HashSet<String>) -> Self {
Expand Down Expand Up @@ -131,6 +132,8 @@ pub struct AddNode {
pub conn_id: ConnId,
/// Recipient for the initialization message
pub rec: Recipient<Initialize>,
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
/// Recipient for the mute message
pub mute: Recipient<Mute>,
}

/// Message sent from the Chain to the Aggregator when the Chain loses all nodes
Expand Down Expand Up @@ -196,10 +199,13 @@ impl Handler<AddNode> for Aggregator {

fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
if self.denylist.contains(&*msg.node.chain) {
log::debug!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain);
log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain);
let AddNode { mute, .. } = msg;
let reason = CloseReason{ code: CloseCode::Abnormal, description: Some("Denied".into()) };
let _ = mute.do_send(Mute { reason });
return;
}
let AddNode { node, conn_id, rec } = msg;
let AddNode { node, conn_id, rec, mute } = msg;
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain);

let cid = self.lazy_chain(&node.chain, ctx);
Expand All @@ -212,6 +218,8 @@ impl Handler<AddNode> for Aggregator {
});
} else {
log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes);
let reason = CloseReason{ code: CloseCode::Again, description: Some("Overquota".into()) };
let _ = mute.do_send(Mute { reason });
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion backend/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl Chain {
if node.update_block(*block) {
if block.height > self.best.height {
self.best = *block;
log::info!(
log::debug!(
"[{}] [nodes={}/feeds={}] new best block={}/{:?}",
self.label.0,
nodes_len,
Expand Down
31 changes: 25 additions & 6 deletions backend/src/node/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::mem;

use bytes::{Bytes, BytesMut};
use actix::prelude::*;
use actix_web_actors::ws;
use actix_web_actors::ws::{self, CloseReason};
use actix_http::ws::Item;
use crate::aggregator::{Aggregator, AddNode};
use crate::chain::{Chain, UpdateNode, RemoveNode};
Expand All @@ -24,7 +24,7 @@ const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024;
pub struct NodeConnector {
/// Multiplexing connections by id
multiplex: BTreeMap<ConnId, ConnMultiplex>,
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
/// Client must send ping at least once every 60 seconds (CLIENT_TIMEOUT),
hb: Instant,
/// Aggregator actor address
aggregator: Addr<Aggregator>,
Expand Down Expand Up @@ -109,8 +109,6 @@ impl NodeConnector {
ConnMultiplex::Waiting { backlog } => {
if let Payload::SystemConnected(connected) = msg.payload() {
let mut node = connected.node.clone();
let rec = ctx.address().recipient();

// FIXME: Use genesis hash instead of names to avoid this mess
match &*node.chain {
"Kusama CC3" => node.chain = "Kusama".into(),
Expand All @@ -123,7 +121,10 @@ impl NodeConnector {
_ => ()
}

self.aggregator.do_send(AddNode { node, conn_id, rec });
let rec = ctx.address().recipient();
let mute = ctx.address().recipient();

self.aggregator.do_send(AddNode { node, conn_id, rec, mute });
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
} else {
if backlog.len() >= 10 {
backlog.remove(0);
Expand Down Expand Up @@ -157,6 +158,23 @@ impl NodeConnector {
}
}

#[derive(Message)]
#[rtype(result = "()")]
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
pub struct Mute {
pub reason: CloseReason,
}

impl Handler<Mute> for NodeConnector {
type Result = ();
fn handle(&mut self, msg: Mute, ctx: &mut Self::Context) {
let Mute { reason } = msg;
log::debug!(target: "NodeConnector::Mute", "Muting a node. Reason: {:?}", reason.description);

ctx.close(Some(reason));
ctx.stop();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I test this locally the nodes keep trying to reconnect every 5-ish seconds so there's plenty of churn here on this. Is there a way to disconnect the socket and trigger an error on the node side? I think (but not sure) that nodes keep trying on error but not forever, so if we could do that it might be a solution to the bandwidth consumption?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about adding Muted state to ConnMultiplex (might need a rename there) to then stop parsing messages in StreamHandler::handle, but still keeping the connection open. There is no way to keep it open without reading messages into buffers in actix-web AFAIK, but not doing the deserialization should help a lot (not if we are limited on the bandwidth though).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you're saying, instead of stopping the NodeConnector actor when muting a node, we'd change the state to ConnMultiplex::Muted? We'd still need the conn_id/msg.id() to be able to look it up in the self.multiplex, but maybe you're thinking of using something non-serde to look up that key and avoid deserializing the full NodeMessage?

Tell me though, is the gain you're thinking about that it's cheaper to keep the node connected but not deserialize any of their messages than closing the actor and create a new one ever n seconds when they reconnect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the following hack: I added a muted: bool to the NodeConnector actor and instead of muting a node with ctx.close(None); ctx.stop(); I did self.muted = true and then changed the StreahHandler::handle to check this boolean as the first thing.
That would keep 1 NodeConnector actor around for every node but not process any of their messages. It feels pretty dirty and I'm not convinced it's better to keep the node connected rather than recreating them, but maybe you're right and that this is cheaper.

It feels to me like the right way to do this is for telemetry to close the connection with a proper reason and then change substrate to behave properly:

  • When a chain is denied, telemetry closes the connection with a CloseCode::Policy (1008) (or possibly CloseCode::Abnormal (1006)); substrate does not try to reconnect.
  • When a chain is over quota, telemetry closes the connection with a CloseCode::Again (1013) and the node is free to try to reconnect later (maybe with exponential backoff if we want to get fancy).

That seems like the only way we have to reduce both processing resources and bandwidth. Wdyt?

}
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct Initialize {
Expand Down Expand Up @@ -203,7 +221,8 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
Ok(ws::Message::Pong(_)) => return,
Ok(ws::Message::Text(text)) => text.into_bytes(),
Ok(ws::Message::Binary(data)) => data,
Ok(ws::Message::Close(_)) => {
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();
return;
}
Expand Down