Skip to content

Commit d5b7767

Browse files
committed
refactor(node/router): improve disconnect error propagation in the router
1 parent d267d3e commit d5b7767

File tree

22 files changed

+453
-327
lines changed

22 files changed

+453
-327
lines changed

Cargo.lock

Lines changed: 67 additions & 78 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/bft/events/src/disconnect.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ pub enum DisconnectReason {
2929
ProtocolViolation,
3030
/// The peer's client is outdated, judging by its version.
3131
OutdatedClientVersion,
32+
/// The two validators are the same node.
33+
SelfConnect,
3234
}
3335

3436
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@@ -64,6 +66,7 @@ impl FromBytes for Disconnect {
6466
Ok(1) => DisconnectReason::NoReasonGiven,
6567
Ok(2) => DisconnectReason::ProtocolViolation,
6668
Ok(3) => DisconnectReason::OutdatedClientVersion,
69+
Ok(4) => DisconnectReason::SelfConnect,
6770
_ => return Err(io::Error::other("Invalid 'Disconnect' event")),
6871
};
6972

@@ -86,6 +89,7 @@ mod tests {
8689
DisconnectReason::NoReasonGiven,
8790
DisconnectReason::InvalidChallengeResponse,
8891
DisconnectReason::OutdatedClientVersion,
92+
DisconnectReason::SelfConnect,
8993
];
9094

9195
for reason in all_reasons.iter() {

node/bft/src/gateway.rs

Lines changed: 83 additions & 69 deletions
Large diffs are not rendered by default.

node/router/Cargo.toml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ features = [ "codec" ]
104104
[dependencies.tokio-stream]
105105
version = "=0.1"
106106

107+
[dependencies.thiserror]
108+
workspace = true
109+
107110
[dependencies.tracing]
108111
workspace = true
109112

@@ -136,7 +139,3 @@ features = [ "test" ]
136139
[dev-dependencies.snarkvm]
137140
workspace = true
138141
features = [ "test-helpers" ]
139-
140-
[dev-dependencies.tracing-subscriber]
141-
workspace = true
142-
features = [ "env-filter", "fmt" ]

node/router/messages/src/helpers/disconnect.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use snarkvm::prelude::{FromBytes, ToBytes, error};
1818
use std::io;
1919

2020
/// The reason behind the node disconnecting from a peer.
21+
// TODO(kaimast): implement Display
2122
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
2223
pub enum DisconnectReason {
2324
/// The fork length limit was exceeded.
@@ -48,8 +49,16 @@ pub enum DisconnectReason {
4849
TooManyPeers,
4950
/// The peer is a sync node that's behind our node, and it needs to sync itself first.
5051
YouNeedToSyncFirst,
51-
/// The peer's listening port is closed.
52+
/// The peer's listening port is closed
5253
YourPortIsClosed(u16),
54+
/// The two peers are the same node.
55+
SelfConnect,
56+
/// No untrusted external peers are allowed.
57+
NoExternalPeersAllowed,
58+
/// Already connecting to the same node (through another TCP channel).
59+
AlreadyConnecting,
60+
/// Already connected to the same node (through another TCP channel).
61+
AlreadyConnected,
5362
}
5463

5564
impl ToBytes for DisconnectReason {
@@ -73,6 +82,10 @@ impl ToBytes for DisconnectReason {
7382
14u8.write_le(&mut writer)?;
7483
port.write_le(writer)
7584
}
85+
Self::SelfConnect => 15u8.write_le(writer),
86+
Self::NoExternalPeersAllowed => 16u8.write_le(writer),
87+
Self::AlreadyConnecting => 17u8.write_le(writer),
88+
Self::AlreadyConnected => 18u8.write_le(writer),
7689
}
7790
}
7891
}
@@ -98,6 +111,10 @@ impl FromBytes for DisconnectReason {
98111
let port = u16::read_le(reader)?;
99112
Ok(Self::YourPortIsClosed(port))
100113
}
114+
15 => Ok(Self::SelfConnect),
115+
16 => Ok(Self::NoExternalPeersAllowed),
116+
17 => Ok(Self::AlreadyConnecting),
117+
18 => Ok(Self::AlreadyConnected),
101118
_ => Err(error("Invalid disconnect reason")),
102119
}
103120
}

