Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: add near_dropped_message_by_type_and_reason_count metric #6678

Merged
merged 5 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
* Make RocksDB block_size configurable [#6631](https://github.com/near/nearcore/pull/6631)
* Increase default max_open_files RocksDB parameter from 512 to 10k [#6607](https://github.com/near/nearcore/pull/6607)
* Use kebab-case names for neard subcommands to make them consistent with flag names. snake_case names are still valid for existing subcommands but kebab-case will be used for new commands.
* Added `near_peer_message_received_by_type_bytes` metric [#6661](https://github.com/near/nearcore/pull/6661)
* Removed `near_<msg-type>_{total,bytes}` metrics in favour of `near_peer_message_received_by_type_{total,bytes}` metrics [#6661](https://github.com/near/nearcore/pull/6661)
* Added `near_peer_message_received_by_type_bytes` [#6661](https://github.com/near/nearcore/pull/6661) and `near_dropped_message_by_type_and_reason_count` [#6678](https://github.com/near/nearcore/pull/6678) metrics.
* Removed `near_<msg-type>_{total,bytes}` [#6661](https://github.com/near/nearcore/pull/6661), `near_<msg-type>_dropped`, `near_drop_message_unknown_account` and `near_dropped_messages_count` [#6678](https://github.com/near/nearcore/pull/6678) metrics.
* Added `near_action_called_count` metric [#6679]((https://github.com/near/nearcore/pull/6679)
* Removed `near_action_<action-type>_total` metrics [#6679]((https://github.com/near/nearcore/pull/6679)
* Added `near_build_info` metric which exports neard’s build information [#6680](https://github.com/near/nearcore/pull/6680)
Expand Down
9 changes: 1 addition & 8 deletions chain/network-primitives/src/network_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,7 @@ pub struct Pong {
// TODO(#1313): Use Box
#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))]
#[derive(
BorshSerialize,
BorshDeserialize,
PartialEq,
Eq,
Clone,
strum::AsRefStr,
strum::AsStaticStr,
strum::EnumVariantNames,
BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, strum::AsRefStr, strum::AsStaticStr,
)]
#[allow(clippy::large_enum_variant)]
pub enum RoutedMessageBody {
Expand Down
11 changes: 1 addition & 10 deletions chain/network/src/network_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,7 @@ impl std::error::Error for HandshakeFailureReason {}
/// DO NOT MOVE, REORDER, DELETE items from the list. Only add new items to the end.
/// If need to remove old items - replace with `None`.
#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))]
#[derive(
BorshSerialize,
BorshDeserialize,
PartialEq,
Eq,
Clone,
Debug,
strum::AsStaticStr,
strum::EnumVariantNames,
)]
#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug, strum::AsStaticStr)]
// TODO(#1313): Use Box
#[allow(clippy::large_enum_variant)]
pub enum PeerMessage {
Expand Down
57 changes: 29 additions & 28 deletions chain/network/src/peer/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,38 @@ impl Encoder<Vec<u8>> for Codec {

fn encode(&mut self, item: Vec<u8>, buf: &mut BytesMut) -> Result<(), Error> {
if item.len() > NETWORK_MESSAGE_MAX_SIZE_BYTES {
Err(Error::new(ErrorKind::InvalidInput, "Input is too long"))
} else {
// TODO(mina86): Is there some way we can know what message we’re
// encoding?
metrics::MessageDropped::InputTooLong.inc_unknown_msg();
return Err(Error::new(ErrorKind::InvalidInput, "Input is too long"));
}

#[cfg(feature = "performance_stats")]
{
let stat = near_performance_metrics::stats_enabled::get_thread_stats_logger();
stat.lock().unwrap().log_add_write_buffer(item.len() + 4, buf.len(), buf.capacity());
}
if buf.capacity() >= MAX_WRITE_BUFFER_CAPACITY_BYTES
&& item.len() + 4 + buf.len() > buf.capacity()
{
#[cfg(feature = "performance_stats")]
{
let stat = near_performance_metrics::stats_enabled::get_thread_stats_logger();
stat.lock().unwrap().log_add_write_buffer(
item.len() + 4,
buf.len(),
buf.capacity(),
);
}
if buf.capacity() >= MAX_WRITE_BUFFER_CAPACITY_BYTES
&& item.len() + 4 + buf.len() > buf.capacity()
{
#[cfg(feature = "performance_stats")]
let tid = near_rust_allocator_proxy::get_tid();
#[cfg(not(feature = "performance_stats"))]
let tid = 0;
error!(target: "network", "{} throwing away message, because buffer is full item.len(): {} buf.capacity: {}",
tid,
item.len(), buf.capacity());
let tid = near_rust_allocator_proxy::get_tid();
#[cfg(not(feature = "performance_stats"))]
let tid = 0;
error!(target: "network", "{} throwing away message, because buffer is full item.len(): {} buf.capacity: {}",
tid,
item.len(), buf.capacity());

metrics::DROPPED_MESSAGES_COUNT.inc_by(1);
return Err(Error::new(ErrorKind::Other, "Buf max capacity exceeded"));
}
// First four bytes is the length of the buffer.
buf.reserve(item.len() + 4);
buf.put_u32_le(item.len() as u32);
buf.put(&item[..]);
Ok(())
// TODO(mina86): Is there some way we can know what message
// we’re encoding?
metrics::MessageDropped::MaxCapacityExceeded.inc_unknown_msg();
return Err(Error::new(ErrorKind::Other, "Buf max capacity exceeded"));
}
// First four bytes is the length of the buffer.
buf.reserve(item.len() + 4);
buf.put_u32_le(item.len() as u32);
buf.put(&item[..]);
Ok(())
}
}

Expand Down
7 changes: 2 additions & 5 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1426,10 +1426,7 @@ impl PeerManagerActor {
}
Err(find_route_error) => {
// TODO(MarX, #1369): Message is dropped here. Define policy for this case.
self.network_metrics.inc(
NetworkMetrics::peer_message_dropped(strum::AsStaticRef::as_static(&msg.body))
.as_str(),
);
metrics::MessageDropped::NoRouteFound.inc(&msg.body);

debug!(target: "network",
account_id = ?self.config.account_id,
Expand Down Expand Up @@ -1458,7 +1455,7 @@ impl PeerManagerActor {
Ok(peer_id) => peer_id,
Err(find_route_error) => {
// TODO(MarX, #1369): Message is dropped here. Define policy for this case.
metrics::DROP_MESSAGE_UNKNOWN_ACCOUNT.inc();
metrics::MessageDropped::UnknownAccount.inc(&msg);
debug!(target: "network",
account_id = ?self.config.account_id,
to = ?account_id,
Expand Down
66 changes: 29 additions & 37 deletions chain/network/src/stats/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use crate::types::PeerMessage;
use near_metrics::{
do_create_int_counter_vec, try_create_histogram, try_create_int_counter,
try_create_int_counter_vec, try_create_int_gauge, Histogram, IntCounter, IntCounterVec,
IntGauge,
};
use near_network_primitives::types::RoutedMessageBody;
use once_cell::sync::Lazy;
use std::collections::HashMap;
use strum::VariantNames;

pub static PEER_CONNECTIONS_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
try_create_int_gauge("near_peer_connections_total", "Number of connected peers").unwrap()
Expand Down Expand Up @@ -90,24 +87,19 @@ pub static PEER_REACHABLE: Lazy<IntGauge> = Lazy::new(|| {
)
.unwrap()
});
pub static DROP_MESSAGE_UNKNOWN_ACCOUNT: Lazy<IntCounter> = Lazy::new(|| {
try_create_int_counter(
"near_drop_message_unknown_account",
"Total messages dropped because target account is not known",
)
.unwrap()
});
pub static RECEIVED_INFO_ABOUT_ITSELF: Lazy<IntCounter> = Lazy::new(|| {
try_create_int_counter(
"received_info_about_itself",
"Number of times a peer tried to connect to itself",
)
.unwrap()
});
pub static DROPPED_MESSAGES_COUNT: Lazy<IntCounter> = Lazy::new(|| {
near_metrics::try_create_int_counter(
"near_dropped_messages_count",
"Total count of messages which were dropped, because write buffer was full",
static DROPPED_MESSAGE_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
near_metrics::try_create_int_counter_vec(
"near_dropped_message_by_type_and_reason_count",
"Total count of messages which were dropped by type of message and \
reason why the message has been dropped",
&["type", "reason"],
)
.unwrap()
});
Expand All @@ -119,28 +111,38 @@ pub static PARTIAL_ENCODED_CHUNK_REQUEST_DELAY: Lazy<Histogram> = Lazy::new(|| {
.unwrap()
});

#[derive(Clone, Copy, strum::AsRefStr)]
pub(crate) enum MessageDropped {
NoRouteFound,
UnknownAccount,
InputTooLong,
MaxCapacityExceeded,
}

impl MessageDropped {
pub fn inc(self, msg: &RoutedMessageBody) {
self.inc_msg_type(msg.as_ref())
}

pub fn inc_unknown_msg(self) {
self.inc_msg_type("unknown")
}

fn inc_msg_type(self, msg_type: &str) {
let reason = self.as_ref();
DROPPED_MESSAGE_COUNT.with_label_values(&[msg_type, reason]).inc();
}
}

#[derive(Clone, Debug, actix::MessageResponse)]
pub struct NetworkMetrics {
// received messages
peer_messages: HashMap<String, IntCounter>,
// sent messages (broadcast style)
pub broadcast_messages: IntCounterVec,
}

impl NetworkMetrics {
pub fn new() -> Self {
Self {
peer_messages: PeerMessage::VARIANTS
.iter()
.filter(|&name| *name != "Routed")
.chain(RoutedMessageBody::VARIANTS.iter())
.filter_map(|name: &&str| {
let counter_name = Self::peer_message_dropped(name);
try_create_int_counter(&counter_name, &counter_name)
.ok()
.map(|counter| (counter_name, counter))
})
.collect(),
broadcast_messages: do_create_int_counter_vec(
"near_broadcast_msg",
"Broadcasted messages",
Expand All @@ -149,16 +151,6 @@ impl NetworkMetrics {
}
}

pub fn peer_message_dropped(message_name: &str) -> String {
format!("near_{}_dropped", message_name.to_lowercase())
}

pub fn inc(&self, message_name: &str) {
if let Some(counter) = self.peer_messages.get(message_name) {
counter.inc();
}
}

pub fn inc_broadcast(&self, message_name: &str) {
self.broadcast_messages.with_label_values(&[message_name]).inc();
}
Expand Down