-
Notifications
You must be signed in to change notification settings - Fork 961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(webocket): Avoid panic when polling quicksink after errors #5482
fix(webocket): Avoid panic when polling quicksink after errors #5482
Conversation
after errors Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Could you update the changelog entry? |
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Our substrate node is running smoothly so far in Kusama. @jxs @dariusc93 Let me know if this looks good to you, thanks 🙏 |
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Triage Report
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, and thanks for looking into this!
Instead of creating another structure to deal with the panic
s of SinkImpl
wdyt of making SinkImpl
not panic and instead return Err
?
I.e:
diff --git a/Cargo.lock b/Cargo.lock
index 6c64ea3fe..56befd765 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3507,6 +3507,7 @@ dependencies = [
"rcgen",
"rw-stream-sink",
"soketto",
+ "thiserror",
"tracing",
"url",
"webpki-roots 0.25.2",
diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml
index b022d95ca..c567ab860 100644
--- a/transports/websocket/Cargo.toml
+++ b/transports/websocket/Cargo.toml
@@ -21,6 +21,7 @@ pin-project-lite = "0.2.14"
rw-stream-sink = { workspace = true }
soketto = "0.8.0"
tracing = { workspace = true }
+thiserror = "1.0.61"
url = "2.5"
webpki-roots = "0.25"
diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs
index f6f99d185..69a01fdbd 100644
--- a/transports/websocket/src/framed.rs
+++ b/transports/websocket/src/framed.rs
@@ -571,7 +571,7 @@ fn location_to_multiaddr<T>(location: &str) -> Result<Multiaddr, Error<T>> {
/// The websocket connection.
pub struct Connection<T> {
receiver: BoxStream<'static, Result<Incoming, connection::Error>>,
- sender: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>,
+ sender: Pin<Box<dyn Sink<OutgoingData, Error = quicksink::Error<connection::Error>> + Send>>,
_marker: std::marker::PhantomData<T>,
}
diff --git a/transports/websocket/src/quicksink.rs b/transports/websocket/src/quicksink.rs
index cb2c98b07..73c4c5684 100644
--- a/transports/websocket/src/quicksink.rs
+++ b/transports/websocket/src/quicksink.rs
@@ -29,15 +29,6 @@
// }
// Ok::<_, io::Error>(stdout)
// });
-// ```
-//
-// # Panics
-//
-// - If any of the [`Sink`] methods produce an error, the sink transitions
-// to a failure state and none of its methods must be called afterwards or
-// else a panic will occur.
-// - If [`Sink::poll_close`] has been called, no other sink method must be
-// called afterwards or else a panic will be caused.
use futures::{ready, sink::Sink};
use pin_project_lite::pin_project;
@@ -102,6 +93,15 @@ enum State {
Failed,
}
+/// Errors the `Sink` may return.
+#[derive(Debug, thiserror::Error)]
+pub(crate) enum Error<E> {
+ #[error("Error while sending over the sink, {0}")]
+ Send(E),
+ #[error("The Sink has closed")]
+ Closed,
+}
+
pin_project! {
/// `SinkImpl` implements the `Sink` trait.
#[derive(Debug)]
@@ -119,7 +119,7 @@ where
F: FnMut(S, Action<A>) -> T,
T: Future<Output = Result<S, E>>,
{
- type Error = E;
+ type Error = Error<E>;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
@@ -135,7 +135,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
- Poll::Ready(Err(e))
+ Poll::Ready(Err(Error::Send(e)))
}
}
}
@@ -143,20 +143,19 @@ where
Ok(_) => {
this.future.set(None);
*this.state = State::Closed;
- panic!("SinkImpl::poll_ready called on a closing sink.")
+ Poll::Ready(Err(Error::Closed))
}
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
- Poll::Ready(Err(e))
+ Poll::Ready(Err(Error::Send(e)))
}
},
State::Empty => {
assert!(this.param.is_some());
Poll::Ready(Ok(()))
}
- State::Closed => panic!("SinkImpl::poll_ready called on a closed sink."),
- State::Failed => panic!("SinkImpl::poll_ready called after error."),
+ State::Closed | State::Failed => Poll::Ready(Err(Error::Closed)),
}
}
@@ -193,7 +192,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
- return Poll::Ready(Err(e));
+ return Poll::Ready(Err(Error::Send(e)));
}
},
State::Flushing => {
@@ -207,7 +206,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
- return Poll::Ready(Err(e));
+ return Poll::Ready(Err(Error::Send(e)));
}
}
}
@@ -221,11 +220,11 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
- return Poll::Ready(Err(e));
+ return Poll::Ready(Err(Error::Send(e)));
}
},
State::Closed => return Poll::Ready(Ok(())),
- State::Failed => panic!("SinkImpl::poll_flush called after error."),
+ State::Failed => return Poll::Ready(Err(Error::Closed)),
}
}
}
@@ -253,7 +252,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
- return Poll::Ready(Err(e));
+ return Poll::Ready(Err(Error::Send(e)));
}
},
State::Flushing => {
@@ -266,7 +265,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
- return Poll::Ready(Err(e));
+ return Poll::Ready(Err(Error::Send(e)));
}
}
}
@@ -280,11 +279,11 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
- return Poll::Ready(Err(e));
+ return Poll::Ready(Err(Error::Send(e)));
}
},
State::Closed => return Poll::Ready(Ok(())),
- State::Failed => panic!("SinkImpl::poll_closed called after error."),
+ State::Failed => return Poll::Ready(Err(Error::Closed)),
}
}
}
it's what other impls seem to do as well
…polling" This reverts commit 003bb0b. Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
That sounds like a plan! Thanks for the review 🙏 While at it, have also added a test to check quicksink doesn't panic anymore. Would it be possible to chery-pick |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
sorry, missed this. You folks don't plan to upgrade to |
No worries :D We plan to upgrade to 0.53 for sure! In the meanwhile, we would appreciate if you can help us with a point release to fix the panics in substrate, thanks 🙏 |
Hey @jxs, let us know if a point release sounds reasonable 🙏 I've been trying to update libp2p locally in substrate and there are a few things that changed in the api, which would make the adoption a bit slower:
|
yeah no worries, I'll try to cut a release out today.
Yeah I did the same ehe and noticed that, along the |
@jxs are you planning to publish |
This release includes: libp2p/rust-libp2p#5482 Which fixes substrate node crashing with libp2p trace: ``` 0: sp_panic_handler::set::{{closure}} 1: std::panicking::rust_panic_with_hook 2: std::panicking::begin_panic::{{closure}} 3: std::sys_common::backtrace::__rust_end_short_backtrace 4: std::panicking::begin_panic 5: <quicksink::SinkImpl<S,F,T,A,E> as futures_sink::Sink<A>>::poll_ready 6: <rw_stream_sink::RwStreamSink<S> as futures_io::if_std::AsyncWrite>::poll_write 7: <libp2p_noise::io::framed::NoiseFramed<T,S> as futures_sink::Sink<&alloc::vec::Vec<u8>>>::poll_ready 8: <libp2p_noise::io::Output<T> as futures_io::if_std::AsyncWrite>::poll_write 9: <yamux::frame::io::Io<T> as futures_sink::Sink<yamux::frame::Frame<()>>>::poll_ready 10: yamux::connection::Connection<T>::poll_next_inbound 11: <libp2p_yamux::Muxer<C> as libp2p_core::muxing::StreamMuxer>::poll 12: <libp2p_core::muxing::boxed::Wrap<T> as libp2p_core::muxing::StreamMuxer>::poll 13: <libp2p_core::muxing::boxed::Wrap<T> as libp2p_core::muxing::StreamMuxer>::poll 14: libp2p_swarm::connection::pool::task::new_for_established_connection::{{closure}} 15: <sc_service::task_manager::prometheus_future::PrometheusFuture<T> as core::future::future::Future>::poll 16: <futures_util::future::select::Select<A,B> as core::future::future::Future>::poll 17: <tracing_futures::Instrumented<T> as core::future::future::Future>::poll 18: std::panicking::try 19: tokio::runtime::task::harness::Harness<T,S>::poll 20: tokio::runtime::scheduler::multi_thread::worker::Context::run_task 21: tokio::runtime::scheduler::multi_thread::worker::Context::run 22: tokio::runtime::context::set_scheduler 23: tokio::runtime::context::runtime::enter_runtime 24: tokio::runtime::scheduler::multi_thread::worker::run 25: tokio::runtime::task::core::Core<T,S>::poll 26: tokio::runtime::task::harness::Harness<T,S>::poll 27: std::sys_common::backtrace::__rust_begin_short_backtrace 28: core::ops::function::FnOnce::call_once{{vtable.shim}} 29: std::sys::pal::unix::thread::Thread::new::thread_start 30: <unknown> 31: <unknown> Thread 'tokio-runtime-worker' panicked at 'SinkImpl::poll_ready called after error.', /home/ubuntu/.cargo/registry/src/index.crates.io-6f17d22bba15001f/quicksink-0.1.2/src/lib.rs:158 ``` Closes: #4934 --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This release includes: libp2p/rust-libp2p#5482 Which fixes substrate node crashing with libp2p trace: ``` 0: sp_panic_handler::set::{{closure}} 1: std::panicking::rust_panic_with_hook 2: std::panicking::begin_panic::{{closure}} 3: std::sys_common::backtrace::__rust_end_short_backtrace 4: std::panicking::begin_panic 5: <quicksink::SinkImpl<S,F,T,A,E> as futures_sink::Sink<A>>::poll_ready 6: <rw_stream_sink::RwStreamSink<S> as futures_io::if_std::AsyncWrite>::poll_write 7: <libp2p_noise::io::framed::NoiseFramed<T,S> as futures_sink::Sink<&alloc::vec::Vec<u8>>>::poll_ready 8: <libp2p_noise::io::Output<T> as futures_io::if_std::AsyncWrite>::poll_write 9: <yamux::frame::io::Io<T> as futures_sink::Sink<yamux::frame::Frame<()>>>::poll_ready 10: yamux::connection::Connection<T>::poll_next_inbound 11: <libp2p_yamux::Muxer<C> as libp2p_core::muxing::StreamMuxer>::poll 12: <libp2p_core::muxing::boxed::Wrap<T> as libp2p_core::muxing::StreamMuxer>::poll 13: <libp2p_core::muxing::boxed::Wrap<T> as libp2p_core::muxing::StreamMuxer>::poll 14: libp2p_swarm::connection::pool::task::new_for_established_connection::{{closure}} 15: <sc_service::task_manager::prometheus_future::PrometheusFuture<T> as core::future::future::Future>::poll 16: <futures_util::future::select::Select<A,B> as core::future::future::Future>::poll 17: <tracing_futures::Instrumented<T> as core::future::future::Future>::poll 18: std::panicking::try 19: tokio::runtime::task::harness::Harness<T,S>::poll 20: tokio::runtime::scheduler::multi_thread::worker::Context::run_task 21: tokio::runtime::scheduler::multi_thread::worker::Context::run 22: tokio::runtime::context::set_scheduler 23: tokio::runtime::context::runtime::enter_runtime 24: tokio::runtime::scheduler::multi_thread::worker::run 25: tokio::runtime::task::core::Core<T,S>::poll 26: tokio::runtime::task::harness::Harness<T,S>::poll 27: std::sys_common::backtrace::__rust_begin_short_backtrace 28: core::ops::function::FnOnce::call_once{{vtable.shim}} 29: std::sys::pal::unix::thread::Thread::new::thread_start 30: <unknown> 31: <unknown> Thread 'tokio-runtime-worker' panicked at 'SinkImpl::poll_ready called after error.', /home/ubuntu/.cargo/registry/src/index.crates.io-6f17d22bba15001f/quicksink-0.1.2/src/lib.rs:158 ``` Closes: paritytech#4934 --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This release includes: libp2p/rust-libp2p#5482 Which fixes substrate node crashing with libp2p trace: ``` 0: sp_panic_handler::set::{{closure}} 1: std::panicking::rust_panic_with_hook 2: std::panicking::begin_panic::{{closure}} 3: std::sys_common::backtrace::__rust_end_short_backtrace 4: std::panicking::begin_panic 5: <quicksink::SinkImpl<S,F,T,A,E> as futures_sink::Sink<A>>::poll_ready 6: <rw_stream_sink::RwStreamSink<S> as futures_io::if_std::AsyncWrite>::poll_write 7: <libp2p_noise::io::framed::NoiseFramed<T,S> as futures_sink::Sink<&alloc::vec::Vec<u8>>>::poll_ready 8: <libp2p_noise::io::Output<T> as futures_io::if_std::AsyncWrite>::poll_write 9: <yamux::frame::io::Io<T> as futures_sink::Sink<yamux::frame::Frame<()>>>::poll_ready 10: yamux::connection::Connection<T>::poll_next_inbound 11: <libp2p_yamux::Muxer<C> as libp2p_core::muxing::StreamMuxer>::poll 12: <libp2p_core::muxing::boxed::Wrap<T> as libp2p_core::muxing::StreamMuxer>::poll 13: <libp2p_core::muxing::boxed::Wrap<T> as libp2p_core::muxing::StreamMuxer>::poll 14: libp2p_swarm::connection::pool::task::new_for_established_connection::{{closure}} 15: <sc_service::task_manager::prometheus_future::PrometheusFuture<T> as core::future::future::Future>::poll 16: <futures_util::future::select::Select<A,B> as core::future::future::Future>::poll 17: <tracing_futures::Instrumented<T> as core::future::future::Future>::poll 18: std::panicking::try 19: tokio::runtime::task::harness::Harness<T,S>::poll 20: tokio::runtime::scheduler::multi_thread::worker::Context::run_task 21: tokio::runtime::scheduler::multi_thread::worker::Context::run 22: tokio::runtime::context::set_scheduler 23: tokio::runtime::context::runtime::enter_runtime 24: tokio::runtime::scheduler::multi_thread::worker::run 25: tokio::runtime::task::core::Core<T,S>::poll 26: tokio::runtime::task::harness::Harness<T,S>::poll 27: std::sys_common::backtrace::__rust_begin_short_backtrace 28: core::ops::function::FnOnce::call_once{{vtable.shim}} 29: std::sys::pal::unix::thread::Thread::new::thread_start 30: <unknown> 31: <unknown> Thread 'tokio-runtime-worker' panicked at 'SinkImpl::poll_ready called after error.', /home/ubuntu/.cargo/registry/src/index.crates.io-6f17d22bba15001f/quicksink-0.1.2/src/lib.rs:158 ``` Closes: #4934 --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
…#5040) (#5072) This release includes: libp2p/rust-libp2p#5482 Which fixes substrate node crashing with libp2p trace: ``` 0: sp_panic_handler::set::{{closure}} 1: std::panicking::rust_panic_with_hook 2: std::panicking::begin_panic::{{closure}} 3: std::sys_common::backtrace::__rust_end_short_backtrace 4: std::panicking::begin_panic 5: <quicksink::SinkImpl<S,F,T,A,E> as futures_sink::Sink<A>>::poll_ready 6: <rw_stream_sink::RwStreamSink<S> as futures_io::if_std::AsyncWrite>::poll_write 7: <libp2p_noise::io::framed::NoiseFramed<T,S> as futures_sink::Sink<&alloc::vec::Vec<u8>>>::poll_ready 8: <libp2p_noise::io::Output<T> as futures_io::if_std::AsyncWrite>::poll_write 9: <yamux::frame::io::Io<T> as futures_sink::Sink<yamux::frame::Frame<()>>>::poll_ready 10: yamux::connection::Connection<T>::poll_next_inbound 11: <libp2p_yamux::Muxer<C> as libp2p_core::muxing::StreamMuxer>::poll 12: <libp2p_core::muxing::boxed::Wrap<T> as libp2p_core::muxing::StreamMuxer>::poll 13: <libp2p_core::muxing::boxed::Wrap<T> as libp2p_core::muxing::StreamMuxer>::poll 14: libp2p_swarm::connection::pool::task::new_for_established_connection::{{closure}} 15: <sc_service::task_manager::prometheus_future::PrometheusFuture<T> as core::future::future::Future>::poll 16: <futures_util::future::select::Select<A,B> as core::future::future::Future>::poll 17: <tracing_futures::Instrumented<T> as core::future::future::Future>::poll 18: std::panicking::try 19: tokio::runtime::task::harness::Harness<T,S>::poll 20: tokio::runtime::scheduler::multi_thread::worker::Context::run_task 21: tokio::runtime::scheduler::multi_thread::worker::Context::run 22: tokio::runtime::context::set_scheduler 23: tokio::runtime::context::runtime::enter_runtime 24: tokio::runtime::scheduler::multi_thread::worker::run 25: tokio::runtime::task::core::Core<T,S>::poll 26: tokio::runtime::task::harness::Harness<T,S>::poll 27: std::sys_common::backtrace::__rust_begin_short_backtrace 28: core::ops::function::FnOnce::call_once{{vtable.shim}} 29: std::sys::pal::unix::thread::Thread::new::thread_start 30: <unknown> 31: <unknown> Thread 'tokio-runtime-worker' panicked at 'SinkImpl::poll_ready called after error.', /home/ubuntu/.cargo/registry/src/index.crates.io-6f17d22bba15001f/quicksink-0.1.2/src/lib.rs:158 ``` Closes: #4934 --------- --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: Adrian Catangiu <adrian@parity.io>
This release includes: libp2p/rust-libp2p#5482 Which fixes substrate node crashing with libp2p trace: ``` 0: sp_panic_handler::set::{{closure}} 1: std::panicking::rust_panic_with_hook 2: std::panicking::begin_panic::{{closure}} 3: std::sys_common::backtrace::__rust_end_short_backtrace 4: std::panicking::begin_panic 5: <quicksink::SinkImpl<S,F,T,A,E> as futures_sink::Sink<A>>::poll_ready 6: <rw_stream_sink::RwStreamSink<S> as futures_io::if_std::AsyncWrite>::poll_write 7: <libp2p_noise::io::framed::NoiseFramed<T,S> as futures_sink::Sink<&alloc::vec::Vec<u8>>>::poll_ready 8: <libp2p_noise::io::Output<T> as futures_io::if_std::AsyncWrite>::poll_write 9: <yamux::frame::io::Io<T> as futures_sink::Sink<yamux::frame::Frame<()>>>::poll_ready 10: yamux::connection::Connection<T>::poll_next_inbound 11: <libp2p_yamux::Muxer<C> as libp2p_core::muxing::StreamMuxer>::poll 12: <libp2p_core::muxing::boxed::Wrap<T> as libp2p_core::muxing::StreamMuxer>::poll 13: <libp2p_core::muxing::boxed::Wrap<T> as libp2p_core::muxing::StreamMuxer>::poll 14: libp2p_swarm::connection::pool::task::new_for_established_connection::{{closure}} 15: <sc_service::task_manager::prometheus_future::PrometheusFuture<T> as core::future::future::Future>::poll 16: <futures_util::future::select::Select<A,B> as core::future::future::Future>::poll 17: <tracing_futures::Instrumented<T> as core::future::future::Future>::poll 18: std::panicking::try 19: tokio::runtime::task::harness::Harness<T,S>::poll 20: tokio::runtime::scheduler::multi_thread::worker::Context::run_task 21: tokio::runtime::scheduler::multi_thread::worker::Context::run 22: tokio::runtime::context::set_scheduler 23: tokio::runtime::context::runtime::enter_runtime 24: tokio::runtime::scheduler::multi_thread::worker::run 25: tokio::runtime::task::core::Core<T,S>::poll 26: tokio::runtime::task::harness::Harness<T,S>::poll 27: std::sys_common::backtrace::__rust_begin_short_backtrace 28: core::ops::function::FnOnce::call_once{{vtable.shim}} 29: std::sys::pal::unix::thread::Thread::new::thread_start 30: <unknown> 31: <unknown> Thread 'tokio-runtime-worker' panicked at 'SinkImpl::poll_ready called after error.', /home/ubuntu/.cargo/registry/src/index.crates.io-6f17d22bba15001f/quicksink-0.1.2/src/lib.rs:158 ``` Closes: paritytech#4934 --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This PR fixes an issue with the framed quicksink being polled multiple times, even after an error.
The quicksink implementation panics when it is polled after an error.
This PR ensures that the framed quicksink is not polled again after an error.
Instead the wrapped
ConnectionSender
sink returnsio::ErrorKind::Other
with an appropriate message.This PR was created with an easy-backport in mind for
libp2p-v0.52.4
.Substrate is currently using
v0.52.4
which hasquicksink
as a dependency (and not an inline module).The
quicksink
crate maintenance has long been abandoned.Panic
Testing Done
Substrate node is stable over the weekend and produces expected warns / err
Next Steps
Part of: #5471
CC: paritytech/polkadot-sdk#4934