diff --git a/src/main/java/org/csanchez/jenkins/plugins/kubernetes/pipeline/ContainerExecDecorator.java b/src/main/java/org/csanchez/jenkins/plugins/kubernetes/pipeline/ContainerExecDecorator.java index df6dfe1043..7f967806f7 100755 --- a/src/main/java/org/csanchez/jenkins/plugins/kubernetes/pipeline/ContainerExecDecorator.java +++ b/src/main/java/org/csanchez/jenkins/plugins/kubernetes/pipeline/ContainerExecDecorator.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; import java.io.PrintStream; @@ -28,6 +29,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -39,6 +41,7 @@ import hudson.EnvVars; import hudson.FilePath; +import edu.umd.cs.findbugs.annotations.CheckForNull; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; import org.apache.commons.io.output.TeeOutputStream; @@ -76,38 +79,51 @@ public class ContainerExecDecorator extends LauncherDecorator implements Seriali private transient KubernetesClient client; + @SuppressFBWarnings(value = "SE_TRANSIENT_FIELD_NOT_RESTORED", justification = "not needed on deserialization") private transient List closables; + @SuppressFBWarnings(value = "SE_TRANSIENT_FIELD_NOT_RESTORED", justification = "not needed on deserialization") + private transient Map processes = new HashMap(); + private final String podName; private final String namespace; private final String containerName; private final EnvironmentExpander environmentExpander; - public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String namespace, EnvironmentExpander environmentExpander) { + private final FilePath ws; + + public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String namespace, EnvironmentExpander environmentExpander, FilePath ws) { this.client = client; this.podName = podName; this.namespace = namespace; this.containerName = containerName; this.environmentExpander = environmentExpander; + this.ws = ws; } + @Deprecated + public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String namespace, EnvironmentExpander environmentExpander) { + this(client, podName, containerName, namespace, environmentExpander, null); + } + + @Deprecated public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String namespace) { - this(client, podName, containerName, namespace, null); + this(client, podName, containerName, namespace, null, null); } @Deprecated public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, AtomicBoolean alive, CountDownLatch started, CountDownLatch finished, String namespace) { - this(client, podName, containerName, namespace, null); + this(client, podName, containerName, namespace, null, null); } @Deprecated public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, AtomicBoolean alive, CountDownLatch started, CountDownLatch finished) { - this(client, podName, containerName, null, null); + this(client, podName, containerName, (String) null, null, null); } @Deprecated public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String path, AtomicBoolean alive, CountDownLatch started, CountDownLatch finished) { - this(client, podName, containerName, null, null); + this(client, podName, containerName, (String) null, null, null); } @Override @@ -133,6 +149,80 @@ public Proc launch(ProcStarter starter) throws IOException { } private Proc doLaunch(boolean quiet, String [] cmdEnvs, OutputStream outputForCaller, FilePath pwd, String... commands) throws IOException { + if (processes == null) { + processes = new HashMap<>(); + } + //check ifits the actual script or the ProcessLiveness check. + int p = readPidFromPsCommand(commands); + //if it is a liveness check, try to find the actual process to avoid doing multiple execs. + if (p == 9999) { + return new Proc() { + @Override + public boolean isAlive() throws IOException, InterruptedException { + return false; + } + + @Override + public void kill() throws IOException, InterruptedException { + + } + + @Override + public int join() throws IOException, InterruptedException { + return 1; + } + + @Override + public InputStream getStdout() { + return null; + } + + @Override + public InputStream getStderr() { + return null; + } + + @Override + public OutputStream getStdin() { + return null; + } + }; + } else if (p > 0 && processes.containsKey(p)) { + LOGGER.log(Level.INFO, "Retrieved process from cache with pid:[ " + p +"]."); + Proc proc = processes.get(p); + return new Proc() { + + @Override + public boolean isAlive() throws IOException, InterruptedException { + return false; + } + + @Override + public void kill() throws IOException, InterruptedException { + } + + @Override + public int join() throws IOException, InterruptedException { + return proc.isAlive() ? 0 : -1; + } + + @Override + public InputStream getStdout() { + return null; + } + + @Override + public InputStream getStderr() { + return null; + } + + @Override + public OutputStream getStdin() { + return null; + } + }; + } + waitUntilContainerIsReady(); final CountDownLatch started = new CountDownLatch(1); @@ -151,7 +241,7 @@ private Proc doLaunch(boolean quiet, String [] cmdEnvs, OutputStream outputForC // we need to keep the last bytes in the stream to parse the exit code as it is printed there // so we use a buffer ExitCodeOutputStream exitCodeOutputStream = new ExitCodeOutputStream(); - // send container output both to the job output and our buffer + // send container output to all 3 streams (pid, out, job). stream = new TeeOutputStream(exitCodeOutputStream, stream); // Send to proc caller as well if they sent one if (outputForCaller != null) { @@ -169,6 +259,7 @@ private Proc doLaunch(boolean quiet, String [] cmdEnvs, OutputStream outputForC public void onOpen(Response response) { alive.set(true); started.countDown(); + LOGGER.log(Level.FINEST, "onOpen : {0}", finished); } @Override @@ -238,13 +329,17 @@ public void onClose(int i, String s) { this.setupEnvironmentVariable(envVars, watch); doExec(watch, printStream, commands); - ContainerExecProc proc = new ContainerExecProc(watch, alive, finished, exitCodeOutputStream::getExitCode); if (closables == null) { closables = new ArrayList<>(); } + + int pid = readPidFromPidFile(commands); + LOGGER.log(Level.INFO, "Created process inside pod: ["+podName+"], container: ["+containerName+"] with pid:["+pid+"]"); + ContainerExecProc proc = new ContainerExecProc(watch, alive, finished, exitCodeOutputStream::getExitCode); + processes.put(pid, proc); closables.add(proc); return proc; - } catch (InterruptedException ie) { + } catch (InterruptedException ie) { throw new InterruptedIOException(ie.getMessage()); } catch (Exception e) { closeWatch(watch); @@ -266,8 +361,7 @@ public void kill(Map modelEnvVars) throws IOException, Interrupt getListener().getLogger().println("kill finished with exit code " + exitCode); } - private void setupEnvironmentVariable(EnvVars vars, ExecWatch watch) throws IOException - { + private void setupEnvironmentVariable(EnvVars vars, ExecWatch watch) throws IOException { for (Map.Entry entry : vars.entrySet()) { //Check that key is bash compliant. if (entry.getKey().matches("[a-zA-Z_][a-zA-Z0-9_]*")) { @@ -356,6 +450,51 @@ private static void doExec(ExecWatch watch, PrintStream out, String... statement } } + static int readPidFromPsCommand(String... commands) { + if (commands.length == 4 && "ps".equals(commands[0]) && "-o".equals(commands[1]) && commands[2].equals("pid=")) { + return Integer.parseInt(commands[3]); + } + + + if (commands.length == 4 && "ps".equals(commands[0]) && "-o".equals(commands[1]) && commands[2].startsWith("-pid")) { + return Integer.parseInt(commands[3]); + } + return -1; + } + + + private synchronized int readPidFromPidFile(String... commands) throws IOException, InterruptedException { + int pid = -1; + String pidFilePath = readPidFile(commands); + if (pidFilePath == null) { + return pid; + } + FilePath pidFile = ws.child(pidFilePath); + for (int w = 0; w < 10 && !pidFile.exists(); w++) { + try { + wait(1000); + } catch (InterruptedException e) { + break; + } + } + if (pidFile.exists()) { + try { + pid = Integer.parseInt(pidFile.readToString().trim()); + } catch (NumberFormatException x) { + throw new IOException("corrupted content in " + pidFile + ": " + x, x); + } + } + return pid; + } + + @CheckForNull + static String readPidFile(String... commands) { + if (commands.length >= 4 && "nohup".equals(commands[0]) && "sh".equals(commands[1]) && commands[2].equals("-c") && commands[3].startsWith("echo \\$\\$ >")) { + return commands[3].substring(13, commands[3].indexOf(";") - 1); + } + return null; + } + static String[] getCommands(Launcher.ProcStarter starter) { List allCommands = new ArrayList(); @@ -416,7 +555,7 @@ public int getExitCode() { int i = 1; String s = new String(b.array(), StandardCharsets.UTF_8); if (s.indexOf(EXIT_COMMAND_TXT) < 0) { - LOGGER.log(Level.WARNING, "Unable to find \"{0}\" in {1}", new Object[] { EXIT_COMMAND_TXT, s }); + LOGGER.log(Level.WARNING, "Unable to find \"{0}\" in {1}", new Object[]{EXIT_COMMAND_TXT, s}); return i; } // parse the exitcode int printed after EXITCODE @@ -426,7 +565,7 @@ public int getExitCode() { i = Integer.parseInt(s); } catch (NumberFormatException e) { LOGGER.log(Level.WARNING, "Unable to parse exit code as integer: \"{0}\" {1} / {2}", - new Object[] { s, queue.toString(), Arrays.toString(b.array()) }); + new Object[]{s, queue.toString(), Arrays.toString(b.array())}); } return i; } diff --git a/src/main/java/org/csanchez/jenkins/plugins/kubernetes/pipeline/ContainerStepExecution.java b/src/main/java/org/csanchez/jenkins/plugins/kubernetes/pipeline/ContainerStepExecution.java index 32ba9cfac2..57b6ae83bc 100755 --- a/src/main/java/org/csanchez/jenkins/plugins/kubernetes/pipeline/ContainerStepExecution.java +++ b/src/main/java/org/csanchez/jenkins/plugins/kubernetes/pipeline/ContainerStepExecution.java @@ -3,6 +3,9 @@ import java.io.Closeable; import java.util.logging.Level; import java.util.logging.Logger; + +import hudson.FilePath; +import org.jenkinsci.plugins.workflow.steps.AbstractStepExecutionImpl; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.jenkinsci.plugins.workflow.steps.BodyExecutionCallback; import org.jenkinsci.plugins.workflow.steps.BodyInvoker; @@ -57,7 +60,7 @@ public boolean start() throws Exception { client = nodeContext.connectToCloud(); EnvironmentExpander env = getContext().get(EnvironmentExpander.class); - decorator = new ContainerExecDecorator(client, nodeContext.getPodName(), containerName, nodeContext.getNamespace(), env); + decorator = new ContainerExecDecorator(client, nodeContext.getPodName(), containerName, nodeContext.getNamespace(), env, getContext().get(FilePath.class)); getContext().newBodyInvoker() .withContext(BodyInvoker .mergeLauncherDecorators(getContext().get(LauncherDecorator.class), decorator)) diff --git a/src/test/java/org/csanchez/jenkins/plugins/kubernetes/pipeline/ContainerExecDecoratorTest.java b/src/test/java/org/csanchez/jenkins/plugins/kubernetes/pipeline/ContainerExecDecoratorTest.java index 9242ca7bbd..18be99aafc 100644 --- a/src/test/java/org/csanchez/jenkins/plugins/kubernetes/pipeline/ContainerExecDecoratorTest.java +++ b/src/test/java/org/csanchez/jenkins/plugins/kubernetes/pipeline/ContainerExecDecoratorTest.java @@ -99,6 +99,10 @@ public void after() throws Exception { deletePods(client, getLabels(this), false); } + /** + * Test that multiple command execution in parallel works + * @throws Exception + */ @Test(timeout = 10000) public void testCommandExecution() throws Exception { Thread[] t = new Thread[10]; @@ -212,7 +216,11 @@ private ProcReturn execCommand(boolean quiet, String... cmd) throws Exception { .decorate(new DummyLauncher(new StreamTaskListener(new TeeOutputStream(out, System.out))), null); ContainerExecProc proc = (ContainerExecProc) launcher .launch(launcher.new ProcStarter().pwd("/tmp").cmds(cmd).quiet(quiet)); - assertTrue(proc.isAlive()); + // wait for proc to finish (shouldn't take long) + while (proc.isAlive()) { + Thread.sleep(100); + } + assertFalse("proc is alive", proc.isAlive()); int exitCode = proc.joinWithTimeout(10, TimeUnit.SECONDS, StreamTaskListener.fromStderr()); return new ProcReturn(proc, exitCode, out.toString()); }