From 61ebe728299c02c95470b68f341860d7df0966ef Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 16 Jan 2023 17:01:54 -0500 Subject: [PATCH] Fix tests --- node/core/pvf/src/host.rs | 2 +- node/core/pvf/src/prepare/memory_stats.rs | 2 +- node/core/pvf/src/prepare/queue.rs | 63 +++++++++++++++++++---- node/core/pvf/src/prepare/worker.rs | 10 ++-- 4 files changed, 62 insertions(+), 15 deletions(-) diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index c85d54eb46e0..d892be32efc6 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -851,7 +851,7 @@ mod tests { let pulse = pulse_every(Duration::from_millis(100)); futures::pin_mut!(pulse); - for _ in 0usize..5usize { + for _ in 0..5 { let start = std::time::Instant::now(); let _ = pulse.next().await.unwrap(); diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index 35e057e5f0cc..460872779dbc 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -105,7 +105,7 @@ fn getrusage_thread() -> io::Result { /// returns `None`. pub fn get_max_rss_thread() -> Option> { #[cfg(target_os = "linux")] - let max_rss = Some(getrusage_thread().map(|rusage| max_rss)); + let max_rss = Some(getrusage_thread().map(|rusage| rusage.max_rss)); #[cfg(not(target_os = "linux"))] let max_rss = None; max_rss diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index 91e6bfa3c7a1..b3352b43b0d7 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -510,7 +510,10 @@ pub fn start( #[cfg(test)] mod tests { use super::*; - use crate::{error::PrepareError, host::PRECHECK_PREPARATION_TIMEOUT}; + use crate::{ + error::PrepareError, + host::{LENIENT_PREPARATION_TIMEOUT, PRECHECK_PREPARATION_TIMEOUT}, + }; use assert_matches::assert_matches; use futures::{future::BoxFuture, FutureExt}; use slotmap::SlotMap; @@ -629,6 +632,7 @@ mod tests { priority: Priority::Normal, pvf: pvf(1), preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::PreCheck, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -646,12 +650,29 @@ mod tests { #[tokio::test] async fn dont_spawn_over_soft_limit_unless_critical() { let mut test = Test::new(2, 3); - let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; let priority = Priority::Normal; - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout }); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout }); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout }); + let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; + let preparation_kind = PreparationKind::PreCheck; + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(1), + preparation_timeout, + preparation_kind, + }); + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(2), + preparation_timeout, + preparation_kind, + }); + // Start a non-precheck preparation for this one. + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(3), + preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::FromExecutionRequest, + }); // Receive only two spawns. assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -680,6 +701,7 @@ mod tests { priority: Priority::Critical, pvf: pvf(4), preparation_timeout, + preparation_kind, }); // 2 out of 2 are working, but there is a critical job incoming. That means that spawning @@ -691,11 +713,13 @@ mod tests { async fn cull_unwanted() { let mut test = Test::new(1, 2); let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; + let preparation_kind = PreparationKind::PreCheck; test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1), preparation_timeout, + preparation_kind, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); let w1 = test.workers.insert(()); @@ -707,6 +731,7 @@ mod tests { priority: Priority::Critical, pvf: pvf(2), preparation_timeout, + preparation_kind, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -729,10 +754,28 @@ mod tests { async fn worker_mass_die_out_doesnt_stall_queue() { let mut test = Test::new(2, 2); - let (priority, preparation_timeout) = (Priority::Normal, PRECHECK_PREPARATION_TIMEOUT); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout }); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout }); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout }); + let priority = Priority::Normal; + let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; + let preparation_kind = PreparationKind::PreCheck; + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(1), + preparation_timeout, + preparation_kind, + }); + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(2), + preparation_timeout, + preparation_kind, + }); + // Start a non-precheck preparation for this one. + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(3), + preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::FromExecutionRequest, + }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -767,6 +810,7 @@ mod tests { priority: Priority::Normal, pvf: pvf(1), preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::PreCheck, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -792,6 +836,7 @@ mod tests { priority: Priority::Normal, pvf: pvf(1), preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::PreCheck, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index b75c51a6c5a7..5b41978dc8a1 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -167,7 +167,7 @@ async fn handle_response( metrics: &Metrics, worker: IdleWorker, result: PrepareResult, - memory_stats: MemoryStats, + memory_stats: Option, pid: u32, tmp_file: PathBuf, artifact_path: PathBuf, @@ -218,7 +218,9 @@ async fn handle_response( // If there were no errors up until now, log the memory stats for a successful preparation, if // available. - observe_memory_metrics(metrics, memory_stats, pid); + if let Some(memory_stats) = memory_stats { + observe_memory_metrics(metrics, memory_stats, pid); + } outcome } @@ -325,7 +327,7 @@ async fn send_response( async fn recv_response( stream: &mut UnixStream, pid: u32, -) -> io::Result<(PrepareResult, MemoryStats)> { +) -> io::Result<(PrepareResult, Option)> { let result = framed_recv(stream).await?; let result = PrepareResult::decode(&mut &result[..]).map_err(|e| { // We received invalid bytes from the worker. @@ -342,7 +344,7 @@ async fn recv_response( ) })?; let memory_stats = framed_recv(stream).await?; - let memory_stats = MemoryStats::decode(&mut &memory_stats[..]).map_err(|e| { + let memory_stats = Option::::decode(&mut &memory_stats[..]).map_err(|e| { io::Error::new( io::ErrorKind::Other, format!("prepare pvf recv_response: failed to decode memory stats: {:?}", e),