Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Runtime diagnostics for leaked messages in unbounded channels (part 2) #13020

Merged
merged 4 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 45 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"]
array-bytes = "4.1"
async-trait = "0.1"
asynchronous-codec = "0.6"
backtrace = "0.3.67"
bytes = "1"
codec = { package = "parity-scale-codec", version = "3.0.0", features = ["derive"] }
either = "1.5.3"
Expand Down
21 changes: 8 additions & 13 deletions client/network/src/service/out_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
//! - Send events by calling [`OutChannels::send`]. Events are cloned for each sender in the
//! collection.

use backtrace::Backtrace;
use futures::{channel::mpsc, prelude::*, ready, stream::FusedStream};
use log::error;
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
use sc_network_common::protocol::event::Event;
use std::{
backtrace::{Backtrace, BacktraceStatus},
cell::RefCell,
fmt,
pin::Pin,
Expand All @@ -62,7 +62,7 @@ pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver
queue_size: queue_size.clone(),
queue_size_warning,
warning_fired: false,
creation_backtrace: Backtrace::capture(),
creation_backtrace: Backtrace::new_unresolved(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not use the channel from utils here?

The metrics field is always None or?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sender is not accessible on its own, but only via OutChannels. metrics is inserted when a new Sender is added with OutChannels::push():

/// Adds a new [`Sender`] to the collection.
pub fn push(&mut self, sender: Sender) {
let mut metrics = sender.metrics.lock();
debug_assert!(metrics.is_none());
*metrics = Some(self.metrics.clone());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the comment to indicate that metrics will be initialized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics in out_events.rs and mpsc.rs seem different, so I don't know how to reuse utils version of the channel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay ty. At some point we should probably merge both implementations. The metrics are probably not that different.

metrics: metrics.clone(),
};
let rx = Receiver { inner: rx, name, queue_size, metrics };
Expand Down Expand Up @@ -193,17 +193,12 @@ impl OutChannels {
let queue_size = sender.queue_size.fetch_add(1, Ordering::Relaxed);
if queue_size == sender.queue_size_warning && !sender.warning_fired {
sender.warning_fired = true;
match sender.creation_backtrace.status() {
BacktraceStatus::Captured => error!(
"The number of unprocessed events in channel `{}` reached {}.\n\
The channel was created at:\n{}",
sender.name, sender.queue_size_warning, sender.creation_backtrace,
),
_ => error!(
"The number of unprocessed events in channel `{}` reached {}.",
sender.name, sender.queue_size_warning,
),
}
sender.creation_backtrace.resolve();
error!(
"The number of unprocessed events in channel `{}` reached {}.\n\
The channel was created at:\n{:?}",
sender.name, sender.queue_size_warning, sender.creation_backtrace,
);
}
sender.inner.unbounded_send(event.clone()).is_ok()
});
Expand Down
1 change: 1 addition & 0 deletions client/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ description = "I/O for Substrate runtimes"
readme = "README.md"

[dependencies]
backtrace = "0.3.67"
futures = "0.3.21"
futures-timer = "3.0.2"
lazy_static = "1.4.0"
Expand Down
31 changes: 14 additions & 17 deletions client/utils/src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod inner {
mod inner {
// tracing implementation
use crate::metrics::UNBOUNDED_CHANNELS_COUNTER;
use backtrace::Backtrace;
use futures::{
channel::mpsc::{
self, SendError, TryRecvError, TrySendError, UnboundedReceiver, UnboundedSender,
Expand All @@ -47,11 +48,10 @@ mod inner {
};
use log::error;
use std::{
backtrace::{Backtrace, BacktraceStatus},
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicI64, Ordering},
Arc,
Arc, Mutex,
},
};

Expand All @@ -67,7 +67,7 @@ mod inner {
queue_size: Arc<AtomicI64>,
queue_size_warning: i64,
warning_fired: Arc<AtomicBool>,
creation_backtrace: Arc<Backtrace>,
creation_backtrace: Arc<Mutex<Backtrace>>,
}

// Strangely, deriving `Clone` requires that `T` is also `Clone`.
Expand Down Expand Up @@ -108,7 +108,7 @@ mod inner {
queue_size: queue_size.clone(),
queue_size_warning,
warning_fired: Arc::new(AtomicBool::new(false)),
creation_backtrace: Arc::new(Backtrace::capture()),
creation_backtrace: Arc::new(Mutex::new(Backtrace::new_unresolved())),
};
let receiver = TracingUnboundedReceiver { inner: r, name, queue_size };
(sender, receiver)
Expand Down Expand Up @@ -149,23 +149,20 @@ mod inner {

let queue_size = self.queue_size.fetch_add(1, Ordering::Relaxed);
if queue_size == self.queue_size_warning &&
!self.warning_fired.load(Ordering::Relaxed)
self.warning_fired
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
// `warning_fired` and `queue_size` are not synchronized, so it's possible
// that the warning is fired few times before the `warning_fired` is seen
// by all threads. This seems better than introducing a mutex guarding them.
self.warning_fired.store(true, Ordering::Relaxed);
match self.creation_backtrace.status() {
BacktraceStatus::Captured => error!(
"The number of unprocessed messages in channel `{}` reached {}.\n\
The channel was created at:\n{}",
self.name, self.queue_size_warning, self.creation_backtrace,
),
_ => error!(
"The number of unprocessed messages in channel `{}` reached {}.",
self.name, self.queue_size_warning,
),
}
let mut bt = self.creation_backtrace.lock().expect("another thread panicked.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure about the mutex here. I mean I get why, but we could also just use Arc and then clone the backtrace here into some mutable value to resolve it.

let mut backtrace = (*self.creation_backtrace).clone();
backtrace.resolve();

But I don't know if not using a mutex is wort the clone 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. It's a rare one-time event anyway, not a big deal.

bt.resolve();
error!(
"The number of unprocessed messages in channel `{}` reached {}.\n\
The channel was created at:\n{:?}",
self.name, self.queue_size_warning, bt,
);
}

s
Expand Down