Skip to content

Backport fix for OWLS-96080 to 3.4 branch #3109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,15 @@ public NextAction onSuccess(Packet packet, CallResponse<V1Job> callResponse) {
}
}

static class DeadlineExceededException extends Exception {
public static class DeadlineExceededException extends Exception {
final V1Job job;

DeadlineExceededException(V1Job job) {
public DeadlineExceededException(V1Job job) {
super();
this.job = job;
}

@Override
public String toString() {
return LOGGER.formatMessage(
MessageKeys.JOB_DEADLINE_EXCEEDED_MESSAGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static oracle.kubernetes.operator.LabelConstants.INTROSPECTION_DOMAIN_SPEC_GENERATION;
import static oracle.kubernetes.operator.LabelConstants.INTROSPECTION_STATE_LABEL;
import static oracle.kubernetes.operator.ProcessingConstants.COMPATIBILITY_MODE;
import static oracle.kubernetes.operator.ProcessingConstants.DOMAIN_INTROSPECTOR_JOB;
import static oracle.kubernetes.operator.ProcessingConstants.DOMAIN_INTROSPECT_REQUESTED;
import static oracle.kubernetes.operator.ProcessingConstants.JOB_POD_FLUENTD_CONTAINER_TERMINATED;
import static oracle.kubernetes.operator.ProcessingConstants.JOB_POD_INTROSPECT_CONTAINER_TERMINATED;
Expand Down Expand Up @@ -368,8 +369,8 @@ static class ReplaceOrCreateStep extends DefaultResponseStep {
public NextAction onSuccess(Packet packet, CallResponse callResponse) {
DomainPresenceInfo info = packet.getSpi(DomainPresenceInfo.class);
V1Job job = (V1Job) callResponse.getResult();
if ((job != null) && (packet.get(ProcessingConstants.DOMAIN_INTROSPECTOR_JOB) == null)) {
packet.put(ProcessingConstants.DOMAIN_INTROSPECTOR_JOB, job);
if ((job != null) && (packet.get(DOMAIN_INTROSPECTOR_JOB) == null)) {
packet.put(DOMAIN_INTROSPECTOR_JOB, job);
}
return doNext(getIntrospectorPodStatus(info.getDomainUid(), info.getNamespace(), getNext()), packet);
}
Expand Down Expand Up @@ -443,7 +444,7 @@ public NextAction onSuccess(Packet packet, CallResponse<V1PodList> callResponse)
.orElseGet(Collections::emptyList)
.forEach(pod -> recordJobPodNameAndStatus(packet, pod));

V1Job job = (V1Job) packet.get(ProcessingConstants.DOMAIN_INTROSPECTOR_JOB);
V1Job job = (V1Job) packet.get(DOMAIN_INTROSPECTOR_JOB);
OffsetDateTime startTime = createNextSteps(nextSteps, packet, job, getNext());
packet.putIfAbsent(START_TIME, startTime);
return doContinueListOrNext(callResponse, packet, nextSteps.get(0));
Expand Down Expand Up @@ -571,7 +572,7 @@ public NextAction onSuccess(Packet packet, CallResponse<String> callResponse) {
}

V1Job domainIntrospectorJob =
(V1Job) packet.get(ProcessingConstants.DOMAIN_INTROSPECTOR_JOB);
(V1Job) packet.get(DOMAIN_INTROSPECTOR_JOB);
boolean jobPodContainerTerminated = JOB_POD_INTROSPECT_CONTAINER_TERMINATED_MARKER
.equals(packet.get(JOB_POD_INTROSPECT_CONTAINER_TERMINATED));

Expand Down Expand Up @@ -790,7 +791,7 @@ public static Step deleteDomainIntrospectorJobStep(Step next) {

static void logJobDeleted(String domainUid, String namespace, String jobName, Packet packet) {
V1Job domainIntrospectorJob =
(V1Job) packet.remove(ProcessingConstants.DOMAIN_INTROSPECTOR_JOB);
(V1Job) packet.remove(DOMAIN_INTROSPECTOR_JOB);

packet.remove(ProcessingConstants.INTROSPECTOR_JOB_FAILURE_LOGGED);
if (domainIntrospectorJob != null
Expand Down Expand Up @@ -908,6 +909,9 @@ public NextAction onSuccess(Packet packet, CallResponse<V1PodList> callResponse)
.orElse(null);

if (jobPod != null) {
if (isJobPodTimedOut(jobPod)) {
return onFailureNoRetry(packet, callResponse);
}
addContainerTerminatedMarkerToPacket(jobPod, getJobName(), packet);
recordJobPodName(packet, getName(jobPod));
}
Expand All @@ -922,6 +926,20 @@ private String getName(V1Pod pod) {
return Optional.of(pod).map(V1Pod::getMetadata).map(V1ObjectMeta::getName).orElse("");
}

private boolean isJobPodTimedOut(V1Pod jobPod) {
return "DeadlineExceeded".equals(getJobPodStatusReason(jobPod));
}

private String getJobPodStatusReason(V1Pod jobPod) {
return Optional.ofNullable(jobPod.getStatus()).map(V1PodStatus::getReason).orElse(null);
}

@Override
protected Throwable createTerminationException(Packet packet,
CallResponse<V1PodList> callResponse) {
return new JobWatcher.DeadlineExceededException((V1Job) packet.get(DOMAIN_INTROSPECTOR_JOB));
}

private void recordJobPodName(Packet packet, String podName) {
packet.put(ProcessingConstants.JOB_POD_NAME, podName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,18 @@ public NextAction onFailure(Step conflictStep, Packet packet, CallResponse<T> ca
}

protected NextAction onFailureNoRetry(Packet packet, CallResponse<T> callResponse) {
return doTerminate(UnrecoverableErrorBuilder.createExceptionFromFailedCall(callResponse), packet);
return doTerminate(createTerminationException(packet, callResponse), packet);
}

/**
* Create an exception to be passed to the doTerminate call.
*
* @param packet Packet for creating the exception
* @param callResponse CallResponse for creating the exception
* @return An Exception to be passed to the doTerminate call
*/
protected Throwable createTerminationException(Packet packet, CallResponse<T> callResponse) {
return UnrecoverableErrorBuilder.createExceptionFromFailedCall(callResponse);
}

protected boolean isNotAuthorizedOrForbidden(CallResponse<T> callResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,20 @@ public synchronized Fiber startFiberIfLastFiberMatches(
new CompletionCallback() {
@Override
public void onCompletion(Packet packet) {
gateMap.remove(key, f);
callback.onCompletion(packet);
try {
callback.onCompletion(packet);
} finally {
gateMap.remove(key, f);
}
}

@Override
public void onThrowable(Packet packet, Throwable throwable) {
gateMap.remove(key, f);
callback.onThrowable(packet, throwable);
try {
callback.onThrowable(packet, throwable);
} finally {
gateMap.remove(key, f);
}
}
});
return f;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.LogRecord;
Expand All @@ -23,12 +25,16 @@
import io.kubernetes.client.openapi.models.V1JobCondition;
import io.kubernetes.client.openapi.models.V1JobStatus;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1PodStatus;
import io.kubernetes.client.openapi.models.V1ResourceRequirements;
import io.kubernetes.client.openapi.models.V1SecretReference;
import io.kubernetes.client.openapi.models.V1Volume;
import io.kubernetes.client.openapi.models.V1VolumeMount;
import oracle.kubernetes.operator.JobAwaiterStepFactory;
import oracle.kubernetes.operator.JobWatcher;
import oracle.kubernetes.operator.LabelConstants;
import oracle.kubernetes.operator.ProcessingConstants;
import oracle.kubernetes.operator.TuningParameters;
import oracle.kubernetes.operator.calls.unprocessable.UnrecoverableErrorBuilderImpl;
Expand Down Expand Up @@ -127,6 +133,7 @@ class DomainIntrospectorJobTest {
private static final String FATAL_MESSAGE_1 = "@[SEVERE] " + FATAL_PROBLEM_1;
public static final String TEST_VOLUME_NAME = "test";
public static final String JOB_UID = "some-unique-id";
private static final String JOB_NAME = UID + "-introspector";
public static final String INFO_MESSAGE_1 = "informational message";
private static final String INFO_MESSAGE = "@[INFO] just letting you know";

Expand Down Expand Up @@ -878,4 +885,60 @@ private Cluster getCluster(String clusterName) {
private String getDomainHome() {
return "/shared/domains/" + UID;
}

@Test
void whenPreviousFailedJobWithDeadlineExceeded_terminateWithException() {
ignoreIntrospectorFailureLogs();
ignoreJobCreatedAndDeletedLogs();
testSupport.addToPacket(DOMAIN_TOPOLOGY, createDomainConfig("cluster-1"));
defineFailedIntrospectionPodWithDeadlineExceeded();
testSupport.doOnCreate(JOB, this::recordJob);
testSupport.doAfterCall(JOB, "deleteJob", this::replaceFailedJobPodWithSuccess);

testSupport.runSteps(JobHelper.createDomainIntrospectorJobStep(null));

testSupport.verifyCompletionThrowable(JobWatcher.DeadlineExceededException.class);
}

private void defineFailedIntrospectionPodWithDeadlineExceeded() {
testSupport.defineResources(asFailedJobPodWithDeadlineExceeded(createIntrospectorJobPod()));
}

private V1Pod asFailedJobPodWithDeadlineExceeded(V1Pod introspectorJobPod) {
return introspectorJobPod.status(new V1PodStatus().reason("DeadlineExceeded"));
}

private void ignoreIntrospectorFailureLogs() {
consoleHandlerMemento.ignoreMessage(getJobFailedMessageKey());
consoleHandlerMemento.ignoreMessage(getJobFailedDetailMessageKey());
}

private void ignoreJobCreatedAndDeletedLogs() {
consoleHandlerMemento.ignoreMessage(getJobCreatedMessageKey());
consoleHandlerMemento.ignoreMessage(getJobDeletedMessageKey());
}

private V1Job affectedJob;

private void recordJob(Object job) {
affectedJob = asCompletedJob((V1Job) job);
}

private V1Job asCompletedJob(V1Job job) {
job.setStatus(new V1JobStatus().addConditionsItem(
new V1JobCondition().status("True").type("Complete")));
return job;
}

private void replaceFailedJobPodWithSuccess() {
testSupport.deleteResources(createIntrospectorJobPod());
testSupport.defineResources(createIntrospectorJobPod());
}

private V1Pod createIntrospectorJobPod() {
Map<String, String> labels = new HashMap<>();
labels.put(LabelConstants.JOBNAME_LABEL, JOB_NAME);
return new V1Pod().metadata(new V1ObjectMeta().name(JOB_NAME).labels(labels).namespace(NS));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public class KubernetesTestSupport extends FiberTestSupport {
private final Map<String, DataRepository<?>> repositories = new HashMap<>();
private final Map<Class<?>, String> dataTypes = new HashMap<>();
private Failure failure;
private AfterCallAction afterCallAction;
private long resourceVersion;
private int numCalls;
private boolean addCreationTimestamp;
Expand Down Expand Up @@ -480,6 +481,16 @@ public void cancelFailures() {
failure = null;
}

/**
* Specifies an action to perform after completing the next matching invocation.
* @param resourceType the type of resource
* @param call the call string
* @param action the action to perform
*/
public void doAfterCall(@Nonnull String resourceType, @Nonnull String call, @Nonnull Runnable action) {
afterCallAction = new AfterCallAction(resourceType, call, action);
}

@SuppressWarnings("unused")
private enum Operation {
create {
Expand Down Expand Up @@ -591,6 +602,27 @@ HttpErrorException getException() {
}
}

static class AfterCallAction {
private final String resourceType;
private final String call;
private final Runnable action;

AfterCallAction(@Nonnull String resourceType, @Nonnull String call, @Nonnull Runnable action) {
this.resourceType = resourceType;
this.call = call;
this.action = action;
}

boolean matches(String resourceType, RequestParams requestParams) {
return this.resourceType.equals(resourceType)
&& (this.call.equals(requestParams.call));
}

void doAction() {
action.run();
}
}

static class HttpErrorException extends RuntimeException {
private final ApiException apiException;

Expand Down Expand Up @@ -1174,15 +1206,22 @@ private int indexOfFirstCapital(String callName) {
}

private Object execute() {
if (failure != null && failure.matches(resourceType, requestParams, operation)) {
try {
throw failure.getException();
} finally {
failure = null;
try {
if (failure != null && failure.matches(resourceType, requestParams, operation)) {
try {
throw failure.getException();
} finally {
failure = null;
}
}
}

return operation.execute(this, selectRepository(resourceType));
return operation.execute(this, selectRepository(resourceType));
} finally {
if (afterCallAction != null && afterCallAction.matches(resourceType, requestParams)) {
afterCallAction.doAction();
afterCallAction = null;
}
}
}

@SuppressWarnings("unchecked")
Expand Down