From 4f331d74c3004d9765b735ec66acc92edea62c7f Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 6 Feb 2023 12:17:21 +0100 Subject: [PATCH] pvf: Log memory metrics from preparation (#6565) * Add getrusage and memory tracker for precheck preparation * Log memory stats metrics after prechecking * Fix tests * Try to fix errors (linux-only so I'm relying on CI here) * Try to fix CI * Add module docs for `prepare/memory_stats.rs`; fix CI error * Report memory stats for all preparation jobs * Use `RUSAGE_SELF` instead of `RUSAGE_THREAD` Not sure why I did that -- was a brainfart on my end. * Revert last commit (RUSAGE_THREAD is correct) * Use exponential buckets * Use `RUSAGE_SELF` for `getrusage`; enable `max_rss` metric for MacOS * Increase poll interval * Revert "Use `RUSAGE_SELF` for `getrusage`; enable `max_rss` metric for MacOS" This reverts commit becf7a815409ab530fc61370abffcd1b97b9a777. --- Cargo.lock | 2 + node/core/pvf/Cargo.toml | 10 +- node/core/pvf/src/host.rs | 2 +- node/core/pvf/src/metrics.rs | 60 ++++++ node/core/pvf/src/prepare/memory_stats.rs | 243 ++++++++++++++++++++++ node/core/pvf/src/prepare/mod.rs | 3 +- node/core/pvf/src/prepare/pool.rs | 5 +- node/core/pvf/src/prepare/queue.rs | 24 ++- node/core/pvf/src/prepare/worker.rs | 166 +++++++++++---- node/overseer/src/memory_stats.rs | 8 +- node/overseer/src/metrics.rs | 4 +- 11 files changed, 469 insertions(+), 58 deletions(-) create mode 100644 node/core/pvf/src/prepare/memory_stats.rs diff --git a/Cargo.lock b/Cargo.lock index 3a1eae3c2714..376321bdbc23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6981,6 +6981,7 @@ dependencies = [ "futures", "futures-timer", "hex-literal", + "libc", "parity-scale-codec", "pin-project", "polkadot-core-primitives", @@ -7001,6 +7002,7 @@ dependencies = [ "tempfile", "test-parachain-adder", "test-parachain-halt", + "tikv-jemalloc-ctl", "tokio", "tracing-gum", ] diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml index cf2fa26bc81f..e918c5f90fb2 100644 --- a/node/core/pvf/Cargo.toml +++ b/node/core/pvf/Cargo.toml @@ -14,13 +14,14 @@ assert_matches = "1.4.0" cpu-time = "1.0.0" futures = "0.3.21" futures-timer = "3.0.2" -slotmap = "1.0" gum = { package = "tracing-gum", path = "../../gum" } pin-project = "1.0.9" rand = "0.8.5" +rayon = "1.5.1" +slotmap = "1.0" tempfile = "3.3.0" +tikv-jemalloc-ctl = "0.5.0" tokio = { version = "1.24.2", features = ["fs", "process"] } -rayon = "1.5.1" parity-scale-codec = { version = "3.3.0", default-features = false, features = ["derive"] } @@ -38,8 +39,11 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch = sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } +[target.'cfg(target_os = "linux")'.dependencies] +libc = "0.2.139" + [dev-dependencies] adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" } halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" } hex-literal = "0.3.4" -tempfile = "3.2.0" +tempfile = "3.3.0" diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 98a945be26f6..956c580380a0 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -845,7 +845,7 @@ mod tests { let pulse = pulse_every(Duration::from_millis(100)); futures::pin_mut!(pulse); - for _ in 0usize..5usize { + for _ in 0..5 { let start = std::time::Instant::now(); let _ = pulse.next().await.unwrap(); diff --git a/node/core/pvf/src/metrics.rs b/node/core/pvf/src/metrics.rs index 8db105d895ea..07a2bf46f530 100644 --- a/node/core/pvf/src/metrics.rs +++ b/node/core/pvf/src/metrics.rs @@ -72,6 +72,27 @@ impl Metrics { pub(crate) fn time_execution(&self) -> Option { self.0.as_ref().map(|metrics| metrics.execution_time.start_timer()) } + + /// Observe max_rss for preparation. + pub(crate) fn observe_preparation_max_rss(&self, max_rss: f64) { + if let Some(metrics) = &self.0 { + metrics.preparation_max_rss.observe(max_rss); + } + } + + /// Observe max resident memory for preparation. + pub(crate) fn observe_preparation_max_resident(&self, max_resident_kb: f64) { + if let Some(metrics) = &self.0 { + metrics.preparation_max_resident.observe(max_resident_kb); + } + } + + /// Observe max allocated memory for preparation. + pub(crate) fn observe_preparation_max_allocated(&self, max_allocated_kb: f64) { + if let Some(metrics) = &self.0 { + metrics.preparation_max_allocated.observe(max_allocated_kb); + } + } } #[derive(Clone)] @@ -85,6 +106,9 @@ struct MetricsInner { execute_finished: prometheus::Counter, preparation_time: prometheus::Histogram, execution_time: prometheus::Histogram, + preparation_max_rss: prometheus::Histogram, + preparation_max_allocated: prometheus::Histogram, + preparation_max_resident: prometheus::Histogram, } impl metrics::Metrics for Metrics { @@ -202,6 +226,42 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + preparation_max_rss: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_pvf_preparation_max_rss", + "ru_maxrss (maximum resident set size) observed for preparation (in kilobytes)", + ).buckets( + prometheus::exponential_buckets(8192.0, 2.0, 10) + .expect("arguments are always valid; qed"), + ), + )?, + registry, + )?, + preparation_max_resident: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_pvf_preparation_max_resident", + "max resident memory observed for preparation (in kilobytes)", + ).buckets( + prometheus::exponential_buckets(8192.0, 2.0, 10) + .expect("arguments are always valid; qed"), + ), + )?, + registry, + )?, + preparation_max_allocated: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_pvf_preparation_max_allocated", + "max allocated memory observed for preparation (in kilobytes)", + ).buckets( + prometheus::exponential_buckets(8192.0, 2.0, 10) + .expect("arguments are always valid; qed"), + ), + )?, + registry, + )?, }; Ok(Metrics(Some(inner))) } diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs new file mode 100644 index 000000000000..4765a196d54e --- /dev/null +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -0,0 +1,243 @@ +// Copyright 2023 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Memory stats for preparation. +//! +//! Right now we gather three measurements: +//! +//! - `ru_maxrss` (resident set size) from `getrusage`. +//! - `resident` memory stat provided by `tikv-malloc-ctl`. +//! - `allocated` memory stat also from `tikv-malloc-ctl`. +//! +//! Currently we are only logging these for the purposes of gathering data. In the future, we may +//! use these stats to reject PVFs during pre-checking. See +//! for more +//! background. + +use crate::{metrics::Metrics, LOG_TARGET}; +use parity_scale_codec::{Decode, Encode}; +use std::{ + io, + sync::mpsc::{Receiver, RecvTimeoutError, Sender}, + time::Duration, +}; +use tikv_jemalloc_ctl::{epoch, stats, Error}; +use tokio::task::JoinHandle; + +#[cfg(target_os = "linux")] +use libc::{getrusage, rusage, timeval, RUSAGE_THREAD}; + +/// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if +/// supported by the OS, `ru_maxrss`. +#[derive(Encode, Decode)] +pub struct MemoryStats { + /// Memory stats from `tikv_jemalloc_ctl`. + pub memory_tracker_stats: Option, + /// `ru_maxrss` from `getrusage`. A string error since `io::Error` is not `Encode`able. + pub max_rss: Option>, +} + +/// Statistics of collected memory metrics. +#[non_exhaustive] +#[derive(Clone, Debug, Default, Encode, Decode)] +pub struct MemoryAllocationStats { + /// Total resident memory, in bytes. + pub resident: u64, + /// Total allocated memory, in bytes. + pub allocated: u64, +} + +#[derive(Clone)] +struct MemoryAllocationTracker { + epoch: tikv_jemalloc_ctl::epoch_mib, + allocated: stats::allocated_mib, + resident: stats::resident_mib, +} + +impl MemoryAllocationTracker { + pub fn new() -> Result { + Ok(Self { + epoch: epoch::mib()?, + allocated: stats::allocated::mib()?, + resident: stats::resident::mib()?, + }) + } + + pub fn snapshot(&self) -> Result { + // update stats by advancing the allocation epoch + self.epoch.advance()?; + + // Convert to `u64`, as `usize` is not `Encode`able. + let allocated = self.allocated.read()? as u64; + let resident = self.resident.read()? as u64; + Ok(MemoryAllocationStats { allocated, resident }) + } +} + +/// Get the rusage stats for the current thread. +#[cfg(target_os = "linux")] +fn getrusage_thread() -> io::Result { + let mut result = rusage { + ru_utime: timeval { tv_sec: 0, tv_usec: 0 }, + ru_stime: timeval { tv_sec: 0, tv_usec: 0 }, + ru_maxrss: 0, + ru_ixrss: 0, + ru_idrss: 0, + ru_isrss: 0, + ru_minflt: 0, + ru_majflt: 0, + ru_nswap: 0, + ru_inblock: 0, + ru_oublock: 0, + ru_msgsnd: 0, + ru_msgrcv: 0, + ru_nsignals: 0, + ru_nvcsw: 0, + ru_nivcsw: 0, + }; + if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 { + return Err(io::Error::last_os_error()) + } + Ok(result) +} + +/// Gets the `ru_maxrss` for the current thread if the OS supports `getrusage`. Otherwise, just +/// returns `None`. +pub fn get_max_rss_thread() -> Option> { + // `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works. + #[cfg(target_os = "linux")] + let max_rss = Some(getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss))); + #[cfg(not(target_os = "linux"))] + let max_rss = None; + max_rss +} + +/// Runs a thread in the background that observes memory statistics. The goal is to try to get +/// accurate stats during preparation. +/// +/// # Algorithm +/// +/// 1. Create the memory tracker. +/// +/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the +/// allocation epoch. +/// +/// 3. When we receive a signal that preparation has completed, take one last snapshot and return +/// the maximum observed values. +/// +/// # Errors +/// +/// For simplicity, any errors are returned as a string. As this is not a critical component, errors +/// are used for informational purposes (logging) only. +pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result { + // This doesn't need to be too fine-grained since preparation currently takes 3-10s or more. + // Apart from that, there is not really a science to this number. + const POLL_INTERVAL: Duration = Duration::from_millis(100); + + let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?; + let mut max_stats = MemoryAllocationStats::default(); + + let mut update_stats = || -> Result<(), String> { + let current_stats = tracker.snapshot().map_err(|err| err.to_string())?; + if current_stats.resident > max_stats.resident { + max_stats.resident = current_stats.resident; + } + if current_stats.allocated > max_stats.allocated { + max_stats.allocated = current_stats.allocated; + } + Ok(()) + }; + + loop { + // Take a snapshot and update the max stats. + update_stats()?; + + // Sleep. + match finished_rx.recv_timeout(POLL_INTERVAL) { + // Received finish signal. + Ok(()) => { + update_stats()?; + return Ok(max_stats) + }, + // Timed out, restart loop. + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => + return Err("memory_tracker_loop: finished_rx disconnected".into()), + } + } +} + +/// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this +/// error handling. +pub async fn get_memory_tracker_loop_stats( + fut: JoinHandle>, + tx: Sender<()>, +) -> Option { + // Signal to the memory tracker thread to terminate. + if let Err(err) = tx.send(()) { + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: error sending signal to memory tracker_thread: {}", err + ); + None + } else { + // Join on the thread handle. + match fut.await { + Ok(Ok(stats)) => Some(stats), + Ok(Err(err)) => { + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: error occurred in the memory tracker thread: {}", err + ); + None + }, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: error joining on memory tracker thread: {}", err + ); + None + }, + } + } +} + +/// Helper function to send the memory metrics, if available, to prometheus. +pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: u32) { + if let Some(max_rss) = memory_stats.max_rss { + match max_rss { + Ok(max_rss) => metrics.observe_preparation_max_rss(max_rss as f64), + Err(err) => gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "error getting `ru_maxrss` in preparation thread: {}", + err + ), + } + } + + if let Some(tracker_stats) = memory_stats.memory_tracker_stats { + // We convert these stats from B to KB to match the unit of `ru_maxrss` from `getrusage`. + let resident_kb = (tracker_stats.resident / 1024) as f64; + let allocated_kb = (tracker_stats.allocated / 1024) as f64; + + metrics.observe_preparation_max_resident(resident_kb); + metrics.observe_preparation_max_allocated(allocated_kb); + } +} diff --git a/node/core/pvf/src/prepare/mod.rs b/node/core/pvf/src/prepare/mod.rs index ac03cefc6fdb..4cbd63eff7d2 100644 --- a/node/core/pvf/src/prepare/mod.rs +++ b/node/core/pvf/src/prepare/mod.rs @@ -1,4 +1,4 @@ -// Copyright 2021 Parity Technologies (UK) Ltd. +// Copyright 2021-2023 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify @@ -22,6 +22,7 @@ //! The pool will spawn workers in new processes and those should execute pass control to //! [`worker_entrypoint`]. +mod memory_stats; mod pool; mod queue; mod worker; diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 0d39623c99db..49670e4c1ac2 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -220,6 +220,7 @@ fn handle_to_pool( let preparation_timer = metrics.time_preparation(); mux.push( start_work_task( + metrics.clone(), worker, idle, code, @@ -268,6 +269,7 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Po } async fn start_work_task( + metrics: Metrics, worker: Worker, idle: IdleWorker, code: Arc>, @@ -277,7 +279,8 @@ async fn start_work_task( _preparation_timer: Option, ) -> PoolEvent { let outcome = - worker::start_work(idle, code, &cache_path, artifact_path, preparation_timeout).await; + worker::start_work(&metrics, idle, code, &cache_path, artifact_path, preparation_timeout) + .await; PoolEvent::StartWork(worker, outcome) } diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index c44301c7427b..32e9bfa70748 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -492,7 +492,10 @@ pub fn start( #[cfg(test)] mod tests { use super::*; - use crate::{error::PrepareError, host::PRECHECK_PREPARATION_TIMEOUT}; + use crate::{ + error::PrepareError, + host::{LENIENT_PREPARATION_TIMEOUT, PRECHECK_PREPARATION_TIMEOUT}, + }; use assert_matches::assert_matches; use futures::{future::BoxFuture, FutureExt}; use slotmap::SlotMap; @@ -628,12 +631,17 @@ mod tests { #[tokio::test] async fn dont_spawn_over_soft_limit_unless_critical() { let mut test = Test::new(2, 3); - let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; let priority = Priority::Normal; + let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout }); test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout }); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout }); + // Start a non-precheck preparation for this one. + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(3), + preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + }); // Receive only two spawns. assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -711,10 +719,16 @@ mod tests { async fn worker_mass_die_out_doesnt_stall_queue() { let mut test = Test::new(2, 2); - let (priority, preparation_timeout) = (Priority::Normal, PRECHECK_PREPARATION_TIMEOUT); + let priority = Priority::Normal; + let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout }); test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout }); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout }); + // Start a non-precheck preparation for this one. + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(3), + preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index d3550fe3afe6..bb6e120a6691 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -14,9 +14,14 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use super::memory_stats::{ + get_max_rss_thread, get_memory_tracker_loop_stats, memory_tracker_loop, observe_memory_metrics, + MemoryStats, +}; use crate::{ artifacts::CompiledArtifact, error::{PrepareError, PrepareResult}, + metrics::Metrics, worker_common::{ bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, @@ -73,6 +78,7 @@ pub enum Outcome { /// NOTE: Returning the `TimedOut`, `IoErr` or `Unreachable` outcomes will trigger the child process /// being killed. pub async fn start_work( + metrics: &Metrics, worker: IdleWorker, code: Arc>, cache_path: &Path, @@ -109,14 +115,16 @@ pub async fn start_work( // load, but the CPU resources of the child can only be measured from the parent after the // child process terminates. let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; - let result = tokio::time::timeout(timeout, framed_recv(&mut stream)).await; + let result = tokio::time::timeout(timeout, recv_response(&mut stream, pid)).await; match result { // Received bytes from worker within the time limit. - Ok(Ok(response_bytes)) => - handle_response_bytes( + Ok(Ok((prepare_result, memory_stats))) => + handle_response( + metrics, IdleWorker { stream, pid }, - response_bytes, + prepare_result, + memory_stats, pid, tmp_file, artifact_path, @@ -151,29 +159,16 @@ pub async fn start_work( /// /// NOTE: Here we know the artifact exists, but is still located in a temporary file which will be /// cleared by `with_tmp_file`. -async fn handle_response_bytes( +async fn handle_response( + metrics: &Metrics, worker: IdleWorker, - response_bytes: Vec, + result: PrepareResult, + memory_stats: Option, pid: u32, tmp_file: PathBuf, artifact_path: PathBuf, preparation_timeout: Duration, ) -> Outcome { - // By convention we expect encoded `PrepareResult`. - let result = match PrepareResult::decode(&mut response_bytes.as_slice()) { - Ok(result) => result, - Err(err) => { - // We received invalid bytes from the worker. - let bound_bytes = &response_bytes[..response_bytes.len().min(4)]; - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "received unexpected response from the prepare worker: {}", - HexDisplay::from(&bound_bytes), - ); - return Outcome::IoErr(err.to_string()) - }, - }; let cpu_time_elapsed = match result { Ok(result) => result, // Timed out on the child. This should already be logged by the child. @@ -202,7 +197,7 @@ async fn handle_response_bytes( artifact_path.display(), ); - match tokio::fs::rename(&tmp_file, &artifact_path).await { + let outcome = match tokio::fs::rename(&tmp_file, &artifact_path).await { Ok(()) => Outcome::Concluded { worker, result }, Err(err) => { gum::warn!( @@ -215,7 +210,15 @@ async fn handle_response_bytes( ); Outcome::RenameTmpFileErr { worker, result, err: format!("{:?}", err) } }, + }; + + // If there were no errors up until now, log the memory stats for a successful preparation, if + // available. + if let Some(memory_stats) = memory_stats { + observe_memory_metrics(metrics, memory_stats, pid); } + + outcome } /// Create a temporary file for an artifact at the given cache path and execute the given @@ -288,17 +291,75 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf, ) })?; let preparation_timeout = framed_recv(stream).await?; - let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|_| { + let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|e| { io::Error::new( io::ErrorKind::Other, - "prepare pvf recv_request: failed to decode duration".to_string(), + format!("prepare pvf recv_request: failed to decode duration: {:?}", e), ) })?; Ok((code, tmp_file, preparation_timeout)) } +async fn send_response( + stream: &mut UnixStream, + result: PrepareResult, + memory_stats: Option, +) -> io::Result<()> { + framed_send(stream, &result.encode()).await?; + framed_send(stream, &memory_stats.encode()).await +} + +async fn recv_response( + stream: &mut UnixStream, + pid: u32, +) -> io::Result<(PrepareResult, Option)> { + let result = framed_recv(stream).await?; + let result = PrepareResult::decode(&mut &result[..]).map_err(|e| { + // We received invalid bytes from the worker. + let bound_bytes = &result[..result.len().min(4)]; + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "received unexpected response from the prepare worker: {}", + HexDisplay::from(&bound_bytes), + ); + io::Error::new( + io::ErrorKind::Other, + format!("prepare pvf recv_response: failed to decode result: {:?}", e), + ) + })?; + let memory_stats = framed_recv(stream).await?; + let memory_stats = Option::::decode(&mut &memory_stats[..]).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("prepare pvf recv_response: failed to decode memory stats: {:?}", e), + ) + })?; + Ok((result, memory_stats)) +} + /// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies /// the path to the socket used to communicate with the host. +/// +/// # Flow +/// +/// This runs the following in a loop: +/// +/// 1. Get the code and parameters for preparation from the host. +/// +/// 2. Start a memory tracker in a separate thread. +/// +/// 3. Start the CPU time monitor loop and the actual preparation in two separate threads. +/// +/// 4. Select on the two threads created in step 3. If the CPU timeout was hit, the CPU time monitor +/// thread will trigger first. +/// +/// 5. Stop the memory tracker and get the stats. +/// +/// 6. If compilation succeeded, write the compiled artifact into a temporary file. +/// +/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we +/// send that in the `PrepareResult`. pub fn worker_entrypoint(socket_path: &str) { worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { loop { @@ -309,26 +370,40 @@ pub fn worker_entrypoint(socket_path: &str) { "worker: preparing artifact", ); - // Used to signal to the cpu time monitor thread that it can finish. - let (finished_tx, finished_rx) = channel::<()>(); let cpu_time_start = ProcessTime::now(); + // Run the memory tracker. + let (memory_tracker_tx, memory_tracker_rx) = channel::<()>(); + let memory_tracker_fut = + rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx)); + // Spawn a new thread that runs the CPU time monitor. - let thread_fut = rt_handle + let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); + let cpu_time_monitor_fut = rt_handle + .spawn_blocking(move || { + cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx) + }) + .fuse(); + // Spawn another thread for preparation. + let prepare_fut = rt_handle .spawn_blocking(move || { - cpu_time_monitor_loop(cpu_time_start, preparation_timeout, finished_rx) + let prepare_result = prepare_artifact(&code); + + // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. + let max_rss = get_max_rss_thread(); + + (prepare_result, max_rss) }) .fuse(); - let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)).fuse(); - pin_mut!(thread_fut); + pin_mut!(cpu_time_monitor_fut); pin_mut!(prepare_fut); - let result = select_biased! { + let (result, memory_stats) = select_biased! { // If this future is not selected, the join handle is dropped and the thread will // finish in the background. - join_res = thread_fut => { - match join_res { + join_res = cpu_time_monitor_fut => { + let result = match join_res { Ok(Some(cpu_time_elapsed)) => { // Log if we exceed the timeout and the other thread hasn't finished. gum::warn!( @@ -342,18 +417,27 @@ pub fn worker_entrypoint(socket_path: &str) { }, Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())), Err(err) => Err(PrepareError::IoErr(err.to_string())), - } + }; + (result, None) }, compilation_res = prepare_fut => { let cpu_time_elapsed = cpu_time_start.elapsed(); - let _ = finished_tx.send(()); + let _ = cpu_time_monitor_tx.send(()); - match compilation_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) { - Err(err) => { + match compilation_res.unwrap_or_else(|err| (Err(PrepareError::IoErr(err.to_string())), None)) { + (Err(err), _) => { // Serialized error will be written into the socket. - Err(err) + (Err(err), None) }, - Ok(compiled_artifact) => { + (Ok(compiled_artifact), max_rss) => { + // Stop the memory stats worker and get its observed memory stats. + let memory_tracker_stats = + get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx).await; + let memory_stats = MemoryStats { + memory_tracker_stats, + max_rss: max_rss.map(|inner| inner.map_err(|e| e.to_string())), + }; + // Write the serialized artifact into a temp file. // // PVF host only keeps artifacts statuses in its memory, successfully @@ -369,13 +453,13 @@ pub fn worker_entrypoint(socket_path: &str) { ); tokio::fs::write(&dest, &compiled_artifact).await?; - Ok(cpu_time_elapsed) + (Ok(cpu_time_elapsed), Some(memory_stats)) }, } }, }; - framed_send(&mut stream, result.encode().as_slice()).await?; + send_response(&mut stream, result, memory_stats).await?; } }); } diff --git a/node/overseer/src/memory_stats.rs b/node/overseer/src/memory_stats.rs index 670762a4935c..908e20cc213a 100644 --- a/node/overseer/src/memory_stats.rs +++ b/node/overseer/src/memory_stats.rs @@ -36,8 +36,8 @@ impl MemoryAllocationTracker { // update stats by advancing the allocation epoch self.epoch.advance()?; - let allocated: u64 = self.allocated.read()? as _; - let resident: u64 = self.resident.read()? as _; + let allocated = self.allocated.read()?; + let resident = self.resident.read()?; Ok(MemoryAllocationSnapshot { allocated, resident }) } } @@ -47,7 +47,7 @@ impl MemoryAllocationTracker { #[derive(Debug, Clone)] pub struct MemoryAllocationSnapshot { /// Total resident memory, in bytes. - pub resident: u64, + pub resident: usize, /// Total allocated memory, in bytes. - pub allocated: u64, + pub allocated: usize, } diff --git a/node/overseer/src/metrics.rs b/node/overseer/src/metrics.rs index bb7d98a68f2e..b7a4ff443fae 100644 --- a/node/overseer/src/metrics.rs +++ b/node/overseer/src/metrics.rs @@ -69,8 +69,8 @@ impl Metrics { pub(crate) fn memory_stats_snapshot(&self, memory_stats: MemoryAllocationSnapshot) { if let Some(metrics) = &self.0 { - metrics.memory_stats_allocated.set(memory_stats.allocated); - metrics.memory_stats_resident.set(memory_stats.resident); + metrics.memory_stats_allocated.set(memory_stats.allocated as u64); + metrics.memory_stats_resident.set(memory_stats.resident as u64); } }