-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flume, Metrics and Message control when overloaded #367
Conversation
Note to self: change base to master, and rebase/merge this after the |
5c4b3cb
to
05a3ba3
Compare
(force push to undo a commit I put on the wrong branch :)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Left a few nits and suggestions (likely for future work).
backend/common/src/dense_map.rs
Outdated
use super::*; | ||
|
||
#[test] | ||
fn len_doesnt_panic_if_lots_of_retired() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean "removed" here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retired
is referring to the list of retired
IDs in the densemap struct :)
I could probably have made the grammar in that name a bit less awful!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I made the name less awful!)
use futures::{Stream, StreamExt}; | ||
use std::sync::Arc; | ||
|
||
/// Receive messages out of a connection | ||
pub struct Receiver { | ||
pub(super) inner: mpsc::UnboundedReceiver<Result<RecvMessage, RecvError>>, | ||
pub(super) inner: flume::r#async::RecvStream<'static, Result<RecvMessage, RecvError>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eew, that is an unfortunate choice of module name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I thought so! If I had to write that too much I'd probably use
and rename it!
// in to the aggregator. If nobody is tolding the tx side of the channel | ||
// any more, this task will gracefully end. | ||
/// This is spawned into a separate task and handles any messages coming | ||
/// in to the aggregator. If nobody is tolding the tx side of the channel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// in to the aggregator. If nobody is tolding the tx side of the channel | |
/// in to the aggregator. If nobody is holding the tx side of the channel |
pub chains_subscribed_to: usize, | ||
/// How many feeds are currently subscribed to something. | ||
pub subscribed_feeds: usize, | ||
/// How many feeds have asked for finality information, too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What "too" was hinting at is that it's the number of feeds that are subscribed to a chain and also have asked for finality (because you can't ask for finality without being subscribed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. How about this: "Number of subscribed feeds that also asked for finality information."?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it!
pub total_messages_to_feeds: usize, | ||
/// How many messages are queued waiting to be handled by this aggregator. | ||
pub total_messages_to_aggregator: usize, | ||
/// How many nodes are currently known about by this aggregator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// How many nodes are currently known about by this aggregator. | |
/// How many nodes are currently known to this aggregator. |
?
@@ -100,6 +101,30 @@ pub enum FromFeedWebsocket { | |||
Disconnected, | |||
} | |||
|
|||
/// A set of metrics returned when we ask for metrics | |||
#[derive(Clone, Debug, Default)] | |||
pub struct Metrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking out loud here: it would be interesting to have a few histograms too, e.g. nodes and their verbosity level (to get an inkling to how much data we're receiving), message payload size in and out, maybe connection longevity too (how often do nodes stay connected).
Another data point I wouldn't mind having is the current message rate (is that what they call a "gauge"?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I return the current message rates already (I think guage is the right term.. there was a word for numbers that are always increasing and a word for values that can go up or down and I think guage was the latter!). I think we can see the bandwidth anyway from outside the process, so I wonder how useful the bytes in/out are? But that histogram sounds like a good idea; roughly breaking down the bytes in/out per node/feed could be interesting!
When this merges, perhaps we should create an issue to track any additional metrics we'd like to add?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this merges, perhaps we should create an issue to track any additional metrics we'd like to add?
You read my mind.
msg, | ||
ToAggregator::FromShardWebsocket(.., FromShardWebsocket::Update { .. }) | ||
) { | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm tempted to ask for a log here, but maybe that's a bad thing in this scenario, logging when the node is already struggling?
Either way, I think collecting metrics on the number of dropped messages might prove useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooh yeah, that's a really good number to have! I might sneak that into this PR as I don't think it'll be that hard to gather and it would be useful to see reported!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(done!)
backend/telemetry_core/src/main.rs
Outdated
// we just split out the text format that prometheus expects ourselves, using whatever the latest metrics that we've | ||
// captured so far from the aggregators are. See: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// we just split out the text format that prometheus expects ourselves, using whatever the latest metrics that we've | |
// captured so far from the aggregators are. See: | |
// we just split out the text format that prometheus expects ourselves, and use the latest metrics that we've | |
// captured so far from the aggregators. See: |
|
||
// Sleep *at least* 10 seconds. If it takes a while to get metrics back, we'll | ||
// end up waiting longer between requests. | ||
tokio::time::sleep_until(now + tokio::time::Duration::from_secs(10)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why sleep_until
instead of just sleep
? do we want to sleep 10seconds from the start of execution or at least 10 seconds after the metrics update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could have gone either way really, but I went for "aim to have 10 seconds between each update to metrics, unless it takes more than 10 seconds for an update to come through"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(To explain the reasoning a bit more; the actual logic to gather metrics is fairly quick, but potentially the message could end up in a queue for a while if we happen to be under heavy load, so if it takes eg 9seconds to get a response, I'd rather put another message into the queue 1 second later than wait another 10 seconds before queueing another message)
@@ -39,6 +37,9 @@ pub enum ToAggregator { | |||
FromShardWebsocket(ConnId, FromShardWebsocket), | |||
FromFeedWebsocket(ConnId, FromFeedWebsocket), | |||
FromFindLocation(NodeId, find_location::Location), | |||
/// Hand back some metrics. The provided sender is expected not to block when | |||
/// a message it sent into it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// a message it sent into it. | |
/// a message is sent into it. |
looks good to me apart from a couple questions. Overall I've had a good experience with flume, I like how Sync/Async methods are both available on the same Sender/Receiver objects |
That's good to hear! I like the flexibility it has there too, and unlike the tokio channels it also has Stream/Sink impls, and unlike futures/tokio channels you can see lengths (and on top of all of that, it's supposed to be pretty quick) :) |
@@ -138,7 +138,7 @@ mod test { | |||
use super::*; | |||
|
|||
#[test] | |||
fn len_doesnt_panic_if_lots_of_retired() { | |||
fn len_doesnt_panic_if_lots_of_ids_are_retired() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
Last changes still lgtm! Merge at will. |
flume
throughout. It seems to benchmark well against the futures channels I've been using (see https://github.com/zesterer/flume/blob/master/misc/benchmarks.png) but importantly it also exposes alen()
fn so we can see how many messages are queued in each channel.Builds on https://github.com/paritytech/substrate-telemetry/tree/jsdw-minor-fixes. Merge this after that.