Skip to content

Commit

Permalink
Merge pull request #239 from iocanel/process-alive
Browse files Browse the repository at this point in the history
Prevent unneeded exec operations
carlossg authored Jan 29, 2018
2 parents 6090784 + 1fe0491 commit 9453d84
Showing 3 changed files with 164 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.PrintStream;
@@ -28,6 +29,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -39,6 +41,7 @@

import hudson.EnvVars;
import hudson.FilePath;
import edu.umd.cs.findbugs.annotations.CheckForNull;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import org.apache.commons.io.output.TeeOutputStream;
@@ -76,38 +79,51 @@ public class ContainerExecDecorator extends LauncherDecorator implements Seriali

private transient KubernetesClient client;


@SuppressFBWarnings(value = "SE_TRANSIENT_FIELD_NOT_RESTORED", justification = "not needed on deserialization")
private transient List<Closeable> closables;
@SuppressFBWarnings(value = "SE_TRANSIENT_FIELD_NOT_RESTORED", justification = "not needed on deserialization")
private transient Map<Integer, ContainerExecProc> processes = new HashMap<Integer, ContainerExecProc>();

private final String podName;
private final String namespace;
private final String containerName;
private final EnvironmentExpander environmentExpander;

public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String namespace, EnvironmentExpander environmentExpander) {
private final FilePath ws;

public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String namespace, EnvironmentExpander environmentExpander, FilePath ws) {
this.client = client;
this.podName = podName;
this.namespace = namespace;
this.containerName = containerName;
this.environmentExpander = environmentExpander;
this.ws = ws;
}

@Deprecated
public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String namespace, EnvironmentExpander environmentExpander) {
this(client, podName, containerName, namespace, environmentExpander, null);
}

@Deprecated
public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String namespace) {
this(client, podName, containerName, namespace, null);
this(client, podName, containerName, namespace, null, null);
}

@Deprecated
public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, AtomicBoolean alive, CountDownLatch started, CountDownLatch finished, String namespace) {
this(client, podName, containerName, namespace, null);
this(client, podName, containerName, namespace, null, null);
}

@Deprecated
public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, AtomicBoolean alive, CountDownLatch started, CountDownLatch finished) {
this(client, podName, containerName, null, null);
this(client, podName, containerName, (String) null, null, null);
}

@Deprecated
public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String path, AtomicBoolean alive, CountDownLatch started, CountDownLatch finished) {
this(client, podName, containerName, null, null);
this(client, podName, containerName, (String) null, null, null);
}

@Override
@@ -133,6 +149,80 @@ public Proc launch(ProcStarter starter) throws IOException {
}

