Skip to content

Check job condition to determine if job has failed in addition to checking job status #2201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions operator/src/main/java/oracle/kubernetes/operator/JobWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@

package oracle.kubernetes.operator;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.Nonnull;

import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1Job;
Expand Down Expand Up @@ -137,16 +139,38 @@ static boolean isFailed(V1Job job) {
return false;
}

V1JobStatus status = job.getStatus();
if (status != null) {
if (status.getFailed() != null && status.getFailed() > 0) {
LOGGER.severe(MessageKeys.JOB_IS_FAILED, job.getMetadata().getName());
return true;
}
if (isStatusFailed(job) || isConditionFailed(job)) {
LOGGER.severe(MessageKeys.JOB_IS_FAILED, job.getMetadata().getName());
return true;
}
return false;
}

private static boolean isStatusFailed(V1Job job) {
return Optional.ofNullable(job.getStatus()).map(V1JobStatus::getFailed).map(failed -> (failed > 0)).orElse(false);
}

private static boolean isConditionFailed(V1Job job) {
return getJobConditions(job).stream().anyMatch(JobWatcher::isJobConditionFailed);
}

private static List<V1JobCondition> getJobConditions(@Nonnull V1Job job) {
return Optional.ofNullable(job.getStatus()).map(V1JobStatus::getConditions).orElse(Collections.emptyList());
}

private static boolean isJobConditionFailed(V1JobCondition jobCondition) {
return getType(jobCondition).equals("Failed") && getStatus(jobCondition).equals("True");
}

private static String getType(V1JobCondition jobCondition) {
return Optional.ofNullable(jobCondition).map(V1JobCondition::getType).orElse("");
}

private static String getStatus(V1JobCondition jobCondition) {
return Optional.ofNullable(jobCondition).map(V1JobCondition::getStatus).orElse("");
}


static String getFailedReason(V1Job job) {
V1JobStatus status = job.getStatus();
if (status != null && status.getConditions() != null) {
Expand Down Expand Up @@ -305,4 +329,4 @@ private long getJobStartedSeconds() {
return -1;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package oracle.kubernetes.operator;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
Expand Down Expand Up @@ -128,6 +130,28 @@ public void whenJobConditionStatusFalse_reportNotComplete() {
assertThat(JobWatcher.isComplete(cachedJob), is(false));
}

@Test
public void whenJobConditionTypeFailedWithTrueStatus_reportFailed() {
markJobConditionFailed(cachedJob);

assertThat(JobWatcher.isFailed(cachedJob), is(true));
}

@Test
public void whenJobConditionTypeFailedWithNoStatus_reportNotFailed() {
cachedJob.status(new V1JobStatus().addConditionsItem(new V1JobCondition().type("Failed").status("")));

assertThat(JobWatcher.isFailed(cachedJob), is(false));
}

@Test
public void whenJobHasStatusWithNoConditionsAndNotFailed_reportNotFailed() {
cachedJob.status(new V1JobStatus().conditions(Collections.emptyList()));

assertThat(JobWatcher.isFailed(cachedJob), is(false));
}


@Test
public void whenJobRunningAndReadyConditionIsTrue_reportComplete() {
markJobCompleted(cachedJob);
Expand All @@ -151,6 +175,10 @@ private V1Job markJobFailed(V1Job job) {
return setFailedWithReason(job, null);
}

private V1Job markJobConditionFailed(V1Job job) {
return setFailedConditionWithReason(job, null);
}

private V1Job markJobTimedOut(V1Job job) {
return markJobTimedOut(job, "DeadlineExceeded");
}
Expand All @@ -163,6 +191,11 @@ private V1Job setFailedWithReason(V1Job job, String reason) {
return job.status(new V1JobStatus().failed(1).addConditionsItem(createCondition("Failed").reason(reason)));
}

private V1Job setFailedConditionWithReason(V1Job job, String reason) {
return job.status(new V1JobStatus().conditions(
new ArrayList<>(Arrays.asList(new V1JobCondition().type("Failed").status("True").reason(reason)))));
}

@Test
public void whenJobHasNoStatus_reportNotFailed() {
assertThat(JobWatcher.isFailed(cachedJob), is(false));
Expand Down Expand Up @@ -248,6 +281,13 @@ public void whenWaitForReadyAppliedToFailedJob_performNextStep() {
assertThat(terminalStep.wasRun(), is(true));
}

@Test
public void whenWaitForReadyAppliedToJobWithFailedCondition_performNextStep() {
startWaitForReady(this::markJobConditionFailed);

assertThat(terminalStep.wasRun(), is(true));
}

// Starts the waitForReady step with job modified as needed
private void startWaitForReady(Function<V1Job,V1Job> jobFunction) {
AtomicBoolean stopping = new AtomicBoolean(false);
Expand Down