Skip to content

Commit 276b190

Browse files
authored
websocket: Fix connection stability on decrypt messages (#393)
This PR greatly improves the WebSocket connection stability by relying on the interval buffers of tungstenite instead of buffering at a higher level. The fix passes through the messages to the tungstenite socket directly. This is a long-lasting issue (reproducible on all older versions silently with IO errors) that manifested as a decryption error after the state fixes: - #325 - #327 Issue context: - node is under stress due to handling multiple substreams - the issue affected only long running WebSocket substreams and manifested as an IO error from crypto/noise decoding - tungstenite `WebSocketStream` already has a 128KiB buffer for writing - litep2p has a **redundant** 8 KiB buffer for writing - litep2p buffered internally multiple packets, tunstenite accepted the batch. I expect this creates a wrongly framed packet that fails to decode at the crypto/noise level ## Investigation We have noted several errors that manifested as crypto/nosie decoding failures on our Kusama validators: - paritytech/polkadot-sdk#8525 ```rust litep2p::crypto::noise: failed to decrypt message error=Decrypt ``` Upon further investigation, the errors affected only WebSocket connections. The issue could be reproduced by running a local node in Kusama with more than 500 peers in and out. As well as running subp2p-explorer with adjusted protocols: ```yaml 2025-05-15T14:58:08.095961Z ERROR {peer_id=peer_id=12D3KooWGsDvWrbApFTCpF8h7YCKHuvJbok6HAq5ZnPgE9LGWnsv}: litep2p::crypto::noise: failed to decrypt message for bigger buffers error=Decrypt peer=PeerId("12D3KooWSa5SbCHGKpNeSs3Qak2TrM5gTkEBrPfvo6TyxhUpEHeu") 2025-05-15T14:58:08.096419Z DEBUG {peer_id=peer_id=12D3KooWGsDvWrbApFTCpF8h7YCKHuvJbok6HAq5ZnPgE9LGWnsv}: litep2p::websocket::connection: connection closed with error peer=PeerId("12D3KooWSa5SbCHGKpNeSs3Qak2TrM5gTkEBrPfvo6TyxhUpEHeu") error=Decode(Io(Custom { kind: Other, error: "failed to decrypt message bigger buffers: decrypt error 12D3KooWSa5SbCHGKpNeSs3Qak2TrM5gTkEBrPfvo6TyxhUpEHeu" })) ``` The issue also reproduced on the zombinet PR, which uses litep2p: - paritytech/polkadot-sdk#8461 ```yaml 2025-05-14 09:37:30.805 INFO tokio-runtime-worker sync: Warp sync is complete, continuing with state sync. 2025-05-14 09:37:33.189 ERROR tokio-runtime-worker litep2p::crypto::noise: failed to decrypt message error=Decrypt 2025-05-14 09:37:33.283 ERROR tokio-runtime-worker litep2p::crypto::noise: failed to decrypt message error=Decrypt 2025-05-14 09:37:34.764 ERROR tokio-runtime-worker litep2p::crypto::noise: failed to decrypt message error=Decrypt 2025-05-14 09:37:35.656 INFO tokio-runtime-worker substrate: ⚙️ State sync, Downloading state, 22%, 2.21 Mib (0 peers), best: #0 (0xc5e7…d059), finalized #0 (0xc5e7…d059), ⬇ 707.8kiB/s ⬆ 0.5kiB/s 2025-05-14 09:37:40.657 INFO tokio-runtime-worker substrate: ⚙️ State sync, Downloading state, 22%, 2.21 Mib (3 peers), best: #0 (0xc5e7…d059), finalized #0 (0xc5e7…d059), ⬇ 1.0kiB/s ⬆ 1.0kiB/s ``` ## Testing Done ### Performance Tested the performance with litep2p-perf using the following branch: - https://github.com/lexnv/litep2p-perf/compare/lexnv/websocket-tests?expand=1 | Status | Data Size | Time (s) | Bandwidth (Mbit/s) | |------------|-----------|----------|-------------------| | **Before** | | | | | Uploaded | 256.00 MiB| 15.1152 | 135.49 | | Downloaded | 256.00 MiB| 13.2296 | 154.80 | | **After** | | | | | Uploaded | 256.00 MiB| 15.7178 | 130.30 | | Downloaded | 256.00 MiB| 13.2435 | 154.64 | From the performance table, we are within 3% of the original buggy implementation. I would lean towards a normal variation in our results. Therefore, the performance remains unimpacted. ### Repro Case Have added a custom user protocol as part of our testing to filter out these errors. - The protocol opens 16 outbound substreams on the connection established event. Therefore, it will handle 16 outbound substreams and 16 inbound substreams - The outbound substreams will push a configurable number of packets, each of size 128 bytes, to the remote peer. While the inbound substreams will read the same number of packets from the remote peer. Before this PR, the TCP was unaffected and the websocket reproduces the decrypt failure. After this PR, the test passes. Closes: paritytech/polkadot-sdk#8525 --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
1 parent 630aa37 commit 276b190

File tree

5 files changed

+375
-116
lines changed

5 files changed

+375
-116
lines changed

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#![allow(clippy::single_match)]
2222
#![allow(clippy::result_large_err)]
23+
#![allow(clippy::large_enum_variant)]
2324
#![allow(clippy::redundant_pattern_matching)]
2425
#![allow(clippy::type_complexity)]
2526
#![allow(clippy::result_unit_err)]
@@ -80,7 +81,7 @@ pub mod yamux;
8081

8182
mod bandwidth;
8283
mod multistream_select;
83-
mod utils;
84+
pub mod utils;
8485

8586
#[cfg(test)]
8687
mod mock;

src/multistream_select/negotiated.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,6 @@ impl From<NegotiationError> for io::Error {
370370
if let NegotiationError::ProtocolError(e) = err {
371371
return e.into();
372372
}
373-
io::Error::new(io::ErrorKind::Other, err)
373+
io::Error::other(err)
374374
}
375375
}

