Skip to content

Commit

Permalink
[forge] fail fast deployer if job failed (#14854)
Browse files Browse the repository at this point in the history
  • Loading branch information
rustielin authored Oct 3, 2024
1 parent 68bba58 commit 514653c
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 69 deletions.
165 changes: 106 additions & 59 deletions testsuite/forge/src/backend/k8s/cluster_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,71 +71,118 @@ pub fn dump_string_to_file(
Ok(file_path_str)
}

#[derive(Error, Debug)]
#[error("{0}")]
enum LogJobError {
RetryableError(String),
FinalError(String),
}

/**
* Tail the logs of a job. Returns OK if the job has a pod that succeeds.
* Assumes that the job only runs once and exits, and has no configured retry policy (i.e. backoffLimit = 0)
*/
async fn tail_job_logs(
jobs_api: Arc<dyn ReadWrite<Job>>,
job_name: String,
job_namespace: String,
) -> Result<(), LogJobError> {
let genesis_job = jobs_api
.get_status(&job_name)
.await
.map_err(|e| LogJobError::FinalError(format!("Failed to get job status: {}", e)))?;

let status = genesis_job.status.expect("Job status not found");
info!("Job {} status: {:?}", &job_name, status);
match status.active {
Some(active) => {
if active < 1 {
return Err(LogJobError::RetryableError(format!(
"Job {} has no active pods. Maybe it has not started yet",
&job_name
)));
}
// try tailing the logs of the genesis job
// by the time this is done, we can re-evalulate its status
let mut command = tokio::process::Command::new(KUBECTL_BIN)
.args([
"-n",
&job_namespace,
"logs",
"--tail=10", // in case of connection reset we only want the last few lines to avoid spam
"-f",
format!("job/{}", &job_name).as_str(),
])
.stdout(Stdio::piped())
.spawn()
.map_err(|e| {
LogJobError::RetryableError(format!("Failed to spawn command: {}", e))
})?;
// Ensure the command has stdout
let stdout = command.stdout.take().ok_or_else(|| {
LogJobError::RetryableError("Failed to capture stdout".to_string())
})?;

// Create a BufReader to read the output asynchronously, line by line
let mut reader = BufReader::new(stdout).lines();

// Iterate over the lines as they come
while let Some(line) = reader.next_line().await.transpose() {
match line {
Ok(line) => {
info!("[{}]: {}", &job_name, line); // Add a prefix to each line
},
Err(e) => {
return Err(LogJobError::RetryableError(format!(
"Error reading line: {}",
e
)));
},
}
}
command.wait().await.map_err(|e| {
LogJobError::RetryableError(format!("Error waiting for command: {}", e))
})?;
},
None => info!("Job {} has no active pods running", &job_name),
}
match status.succeeded {
Some(_) => {
info!("Job {} succeeded!", &job_name);
return Ok(());
},
None => info!("Job {} has no succeeded pods", &job_name),
}
if status.failed.is_some() {
info!("Job {} failed!", &job_name);
return Err(LogJobError::FinalError(format!("Job {} failed", &job_name)));
}
Err(LogJobError::RetryableError(format!(
"Job {} has no succeeded or failed pods. Maybe it has not started yet.",
&job_name
)))
}

/// Waits for a job to complete, while tailing the job's logs
pub async fn wait_log_job(
jobs_api: Arc<dyn ReadWrite<Job>>,
job_namespace: &str,
job_name: String,
retry_strategy: impl Iterator<Item = Duration>,
retry_policy: RetryPolicy,
) -> Result<()> {
aptos_retrier::retry_async(retry_strategy, || {
let jobs_api = jobs_api.clone();
let job_name = job_name.clone();
Box::pin(async move {
let genesis_job = jobs_api.get_status(&job_name).await.unwrap();

let status = genesis_job.status.unwrap();
info!("Job {} status: {:?}", &job_name, status);
match status.active {
Some(_) => {
// try tailing the logs of the genesis job
// by the time this is done, we can re-evalulate its status
let mut command = tokio::process::Command::new(KUBECTL_BIN)
.args([
"-n",
job_namespace,
"logs",
"--tail=10", // in case of connection reset we only want the last few lines to avoid spam
"-f",
format!("job/{}", &job_name).as_str(),
])
.stdout(Stdio::piped())
.spawn()?;
// Ensure the command has stdout
let stdout = command
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to capture stdout"))?;

// Create a BufReader to read the output asynchronously, line by line
let mut reader = BufReader::new(stdout).lines();

// Iterate over the lines as they come
while let Some(line) = reader.next_line().await.transpose() {
match line {
Ok(line) => {
info!("[{}]: {}", &job_name, line); // Add a prefix to each line
},
Err(e) => {
bail!("Error reading line: {}", e);
},
}
}
command.wait().await?;
},
None => info!("Job {} completed running", &job_name),
}
info!("Job {} status: {:?}", &job_name, status);
match status.succeeded {
Some(_) => {
info!("Job {} done", &job_name);
Ok(())
},
None => bail!("Job {} did not succeed", &job_name),
}
})
})
.await
retry_policy
.retry_if(
move || {
tail_job_logs(
jobs_api.clone(),
job_name.clone(),
job_namespace.to_string(),
)
},
|e: &LogJobError| matches!(e, LogJobError::RetryableError(_)),
)
.await?;
Ok(())
}

/// Waits for a given number of HAProxy K8s Deployments to be ready
Expand Down
5 changes: 0 additions & 5 deletions testsuite/forge/src/backend/k8s/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,6 @@ pub fn k8s_wait_nodes_strategy() -> impl Iterator<Item = Duration> {
fixed_retry_strategy(10 * 1000, 120)
}

pub fn k8s_wait_indexer_strategy() -> impl Iterator<Item = Duration> {
// retry every 10 seconds for 20 minutes
fixed_retry_strategy(10 * 1000, 120)
}

async fn list_stateful_sets(client: K8sClient, kube_namespace: &str) -> Result<Vec<StatefulSet>> {
let stateful_set_api: Api<StatefulSet> = Api::namespaced(client, kube_namespace);
let lp = ListParams::default();
Expand Down
12 changes: 7 additions & 5 deletions testsuite/forge/src/backend/k8s_deployer/deployer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use super::{
DEFAULT_FORGE_DEPLOYER_IMAGE_TAG, FORGE_DEPLOYER_SERVICE_ACCOUNT_NAME,
FORGE_DEPLOYER_VALUES_ENV_VAR_NAME,
};
use crate::{
k8s_wait_indexer_strategy, maybe_create_k8s_resource, wait_log_job, K8sApi, ReadWrite, Result,
};
use crate::{maybe_create_k8s_resource, wait_log_job, K8sApi, ReadWrite, Result};
use again::RetryPolicy;
use aptos_logger::info;
use k8s_openapi::api::{
batch::v1::Job,
Expand All @@ -18,7 +17,7 @@ use kube::{
api::{ObjectMeta, PostParams},
ResourceExt,
};
use std::{collections::BTreeMap, sync::Arc};
use std::{collections::BTreeMap, sync::Arc, time::Duration};

/// The ForgeDeployerManager is responsible for managing the lifecycle of forge deployers, which deploy the
/// forge components to the k8s cluster.
Expand Down Expand Up @@ -240,11 +239,14 @@ impl ForgeDeployerManager {
* Wait for the deployer job to complete.
*/
pub async fn wait_completed(&self) -> Result<()> {
// retry for ~10 min at a fixed interval. Note the actual job may take longer than this to complete, but the last attempt to tail the logs will succeed before then
// Ideally the deployer itself knows to fail fast depending on the workloads' health
let retry_policy = RetryPolicy::fixed(Duration::from_secs(10)).with_max_retries(6 * 10);
wait_log_job(
self.jobs_api.clone(),
&self.namespace,
self.get_name(),
k8s_wait_indexer_strategy(),
retry_policy,
)
.await
}
Expand Down

0 comments on commit 514653c

Please sign in to comment.