From 514653c011241ce3385c36804156e7d6f1d1e96a Mon Sep 17 00:00:00 2001 From: Rustie Lin Date: Thu, 3 Oct 2024 09:45:38 -0700 Subject: [PATCH] [forge] fail fast deployer if job failed (#14854) --- .../forge/src/backend/k8s/cluster_helper.rs | 165 +++++++++++------- testsuite/forge/src/backend/k8s/swarm.rs | 5 - .../src/backend/k8s_deployer/deployer.rs | 12 +- 3 files changed, 113 insertions(+), 69 deletions(-) diff --git a/testsuite/forge/src/backend/k8s/cluster_helper.rs b/testsuite/forge/src/backend/k8s/cluster_helper.rs index 92e44e92f25fd..46131ac4e6435 100644 --- a/testsuite/forge/src/backend/k8s/cluster_helper.rs +++ b/testsuite/forge/src/backend/k8s/cluster_helper.rs @@ -71,71 +71,118 @@ pub fn dump_string_to_file( Ok(file_path_str) } +#[derive(Error, Debug)] +#[error("{0}")] +enum LogJobError { + RetryableError(String), + FinalError(String), +} + +/** + * Tail the logs of a job. Returns OK if the job has a pod that succeeds. + * Assumes that the job only runs once and exits, and has no configured retry policy (i.e. backoffLimit = 0) + */ +async fn tail_job_logs( + jobs_api: Arc>, + job_name: String, + job_namespace: String, +) -> Result<(), LogJobError> { + let genesis_job = jobs_api + .get_status(&job_name) + .await + .map_err(|e| LogJobError::FinalError(format!("Failed to get job status: {}", e)))?; + + let status = genesis_job.status.expect("Job status not found"); + info!("Job {} status: {:?}", &job_name, status); + match status.active { + Some(active) => { + if active < 1 { + return Err(LogJobError::RetryableError(format!( + "Job {} has no active pods. Maybe it has not started yet", + &job_name + ))); + } + // try tailing the logs of the genesis job + // by the time this is done, we can re-evalulate its status + let mut command = tokio::process::Command::new(KUBECTL_BIN) + .args([ + "-n", + &job_namespace, + "logs", + "--tail=10", // in case of connection reset we only want the last few lines to avoid spam + "-f", + format!("job/{}", &job_name).as_str(), + ]) + .stdout(Stdio::piped()) + .spawn() + .map_err(|e| { + LogJobError::RetryableError(format!("Failed to spawn command: {}", e)) + })?; + // Ensure the command has stdout + let stdout = command.stdout.take().ok_or_else(|| { + LogJobError::RetryableError("Failed to capture stdout".to_string()) + })?; + + // Create a BufReader to read the output asynchronously, line by line + let mut reader = BufReader::new(stdout).lines(); + + // Iterate over the lines as they come + while let Some(line) = reader.next_line().await.transpose() { + match line { + Ok(line) => { + info!("[{}]: {}", &job_name, line); // Add a prefix to each line + }, + Err(e) => { + return Err(LogJobError::RetryableError(format!( + "Error reading line: {}", + e + ))); + }, + } + } + command.wait().await.map_err(|e| { + LogJobError::RetryableError(format!("Error waiting for command: {}", e)) + })?; + }, + None => info!("Job {} has no active pods running", &job_name), + } + match status.succeeded { + Some(_) => { + info!("Job {} succeeded!", &job_name); + return Ok(()); + }, + None => info!("Job {} has no succeeded pods", &job_name), + } + if status.failed.is_some() { + info!("Job {} failed!", &job_name); + return Err(LogJobError::FinalError(format!("Job {} failed", &job_name))); + } + Err(LogJobError::RetryableError(format!( + "Job {} has no succeeded or failed pods. Maybe it has not started yet.", + &job_name + ))) +} + /// Waits for a job to complete, while tailing the job's logs pub async fn wait_log_job( jobs_api: Arc>, job_namespace: &str, job_name: String, - retry_strategy: impl Iterator, + retry_policy: RetryPolicy, ) -> Result<()> { - aptos_retrier::retry_async(retry_strategy, || { - let jobs_api = jobs_api.clone(); - let job_name = job_name.clone(); - Box::pin(async move { - let genesis_job = jobs_api.get_status(&job_name).await.unwrap(); - - let status = genesis_job.status.unwrap(); - info!("Job {} status: {:?}", &job_name, status); - match status.active { - Some(_) => { - // try tailing the logs of the genesis job - // by the time this is done, we can re-evalulate its status - let mut command = tokio::process::Command::new(KUBECTL_BIN) - .args([ - "-n", - job_namespace, - "logs", - "--tail=10", // in case of connection reset we only want the last few lines to avoid spam - "-f", - format!("job/{}", &job_name).as_str(), - ]) - .stdout(Stdio::piped()) - .spawn()?; - // Ensure the command has stdout - let stdout = command - .stdout - .take() - .ok_or_else(|| anyhow::anyhow!("Failed to capture stdout"))?; - - // Create a BufReader to read the output asynchronously, line by line - let mut reader = BufReader::new(stdout).lines(); - - // Iterate over the lines as they come - while let Some(line) = reader.next_line().await.transpose() { - match line { - Ok(line) => { - info!("[{}]: {}", &job_name, line); // Add a prefix to each line - }, - Err(e) => { - bail!("Error reading line: {}", e); - }, - } - } - command.wait().await?; - }, - None => info!("Job {} completed running", &job_name), - } - info!("Job {} status: {:?}", &job_name, status); - match status.succeeded { - Some(_) => { - info!("Job {} done", &job_name); - Ok(()) - }, - None => bail!("Job {} did not succeed", &job_name), - } - }) - }) - .await + retry_policy + .retry_if( + move || { + tail_job_logs( + jobs_api.clone(), + job_name.clone(), + job_namespace.to_string(), + ) + }, + |e: &LogJobError| matches!(e, LogJobError::RetryableError(_)), + ) + .await?; + Ok(()) } /// Waits for a given number of HAProxy K8s Deployments to be ready diff --git a/testsuite/forge/src/backend/k8s/swarm.rs b/testsuite/forge/src/backend/k8s/swarm.rs index 4d3c30e31801e..9c249dc1c6aea 100644 --- a/testsuite/forge/src/backend/k8s/swarm.rs +++ b/testsuite/forge/src/backend/k8s/swarm.rs @@ -460,11 +460,6 @@ pub fn k8s_wait_nodes_strategy() -> impl Iterator { fixed_retry_strategy(10 * 1000, 120) } -pub fn k8s_wait_indexer_strategy() -> impl Iterator { - // retry every 10 seconds for 20 minutes - fixed_retry_strategy(10 * 1000, 120) -} - async fn list_stateful_sets(client: K8sClient, kube_namespace: &str) -> Result> { let stateful_set_api: Api = Api::namespaced(client, kube_namespace); let lp = ListParams::default(); diff --git a/testsuite/forge/src/backend/k8s_deployer/deployer.rs b/testsuite/forge/src/backend/k8s_deployer/deployer.rs index 891748cfafd21..62e7ce724d0b3 100644 --- a/testsuite/forge/src/backend/k8s_deployer/deployer.rs +++ b/testsuite/forge/src/backend/k8s_deployer/deployer.rs @@ -5,9 +5,8 @@ use super::{ DEFAULT_FORGE_DEPLOYER_IMAGE_TAG, FORGE_DEPLOYER_SERVICE_ACCOUNT_NAME, FORGE_DEPLOYER_VALUES_ENV_VAR_NAME, }; -use crate::{ - k8s_wait_indexer_strategy, maybe_create_k8s_resource, wait_log_job, K8sApi, ReadWrite, Result, -}; +use crate::{maybe_create_k8s_resource, wait_log_job, K8sApi, ReadWrite, Result}; +use again::RetryPolicy; use aptos_logger::info; use k8s_openapi::api::{ batch::v1::Job, @@ -18,7 +17,7 @@ use kube::{ api::{ObjectMeta, PostParams}, ResourceExt, }; -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; /// The ForgeDeployerManager is responsible for managing the lifecycle of forge deployers, which deploy the /// forge components to the k8s cluster. @@ -240,11 +239,14 @@ impl ForgeDeployerManager { * Wait for the deployer job to complete. */ pub async fn wait_completed(&self) -> Result<()> { + // retry for ~10 min at a fixed interval. Note the actual job may take longer than this to complete, but the last attempt to tail the logs will succeed before then + // Ideally the deployer itself knows to fail fast depending on the workloads' health + let retry_policy = RetryPolicy::fixed(Duration::from_secs(10)).with_max_retries(6 * 10); wait_log_job( self.jobs_api.clone(), &self.namespace, self.get_name(), - k8s_wait_indexer_strategy(), + retry_policy, ) .await }