src/transport/websocket/stream.rs

Lines changed: 26 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
//! Stream implementation for `tokio_tungstenite::WebSocketStream` that implements
2222
//! `AsyncRead + AsyncWrite`
2323
24-
use bytes::{Buf, Bytes, BytesMut};
24+
use bytes::{Buf, Bytes};
2525
use futures::{SinkExt, StreamExt};
2626
use tokio::io::{AsyncRead, AsyncWrite};
2727
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
@@ -31,121 +31,66 @@ use std::{
3131
task::{Context, Poll},
3232
};
3333

34-
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
35-
36-
/// Send state.
37-
#[derive(Debug)]
38-
enum State {
39-
/// State is poisoned.
40-
Poisoned,
41-
42-
/// Sink is accepting input.
43-
ReadyToSend,
44-
45-
/// Flush is pending for the sink.
46-
FlushPending,
47-
}
34+
const LOG_TARGET: &str = "litep2p::transport::websocket::stream";
4835

4936
/// Buffered stream which implements `AsyncRead + AsyncWrite`
5037
#[derive(Debug)]
5138
pub(super) struct BufferedStream<S: AsyncRead + AsyncWrite + Unpin> {
52-
/// Write buffer.
53-
write_buffer: BytesMut,
54-
5539
/// Read buffer.
5640
///
5741
/// The buffer is taken directly from the WebSocket stream.
5842
read_buffer: Bytes,
5943

6044
/// Underlying WebSocket stream.
6145
stream: WebSocketStream<S>,
62-
63-
/// Read state.
64-
state: State,
6546
}
6647

6748
impl<S: AsyncRead + AsyncWrite + Unpin> BufferedStream<S> {
6849
/// Create new [`BufferedStream`].
6950
pub(super) fn new(stream: WebSocketStream<S>) -> Self {
7051
Self {
71-
write_buffer: BytesMut::with_capacity(DEFAULT_BUF_SIZE),
7252
read_buffer: Bytes::new(),
7353
stream,
74-
state: State::ReadyToSend,
7554
}
7655
}
7756
}
7857

