Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11534. Fixed exception handling when container signalling is interrupted #5864

Merged
merged 4 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -787,9 +788,18 @@ public boolean signalContainer(ContainerSignalContext ctx)
LOG.warn("Error in signalling container {} with {}; exit = {}",
pid, signal, retCode, e);
logOutput(e.getOutput());
throw new IOException("Problem signalling container " + pid + " with "
+ signal + "; output: " + e.getOutput() + " and exitCode: "
+ retCode, e);

// In ContainerExecutionException -1 is the default value for the exit code.
// If it remained unset, we can treat the signalling as interrupted.
if (retCode == ContainerExecutionException.getDefaultExitCode()) {
throw new InterruptedIOException("Signalling container " + pid + " with "
+ signal + " is interrupted; output: " + e.getOutput() + " and exitCode: "
+ retCode);
} else {
throw new IOException("Problem signalling container " + pid + " with "
+ signal + "; output: " + e.getOutput() + " and exitCode: "
+ retCode, e);
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Integer call() {

dispatcher.getEventHandler().handle(new ContainerEvent(containerId,
ContainerEventType.RECOVER_PAUSED_CONTAINER));
boolean notInterrupted = true;
boolean interrupted = false;
try {
File pidFile = locatePidFile(appIdStr, containerIdStr);
if (pidFile != null) {
Expand All @@ -87,11 +87,11 @@ public Integer call() {

} catch (InterruptedException | InterruptedIOException e) {
LOG.warn("Interrupted while waiting for exit code from " + containerId);
notInterrupted = false;
interrupted = true;
} catch (IOException e) {
LOG.error("Unable to kill the paused container " + containerIdStr, e);
} finally {
if (notInterrupted) {
if (!interrupted) {
this.completed.set(true);
exec.deactivateContainer(containerId);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Integer call() {
dispatcher.getEventHandler().handle(new ContainerEvent(containerId,
ContainerEventType.CONTAINER_LAUNCHED));

boolean notInterrupted = true;
boolean interrupted = false;
try {
File pidFile = locatePidFile(appIdStr, containerIdStr);
if (pidFile != null) {
Expand All @@ -92,11 +92,11 @@ public Integer call() {
}
} catch (InterruptedException | InterruptedIOException e) {
LOG.warn("Interrupted while waiting for exit code from " + containerId);
notInterrupted = false;
interrupted = true;
} catch (IOException e) {
LOG.error("Unable to recover container " + containerIdStr, e);
} finally {
if (notInterrupted) {
if (!interrupted) {
this.completed.set(true);
exec.deactivateContainer(containerId);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,8 @@ public String getErrorOutput() {
return errorOutput;
}

public static int getDefaultExitCode() {
return EXIT_CODE_UNSET;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntime;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
import org.slf4j.Logger;
Expand All @@ -41,6 +46,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.util.ArrayList;
Expand Down Expand Up @@ -725,6 +731,44 @@ public void testGetLocalResources() throws Exception {
verify(lce, times(1)).getLocalResources(container);
}

@Test
public void testSignalContainerFailureWhenExitCodeIsPresentInTheException()
throws ContainerExecutionException {
LinuxContainerRuntime containerRuntime = mock(LinuxContainerRuntime.class);
LinuxContainerExecutor containerExecutor = spy(new LinuxContainerExecutor(
containerRuntime));
ContainerSignalContext signalContext = new ContainerSignalContext.Builder().build();
ContainerExecutionException testException =
new ContainerExecutionException("exceptionWithExitCode", 123);

doNothing().when(containerExecutor).verifyUsernamePattern(any());
doThrow(testException)
.when(containerRuntime)
.signalContainer(any(ContainerRuntimeContext.class));

assertThrows(IOException.class,
() -> containerExecutor.signalContainer(signalContext));
}

@Test
public void testSignalContainerFailureWhenExitCodeIsNotPresentInTheException()
throws ContainerExecutionException {
LinuxContainerRuntime containerRuntime = mock(LinuxContainerRuntime.class);
LinuxContainerExecutor containerExecutor = spy(new LinuxContainerExecutor(
containerRuntime));
ContainerSignalContext signalContext = new ContainerSignalContext.Builder().build();
ContainerExecutionException testException =
new ContainerExecutionException("exceptionWithoutExitCode");

doNothing().when(containerExecutor).verifyUsernamePattern(any());
doThrow(testException)
.when(containerRuntime)
.signalContainer(any(ContainerRuntimeContext.class));

assertThrows(InterruptedIOException.class,
() -> containerExecutor.signalContainer(signalContext));
}

@Deprecated
private static class TestResourceHandler implements LCEResourcesHandler {
static Set<ContainerId> postExecContainers = new HashSet<ContainerId>();
Expand Down