private Proc doLaunch(boolean quiet, String [] cmdEnvs, OutputStream outputForCaller, FilePath pwd, String... commands) throws IOException {
if (processes == null) {
processes = new HashMap<>();
}
//check ifits the actual script or the ProcessLiveness check.
int p = readPidFromPsCommand(commands);
//if it is a liveness check, try to find the actual process to avoid doing multiple execs.
if (p == 9999) {
return new Proc() {
@Override
public boolean isAlive() throws IOException, InterruptedException {
return false;
}

@Override
public void kill() throws IOException, InterruptedException {

}

@Override
public int join() throws IOException, InterruptedException {
return 1;
}

@Override
public InputStream getStdout() {
return null;
}

@Override
public InputStream getStderr() {
return null;
}

@Override
public OutputStream getStdin() {
return null;
}
};
} else if (p > 0 && processes.containsKey(p)) {
LOGGER.log(Level.INFO, "Retrieved process from cache with pid:[ " + p +"].");
Proc proc = processes.get(p);
return new Proc() {

@Override
public boolean isAlive() throws IOException, InterruptedException {
return false;
}

@Override
public void kill() throws IOException, InterruptedException {
}

@Override
public int join() throws IOException, InterruptedException {
return proc.isAlive() ? 0 : -1;
}

@Override
public InputStream getStdout() {
return null;
}

@Override
public InputStream getStderr() {
return null;
}

@Override
public OutputStream getStdin() {
return null;
}
};
}

waitUntilContainerIsReady();

final CountDownLatch started = new CountDownLatch(1);
@@ -151,7 +241,7 @@ private Proc doLaunch(boolean quiet, String [] cmdEnvs, OutputStream outputForC
// we need to keep the last bytes in the stream to parse the exit code as it is printed there
// so we use a buffer
ExitCodeOutputStream exitCodeOutputStream = new ExitCodeOutputStream();
// send container output both to the job output and our buffer
// send container output to all 3 streams (pid, out, job).
stream = new TeeOutputStream(exitCodeOutputStream, stream);
// Send to proc caller as well if they sent one
if (outputForCaller != null) {
@@ -169,6 +259,7 @@ private Proc doLaunch(boolean quiet, String [] cmdEnvs, OutputStream outputForC
public void onOpen(Response response) {
alive.set(true);
started.countDown();
LOGGER.log(Level.FINEST, "onOpen : {0}", finished);
}

@Override
@@ -238,13 +329,17 @@ public void onClose(int i, String s) {

this.setupEnvironmentVariable(envVars, watch);
doExec(watch, printStream, commands);
ContainerExecProc proc = new ContainerExecProc(watch, alive, finished, exitCodeOutputStream::getExitCode);
if (closables == null) {
closables = new ArrayList<>();
}

int pid = readPidFromPidFile(commands);
LOGGER.log(Level.INFO, "Created process inside pod: ["+podName+"], container: ["+containerName+"] with pid:["+pid+"]");
ContainerExecProc proc = new ContainerExecProc(watch, alive, finished, exitCodeOutputStream::getExitCode);
processes.put(pid, proc);
closables.add(proc);
return proc;
} catch (InterruptedException ie) {
} catch (InterruptedException ie) {
throw new InterruptedIOException(ie.getMessage());
} catch (Exception e) {
closeWatch(watch);
@@ -266,8 +361,7 @@ public void kill(Map<String, String> modelEnvVars) throws IOException, Interrupt
getListener().getLogger().println("kill finished with exit code " + exitCode);
}

private void setupEnvironmentVariable(EnvVars vars, ExecWatch watch) throws IOException
{
private void setupEnvironmentVariable(EnvVars vars, ExecWatch watch) throws IOException {
for (Map.Entry<String, String> entry : vars.entrySet()) {
//Check that key is bash compliant.
if (entry.getKey().matches("[a-zA-Z_][a-zA-Z0-9_]*")) {
@@ -356,6 +450,51 @@ private static void doExec(ExecWatch watch, PrintStream out, String... statement
}
}

static int readPidFromPsCommand(String... commands) {
if (commands.length == 4 && "ps".equals(commands[0]) && "-o".equals(commands[1]) && commands[2].equals("pid=")) {
return Integer.parseInt(commands[3]);
}


if (commands.length == 4 && "ps".equals(commands[0]) && "-o".equals(commands[1]) && commands[2].startsWith("-pid")) {
return Integer.parseInt(commands[3]);
}
return -1;
}


private synchronized int readPidFromPidFile(String... commands) throws IOException, InterruptedException {
int pid = -1;
String pidFilePath = readPidFile(commands);
if (pidFilePath == null) {
return pid;
}
FilePath pidFile = ws.child(pidFilePath);
for (int w = 0; w < 10 && !pidFile.exists(); w++) {
try {
wait(1000);
} catch (InterruptedException e) {
break;
}
}
if (pidFile.exists()) {
try {
pid = Integer.parseInt(pidFile.readToString().trim());
} catch (NumberFormatException x) {
throw new IOException("corrupted content in " + pidFile + ": " + x, x);
}
}
return pid;
}

@CheckForNull
static String readPidFile(String... commands) {
if (commands.length >= 4 && "nohup".equals(commands[0]) && "sh".equals(commands[1]) && commands[2].equals("-c") && commands[3].startsWith("echo \\$\\$ >")) {
return commands[3].substring(13, commands[3].indexOf(";") - 1);
}
return null;
}

static String[] getCommands(Launcher.ProcStarter starter) {
List<String> allCommands = new ArrayList<String>();

@@ -416,7 +555,7 @@ public int getExitCode() {
int i = 1;
String s = new String(b.array(), StandardCharsets.UTF_8);
if (s.indexOf(EXIT_COMMAND_TXT) < 0) {
LOGGER.log(Level.WARNING, "Unable to find \"{0}\" in {1}", new Object[] { EXIT_COMMAND_TXT, s });
LOGGER.log(Level.WARNING, "Unable to find \"{0}\" in {1}", new Object[]{EXIT_COMMAND_TXT, s});
return i;
}
// parse the exitcode int printed after EXITCODE
@@ -426,7 +565,7 @@ public int getExitCode() {
i = Integer.parseInt(s);
} catch (NumberFormatException e) {
LOGGER.log(Level.WARNING, "Unable to parse exit code as integer: \"{0}\" {1} / {2}",
new Object[] { s, queue.toString(), Arrays.toString(b.array()) });
new Object[]{s, queue.toString(), Arrays.toString(b.array())});
}
return i;
}
Original file line number Diff line number Diff line change
@@ -3,6 +3,9 @@
import java.io.Closeable;
import java.util.logging.Level;
import java.util.logging.Logger;

import hudson.FilePath;
import org.jenkinsci.plugins.workflow.steps.AbstractStepExecutionImpl;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.jenkinsci.plugins.workflow.steps.BodyExecutionCallback;
import org.jenkinsci.plugins.workflow.steps.BodyInvoker;
@@ -57,7 +60,7 @@ public boolean start() throws Exception {
client = nodeContext.connectToCloud();

EnvironmentExpander env = getContext().get(EnvironmentExpander.class);
decorator = new ContainerExecDecorator(client, nodeContext.getPodName(), containerName, nodeContext.getNamespace(), env);
decorator = new ContainerExecDecorator(client, nodeContext.getPodName(), containerName, nodeContext.getNamespace(), env, getContext().get(FilePath.class));
getContext().newBodyInvoker()
.withContext(BodyInvoker
.mergeLauncherDecorators(getContext().get(LauncherDecorator.class), decorator))
Original file line number Diff line number Diff line change
@@ -99,6 +99,10 @@ public void after() throws Exception {
deletePods(client, getLabels(this), false);
}

/**
* Test that multiple command execution in parallel works
* @throws Exception
*/
@Test(timeout = 10000)
public void testCommandExecution() throws Exception {
Thread[] t = new Thread[10];
@@ -212,7 +216,11 @@ private ProcReturn execCommand(boolean quiet, String... cmd) throws Exception {
.decorate(new DummyLauncher(new StreamTaskListener(new TeeOutputStream(out, System.out))), null);
ContainerExecProc proc = (ContainerExecProc) launcher
.launch(launcher.new ProcStarter().pwd("/tmp").cmds(cmd).quiet(quiet));
assertTrue(proc.isAlive());
// wait for proc to finish (shouldn't take long)
while (proc.isAlive()) {
Thread.sleep(100);
}
assertFalse("proc is alive", proc.isAlive());
int exitCode = proc.joinWithTimeout(10, TimeUnit.SECONDS, StreamTaskListener.fromStderr());
return new ProcReturn(proc, exitCode, out.toString());
}

0 comments on commit 9453d84

Please sign in to comment.