Skip to content

Commit

Permalink
network: add near_dropped_message_by_type_and_reason_count metric
Browse files Browse the repository at this point in the history
Add near_dropped_message_by_type_and_reason_count Prometheus metric
and delete `near_drop_message_unknown_account`,
`near_<msg-type>_dropped` and `near_dropped_messages_count` metrics.
The new metric includes all and more information than the previous
metrics provided and does so in a more consistent format.

This not only improves the metrics but also makes the code simpler by
eliminating `peer_messages` hash map.
  • Loading branch information
mina86 committed Apr 23, 2022
1 parent 67475dc commit 365fe45
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 74 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
* Make RocksDB block_size configurable [#6631](https://github.com/near/nearcore/pull/6631)
* Increase default max_open_files RocksDB parameter from 512 to 10k [#6607](https://github.com/near/nearcore/pull/6607)
* Use kebab-case names for neard subcommands to make them consistent with flag names. snake_case names are still valid for existing subcommands but kebab-case will be used for new commands.
* Added `near_peer_message_received_by_type_bytes` metric [#6661](https://github.com/near/nearcore/pull/6661)
* Removed `near_<msg-type>_{total,bytes}` metrics in favour of `near_peer_message_received_by_type_{total,bytes}` metrics [#6661](https://github.com/near/nearcore/pull/6661)
* Added `near_peer_message_received_by_type_bytes` [#6661](https://github.com/near/nearcore/pull/6661) and `near_dropped_message_by_type_and_reason_count` [#6678](https://github.com/near/nearcore/pull/6678) metrics.
* Removed `near_<msg-type>_{total,bytes}` [#6661](https://github.com/near/nearcore/pull/6661), `near_<msg-type>_dropped`, `near_drop_message_unknown_account` and `near_dropped_messages_count` [#6678](https://github.com/near/nearcore/pull/6678) metrics.

## 1.25.0 [2022-03-16]

Expand Down
1 change: 0 additions & 1 deletion chain/network-primitives/src/network_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ pub struct Pong {
Clone,
strum::AsRefStr,
strum::AsStaticStr,
strum::EnumVariantNames,
)]
#[allow(clippy::large_enum_variant)]
pub enum RoutedMessageBody {
Expand Down
1 change: 0 additions & 1 deletion chain/network/src/network_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ impl std::error::Error for HandshakeFailureReason {}
Clone,
Debug,
strum::AsStaticStr,
strum::EnumVariantNames,
)]
// TODO(#1313): Use Box
#[allow(clippy::large_enum_variant)]
Expand Down
61 changes: 33 additions & 28 deletions chain/network/src/peer/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,42 @@ impl Encoder<Vec<u8>> for Codec {

fn encode(&mut self, item: Vec<u8>, buf: &mut BytesMut) -> Result<(), Error> {
if item.len() > NETWORK_MESSAGE_MAX_SIZE_BYTES {
Err(Error::new(ErrorKind::InvalidInput, "Input is too long"))
} else {
// TODO(mina86): Is there some way we can know what message we’re
// encoding?
metrics::MessageDropped::InputTooLong.inc_unknown_msg();
return Err(Error::new(ErrorKind::InvalidInput, "Input is too long"));
}

#[cfg(feature = "performance_stats")]
{
let stat = near_performance_metrics::stats_enabled::get_thread_stats_logger();
stat.lock().unwrap().log_add_write_buffer(
item.len() + 4,
buf.len(),
buf.capacity(),
);
}
if buf.capacity() >= MAX_WRITE_BUFFER_CAPACITY_BYTES
&& item.len() + 4 + buf.len() > buf.capacity()
{
#[cfg(feature = "performance_stats")]
{
let stat = near_performance_metrics::stats_enabled::get_thread_stats_logger();
stat.lock().unwrap().log_add_write_buffer(
item.len() + 4,
buf.len(),
buf.capacity(),
);
}
if buf.capacity() >= MAX_WRITE_BUFFER_CAPACITY_BYTES
&& item.len() + 4 + buf.len() > buf.capacity()
{
#[cfg(feature = "performance_stats")]
let tid = near_rust_allocator_proxy::get_tid();
#[cfg(not(feature = "performance_stats"))]
let tid = 0;
error!(target: "network", "{} throwing away message, because buffer is full item.len(): {} buf.capacity: {}",
tid,
item.len(), buf.capacity());
let tid = near_rust_allocator_proxy::get_tid();
#[cfg(not(feature = "performance_stats"))]
let tid = 0;
error!(target: "network", "{} throwing away message, because buffer is full item.len(): {} buf.capacity: {}",
tid,
item.len(), buf.capacity());

metrics::DROPPED_MESSAGES_COUNT.inc_by(1);
return Err(Error::new(ErrorKind::Other, "Buf max capacity exceeded"));
}
// First four bytes is the length of the buffer.
buf.reserve(item.len() + 4);
buf.put_u32_le(item.len() as u32);
buf.put(&item[..]);
Ok(())
// TODO(mina86): Is there some way we can know what message
// we’re encoding?
metrics::MessageDropped::MaxCapacityExceeded.inc_unknown_msg();
return Err(Error::new(ErrorKind::Other, "Buf max capacity exceeded"));
}
// First four bytes is the length of the buffer.
buf.reserve(item.len() + 4);
buf.put_u32_le(item.len() as u32);
buf.put(&item[..]);
Ok(())
}
}

Expand Down
7 changes: 2 additions & 5 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1426,10 +1426,7 @@ impl PeerManagerActor {
}
Err(find_route_error) => {
// TODO(MarX, #1369): Message is dropped here. Define policy for this case.
self.network_metrics.inc(
NetworkMetrics::peer_message_dropped(strum::AsStaticRef::as_static(&msg.body))
.as_str(),
);
metrics::MessageDropped::NoRouteFound.inc(&msg.body);

debug!(target: "network",
account_id = ?self.config.account_id,
Expand Down Expand Up @@ -1458,7 +1455,7 @@ impl PeerManagerActor {
Ok(peer_id) => peer_id,
Err(find_route_error) => {
// TODO(MarX, #1369): Message is dropped here. Define policy for this case.
metrics::DROP_MESSAGE_UNKNOWN_ACCOUNT.inc();
metrics::MessageDropped::UnknownAccount.inc(&msg);
debug!(target: "network",
account_id = ?self.config.account_id,
to = ?account_id,
Expand Down
66 changes: 29 additions & 37 deletions chain/network/src/stats/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use crate::types::PeerMessage;
use near_metrics::{
do_create_int_counter_vec, try_create_histogram, try_create_int_counter,
try_create_int_counter_vec, try_create_int_gauge, Histogram, IntCounter, IntCounterVec,
IntGauge,
};
use near_network_primitives::types::RoutedMessageBody;
use once_cell::sync::Lazy;
use std::collections::HashMap;
use strum::VariantNames;

pub static PEER_CONNECTIONS_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
try_create_int_gauge("near_peer_connections_total", "Number of connected peers").unwrap()
Expand Down Expand Up @@ -90,24 +87,19 @@ pub static PEER_REACHABLE: Lazy<IntGauge> = Lazy::new(|| {
)
.unwrap()
});
pub static DROP_MESSAGE_UNKNOWN_ACCOUNT: Lazy<IntCounter> = Lazy::new(|| {
try_create_int_counter(
"near_drop_message_unknown_account",
"Total messages dropped because target account is not known",
)
.unwrap()
});
pub static RECEIVED_INFO_ABOUT_ITSELF: Lazy<IntCounter> = Lazy::new(|| {
try_create_int_counter(
"received_info_about_itself",
"Number of times a peer tried to connect to itself",
)
.unwrap()
});
pub static DROPPED_MESSAGES_COUNT: Lazy<IntCounter> = Lazy::new(|| {
near_metrics::try_create_int_counter(
"near_dropped_messages_count",
"Total count of messages which were dropped, because write buffer was full",
static DROPPED_MESSAGE_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
near_metrics::try_create_int_counter_vec(
"near_dropped_message_by_type_and_reason_count",
"Total count of messages which were dropped by type of message and \
reason why the message has been dropped",
&["type", "reason"]
)
.unwrap()
});
Expand All @@ -119,28 +111,38 @@ pub static PARTIAL_ENCODED_CHUNK_REQUEST_DELAY: Lazy<Histogram> = Lazy::new(|| {
.unwrap()
});

#[derive(Clone, Copy, strum::AsRefStr)]
pub(crate) enum MessageDropped {
NoRouteFound,
UnknownAccount,
InputTooLong,
MaxCapacityExceeded,
}

impl MessageDropped {
pub fn inc(self, msg: &RoutedMessageBody) {
self.inc_msg_type(msg.as_ref())
}

pub fn inc_unknown_msg(self) {
self.inc_msg_type("unknown")
}

fn inc_msg_type(self, msg_type: &str) {
let reason = self.as_ref();
DROPPED_MESSAGE_COUNT.with_label_values(&[msg_type, reason]).inc();
}
}

#[derive(Clone, Debug, actix::MessageResponse)]
pub struct NetworkMetrics {
// received messages
peer_messages: HashMap<String, IntCounter>,
// sent messages (broadcast style)
pub broadcast_messages: IntCounterVec,
}

impl NetworkMetrics {
pub fn new() -> Self {
Self {
peer_messages: PeerMessage::VARIANTS
.iter()
.filter(|&name| *name != "Routed")
.chain(RoutedMessageBody::VARIANTS.iter())
.filter_map(|name: &&str| {
let counter_name = Self::peer_message_dropped(name);
try_create_int_counter(&counter_name, &counter_name)
.ok()
.map(|counter| (counter_name, counter))
})
.collect(),
broadcast_messages: do_create_int_counter_vec(
"near_broadcast_msg",
"Broadcasted messages",
Expand All @@ -149,16 +151,6 @@ impl NetworkMetrics {
}
}

pub fn peer_message_dropped(message_name: &str) -> String {
format!("near_{}_dropped", message_name.to_lowercase())
}

pub fn inc(&self, message_name: &str) {
if let Some(counter) = self.peer_messages.get(message_name) {
counter.inc();
}
}

pub fn inc_broadcast(&self, message_name: &str) {
self.broadcast_messages.with_label_values(&[message_name]).inc();
}
Expand Down

0 comments on commit 365fe45

Please sign in to comment.