Skip to content

Commit

Permalink
Start creating connections (edges) with large nonces (#7966)
Browse files Browse the repository at this point in the history
* preparing for sending large nonces

* also show nonce on debug page.

* add time since nonce was created to html

* set proper nonces on re-initialzing of the connection

* fixed broken test

* added more tests

* reviews feedback part1

* comments

* logs fix

* comments and cleanup unnecessary clock advance

* added missing updates

* compile fixes

* test fixed

* fixed compilation issue

* review comment

* fixed after merge
  • Loading branch information
mm-near authored Nov 25, 2022
1 parent ca4b913 commit 9e576fe
Show file tree
Hide file tree
Showing 14 changed files with 234 additions and 44 deletions.
1 change: 1 addition & 0 deletions chain/client/src/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ fn new_peer_info_view(chain: &Chain, connected_peer_info: &ConnectedPeerInfo) ->
.elapsed()
.whole_milliseconds() as u64,
is_outbound_peer: connected_peer_info.peer_type == PeerType::Outbound,
nonce: connected_peer_info.nonce,
}
}

Expand Down
1 change: 1 addition & 0 deletions chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ pub fn setup_mock_all_validators(
last_time_received_message: near_network::time::Instant::now(),
connection_established_time: near_network::time::Instant::now(),
peer_type: PeerType::Outbound,
nonce: 3,
})
.collect();
let peers2 = peers
Expand Down
5 changes: 4 additions & 1 deletion chain/jsonrpc/res/network_info.html
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,15 @@
let row = $('.js-tbody-peers').append($('<tr>')
.append($('<td>').append(add_debug_port_link(peer.addr)))
.append($('<td>').append(validator.join(",")))
.append($('<td>').append(peer.peer_id.substr(9, 5) + "..."))
.append($('<td>').append(peer.peer_id.substr(8, 5) + "..."))
.append($('<td>').append(convertTime(peer.last_time_received_message_millis)).addClass(last_ping_class))
.append($('<td>').append(JSON.stringify(peer.height)).addClass(peer_class))
.append($('<td>').append(displayHash(peer)))
.append($('<td>').append(JSON.stringify(peer.tracked_shards)))
.append($('<td>').append(JSON.stringify(peer.archival)))
.append($('<td>').append(((peer.is_outbound_peer) ? 'OUT' : 'IN')))
// If this is a new style nonce - show the approx time since it was created.
.append($('<td>').append(peer.nonce + " <br> " + ((peer.nonce > 1660000000) ? convertTime(Date.now() - peer.nonce * 1000) : "old style nonce")))
.append($('<td>').append(convertTime(peer.connection_established_time_millis)))
.append($('<td>').append(computeTraffic(peer.received_bytes_per_sec, peer.sent_bytes_per_sec)))
.append($('<td>').append(routedValidator.join(",")))
Expand Down Expand Up @@ -412,6 +414,7 @@ <h2>
<th>Tracked Shards</th>
<th>Archival</th>
<th>Connection type</th>
<th>Nonce</th>
<th>First connection</th>
<th>Traffic (last minute)</th>
<th>Route to validators</th>
Expand Down
10 changes: 10 additions & 0 deletions chain/network/src/network_protocol/edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ impl Edge {
}
}

/// Create a fresh nonce (based on the current time).
pub fn create_fresh_nonce(clock: &time::Clock) -> u64 {
let mut nonce = clock.now_utc().unix_timestamp() as u64;
// Even nonce means that the edge should be removed, so if the timestamp is even, add one to get the odd value.
if nonce % 2 == 0 {
nonce += 1;
}
nonce
}