node/router/src/handshake.rs

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,19 @@
1414
// limitations under the License.
1515

1616
use crate::{
17+
AddPeerError,
1718
NodeType,
1819
PeerPoolHandling,
1920
Router,
2021
messages::{ChallengeRequest, ChallengeResponse, DisconnectReason, Message, MessageCodec, MessageTrait},
2122
};
22-
use snarkos_node_tcp::{ConnectionSide, P2P, Tcp};
23+
use snarkos_node_tcp::{ConnectError, ConnectionSide, P2P, Tcp};
2324
use snarkvm::{
2425
ledger::narwhal::Data,
25-
prelude::{Address, Field, Network, block::Header, error},
26+
prelude::{Address, Field, Network, block::Header},
2627
};
2728

28-
use anyhow::{Result, bail};
29+
use anyhow::{Result, anyhow};
2930
use futures::SinkExt;
3031
use rand::{Rng, rngs::OsRng};
3132
use std::{io, net::SocketAddr};
@@ -52,24 +53,19 @@ macro_rules! expect_message {
5253
}
5354
// Received a disconnect message, abort.
5455
Some(Message::Disconnect(reason)) => {
55-
return Err(error(format!("'{}' disconnected: {reason:?}", $peer_addr)))
56+
return Err(ConnectError::other(format!("'{}' disconnected with reason {reason:?}", $peer_addr)));
5657
}
5758
// Received an unexpected message, abort.
5859
Some(ty) => {
59-
return Err(error(format!(
60+
return Err(ConnectError::other(format!(
6061
"'{}' did not follow the handshake protocol: received {:?} instead of {}",
6162
$peer_addr,
6263
ty.name(),
6364
stringify!($msg_ty),
64-
)))
65+
)));
6566
}
6667
// Received nothing.
67-
None => {
68-
return Err(error(format!(
69-
"the peer disconnected before sending {:?}, likely due to peer saturation or shutdown",
70-
stringify!($msg_ty),
71-
)))
72-
}
68+
None => return Err(ConnectError::IoError(io::ErrorKind::BrokenPipe.into())),
7369
}
7470
};
7571
}
@@ -93,7 +89,7 @@ impl<N: Network> Router<N> {
9389
peer_side: ConnectionSide,
9490
genesis_header: Header<N>,
9591
restrictions_id: Field<N>,
96-
) -> io::Result<Option<ChallengeRequest<N>>> {
92+
) -> Result<ChallengeRequest<N>, ConnectError> {
9793
// If this is an inbound connection, we log it, but don't know the listening address yet.
9894
// Otherwise, we can immediately register the listening address.
9995
let mut listener_addr = if peer_side == ConnectionSide::Initiator {
@@ -110,7 +106,7 @@ impl<N: Network> Router<N> {
110106
// If the IP is already banned reject the connection.
111107
if self.is_ip_banned(peer_addr.ip()) {
112108
trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip());
113-
return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
109+
return Err(ConnectError::other(anyhow!("'{}' is a banned IP address", peer_addr.ip())));
114110
}
115111

116112
let num_attempts =
@@ -120,30 +116,32 @@ impl<N: Network> Router<N> {
120116
if num_attempts > Router::<N>::MAX_CONNECTION_ATTEMPTS {
121117
self.update_ip_ban(peer_addr.ip());
122118
trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
123-
return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
119+
return Err(ConnectError::other(anyhow!("'{}' appears to be spamming connections", peer_addr.ip())));
124120
}
125121
}
126122

127123
// Perform the handshake; we pass on a mutable reference to listener_addr in case the process is broken at any point in time.
128124
let handshake_result = if peer_side == ConnectionSide::Responder {
129-
self.handshake_inner_initiator(peer_addr, stream, genesis_header, restrictions_id).await
125+
self.handshake_inner_initiator(peer_addr, stream, genesis_header, restrictions_id)
126+
.await
127+
.map_err(|err| ConnectError::Other(err.into()))
130128
} else {
131129
self.handshake_inner_responder(peer_addr, &mut listener_addr, stream, genesis_header, restrictions_id).await
132130
};
133131

134132
if let Some(addr) = listener_addr {
135133
match handshake_result {
136-
Ok(Some(ref cr)) => {
134+
Ok(ref cr) => {
137135
if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
138136
self.resolver.write().insert_peer(peer.listener_addr(), peer_addr, None);
139137
peer.upgrade_to_connected(peer_addr, cr.listener_port, cr.address, cr.node_type, cr.version);
138+
debug!("Completed the handshake with '{peer_addr}'");
139+
} else {
140+
debug!("Completed the handshake with '{peer_addr}', but it is not in the peer pool");
140141
}
142+
141143
#[cfg(feature = "metrics")]
142144
self.update_metrics();
143-
debug!("Completed the handshake with '{peer_addr}'");
144-
}
145-
Ok(None) => {
146-
return Err(error("Duplicate handshake attempt with '{addr}'"));
147145
}
148146
Err(_) => {
149147
if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
@@ -163,12 +161,13 @@ impl<N: Network> Router<N> {
163161
stream: &'a mut TcpStream,
164162
genesis_header: Header<N>,
165163
restrictions_id: Field<N>,
166-
) -> io::Result<Option<ChallengeRequest<N>>> {
164+
) -> Result<ChallengeRequest<N>, ConnectError> {
167165
// Introduce the peer into the peer pool.
168-
if !self.add_connecting_peer(peer_addr) {
169-
// Return early if already being connected to.
170-
return Ok(None);
171-
}
166+
// If we are connecting, the peer and listener address are identical.
167+
self.add_connecting_peer(peer_addr).map_err(|err| match err {
168+
AddPeerError::AlreadyConnected => ConnectError::AlreadyConnected { address: peer_addr },
169+
AddPeerError::AlreadyConnecting => ConnectError::AlreadyConnecting { address: peer_addr },
170+
})?;
172171

173172
// Construct the stream.
174173
let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
@@ -205,12 +204,13 @@ impl<N: Network> Router<N> {
205204
.await
206205
{
207206
send(&mut framed, peer_addr, reason.into()).await?;
208-
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
207+
return Err(ConnectError::other(anyhow!("{reason:?}")));
209208
}
209+
210210
// Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
211211
if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
212212
send(&mut framed, peer_addr, reason.into()).await?;
213-
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
213+
return Err(ConnectError::other(anyhow!("{reason:?}")));
214214
}
215215

216216
/* Step 3: Send the challenge response. */
@@ -219,7 +219,7 @@ impl<N: Network> Router<N> {
219219
let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
220220
// Sign the counterparty nonce.
221221
let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
222-
return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
222+
return Err(ConnectError::other(anyhow!("Failed to sign the challenge request nonce")));
223223
};
224224
// Send the challenge response.
225225
let our_response = ChallengeResponse {
@@ -230,7 +230,7 @@ impl<N: Network> Router<N> {
230230
};
231231
send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
232232

233-
Ok(Some(peer_request))
233+
Ok(peer_request)
234234
}
235235

236236
/// The connection responder side of the handshake.
@@ -241,34 +241,42 @@ impl<N: Network> Router<N> {
241241
stream: &'a mut TcpStream,
242242
genesis_header: Header<N>,
243243
restrictions_id: Field<N>,
244-
) -> io::Result<Option<ChallengeRequest<N>>> {
244+
) -> Result<ChallengeRequest<N>, ConnectError> {
245245
// Construct the stream.
246246
let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
247247

248248
/* Step 1: Receive the challenge request. */
249249

250-
// Listen for the challenge request message.
250+
// Wait for the challenge request message.
251251
let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
252252

253253
// Obtain the peer's listening address.
254254
*listener_addr = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
255255
let listener_addr = listener_addr.unwrap();
256256

257+
// Knowing the peers listening address, ensure we are not already connecting/connected to it.
258+
if self.is_connecting_or_connected(listener_addr) {
259+
return Err(ConnectError::AlreadyConnected { address: listener_addr });
260+
}
261+
257262
// Knowing the peer's listening address, ensure it is allowed to connect.
258-
if let Err(forbidden_message) = self.ensure_peer_is_allowed(listener_addr) {
259-
return Err(error(format!("{forbidden_message}")));
263+
if let Err(reason) = self.ensure_peer_is_allowed(listener_addr) {
264+
send(&mut framed, peer_addr, reason.into()).await?;
265+
return Err(ConnectError::Other(
266+
anyhow!("Dropping connection request from {listener_addr} ({reason:?})").into(),
267+
));
260268
}
261269

262270
// Introduce the peer into the peer pool.
263-
if !self.add_connecting_peer(listener_addr) {
264-
// Return early if already being connected to.
265-
return Ok(None);
266-
}
271+
self.add_connecting_peer(listener_addr).map_err(|err| match err {
272+
AddPeerError::AlreadyConnected => ConnectError::AlreadyConnected { address: peer_addr },
273+
AddPeerError::AlreadyConnecting => ConnectError::AlreadyConnecting { address: peer_addr },
274+
})?;
267275

268276
// Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
269277
if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
270278
send(&mut framed, peer_addr, reason.into()).await?;
271-
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
279+
return Err(ConnectError::Other(anyhow!("Dropped '{peer_addr}' for reason: {reason:?}").into()));
272280
}
273281

274282
/* Step 2: Send the challenge response followed by own challenge request. */
@@ -280,7 +288,9 @@ impl<N: Network> Router<N> {
280288
let response_nonce: u64 = rng.r#gen();
281289
let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
282290
let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
283-
return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
291+
return Err(ConnectError::Other(
292+
anyhow!("Failed to sign the challenge request nonce from '{peer_addr}'").into(),
293+
));
284294
};
285295
// Send the challenge response.
286296
let our_response = ChallengeResponse {
@@ -299,8 +309,9 @@ impl<N: Network> Router<N> {
299309

300310
/* Step 3: Receive the challenge response. */
301311

302-
// Listen for the challenge response message.
312+
// Wait for the challenge response message.
303313
let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
314+
304315
// Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
305316
if let Some(reason) = self
306317
.verify_challenge_response(
@@ -315,21 +326,21 @@ impl<N: Network> Router<N> {
315326
.await
316327
{
317328
send(&mut framed, peer_addr, reason.into()).await?;
318-
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
329+
Err(ConnectError::Other(anyhow!("Dropped '{peer_addr}' for reason: {reason:?}").into()))
330+
} else {
331+
Ok(peer_request)
319332
}
320-
321-
Ok(Some(peer_request))
322333
}
323334

324335
/// Ensure the peer is allowed to connect.
325-
fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> {
336+
fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<(), DisconnectReason> {
326337
// Ensure that it's not a self-connect attempt.
327338
if self.is_local_ip(listener_addr) {
328-
bail!("Dropping connection request from '{listener_addr}' (attempted to self-connect)");
339+
return Err(DisconnectReason::SelfConnect);
329340
}
330341
// Unknown peers are untrusted, so check if `allow_external_peers` is true.
331342
if !self.allow_external_peers() && !self.is_trusted(listener_addr) {
332-
bail!("Dropping connection request from '{listener_addr}' (untrusted)");
343+
return Err(DisconnectReason::NoExternalPeersAllowed);
333344
}
334345
Ok(())
335346
}

0 commit comments

Comments
 (0)