Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start creating connections (edges) with large nonces #7966

Merged
merged 23 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain/client/src/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ fn new_peer_info_view(chain: &Chain, connected_peer_info: &ConnectedPeerInfo) ->
.elapsed()
.whole_milliseconds() as u64,
is_outbound_peer: connected_peer_info.peer_type == PeerType::Outbound,
nonce: connected_peer_info.nonce,
}
}

Expand Down
1 change: 1 addition & 0 deletions chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ pub fn setup_mock_all_validators(
last_time_received_message: near_network::time::Instant::now(),
connection_established_time: near_network::time::Instant::now(),
peer_type: PeerType::Outbound,
nonce: 3,
})
.collect();
let peers2 = peers
Expand Down
5 changes: 4 additions & 1 deletion chain/jsonrpc/res/network_info.html
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,15 @@
let row = $('.js-tbody-peers').append($('<tr>')
.append($('<td>').append(add_debug_port_link(peer.addr)))
.append($('<td>').append(validator.join(",")))
.append($('<td>').append(peer.peer_id.substr(9, 5) + "..."))
.append($('<td>').append(peer.peer_id.substr(8, 5) + "..."))
.append($('<td>').append(convertTime(peer.last_time_received_message_millis)).addClass(last_ping_class))
.append($('<td>').append(JSON.stringify(peer.height)).addClass(peer_class))
.append($('<td>').append(displayHash(peer)))
.append($('<td>').append(JSON.stringify(peer.tracked_shards)))
.append($('<td>').append(JSON.stringify(peer.archival)))
.append($('<td>').append(((peer.is_outbound_peer) ? 'OUT' : 'IN')))
// If this is a new style nonce - show the approx time since it was created.
.append($('<td>').append(peer.nonce + " <br> " + ((peer.nonce > 1660000000) ? convertTime(Date.now() - peer.nonce * 1000) : "old style nonce")))
.append($('<td>').append(convertTime(peer.connection_established_time_millis)))
.append($('<td>').append(computeTraffic(peer.received_bytes_per_sec, peer.sent_bytes_per_sec)))
.append($('<td>').append(routedValidator.join(",")))
Expand Down Expand Up @@ -412,6 +414,7 @@ <h2>
<th>Tracked Shards</th>
<th>Archival</th>
<th>Connection type</th>
<th>Nonce</th>
<th>First connection</th>
<th>Traffic (last minute)</th>
<th>Route to validators</th>
Expand Down
10 changes: 10 additions & 0 deletions chain/network/src/network_protocol/edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ impl Edge {
}
}

/// Create a fresh nonce (based on the current time).
pub fn create_fresh_nonce(clock: &time::Clock) -> u64 {
let mut nonce = clock.now_utc().unix_timestamp() as u64;
// Even nonce means that the edge should be removed, so if the timestamp is even, add one to get the odd value.
if nonce % 2 == 0 {
nonce += 1;
}
nonce
}