/// Create the remove edge change from an added edge change.
pub fn remove_edge(&self, my_peer_id: PeerId, sk: &SecretKey) -> Edge {
assert_eq!(self.edge_type(), EdgeState::Active);
Expand Down
67 changes: 58 additions & 9 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::network_protocol::{
use crate::peer::stream;
use crate::peer::tracker::Tracker;
use crate::peer_manager::connection;
use crate::peer_manager::network_state::NetworkState;
use crate::peer_manager::network_state::{NetworkState, PRUNE_EDGES_AFTER};
use crate::peer_manager::peer_manager_actor::Event;
use crate::private_actix::{RegisterPeerError, SendMessage};
use crate::routing::edge::verify_nonce;
Expand Down Expand Up @@ -156,9 +156,21 @@ impl PeerActor {
stream: tcp::Stream,
force_encoding: Option<Encoding>,
network_state: Arc<NetworkState>,
) -> anyhow::Result<actix::Addr<Self>> {
Self::spawn_with_nonce(clock, stream, force_encoding, network_state, None)
}

/// Span peer actor, and make it establish a connection with a given nonce.
/// Used mostly for tests.
pub(crate) fn spawn_with_nonce(
clock: time::Clock,
stream: tcp::Stream,
force_encoding: Option<Encoding>,
network_state: Arc<NetworkState>,
nonce: Option<u64>,
) -> anyhow::Result<actix::Addr<Self>> {
let stream_id = stream.id();
match Self::spawn_inner(clock, stream, force_encoding, network_state.clone()) {
match Self::spawn_inner(clock, stream, force_encoding, network_state.clone(), nonce) {
Ok(it) => Ok(it),
Err(reason) => {
network_state.config.event_sink.push(Event::ConnectionClosed(
Expand All @@ -174,6 +186,7 @@ impl PeerActor {
stream: tcp::Stream,
force_encoding: Option<Encoding>,
network_state: Arc<NetworkState>,
nonce: Option<u64>,
) -> Result<actix::Addr<Self>, ClosingReason> {
let connecting_status = match &stream.type_ {
tcp::StreamType::Inbound => ConnectingStatus::Inbound(
Expand All @@ -189,7 +202,7 @@ impl PeerActor {
.start_outbound(peer_id.clone())
.map_err(ClosingReason::OutboundNotAllowed)?,
handshake_spec: HandshakeSpec {
partial_edge_info: network_state.propose_edge(peer_id, None),
partial_edge_info: network_state.propose_edge(&clock, peer_id, nonce),
protocol_version: PROTOCOL_VERSION,
peer_id: peer_id.clone(),
},
Expand Down Expand Up @@ -431,6 +444,7 @@ impl PeerActor {
));
return;
}

// Verify if nonce is sane.
if let Err(err) = verify_nonce(&self.clock, handshake.partial_edge_info.nonce) {
tracing::debug!(target: "network", nonce=?handshake.partial_edge_info.nonce, my_node_id = ?self.my_node_id(), peer_id=?handshake.sender_peer_id, "bad nonce, disconnecting: {err}");
Expand Down Expand Up @@ -470,7 +484,7 @@ impl PeerActor {
handshake_spec.partial_edge_info.clone()
}
ConnectingStatus::Inbound { .. } => {
self.network_state.propose_edge(&handshake.sender_peer_id, Some(nonce))
self.network_state.propose_edge(&self.clock, &handshake.sender_peer_id, Some(nonce))
}
};
let edge = Edge::new(
Expand Down Expand Up @@ -499,7 +513,7 @@ impl PeerActor {
let conn = Arc::new(connection::Connection {
addr: ctx.address(),
peer_info: peer_info.clone(),
edge,
edge: AtomicCell::new(edge),
genesis_id: handshake.sender_chain_info.genesis_id.clone(),
tracked_shards: handshake.sender_chain_info.tracked_shards.clone(),
archival: handshake.sender_chain_info.archival,
Expand Down Expand Up @@ -556,6 +570,11 @@ impl PeerActor {
})
});

// This time is used to figure out when the first run of the callbacks it run.
// It is important that it is set here (rather than calling clock.now() within the future) - as it makes testing a lot easier (and more deterministic).

let start_time = self.clock.now();

// Here we stop processing any PeerActor events until PeerManager
// decides whether to accept the connection or not: ctx.wait makes
// the actor event loop poll on the future until it completes before
Expand Down Expand Up @@ -588,7 +607,7 @@ impl PeerActor {
}));
// Only broadcast the new edge from the outbound endpoint.
act.network_state.tier2.broadcast_message(Arc::new(PeerMessage::SyncRoutingTable(
RoutingTableUpdate::from_edges(vec![conn.edge.clone()]),
RoutingTableUpdate::from_edges(vec![conn.edge.load()]),
)));
}

Expand Down Expand Up @@ -623,11 +642,38 @@ impl PeerActor {
}
}
}));

// Refresh connection nonces but only if we're outbound. For inbound connection, the other party should
// take care of nonce refresh.
if act.peer_type == PeerType::Outbound {
ctx.spawn(wrap_future({
let conn = conn.clone();
let network_state = act.network_state.clone();
let clock = act.clock.clone();
async move {
// How often should we refresh a nonce from a peer.
// It should be smaller than PRUNE_EDGES_AFTER.
let mut interval = time::Interval::new(start_time + PRUNE_EDGES_AFTER / 3, PRUNE_EDGES_AFTER / 3);
loop {
interval.tick(&clock).await;
conn.send_message(Arc::new(
PeerMessage::RequestUpdateNonce(PartialEdgeInfo::new(
&network_state.config.node_id(),
&conn.peer_info.id,
Edge::create_fresh_nonce(&clock),
&network_state.config.node_key,
)
)));

}
}
}));
}
// Sync the RoutingTable.
act.sync_routing_table();
act.network_state.config.event_sink.push(Event::HandshakeCompleted(HandshakeCompletedEvent{
stream_id: act.stream_id,
edge: conn.edge.clone(),
edge: conn.edge.load(),
}));
},
Err(err) => {
Expand Down Expand Up @@ -724,8 +770,11 @@ impl PeerActor {
return;
}
// Recreate the edge with a newer nonce.
handshake_spec.partial_edge_info =
self.network_state.propose_edge(&handshake_spec.peer_id, Some(edge.next()));
handshake_spec.partial_edge_info = self.network_state.propose_edge(
&self.clock,
&handshake_spec.peer_id,
Some(std::cmp::max(edge.next(), Edge::create_fresh_nonce(&self.clock))),
);
let spec = handshake_spec.clone();
ctx.wait(actix::fut::ready(()).then(move |_, act: &mut Self, _| {
act.send_handshake(spec);
Expand Down
18 changes: 16 additions & 2 deletions chain/network/src/peer_manager/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(crate) struct Connection {
pub addr: actix::Addr<PeerActor>,

pub peer_info: PeerInfo,
pub edge: Edge,
pub edge: AtomicCell<Edge>,
/// Chain Id and hash of genesis block.
pub genesis_id: GenesisId,
/// Shards that the peer is tracking.
Expand Down Expand Up @@ -77,7 +77,7 @@ impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection")
.field("peer_info", &self.peer_info)
.field("edge", &self.edge)
.field("edge", &self.edge.load())
.field("peer_type", &self.peer_type)
.field("connection_established_time", &self.connection_established_time)
.finish()
Expand Down Expand Up @@ -295,6 +295,20 @@ impl Pool {
pool.ready.remove(peer_id);
});
}
/// Update the edge in the pool (if it is newer).
pub fn update_edge(&self, new_edge: &Edge) {
self.0.update(|pool| {
let other = new_edge.other(&pool.me);
if let Some(other) = other {
if let Some(connection) = pool.ready.get_mut(other) {
let edge = connection.edge.load();
if edge.nonce() < new_edge.nonce() {
connection.edge.store(new_edge.clone());
}
}
}
})
}

/// Send message to peer that belongs to our active set
/// Return whether the message is sent or not.
Expand Down
6 changes: 3 additions & 3 deletions chain/network/src/peer_manager/network_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const IMPORTANT_MESSAGE_RESENT_COUNT: usize = 3;
const PRUNE_UNREACHABLE_PEERS_AFTER: time::Duration = time::Duration::hours(1);

/// Remove the edges that were created more that this duration ago.
const PRUNE_EDGES_AFTER: time::Duration = time::Duration::minutes(30);
pub const PRUNE_EDGES_AFTER: time::Duration = time::Duration::minutes(30);

impl WhitelistNode {
pub fn from_peer_info(pi: &PeerInfo) -> anyhow::Result<Self> {
Expand Down Expand Up @@ -258,7 +258,7 @@ impl NetworkState {
// Verify and broadcast the edge of the connection. Only then insert the new
// connection to TIER2 pool, so that nothing is broadcasted to conn.
// TODO(gprusak): consider actually banning the peer for consistency.
this.add_edges(&clock, vec![conn.edge.clone()])
this.add_edges(&clock, vec![conn.edge.load()])
.await
.map_err(|_: ReasonForBan| RegisterPeerError::InvalidEdge)?;
this.tier2.insert_ready(conn.clone()).map_err(RegisterPeerError::PoolError)?;
Expand Down Expand Up @@ -474,7 +474,7 @@ impl NetworkState {
PartialEdgeInfo::new(
&node_id,
&conn.peer_info.id,
edge.next(),
std::cmp::max(Edge::create_fresh_nonce(&clock), edge.next()),
&this.config.node_key,
),
)));
Expand Down
25 changes: 23 additions & 2 deletions chain/network/src/peer_manager/network_state/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,24 @@ impl NetworkState {

/// Constructs a partial edge to the given peer with the nonce specified.
/// If nonce is None, nonce is selected automatically.
pub fn propose_edge(&self, peer1: &PeerId, with_nonce: Option<u64>) -> PartialEdgeInfo {
pub fn propose_edge(
&self,
clock: &time::Clock,
peer1: &PeerId,
with_nonce: Option<u64>,
) -> PartialEdgeInfo {
// When we create a new edge we increase the latest nonce by 2 in case we miss a removal
// proposal from our partner.
let nonce = with_nonce.unwrap_or_else(|| {
self.graph.load().local_edges.get(peer1).map_or(1, |edge| edge.next())
let nonce = Edge::create_fresh_nonce(clock);
// If we already had a connection to this peer - check that edge's nonce.
// And use either that one or the one from the current timestamp.
// We would use existing edge's nonce, if we were trying to connect to a given peer multiple times per second.
self.graph
.load()
.local_edges
.get(peer1)
.map_or(nonce, |edge| std::cmp::max(edge.next(), nonce))
});
PartialEdgeInfo::new(&self.config.node_id(), peer1, nonce, &self.config.node_key)
}
Expand Down Expand Up @@ -85,6 +98,14 @@ impl NetworkState {
if edges.len() == 0 {
return result;
}

// Select local edges (where we are one of the peers) - and update the peer's Connection nonces.
for e in &edges {
if let Some(_) = e.other(&self.config.node_id()) {
self.tier2.update_edge(&e);
}
}

let this = self.clone();
let clock = clock.clone();
let _ = self
Expand Down
1 change: 1 addition & 0 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ impl PeerManagerActor {
last_time_received_message: cp.last_time_received_message.load(),
connection_established_time: cp.connection_established_time,
peer_type: cp.peer_type,
nonce: cp.edge.load().nonce(),
})
.collect(),
num_connected_peers: tier2.ready.len(),
Expand Down
Loading

0 comments on commit 9e576fe

Please sign in to comment.