Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
pvf: Log memory metrics from preparation (#6565)
Browse files Browse the repository at this point in the history
* Add getrusage and memory tracker for precheck preparation

* Log memory stats metrics after prechecking

* Fix tests

* Try to fix errors (linux-only so I'm relying on CI here)

* Try to fix CI

* Add module docs for `prepare/memory_stats.rs`; fix CI error

* Report memory stats for all preparation jobs

* Use `RUSAGE_SELF` instead of `RUSAGE_THREAD`

Not sure why I did that -- was a brainfart on my end.

* Revert last commit (RUSAGE_THREAD is correct)

* Use exponential buckets

* Use `RUSAGE_SELF` for `getrusage`; enable `max_rss` metric for MacOS

* Increase poll interval

* Revert "Use `RUSAGE_SELF` for `getrusage`; enable `max_rss` metric for MacOS"

This reverts commit becf7a8.
  • Loading branch information
mrcnski authored Feb 6, 2023
1 parent 2ff97c5 commit 4f331d7
Show file tree
Hide file tree
Showing 11 changed files with 469 additions and 58 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ assert_matches = "1.4.0"
cpu-time = "1.0.0"
futures = "0.3.21"
futures-timer = "3.0.2"
slotmap = "1.0"
gum = { package = "tracing-gum", path = "../../gum" }
pin-project = "1.0.9"
rand = "0.8.5"
rayon = "1.5.1"
slotmap = "1.0"
tempfile = "3.3.0"
tikv-jemalloc-ctl = "0.5.0"
tokio = { version = "1.24.2", features = ["fs", "process"] }
rayon = "1.5.1"

parity-scale-codec = { version = "3.3.0", default-features = false, features = ["derive"] }

Expand All @@ -38,8 +39,11 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch =
sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }

[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2.139"

[dev-dependencies]
adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" }
halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" }
hex-literal = "0.3.4"
tempfile = "3.2.0"
tempfile = "3.3.0"
2 changes: 1 addition & 1 deletion node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ mod tests {
let pulse = pulse_every(Duration::from_millis(100));
futures::pin_mut!(pulse);

for _ in 0usize..5usize {
for _ in 0..5 {
let start = std::time::Instant::now();
let _ = pulse.next().await.unwrap();

Expand Down
60 changes: 60 additions & 0 deletions node/core/pvf/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,27 @@ impl Metrics {
pub(crate) fn time_execution(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.execution_time.start_timer())
}

/// Observe max_rss for preparation.
pub(crate) fn observe_preparation_max_rss(&self, max_rss: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_rss.observe(max_rss);
}
}

/// Observe max resident memory for preparation.
pub(crate) fn observe_preparation_max_resident(&self, max_resident_kb: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_resident.observe(max_resident_kb);
}
}

/// Observe max allocated memory for preparation.
pub(crate) fn observe_preparation_max_allocated(&self, max_allocated_kb: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_allocated.observe(max_allocated_kb);
}
}
}

#[derive(Clone)]
Expand All @@ -85,6 +106,9 @@ struct MetricsInner {
execute_finished: prometheus::Counter<prometheus::U64>,
preparation_time: prometheus::Histogram,
execution_time: prometheus::Histogram,
preparation_max_rss: prometheus::Histogram,
preparation_max_allocated: prometheus::Histogram,
preparation_max_resident: prometheus::Histogram,
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -202,6 +226,42 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
preparation_max_rss: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_pvf_preparation_max_rss",
"ru_maxrss (maximum resident set size) observed for preparation (in kilobytes)",
).buckets(
prometheus::exponential_buckets(8192.0, 2.0, 10)
.expect("arguments are always valid; qed"),
),
)?,
registry,
)?,
preparation_max_resident: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_pvf_preparation_max_resident",
"max resident memory observed for preparation (in kilobytes)",
).buckets(
prometheus::exponential_buckets(8192.0, 2.0, 10)
.expect("arguments are always valid; qed"),
),
)?,
registry,
)?,
preparation_max_allocated: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_pvf_preparation_max_allocated",
"max allocated memory observed for preparation (in kilobytes)",
).buckets(
prometheus::exponential_buckets(8192.0, 2.0, 10)
.expect("arguments are always valid; qed"),
),
)?,
registry,
)?,
};
Ok(Metrics(Some(inner)))
}
Expand Down
243 changes: 243 additions & 0 deletions node/core/pvf/src/prepare/memory_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
// Copyright 2023 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Memory stats for preparation.
//!
//! Right now we gather three measurements:
//!
//! - `ru_maxrss` (resident set size) from `getrusage`.
//! - `resident` memory stat provided by `tikv-malloc-ctl`.
//! - `allocated` memory stat also from `tikv-malloc-ctl`.
//!
//! Currently we are only logging these for the purposes of gathering data. In the future, we may
//! use these stats to reject PVFs during pre-checking. See
//! <https://github.com/paritytech/polkadot/issues/6472#issuecomment-1381941762> for more
//! background.

use crate::{metrics::Metrics, LOG_TARGET};
use parity_scale_codec::{Decode, Encode};
use std::{
io,
sync::mpsc::{Receiver, RecvTimeoutError, Sender},
time::Duration,
};
use tikv_jemalloc_ctl::{epoch, stats, Error};
use tokio::task::JoinHandle;

#[cfg(target_os = "linux")]
use libc::{getrusage, rusage, timeval, RUSAGE_THREAD};

/// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if
/// supported by the OS, `ru_maxrss`.
#[derive(Encode, Decode)]
pub struct MemoryStats {
/// Memory stats from `tikv_jemalloc_ctl`.
pub memory_tracker_stats: Option<MemoryAllocationStats>,
/// `ru_maxrss` from `getrusage`. A string error since `io::Error` is not `Encode`able.
pub max_rss: Option<Result<i64, String>>,
}

/// Statistics of collected memory metrics.
#[non_exhaustive]
#[derive(Clone, Debug, Default, Encode, Decode)]
pub struct MemoryAllocationStats {
/// Total resident memory, in bytes.
pub resident: u64,
/// Total allocated memory, in bytes.
pub allocated: u64,
}

#[derive(Clone)]
struct MemoryAllocationTracker {
epoch: tikv_jemalloc_ctl::epoch_mib,
allocated: stats::allocated_mib,
resident: stats::resident_mib,
}

impl MemoryAllocationTracker {
pub fn new() -> Result<Self, Error> {
Ok(Self {
epoch: epoch::mib()?,
allocated: stats::allocated::mib()?,
resident: stats::resident::mib()?,
})
}

pub fn snapshot(&self) -> Result<MemoryAllocationStats, Error> {
// update stats by advancing the allocation epoch
self.epoch.advance()?;

// Convert to `u64`, as `usize` is not `Encode`able.
let allocated = self.allocated.read()? as u64;
let resident = self.resident.read()? as u64;
Ok(MemoryAllocationStats { allocated, resident })
}
}

/// Get the rusage stats for the current thread.
#[cfg(target_os = "linux")]
fn getrusage_thread() -> io::Result<rusage> {
let mut result = rusage {
ru_utime: timeval { tv_sec: 0, tv_usec: 0 },
ru_stime: timeval { tv_sec: 0, tv_usec: 0 },
ru_maxrss: 0,
ru_ixrss: 0,
ru_idrss: 0,
ru_isrss: 0,
ru_minflt: 0,
ru_majflt: 0,
ru_nswap: 0,
ru_inblock: 0,
ru_oublock: 0,
ru_msgsnd: 0,
ru_msgrcv: 0,
ru_nsignals: 0,
ru_nvcsw: 0,
ru_nivcsw: 0,
};
if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 {
return Err(io::Error::last_os_error())
}
Ok(result)
}

/// Gets the `ru_maxrss` for the current thread if the OS supports `getrusage`. Otherwise, just
/// returns `None`.
pub fn get_max_rss_thread() -> Option<io::Result<i64>> {
// `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works.
#[cfg(target_os = "linux")]
let max_rss = Some(getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss)));
#[cfg(not(target_os = "linux"))]
let max_rss = None;
max_rss
}

/// Runs a thread in the background that observes memory statistics. The goal is to try to get
/// accurate stats during preparation.
///
/// # Algorithm
///
/// 1. Create the memory tracker.
///
/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the
/// allocation epoch.
///
/// 3. When we receive a signal that preparation has completed, take one last snapshot and return
/// the maximum observed values.
///
/// # Errors
///
/// For simplicity, any errors are returned as a string. As this is not a critical component, errors
/// are used for informational purposes (logging) only.
pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result<MemoryAllocationStats, String> {
// This doesn't need to be too fine-grained since preparation currently takes 3-10s or more.
// Apart from that, there is not really a science to this number.
const POLL_INTERVAL: Duration = Duration::from_millis(100);

let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?;
let mut max_stats = MemoryAllocationStats::default();

let mut update_stats = || -> Result<(), String> {
let current_stats = tracker.snapshot().map_err(|err| err.to_string())?;
if current_stats.resident > max_stats.resident {
max_stats.resident = current_stats.resident;
}
if current_stats.allocated > max_stats.allocated {
max_stats.allocated = current_stats.allocated;
}
Ok(())
};

loop {
// Take a snapshot and update the max stats.
update_stats()?;

// Sleep.
match finished_rx.recv_timeout(POLL_INTERVAL) {
// Received finish signal.
Ok(()) => {
update_stats()?;
return Ok(max_stats)
},
// Timed out, restart loop.
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) =>
return Err("memory_tracker_loop: finished_rx disconnected".into()),
}
}
}

/// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this
/// error handling.
pub async fn get_memory_tracker_loop_stats(
fut: JoinHandle<Result<MemoryAllocationStats, String>>,
tx: Sender<()>,
) -> Option<MemoryAllocationStats> {
// Signal to the memory tracker thread to terminate.
if let Err(err) = tx.send(()) {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error sending signal to memory tracker_thread: {}", err
);
None
} else {
// Join on the thread handle.
match fut.await {
Ok(Ok(stats)) => Some(stats),
Ok(Err(err)) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error occurred in the memory tracker thread: {}", err
);
None
},
Err(err) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error joining on memory tracker thread: {}", err
);
None
},
}
}
}

/// Helper function to send the memory metrics, if available, to prometheus.
pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: u32) {
if let Some(max_rss) = memory_stats.max_rss {
match max_rss {
Ok(max_rss) => metrics.observe_preparation_max_rss(max_rss as f64),
Err(err) => gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"error getting `ru_maxrss` in preparation thread: {}",
err
),
}
}

if let Some(tracker_stats) = memory_stats.memory_tracker_stats {
// We convert these stats from B to KB to match the unit of `ru_maxrss` from `getrusage`.
let resident_kb = (tracker_stats.resident / 1024) as f64;
let allocated_kb = (tracker_stats.allocated / 1024) as f64;

metrics.observe_preparation_max_resident(resident_kb);
metrics.observe_preparation_max_allocated(allocated_kb);
}
}
3 changes: 2 additions & 1 deletion node/core/pvf/src/prepare/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// Copyright 2021-2023 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
Expand All @@ -22,6 +22,7 @@
//! The pool will spawn workers in new processes and those should execute pass control to
//! [`worker_entrypoint`].

mod memory_stats;
mod pool;
mod queue;
mod worker;
Expand Down
Loading

0 comments on commit 4f331d7

Please sign in to comment.