Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mrcnski committed Jan 16, 2023
1 parent 488bb06 commit 61ebe72
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 15 deletions.
2 changes: 1 addition & 1 deletion node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ mod tests {
let pulse = pulse_every(Duration::from_millis(100));
futures::pin_mut!(pulse);

for _ in 0usize..5usize {
for _ in 0..5 {
let start = std::time::Instant::now();
let _ = pulse.next().await.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/prepare/memory_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ fn getrusage_thread() -> io::Result<i32> {
/// returns `None`.
pub fn get_max_rss_thread() -> Option<io::Result<i32>> {
#[cfg(target_os = "linux")]
let max_rss = Some(getrusage_thread().map(|rusage| max_rss));
let max_rss = Some(getrusage_thread().map(|rusage| rusage.max_rss));
#[cfg(not(target_os = "linux"))]
let max_rss = None;
max_rss
Expand Down
63 changes: 54 additions & 9 deletions node/core/pvf/src/prepare/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,10 @@ pub fn start(
#[cfg(test)]
mod tests {
use super::*;
use crate::{error::PrepareError, host::PRECHECK_PREPARATION_TIMEOUT};
use crate::{
error::PrepareError,
host::{LENIENT_PREPARATION_TIMEOUT, PRECHECK_PREPARATION_TIMEOUT},
};
use assert_matches::assert_matches;
use futures::{future::BoxFuture, FutureExt};
use slotmap::SlotMap;
Expand Down Expand Up @@ -629,6 +632,7 @@ mod tests {
priority: Priority::Normal,
pvf: pvf(1),
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
preparation_kind: PreparationKind::PreCheck,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);

Expand All @@ -646,12 +650,29 @@ mod tests {
#[tokio::test]
async fn dont_spawn_over_soft_limit_unless_critical() {
let mut test = Test::new(2, 3);
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;

let priority = Priority::Normal;
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout });
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;
let preparation_kind = PreparationKind::PreCheck;
test.send_queue(ToQueue::Enqueue {
priority,
pvf: pvf(1),
preparation_timeout,
preparation_kind,
});
test.send_queue(ToQueue::Enqueue {
priority,
pvf: pvf(2),
preparation_timeout,
preparation_kind,
});
// Start a non-precheck preparation for this one.
test.send_queue(ToQueue::Enqueue {
priority,
pvf: pvf(3),
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
preparation_kind: PreparationKind::FromExecutionRequest,
});

// Receive only two spawns.
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
Expand Down Expand Up @@ -680,6 +701,7 @@ mod tests {
priority: Priority::Critical,
pvf: pvf(4),
preparation_timeout,
preparation_kind,
});

// 2 out of 2 are working, but there is a critical job incoming. That means that spawning
Expand All @@ -691,11 +713,13 @@ mod tests {
async fn cull_unwanted() {
let mut test = Test::new(1, 2);
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;
let preparation_kind = PreparationKind::PreCheck;

test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
preparation_timeout,
preparation_kind,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(());
Expand All @@ -707,6 +731,7 @@ mod tests {
priority: Priority::Critical,
pvf: pvf(2),
preparation_timeout,
preparation_kind,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);

Expand All @@ -729,10 +754,28 @@ mod tests {
async fn worker_mass_die_out_doesnt_stall_queue() {
let mut test = Test::new(2, 2);

let (priority, preparation_timeout) = (Priority::Normal, PRECHECK_PREPARATION_TIMEOUT);
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout });
let priority = Priority::Normal;
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;
let preparation_kind = PreparationKind::PreCheck;
test.send_queue(ToQueue::Enqueue {
priority,
pvf: pvf(1),
preparation_timeout,
preparation_kind,
});
test.send_queue(ToQueue::Enqueue {
priority,
pvf: pvf(2),
preparation_timeout,
preparation_kind,
});
// Start a non-precheck preparation for this one.
test.send_queue(ToQueue::Enqueue {
priority,
pvf: pvf(3),
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
preparation_kind: PreparationKind::FromExecutionRequest,
});

assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
Expand Down Expand Up @@ -767,6 +810,7 @@ mod tests {
priority: Priority::Normal,
pvf: pvf(1),
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
preparation_kind: PreparationKind::PreCheck,
});

assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
Expand All @@ -792,6 +836,7 @@ mod tests {
priority: Priority::Normal,
pvf: pvf(1),
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
preparation_kind: PreparationKind::PreCheck,
});

assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
Expand Down
10 changes: 6 additions & 4 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ async fn handle_response(
metrics: &Metrics,
worker: IdleWorker,
result: PrepareResult,
memory_stats: MemoryStats,
memory_stats: Option<MemoryStats>,
pid: u32,
tmp_file: PathBuf,
artifact_path: PathBuf,
Expand Down Expand Up @@ -218,7 +218,9 @@ async fn handle_response(

// If there were no errors up until now, log the memory stats for a successful preparation, if
// available.
observe_memory_metrics(metrics, memory_stats, pid);
if let Some(memory_stats) = memory_stats {
observe_memory_metrics(metrics, memory_stats, pid);
}

outcome
}
Expand Down Expand Up @@ -325,7 +327,7 @@ async fn send_response(
async fn recv_response(
stream: &mut UnixStream,
pid: u32,
) -> io::Result<(PrepareResult, MemoryStats)> {
) -> io::Result<(PrepareResult, Option<MemoryStats>)> {
let result = framed_recv(stream).await?;
let result = PrepareResult::decode(&mut &result[..]).map_err(|e| {
// We received invalid bytes from the worker.
Expand All @@ -342,7 +344,7 @@ async fn recv_response(
)
})?;
let memory_stats = framed_recv(stream).await?;
let memory_stats = MemoryStats::decode(&mut &memory_stats[..]).map_err(|e| {
let memory_stats = Option::<MemoryStats>::decode(&mut &memory_stats[..]).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("prepare pvf recv_response: failed to decode memory stats: {:?}", e),
Expand Down

0 comments on commit 61ebe72

Please sign in to comment.