Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flume, Metrics and Message control when overloaded #367

Merged
merged 23 commits into from
Aug 13, 2021

Conversation

jsdw
Copy link
Collaborator

@jsdw jsdw commented Aug 12, 2021

  • Make use of flume throughout. It seems to benchmark well against the futures channels I've been using (see https://github.com/zesterer/flume/blob/master/misc/benchmarks.png) but importantly it also exposes a len() fn so we can see how many messages are queued in each channel.
  • Gather and expose metrics. The above gives us access to some useful metrics on top of what we'd already have. These are exposed in a format compatible with Prometheus (tested using a local version of prometheus).
  • Test util updates. Remove the "worse" soak test runner, and add options to make it easier to test a few different things, and easier to scale the test runner to use more cores (which is needed for larger scale tests).

Builds on https://github.com/paritytech/substrate-telemetry/tree/jsdw-minor-fixes. Merge this after that.

@jsdw jsdw changed the base branch from master to jsdw-minor-fixes August 12, 2021 15:07
@jsdw jsdw marked this pull request as ready for review August 12, 2021 15:16
@jsdw
Copy link
Collaborator Author

jsdw commented Aug 12, 2021

Note to self: change base to master, and rebase/merge this after the jsdw-minor-fixes stuff merges

@jsdw jsdw force-pushed the jsdw-sharding-gatekeeper branch from 5c4b3cb to 05a3ba3 Compare August 12, 2021 15:57
@jsdw
Copy link
Collaborator Author

jsdw commented Aug 12, 2021

(force push to undo a commit I put on the wrong branch :))

Copy link
Contributor

@dvdplm dvdplm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Left a few nits and suggestions (likely for future work).

use super::*;

#[test]
fn len_doesnt_panic_if_lots_of_retired() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean "removed" here?

Copy link
Collaborator Author

@jsdw jsdw Aug 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retired is referring to the list of retired IDs in the densemap struct :)

I could probably have made the grammar in that name a bit less awful!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I made the name less awful!)

use futures::{Stream, StreamExt};
use std::sync::Arc;

/// Receive messages out of a connection
pub struct Receiver {
pub(super) inner: mpsc::UnboundedReceiver<Result<RecvMessage, RecvError>>,
pub(super) inner: flume::r#async::RecvStream<'static, Result<RecvMessage, RecvError>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eew, that is an unfortunate choice of module name.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought so! If I had to write that too much I'd probably use and rename it!

// in to the aggregator. If nobody is tolding the tx side of the channel
// any more, this task will gracefully end.
/// This is spawned into a separate task and handles any messages coming
/// in to the aggregator. If nobody is tolding the tx side of the channel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// in to the aggregator. If nobody is tolding the tx side of the channel
/// in to the aggregator. If nobody is holding the tx side of the channel

pub chains_subscribed_to: usize,
/// How many feeds are currently subscribed to something.
pub subscribed_feeds: usize,
/// How many feeds have asked for finality information, too.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What "too" was hinting at is that it's the number of feeds that are subscribed to a chain and also have asked for finality (because you can't ask for finality without being subscribed)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. How about this: "Number of subscribed feeds that also asked for finality information."?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it!

pub total_messages_to_feeds: usize,
/// How many messages are queued waiting to be handled by this aggregator.
pub total_messages_to_aggregator: usize,
/// How many nodes are currently known about by this aggregator.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// How many nodes are currently known about by this aggregator.
/// How many nodes are currently known to this aggregator.

?

@@ -100,6 +101,30 @@ pub enum FromFeedWebsocket {
Disconnected,
}

/// A set of metrics returned when we ask for metrics
#[derive(Clone, Debug, Default)]
pub struct Metrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud here: it would be interesting to have a few histograms too, e.g. nodes and their verbosity level (to get an inkling to how much data we're receiving), message payload size in and out, maybe connection longevity too (how often do nodes stay connected).
Another data point I wouldn't mind having is the current message rate (is that what they call a "gauge"?).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I return the current message rates already (I think guage is the right term.. there was a word for numbers that are always increasing and a word for values that can go up or down and I think guage was the latter!). I think we can see the bandwidth anyway from outside the process, so I wonder how useful the bytes in/out are? But that histogram sounds like a good idea; roughly breaking down the bytes in/out per node/feed could be interesting!

When this merges, perhaps we should create an issue to track any additional metrics we'd like to add?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this merges, perhaps we should create an issue to track any additional metrics we'd like to add?

You read my mind.

msg,
ToAggregator::FromShardWebsocket(.., FromShardWebsocket::Update { .. })
) {
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tempted to ask for a log here, but maybe that's a bad thing in this scenario, logging when the node is already struggling?

Either way, I think collecting metrics on the number of dropped messages might prove useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh yeah, that's a really good number to have! I might sneak that into this PR as I don't think it'll be that hard to gather and it would be useful to see reported!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(done!)

Comment on lines 489 to 490
// we just split out the text format that prometheus expects ourselves, using whatever the latest metrics that we've
// captured so far from the aggregators are. See:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// we just split out the text format that prometheus expects ourselves, using whatever the latest metrics that we've
// captured so far from the aggregators are. See:
// we just split out the text format that prometheus expects ourselves, and use the latest metrics that we've
// captured so far from the aggregators. See:


// Sleep *at least* 10 seconds. If it takes a while to get metrics back, we'll
// end up waiting longer between requests.
tokio::time::sleep_until(now + tokio::time::Duration::from_secs(10)).await;
Copy link

@insipx insipx Aug 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why sleep_until instead of just sleep? do we want to sleep 10seconds from the start of execution or at least 10 seconds after the metrics update?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have gone either way really, but I went for "aim to have 10 seconds between each update to metrics, unless it takes more than 10 seconds for an update to come through"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(To explain the reasoning a bit more; the actual logic to gather metrics is fairly quick, but potentially the message could end up in a queue for a while if we happen to be under heavy load, so if it takes eg 9seconds to get a response, I'd rather put another message into the queue 1 second later than wait another 10 seconds before queueing another message)

@@ -39,6 +37,9 @@ pub enum ToAggregator {
FromShardWebsocket(ConnId, FromShardWebsocket),
FromFeedWebsocket(ConnId, FromFeedWebsocket),
FromFindLocation(NodeId, find_location::Location),
/// Hand back some metrics. The provided sender is expected not to block when
/// a message it sent into it.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// a message it sent into it.
/// a message is sent into it.

@insipx
Copy link

insipx commented Aug 13, 2021

looks good to me apart from a couple questions. Overall I've had a good experience with flume, I like how Sync/Async methods are both available on the same Sender/Receiver objects

Base automatically changed from jsdw-minor-fixes to master August 13, 2021 09:51
@jsdw
Copy link
Collaborator Author

jsdw commented Aug 13, 2021

looks good to me apart from a couple questions. Overall I've had a good experience with flume, I like how Sync/Async methods are both available on the same Sender/Receiver objects

That's good to hear! I like the flexibility it has there too, and unlike the tokio channels it also has Stream/Sink impls, and unlike futures/tokio channels you can see lengths (and on top of all of that, it's supposed to be pretty quick) :)

@@ -138,7 +138,7 @@ mod test {
use super::*;

#[test]
fn len_doesnt_panic_if_lots_of_retired() {
fn len_doesnt_panic_if_lots_of_ids_are_retired() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

@dvdplm
Copy link
Contributor

dvdplm commented Aug 13, 2021

Last changes still lgtm! Merge at will.

@jsdw jsdw merged commit 502fd2e into master Aug 13, 2021
@jsdw jsdw deleted the jsdw-sharding-gatekeeper branch August 13, 2021 13:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants