Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

pvf: Log memory metrics from preparation #6565

Merged
merged 16 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ assert_matches = "1.4.0"
cpu-time = "1.0.0"
futures = "0.3.21"
futures-timer = "3.0.2"
slotmap = "1.0"
gum = { package = "tracing-gum", path = "../../gum" }
pin-project = "1.0.9"
rand = "0.8.5"
rayon = "1.5.1"
slotmap = "1.0"
tempfile = "3.3.0"
tikv-jemalloc-ctl = "0.5.0"
tokio = { version = "1.22.0", features = ["fs", "process"] }
rayon = "1.5.1"

parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] }

Expand All @@ -38,6 +39,9 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch =
sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }

[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2.139"

[dev-dependencies]
adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" }
halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" }
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ mod tests {
let pulse = pulse_every(Duration::from_millis(100));
futures::pin_mut!(pulse);

for _ in 0usize..5usize {
for _ in 0..5 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the unrelated change, but this really triggered me.

let start = std::time::Instant::now();
let _ = pulse.next().await.unwrap();

Expand Down
51 changes: 51 additions & 0 deletions node/core/pvf/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,27 @@ impl Metrics {
pub(crate) fn time_execution(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.execution_time.start_timer())
}

/// Observe max_rss for preparation.
pub(crate) fn observe_preparation_max_rss(&self, max_rss: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_rss.observe(max_rss);
}
}

/// Observe max resident memory for preparation.
pub(crate) fn observe_preparation_max_resident(&self, max_resident_kb: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_resident.observe(max_resident_kb);
}
}

/// Observe max allocated memory for preparation.
pub(crate) fn observe_preparation_max_allocated(&self, max_allocated_kb: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_allocated.observe(max_allocated_kb);
}
}
}

#[derive(Clone)]
Expand All @@ -85,6 +106,9 @@ struct MetricsInner {
execute_finished: prometheus::Counter<prometheus::U64>,
preparation_time: prometheus::Histogram,
execution_time: prometheus::Histogram,
preparation_max_rss: prometheus::Histogram,
preparation_max_allocated: prometheus::Histogram,
preparation_max_resident: prometheus::Histogram,
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -202,6 +226,33 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
preparation_max_rss: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_pvf_preparation_max_rss",
"ru_maxrss (maximum resident set size) observed for preparation (in kilobytes)",
)
)?,
registry,
)?,
preparation_max_resident: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_pvf_preparation_max_resident",
"max resident memory observed for preparation (in kilobytes)",
)
)?,
registry,
)?,
preparation_max_allocated: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_pvf_preparation_max_allocated",
"max allocated memory observed for preparation (in kilobytes)",
)
)?,
registry,
)?,
};
Ok(Metrics(Some(inner)))
}
Expand Down
241 changes: 241 additions & 0 deletions node/core/pvf/src/prepare/memory_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Copyright 2023 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Memory stats for preparation.
//!
//! Right now we gather three measurements:
//!
//! - `ru_maxrss` (resident set size) from `getrusage`.
//! - `resident` memory stat provided by `tikv-malloc-ctl`.
//! - `allocated` memory stat also from `tikv-malloc-ctl`.
//!
//! Currently we are only logging these for the purposes of gathering data. In the future, we may
//! use these stats to reject PVFs during pre-checking. See
//! <https://github.com/paritytech/polkadot/issues/6472#issuecomment-1381941762> for more
//! background.

use crate::{metrics::Metrics, LOG_TARGET};
use parity_scale_codec::{Decode, Encode};
use std::{
io,
sync::mpsc::{Receiver, RecvTimeoutError, Sender},
time::Duration,
};
use tikv_jemalloc_ctl::{epoch, stats, Error};
use tokio::task::JoinHandle;

#[cfg(target_os = "linux")]
use libc::{getrusage, rusage, timeval, RUSAGE_THREAD};

/// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if
/// supported by the OS, `ru_maxrss`.
#[derive(Encode, Decode)]
pub struct MemoryStats {
/// Memory stats from `tikv_jemalloc_ctl`.
pub memory_tracker_stats: Option<MemoryAllocationStats>,
/// `ru_maxrss` from `getrusage`. A string error since `io::Error` is not `Encode`able.
pub max_rss: Option<Result<i64, String>>,
}

/// Statistics of collected memory metrics.
#[non_exhaustive]
#[derive(Clone, Debug, Default, Encode, Decode)]
pub struct MemoryAllocationStats {
/// Total resident memory, in bytes.
pub resident: u64,
/// Total allocated memory, in bytes.
pub allocated: u64,
}

#[derive(Clone)]
struct MemoryAllocationTracker {
epoch: tikv_jemalloc_ctl::epoch_mib,
allocated: stats::allocated_mib,
resident: stats::resident_mib,
}

impl MemoryAllocationTracker {
pub fn new() -> Result<Self, Error> {
Ok(Self {
epoch: epoch::mib()?,
allocated: stats::allocated::mib()?,
resident: stats::resident::mib()?,
})
}

pub fn snapshot(&self) -> Result<MemoryAllocationStats, Error> {
// update stats by advancing the allocation epoch
self.epoch.advance()?;

// Convert to `u64`, as `usize` is not `Encode`able.
let allocated = self.allocated.read()? as u64;
let resident = self.resident.read()? as u64;
Ok(MemoryAllocationStats { allocated, resident })
}
}

/// Get the rusage stats for the current thread.
#[cfg(target_os = "linux")]
fn getrusage_thread() -> io::Result<rusage> {
let mut result = rusage {
ru_utime: timeval { tv_sec: 0, tv_usec: 0 },
ru_stime: timeval { tv_sec: 0, tv_usec: 0 },
ru_maxrss: 0,
ru_ixrss: 0,
ru_idrss: 0,
ru_isrss: 0,
ru_minflt: 0,
ru_majflt: 0,
ru_nswap: 0,
ru_inblock: 0,
ru_oublock: 0,
ru_msgsnd: 0,
ru_msgrcv: 0,
ru_nsignals: 0,
ru_nvcsw: 0,
ru_nivcsw: 0,
};
if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 {
return Err(io::Error::last_os_error())
}
Ok(result)
}

/// Gets the `ru_maxrss` for the current thread if the OS supports `getrusage`. Otherwise, just
/// returns `None`.
pub fn get_max_rss_thread() -> Option<io::Result<i64>> {
// `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works.
#[cfg(target_os = "linux")]
let max_rss = Some(getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss)));
#[cfg(not(target_os = "linux"))]
let max_rss = None;
max_rss
}

/// Runs a thread in the background that observes memory statistics. The goal is to try to get
/// accurate stats during preparation.
///
/// # Algorithm
///
/// 1. Create the memory tracker.
///
/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the
/// allocation epoch.
///
/// 3. When we receive a signal that preparation has completed, take one last snapshot and return
/// the maximum observed values.
///
/// # Errors
///
/// For simplicity, any errors are returned as a string. As this is not a critical component, errors
/// are used for informational purposes (logging) only.
pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result<MemoryAllocationStats, String> {
const POLL_INTERVAL: Duration = Duration::from_millis(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to sample the stats so often ?

Copy link
Contributor Author

@mrcnski mrcnski Jan 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, probably not. I was thinking of execution, where jobs take 10-25 ms.

Maybe 500ms for preparation is fine? I suspect that in most cases the memory will just grow, and the final measurement will be the max, so polling too often wouldn't buy much.

I'm wondering now if an attacker could craft a PVF that causes compiler memory to spike very quickly and then go back down. A coarse interval wouldn't catch that. Sounds like a very specific and unlikely attack though, and anyway the max_rss metric would be a useful backup stat for that case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is definitely a concern. It should not be possible to bypass the pre-checking checks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean that kind of attack @eskimor? Maybe the interval could be randomized to be less predictable and thus less gameable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am dubious that could work: The sampling interval will likely stay in the milliseconds range, but allocating huge amounts of memory can be accomplished way faster. Therefore it would still be able to trick this. What we could do on top is track the overall amount memory ever allocated - and if that value changed "too much" during two samples, we could also mark the PVF as invalid.

Anyhow, we are talking about the PVF preparation here. So this is about crafting a PVF that makes the compiler use huge amounts of memory for only a very short while. Given that the compiler code is not controlled by an attacker, this might not even be possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if that value changed "too much" during two samples, we could also mark the PVF as invalid.

Interesting idea. Why not just check max_rss at the end -- it should give us the maximum memory spike as well. If that's too high, we reject, even if the memory tracker observed metrics in the normal range.

Given that the compiler code is not controlled by an attacker, this might not even be possible.

Yep, I wondered the same thing above. Certainly for the purposes of gathering metrics right now it's not a concern. And later, maybe we can just rely on max_rss, if we find out it's not a useless stat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better way to handle this would be to not poll and instead set up a cgroup with an appropriate limit and subscribe to an event once that limit is breached.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, thanks! I'll extract that suggestion into a follow-up issue. I'll keep the polling for now, for the purposes of gathering metrics, as we don't yet know what the limit should be.


let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?;
let mut max_stats = MemoryAllocationStats::default();

let mut update_stats = || -> Result<(), String> {
let current_stats = tracker.snapshot().map_err(|err| err.to_string())?;
if current_stats.resident > max_stats.resident {
max_stats.resident = current_stats.resident;
}
if current_stats.allocated > max_stats.allocated {
max_stats.allocated = current_stats.allocated;
}
Ok(())
};

loop {
// Take a snapshot and update the max stats.
update_stats()?;

// Sleep.
match finished_rx.recv_timeout(POLL_INTERVAL) {
// Received finish signal.
Ok(()) => {
update_stats()?;
return Ok(max_stats)
},
// Timed out, restart loop.
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) =>
return Err("memory_tracker_loop: finished_rx disconnected".into()),
}
}
}

/// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this
/// error handling.
pub async fn get_memory_tracker_loop_stats(
fut: JoinHandle<Result<MemoryAllocationStats, String>>,
tx: Sender<()>,
) -> Option<MemoryAllocationStats> {
// Signal to the memory tracker thread to terminate.
if let Err(err) = tx.send(()) {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error sending signal to memory tracker_thread: {}", err
);
None
} else {
// Join on the thread handle.
match fut.await {
Ok(Ok(stats)) => Some(stats),
Ok(Err(err)) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error occurred in the memory tracker thread: {}", err
);
None
},
Err(err) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: error joining on memory tracker thread: {}", err
);
None
},
}
}
}

/// Helper function to send the memory metrics, if available, to prometheus.
pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: u32) {
if let Some(max_rss) = memory_stats.max_rss {
match max_rss {
Ok(max_rss) => metrics.observe_preparation_max_rss(max_rss as f64),
Err(err) => gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"error getting `ru_maxrss` in preparation thread: {}",
err
),
}
}

if let Some(tracker_stats) = memory_stats.memory_tracker_stats {
// We convert these stats from B to KB to match the unit of `ru_maxrss` from `getrusage`.
let resident_kb = (tracker_stats.resident / 1024) as f64;
let allocated_kb = (tracker_stats.allocated / 1024) as f64;

metrics.observe_preparation_max_resident(resident_kb);
metrics.observe_preparation_max_allocated(allocated_kb);
}
}
3 changes: 2 additions & 1 deletion node/core/pvf/src/prepare/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// Copyright 2021-2023 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
Expand All @@ -22,6 +22,7 @@
//! The pool will spawn workers in new processes and those should execute pass control to
//! [`worker_entrypoint`].

mod memory_stats;
mod pool;
mod queue;
mod worker;
Expand Down
Loading