/// Create the remove edge change from an added edge change.
pub fn remove_edge(&self, my_peer_id: PeerId, sk: &SecretKey) -> Edge {
assert_eq!(self.edge_type(), EdgeState::Active);
Expand Down
67 changes: 58 additions & 9 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::peer::stream;
use crate::peer::tracker::Tracker;
use crate::peer_manager::connection;
use crate::peer_manager::network_state::NetworkState;
use crate::peer_manager::peer_manager_actor::Event;
use crate::peer_manager::peer_manager_actor::{Event, PRUNE_EDGES_AFTER};
use crate::private_actix::{RegisterPeerError, SendMessage};
use crate::routing::edge::verify_nonce;
use crate::stats::metrics;
Expand Down Expand Up @@ -156,9 +156,21 @@ impl PeerActor {
stream: tcp::Stream,
force_encoding: Option<Encoding>,
network_state: Arc<NetworkState>,
) -> anyhow::Result<actix::Addr<Self>> {
Self::spawn_with_nonce(clock, stream, force_encoding, network_state, None)
}

/// Span peer actor, and make it establish a connection with a given nonce.
/// Used mostly for tests.
pub(crate) fn spawn_with_nonce(
clock: time::Clock,
stream: tcp::Stream,
force_encoding: Option<Encoding>,
network_state: Arc<NetworkState>,
nonce: Option<u64>,
) -> anyhow::Result<actix::Addr<Self>> {
let stream_id = stream.id();
match Self::spawn_inner(clock, stream, force_encoding, network_state.clone()) {
match Self::spawn_inner(clock, stream, force_encoding, network_state.clone(), nonce) {
Ok(it) => Ok(it),
Err(reason) => {
network_state.config.event_sink.push(Event::ConnectionClosed(
Expand All @@ -174,6 +186,7 @@ impl PeerActor {
stream: tcp::Stream,
force_encoding: Option<Encoding>,
network_state: Arc<NetworkState>,
nonce: Option<u64>,
) -> Result<actix::Addr<Self>, ClosingReason> {
let connecting_status = match &stream.type_ {
tcp::StreamType::Inbound => ConnectingStatus::Inbound(
Expand All @@ -189,7 +202,7 @@ impl PeerActor {
.start_outbound(peer_id.clone())
.map_err(ClosingReason::OutboundNotAllowed)?,
handshake_spec: HandshakeSpec {
partial_edge_info: network_state.propose_edge(peer_id, None),
partial_edge_info: network_state.propose_edge(&clock, peer_id, nonce),
protocol_version: PROTOCOL_VERSION,
peer_id: peer_id.clone(),
},
Expand Down Expand Up @@ -431,6 +444,7 @@ impl PeerActor {
));
return;
}

// Verify if nonce is sane.
if let Err(err) = verify_nonce(&self.clock, handshake.partial_edge_info.nonce) {
debug!(target: "network", nonce=?handshake.partial_edge_info.nonce, my_node_id = ?self.my_node_id(), peer_id=?handshake.sender_peer_id, "bad nonce, disconnecting: {err}");
Expand Down Expand Up @@ -470,7 +484,7 @@ impl PeerActor {
handshake_spec.partial_edge_info.clone()
}
ConnectingStatus::Inbound { .. } => {
self.network_state.propose_edge(&handshake.sender_peer_id, Some(nonce))
self.network_state.propose_edge(&self.clock, &handshake.sender_peer_id, Some(nonce))
}
};
let edge = Edge::new(
Expand Down Expand Up @@ -499,7 +513,7 @@ impl PeerActor {
let conn = Arc::new(connection::Connection {
addr: ctx.address(),
peer_info: peer_info.clone(),
edge,
edge: AtomicCell::new(edge),
genesis_id: handshake.sender_chain_info.genesis_id.clone(),
tracked_shards: handshake.sender_chain_info.tracked_shards.clone(),
archival: handshake.sender_chain_info.archival,
Expand Down Expand Up @@ -559,6 +573,11 @@ impl PeerActor {
})
});

// This time is used to figure out when the first run of the callbacks it run.
// It is important that it is set here (rather than calling clock.now() within the future) - as it makes testing a lot easier (and more deterministic).

let start_time = self.clock.now();

// Here we stop processing any PeerActor events until PeerManager
// decides whether to accept the connection or not: ctx.wait makes
// the actor event loop poll on the future until it completes before
Expand Down Expand Up @@ -591,7 +610,7 @@ impl PeerActor {
}));
// Only broadcast the new edge from the outbound endpoint.
act.network_state.tier2.broadcast_message(Arc::new(PeerMessage::SyncRoutingTable(
RoutingTableUpdate::from_edges(vec![conn.edge.clone()]),
RoutingTableUpdate::from_edges(vec![conn.edge.load()]),
)));
}

Expand Down Expand Up @@ -626,11 +645,38 @@ impl PeerActor {
}
}
}));

// Refresh connection nonces but only if we're outbound. For inbound connection, the other party should
// take care of nonce refresh.
if act.peer_type == PeerType::Outbound {
ctx.spawn(wrap_future({
let conn = conn.clone();
let network_state = act.network_state.clone();
let clock = act.clock.clone();
async move {
// How often should we refresh a nonce from a peer.
// It should be smaller than PRUNE_EDGES_AFTER.
let mut interval = time::Interval::new(start_time + PRUNE_EDGES_AFTER / 3, PRUNE_EDGES_AFTER / 3);
loop {
interval.tick(&clock).await;
conn.send_message(Arc::new(
PeerMessage::RequestUpdateNonce(PartialEdgeInfo::new(
&network_state.config.node_id(),
&conn.peer_info.id,
Edge::create_fresh_nonce(&clock),
&network_state.config.node_key,
)
)));

}
}
}));
}
// Sync the RoutingTable.
act.sync_routing_table();
act.network_state.config.event_sink.push(Event::HandshakeCompleted(HandshakeCompletedEvent{
stream_id: act.stream_id,
edge: conn.edge.clone(),
edge: conn.edge.load(),
}));
},
Err(err) => {
Expand Down Expand Up @@ -727,8 +773,11 @@ impl PeerActor {
return;
}
// Recreate the edge with a newer nonce.
handshake_spec.partial_edge_info =
self.network_state.propose_edge(&handshake_spec.peer_id, Some(edge.next()));
handshake_spec.partial_edge_info = self.network_state.propose_edge(
&self.clock,
&handshake_spec.peer_id,
Some(std::cmp::max(edge.next(), Edge::create_fresh_nonce(&self.clock))),
);
let spec = handshake_spec.clone();
ctx.wait(actix::fut::ready(()).then(move |_, act: &mut Self, _| {
act.send_handshake(spec);
Expand Down
16 changes: 2 additions & 14 deletions chain/network/src/peer/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::testonly::actix::ActixSystem;
use crate::testonly::fake_client;
use crate::time;
use crate::types::PeerIdOrHash;
use near_crypto::Signature;
use near_o11y::WithSpanContextExt;
use near_primitives::network::PeerId;
use std::sync::Arc;
Expand Down Expand Up @@ -130,19 +129,8 @@ impl PeerHandle {
fc,
vec![],
));
// WARNING: this is a hack to make PeerActor use a specific nonce
if let (Some(nonce), tcp::StreamType::Outbound { peer_id }) =
(&cfg.nonce, &stream.type_)
{
network_state.routing_table_view.add_local_edges(&[Edge::new(
cfg.id(),
peer_id.clone(),
nonce - 1,
Signature::default(),
Signature::default(),
)]);
}
PeerActor::spawn(clock, stream, cfg.force_encoding, network_state).unwrap()
PeerActor::spawn_with_nonce(clock, stream, cfg.force_encoding, network_state, cfg.nonce)
.unwrap()
})
.await;
Self { actix, cfg: cfg_, events: recv, edge: None }
Expand Down
18 changes: 16 additions & 2 deletions chain/network/src/peer_manager/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub(crate) struct Connection {
pub addr: actix::Addr<PeerActor>,

pub peer_info: PeerInfo,
pub edge: Edge,
pub edge: AtomicCell<Edge>,
/// Chain Id and hash of genesis block.
pub genesis_id: GenesisId,
/// Shards that the peer is tracking.
Expand Down Expand Up @@ -80,7 +80,7 @@ impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection")
.field("peer_info", &self.peer_info)
.field("edge", &self.edge)
.field("edge", &self.edge.load())
.field("peer_type", &self.peer_type)
.field("connection_established_time", &self.connection_established_time)
.finish()
Expand Down Expand Up @@ -333,6 +333,20 @@ impl Pool {
pool.ready.remove(peer_id);
});
}
/// Update the edge in the pool (if it is newer).
pub fn update_edge(&self, new_edge: &Edge) {
self.0.update(|pool| {
let other = new_edge.other(&pool.me);
if let Some(other) = other {
if let Some(connection) = pool.ready.get_mut(other) {
let edge = connection.edge.load();
if edge.nonce() < new_edge.nonce() {
connection.edge.store(new_edge.clone());
}
}
}
})
}

/// Send message to peer that belongs to our active set
/// Return whether the message is sent or not.
Expand Down
28 changes: 23 additions & 5 deletions chain/network/src/peer_manager/network_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,22 @@ impl NetworkState {
self.runtime.handle.spawn(fut.in_current_span())
}

pub fn propose_edge(&self, peer1: &PeerId, with_nonce: Option<u64>) -> PartialEdgeInfo {
pub fn propose_edge(
&self,
clock: &time::Clock,
peer1: &PeerId,
with_nonce: Option<u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach you took to inject a nonce is totally valid and I agree with it, however on the other hand it has become quite intrusive, given that we need to pass around the nonce explicitly through a bunch of calls and it is used only for tests. Please consider using a lower level abstraction than PeerActor to force a custom nonce. I already planned to do that in my next PR: https://github.com/near/nearcore/pull/8076/files#diff-b691489bc9c0e1683942223dad298acaca321241108c799bf7376c641d75be1a . PTAL at chain/network/src/peer_manager/tests/nonce.rs there, to see whether it would also simplify your PR.

Copy link
Contributor Author

@mm-near mm-near Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the with_nonce was only to fix that one test (that I see that you also fixed in your PR)

) -> PartialEdgeInfo {
// When we create a new edge we increase the latest nonce by 2 in case we miss a removal
// proposal from our partner.
let nonce = with_nonce.unwrap_or_else(|| {
self.routing_table_view.get_local_edge(peer1).map_or(1, |edge| edge.next())
let nonce = Edge::create_fresh_nonce(clock);
// If we already had a connection to this peer - check that edge's nonce.
// And use either that one or the one from the current timestamp.
// We would use existing edge's nonce, if we were trying to connect to a given peer multiple times per second.
self.routing_table_view
.get_local_edge(peer1)
.map_or(nonce, |edge| std::cmp::max(edge.next(), nonce))
});
PartialEdgeInfo::new(&self.config.node_id(), peer1, nonce, &self.config.node_key)
}
Expand Down Expand Up @@ -290,7 +301,7 @@ impl NetworkState {
// Verify and broadcast the edge of the connection. Only then insert the new
// connection to TIER2 pool, so that nothing is broadcasted to conn.
// TODO(gprusak): consider actually banning the peer for consistency.
this.add_edges(&clock, vec![conn.edge.clone()])
this.add_edges(&clock, vec![conn.edge.load()])
.await
.map_err(|_: ReasonForBan| RegisterPeerError::InvalidEdge)?;
this.tier2.insert_ready(conn.clone()).map_err(RegisterPeerError::PoolError)?;
Expand Down Expand Up @@ -510,7 +521,13 @@ impl NetworkState {
})
})
.await;
this.routing_table_view.add_local_edges(&edges);
let new_local_edges = this.routing_table_view.add_local_edges(&edges);

// Local edge information is also kept in the Connection fields in peers.
// Make sure that it gets updated too.
for new_edge in new_local_edges {
this.tier2.update_edge(&new_edge);
}
Copy link
Contributor

@pompon0 pompon0 Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local_edges are the edges of the graph adjacent to this node.
edge stored in connection is the edge that the ends of the connection have agreed upon during handshake.

Given that now the edge is supposed to change dynamically and that it is delivered to a node via SyncRoutingTable message it doesn't make sense to duplicate it I guess. We can either:

  • remove edge field from the connection entirely and just rely on local_edges (I think that would be preferred)
  • revert to using ResponseUpdateNonce message, so that each PeerActor can manage the connection.edge on their own.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we could remove the edge from the connection -- but I'd do that in a separate PR (as this one is becoming too large)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, please add a TODO

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also you unnecesarily take a lock on the whole pool in Pool::update_edge. Instead just update the edge in-place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and for that you might want ArcMutex/ArcSwap instead of AtomicCell.


let mut new_edges = match this
.routing_table_addr
Expand Down Expand Up @@ -603,6 +620,7 @@ impl NetworkState {
&self.config.node_key,
edge_info.signature,
);
self.tier2.update_edge(&edge);
self.add_edges(&clock, vec![edge.clone()]).await?;
Ok(edge)
}
Expand Down Expand Up @@ -660,7 +678,7 @@ impl NetworkState {
PartialEdgeInfo::new(
&node_id,
&conn.peer_info.id,
edge.next(),
std::cmp::max(Edge::create_fresh_nonce(&clock), edge.next()),
&this.config.node_key,
),
)));
Expand Down
3 changes: 2 additions & 1 deletion chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ const REPORT_BANDWIDTH_THRESHOLD_COUNT: usize = 10_000;
const PRUNE_UNREACHABLE_PEERS_AFTER: time::Duration = time::Duration::hours(1);

/// Remove the edges that were created more that this duration ago.
const PRUNE_EDGES_AFTER: time::Duration = time::Duration::minutes(30);
pub const PRUNE_EDGES_AFTER: time::Duration = time::Duration::minutes(30);

/// If a peer is more than these blocks behind (comparing to our current head) - don't route any messages through it.
/// We are updating the list of unreliable peers every MONITOR_PEER_MAX_DURATION (60 seconds) - so the current
Expand Down Expand Up @@ -614,6 +614,7 @@ impl PeerManagerActor {
last_time_received_message: cp.last_time_received_message.load(),
connection_established_time: cp.connection_established_time,
peer_type: cp.peer_type,
nonce: cp.edge.load().nonce(),
})
.collect(),
num_connected_peers: tier2.ready.len(),
Expand Down
Loading