7958
impl<S: AsyncRead + AsyncWrite + Unpin> futures::AsyncWrite for BufferedStream<S> {
8059
fn poll_write(
8160
mut self: Pin<&mut Self>,
82-
_cx: &mut Context<'_>,
61+
cx: &mut Context<'_>,
8362
buf: &[u8],
8463
) -> Poll<std::io::Result<usize>> {
85-
self.write_buffer.extend_from_slice(buf);
86-
87-
Poll::Ready(Ok(buf.len()))
88-
}
64+
match futures::ready!(self.stream.poll_ready_unpin(cx)) {
65+
Ok(()) => {
66+
let message = Message::Binary(Bytes::copy_from_slice(buf));
8967

90-
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
91-
if self.write_buffer.is_empty() {
92-
return self
93-
.stream
94-
.poll_ready_unpin(cx)
95-
.map_err(|_| std::io::ErrorKind::UnexpectedEof.into());
96-
}
97-
98-
loop {
99-
match std::mem::replace(&mut self.state, State::Poisoned) {
100-
State::ReadyToSend => {
101-
match self.stream.poll_ready_unpin(cx) {
102-
Poll::Ready(Ok(())) => {}
103-
Poll::Ready(Err(_error)) =>
104-
return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
105-
Poll::Pending => {
106-
self.state = State::ReadyToSend;
107-
return Poll::Pending;
108-
}
109-
}
110-
111-
let message = std::mem::take(&mut self.write_buffer);
112-
match self.stream.start_send_unpin(Message::Binary(message.freeze())) {
113-
Ok(()) => {}
114-
Err(_error) =>
115-
return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
116-
}
117-
118-
// Transition to flush pending state.
119-
self.state = State::FlushPending;
120-
continue;
68+
if let Err(err) = self.stream.start_send_unpin(message) {
69+
tracing::debug!(target: LOG_TARGET, "Error during start send: {:?}", err);
70+
return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into()));
12171
}
12272

123-
State::FlushPending => {
124-
match self.stream.poll_flush_unpin(cx) {
125-
Poll::Ready(Ok(())) => {}
126-
Poll::Ready(Err(_error)) =>
127-
return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
128-
Poll::Pending => {
129-
self.state = State::FlushPending;
130-
return Poll::Pending;
131-
}
132-
}
133-
134-
self.state = State::ReadyToSend;
135-
self.write_buffer = BytesMut::with_capacity(DEFAULT_BUF_SIZE);
136-
return Poll::Ready(Ok(()));
137-
}
138-
State::Poisoned =>
139-
return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
73+
Poll::Ready(Ok(buf.len()))
74+
}
75+
Err(err) => {
76+
tracing::debug!(target: LOG_TARGET, "Error during poll ready: {:?}", err);
77+
Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into()))
14078
}
14179
}
14280
}
14381

82+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
83+
self.stream.poll_flush_unpin(cx).map_err(|err| {
84+
tracing::debug!(target: LOG_TARGET, "Error during poll flush: {:?}", err);
85+
std::io::ErrorKind::UnexpectedEof.into()
86+
})
87+
}
88+
14489
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
145-
match futures::ready!(self.stream.poll_close_unpin(cx)) {
146-
Ok(_) => Poll::Ready(Ok(())),
147-
Err(_) => Poll::Ready(Err(std::io::ErrorKind::PermissionDenied.into())),
148-
}
90+
self.stream.poll_close_unpin(cx).map_err(|err| {
91+
tracing::debug!(target: LOG_TARGET, "Error during poll close: {:?}", err);
92+
std::io::ErrorKind::PermissionDenied.into()
93+
})
14994
}
15095
}
15196

@@ -183,7 +128,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> futures::AsyncRead for BufferedStream<S>
183128
#[cfg(test)]
184129
mod tests {
185130
use super::*;
186-
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
131+
use futures::{AsyncRead, AsyncReadExt, AsyncWriteExt};
187132
use tokio::io::DuplexStream;
188133
use tokio_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
189134

@@ -203,7 +148,6 @@ mod tests {
203148

204149
let bytes_written = stream.write(data).await.unwrap();
205150
assert_eq!(bytes_written, data.len());
206-
assert_eq!(&stream.write_buffer[..], data);
207151
}
208152

209153
#[tokio::test]
@@ -253,38 +197,6 @@ mod tests {
253197
};
254198
}
255199

256-
#[tokio::test]
257-
async fn test_poisoned_state() {
258-
let (mut stream, server) = create_test_stream().await;
259-
drop(server);
260-
261-
stream.state = State::Poisoned;
262-
263-
let mut buffer = [0u8; 10];
264-
let result = stream.read(&mut buffer).await;
265-
match result {
266-
Err(error) => if error.kind() == std::io::ErrorKind::UnexpectedEof {},
267-
state => panic!("Unexpected state {state:?}"),
268-
};
269-
270-
let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref());
271-
let mut pin_stream = Pin::new(&mut stream);
272-
273-
// Messages are buffered internally, the socket is not touched.
274-
match pin_stream.as_mut().poll_write(&mut cx, &mut buffer) {
275-
Poll::Ready(Ok(10)) => {}
276-
state => panic!("Unexpected state {state:?}"),
277-
}
278-
// Socket is poisoned, the flush will fail.
279-
match pin_stream.poll_flush(&mut cx) {
280-
Poll::Ready(Err(error)) =>
281-
if error.kind() == std::io::ErrorKind::UnexpectedEof {
282-
return;
283-
},
284-
state => panic!("Unexpected state {state:?}"),
285-
}
286-
}
287-
288200
#[tokio::test]
289201
async fn test_read_poll_pending() {
290202
let (mut stream, mut _server) = create_test_stream().await;

tests/connection/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ use crate::common::{add_transport, Transport};
4444

4545
#[cfg(test)]
4646
mod protocol_dial_invalid_address;
47+
#[cfg(test)]
48+
mod stability;
4749

4850
#[tokio::test]
4951
async fn two_litep2ps_work_tcp() {

0 commit comments

Comments
 (0)