Skip to content

Commit

Permalink
Runtime diagnostics for leaked messages in unbounded channels (part 2) (
Browse files Browse the repository at this point in the history
paritytech#13020)

* Fix code review issues

* Clarify doc

* Get rid of backtrace mutex

* kick CI
  • Loading branch information
dmitry-markin authored and ltfschoen committed Feb 22, 2023
1 parent 13dbeba commit 81f0322
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 48 deletions.
64 changes: 45 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"]
array-bytes = "4.1"
async-trait = "0.1"
asynchronous-codec = "0.6"
backtrace = "0.3.67"
bytes = "1"
codec = { package = "parity-scale-codec", version = "3.0.0", features = ["derive"] }
either = "1.5.3"
Expand Down
24 changes: 10 additions & 14 deletions client/network/src/service/out_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
//! - Send events by calling [`OutChannels::send`]. Events are cloned for each sender in the
//! collection.

use backtrace::Backtrace;
use futures::{channel::mpsc, prelude::*, ready, stream::FusedStream};
use log::error;
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
use sc_network_common::protocol::event::Event;
use std::{
backtrace::{Backtrace, BacktraceStatus},
cell::RefCell,
fmt,
pin::Pin,
Expand All @@ -62,7 +62,7 @@ pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver
queue_size: queue_size.clone(),
queue_size_warning,
warning_fired: false,
creation_backtrace: Backtrace::capture(),
creation_backtrace: Backtrace::new_unresolved(),
metrics: metrics.clone(),
};
let rx = Receiver { inner: rx, name, queue_size, metrics };
Expand Down Expand Up @@ -91,7 +91,8 @@ pub struct Sender {
warning_fired: bool,
/// Backtrace of a place where the channel was created.
creation_backtrace: Backtrace,
/// Clone of [`Receiver::metrics`].
/// Clone of [`Receiver::metrics`]. Will be initialized when [`Sender`] is added to
/// [`OutChannels`] with `OutChannels::push()`.
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
}

Expand Down Expand Up @@ -193,17 +194,12 @@ impl OutChannels {
let queue_size = sender.queue_size.fetch_add(1, Ordering::Relaxed);
if queue_size == sender.queue_size_warning && !sender.warning_fired {
sender.warning_fired = true;
match sender.creation_backtrace.status() {
BacktraceStatus::Captured => error!(
"The number of unprocessed events in channel `{}` reached {}.\n\
The channel was created at:\n{}",
sender.name, sender.queue_size_warning, sender.creation_backtrace,
),
_ => error!(
"The number of unprocessed events in channel `{}` reached {}.",
sender.name, sender.queue_size_warning,
),
}
sender.creation_backtrace.resolve();
error!(
"The number of unprocessed events in channel `{}` reached {}.\n\
The channel was created at:\n{:?}",
sender.name, sender.queue_size_warning, sender.creation_backtrace,
);
}
sender.inner.unbounded_send(event.clone()).is_ok()
});
Expand Down
1 change: 1 addition & 0 deletions client/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ description = "I/O for Substrate runtimes"
readme = "README.md"

[dependencies]
backtrace = "0.3.67"
futures = "0.3.21"
futures-timer = "3.0.2"
lazy_static = "1.4.0"
Expand Down
27 changes: 12 additions & 15 deletions client/utils/src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod inner {
mod inner {
// tracing implementation
use crate::metrics::UNBOUNDED_CHANNELS_COUNTER;
use backtrace::Backtrace;
use futures::{
channel::mpsc::{
self, SendError, TryRecvError, TrySendError, UnboundedReceiver, UnboundedSender,
Expand All @@ -47,7 +48,6 @@ mod inner {
};
use log::error;
use std::{
backtrace::{Backtrace, BacktraceStatus},
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicI64, Ordering},
Expand Down Expand Up @@ -108,7 +108,7 @@ mod inner {
queue_size: queue_size.clone(),
queue_size_warning,
warning_fired: Arc::new(AtomicBool::new(false)),
creation_backtrace: Arc::new(Backtrace::capture()),
creation_backtrace: Arc::new(Backtrace::new_unresolved()),
};
let receiver = TracingUnboundedReceiver { inner: r, name, queue_size };
(sender, receiver)
Expand Down Expand Up @@ -149,23 +149,20 @@ mod inner {

let queue_size = self.queue_size.fetch_add(1, Ordering::Relaxed);
if queue_size == self.queue_size_warning &&
!self.warning_fired.load(Ordering::Relaxed)
self.warning_fired
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
// `warning_fired` and `queue_size` are not synchronized, so it's possible
// that the warning is fired few times before the `warning_fired` is seen
// by all threads. This seems better than introducing a mutex guarding them.
self.warning_fired.store(true, Ordering::Relaxed);
match self.creation_backtrace.status() {
BacktraceStatus::Captured => error!(
"The number of unprocessed messages in channel `{}` reached {}.\n\
The channel was created at:\n{}",
self.name, self.queue_size_warning, self.creation_backtrace,
),
_ => error!(
"The number of unprocessed messages in channel `{}` reached {}.",
self.name, self.queue_size_warning,
),
}
let mut backtrace = (*self.creation_backtrace).clone();
backtrace.resolve();
error!(
"The number of unprocessed messages in channel `{}` reached {}.\n\
The channel was created at:\n{:?}",
self.name, self.queue_size_warning, backtrace,
);
}

s
Expand Down

0 comments on commit 81f0322

Please sign in to comment.