Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

[DNM] Box messages in the overseer #7418

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion node/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../gum" }

metered = { package = "prioritized-metered-channel", version = "0.2.0" }
metered = { package = "prioritized-metered-channel", git = "https://github.com/paritytech/orchestra", branch = "vstakhov-feature-async-channels", default-features = false, features=["futures_channel"] }

# Both `sc-service` and `sc-cli` are required by runtime metrics `logger_hook()`.
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
19 changes: 19 additions & 0 deletions node/network/bridge/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,16 @@ impl Metrics {

pub fn on_report_event(&self) {
if let Some(metrics) = self.0.as_ref() {
metrics.messages_sent.with_label_values(&["report_peer"]).inc();
metrics.report_events.inc()
}
}

pub fn on_message(&self, message_type: &'static str) {
if let Some(metrics) = self.0.as_ref() {
metrics.messages_sent.with_label_values(&[message_type]).inc()
}
}
}

#[derive(Clone)]
Expand All @@ -123,6 +130,8 @@ pub(crate) struct MetricsInner {

bytes_received: prometheus::CounterVec<prometheus::U64>,
bytes_sent: prometheus::CounterVec<prometheus::U64>,

messages_sent: prometheus::CounterVec<prometheus::U64>,
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -217,6 +226,16 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
messages_sent: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"polkadot_parachain_messages_sent_total",
"The number of messages sent via network bridge",
),
&["type"]
)?,
registry,
)?,
};

Ok(Metrics(Some(metrics)))
Expand Down
1 change: 1 addition & 0 deletions node/network/bridge/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub(crate) fn send_message<M>(
let message = {
let encoded = message.encode();
metrics.on_notification_sent(peer_set, version, encoded.len(), peers.len());
metrics.on_message(std::any::type_name::<M>());
encoded
};

Expand Down
11 changes: 11 additions & 0 deletions node/network/bridge/src/tx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use polkadot_node_subsystem::{
///
/// To be passed to [`FullNetworkConfiguration::add_notification_protocol`]().
pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
use polkadot_node_network_protocol::request_response::Requests;
use sc_network::ReputationChange;

use crate::validator_discovery;
Expand Down Expand Up @@ -261,6 +262,16 @@ where
);

for req in reqs {
match req {
Requests::ChunkFetchingV1(_) => metrics.on_message("chunk_fetching_v1"),
Requests::AvailableDataFetchingV1(_) =>
metrics.on_message("available_data_fetching_v1"),
Requests::CollationFetchingV1(_) => metrics.on_message("collation_fetching_v1"),
Requests::PoVFetchingV1(_) => metrics.on_message("pov_fetching_v1"),
Requests::DisputeSendingV1(_) => metrics.on_message("dispute_sending_v1"),
Requests::StatementFetchingV1(_) => metrics.on_message("statement_fetching_v1"),
}

network_service
.start_request(
&mut authority_discovery_service,
Expand Down
8 changes: 4 additions & 4 deletions node/overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ polkadot-node-primitives = { path = "../primitives" }
polkadot-node-subsystem-types = { path = "../subsystem-types" }
polkadot-node-metrics = { path = "../metrics" }
polkadot-primitives = { path = "../../primitives" }
orchestra = "0.0.5"
orchestra = { git = "https://github.com/paritytech/orchestra", branch = "vstakhov-feature-async-channels", default-features = false }
gum = { package = "tracing-gum", path = "../gum" }
lru = "0.9"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
async-trait = "0.1.57"
tikv-jemalloc-ctl = { version = "0.5.0", optional = true }

[dev-dependencies]
metered = { package = "prioritized-metered-channel", version = "0.2.0" }

metered = { package = "prioritized-metered-channel", git = "https://github.com/paritytech/orchestra", branch = "vstakhov-feature-async-channels", default-features = false, features=["futures_channel"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = { version = "0.3.21", features = ["thread-pool"] }
femme = "2.2.1"
Expand All @@ -35,7 +34,8 @@ test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../pri
tikv-jemalloc-ctl = "0.5.0"

[features]
default = []
default = ["futures_channel"]
expand = ["orchestra/expand"]
dotgraph = ["orchestra/dotgraph"]
futures_channel = ["orchestra/futures_channel", "metered/futures_channel"]
jemalloc-allocator = ["dep:tikv-jemalloc-ctl"]
1 change: 1 addition & 0 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut hand
signal=OverseerSignal,
error=SubsystemError,
message_capacity=2048,
boxed_messages=true,
)]
pub struct Overseer<SupportsParachains> {
#[subsystem(CandidateValidationMessage, sends: [
Expand Down
1 change: 1 addition & 0 deletions node/overseer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() {

#[test]
fn context_holds_onto_message_until_enough_signals_received() {
const CHANNEL_CAPACITY: usize = 1024;
let (candidate_validation_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
let (candidate_backing_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
let (statement_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
Expand Down
2 changes: 1 addition & 1 deletion node/subsystem-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ polkadot-node-primitives = { path = "../primitives" }
polkadot-node-network-protocol = { path = "../network/protocol" }
polkadot-statement-table = { path = "../../statement-table" }
polkadot-node-jaeger = { path = "../jaeger" }
orchestra = "0.0.5"
orchestra = { git = "https://github.com/paritytech/orchestra", branch = "vstakhov-feature-async-channels", default-features = false, features=["futures_channel"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
2 changes: 1 addition & 1 deletion node/subsystem-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ polkadot-node-network-protocol = { path = "../network/protocol" }
polkadot-primitives = { path = "../../primitives" }
polkadot-node-primitives = { path = "../primitives" }
polkadot-overseer = { path = "../overseer" }
metered = { package = "prioritized-metered-channel", version = "0.2.0" }
metered = { package = "prioritized-metered-channel", git = "https://github.com/paritytech/orchestra", branch = "vstakhov-feature-async-channels", default-features = false, features=["futures_channel"] }

sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down