From ab361667a8b8c5ccf126eb1c34962c86c1b738d4 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Tue, 23 Feb 2021 19:31:10 -0800 Subject: [PATCH] feat(controller) Emissary executor. (#4925) Signed-off-by: Alex Collins --- Makefile | 5 +- cmd/argo/commands/submit.go | 2 +- cmd/argoexec/commands/emissary.go | 218 ++++++++++++++++++ cmd/argoexec/commands/emissary_test.go | 119 ++++++++++ cmd/argoexec/commands/init.go | 4 + cmd/argoexec/commands/root.go | 8 +- cmd/argoexec/main.go | 19 +- config/config.go | 4 + config/image.go | 6 + docs/workflow-controller-configmap.yaml | 12 + docs/workflow-executors.md | 48 +++- manifests/quick-start-minimal.yaml | 9 + manifests/quick-start-mysql.yaml | 9 + manifests/quick-start-postgres.yaml | 9 + .../workflow-controller-configmap.yaml | 9 + test/e2e/fixtures/needs.go | 9 +- workflow/common/common.go | 5 + workflow/controller/workflowpod.go | 92 +++++--- workflow/controller/workflowpod_test.go | 57 ++++- workflow/executor/emissary/binary.go | 28 +++ workflow/executor/emissary/emissary.go | 165 +++++++++++++ .../executor/emissary/multi_reader_closer.go | 30 +++ .../emissary/multi_reader_closer_test.go | 21 ++ workflow/executor/executor.go | 11 + workflow/util/path/search.go | 22 ++ workflow/validate/validate.go | 2 - 26 files changed, 873 insertions(+), 50 deletions(-) create mode 100644 cmd/argoexec/commands/emissary.go create mode 100644 cmd/argoexec/commands/emissary_test.go create mode 100644 config/image.go create mode 100644 workflow/executor/emissary/binary.go create mode 100644 workflow/executor/emissary/emissary.go create mode 100644 workflow/executor/emissary/multi_reader_closer.go create mode 100644 workflow/executor/emissary/multi_reader_closer_test.go create mode 100644 workflow/util/path/search.go diff --git a/Makefile b/Makefile index f977a79e5b2b..b2102496d7c8 100644 --- a/Makefile +++ b/Makefile @@ -377,7 +377,7 @@ endif # for local we have a faster target that prints to stdout, does not use json, and can cache because it has no coverage .PHONY: test -test: server/static/files.go +test: server/static/files.go dist/argosay env KUBECONFIG=/dev/null $(GOTEST) ./... .PHONY: install @@ -411,6 +411,9 @@ endif test/e2e/images/argosay/v2/argosay: test/e2e/images/argosay/v2/main/argosay.go cd test/e2e/images/argosay/v2 && GOOS=linux CGO_ENABLED=0 go build -ldflags '-w -s' -o argosay ./main +dist/argosay: test/e2e/images/argosay/v2/main/argosay.go + go build -ldflags '-w -s' -o dist/argosay ./test/e2e/images/argosay/v2/main + .PHONY: test-images test-images: $(call docker_pull,argoproj/argosay:v1) diff --git a/cmd/argo/commands/submit.go b/cmd/argo/commands/submit.go index a2f633ab662e..047e65512440 100644 --- a/cmd/argo/commands/submit.go +++ b/cmd/argo/commands/submit.go @@ -247,7 +247,7 @@ func waitWatchOrLog(ctx context.Context, serviceClient workflowpkg.WorkflowServi if cliSubmitOpts.log { for _, workflow := range workflowNames { logWorkflow(ctx, serviceClient, namespace, workflow, "", &corev1.PodLogOptions{ - Container: "main", + Container: common.MainContainerName, Follow: true, Previous: false, }) diff --git a/cmd/argoexec/commands/emissary.go b/cmd/argoexec/commands/emissary.go new file mode 100644 index 000000000000..c2251e57b6e8 --- /dev/null +++ b/cmd/argoexec/commands/emissary.go @@ -0,0 +1,218 @@ +package commands + +import ( + "compress/gzip" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "os/signal" + "path/filepath" + "strconv" + "syscall" + "time" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/util/archive" + "github.com/argoproj/argo-workflows/v3/workflow/common" + "github.com/argoproj/argo-workflows/v3/workflow/util/path" +) + +var ( + varRunArgo = "/var/run/argo" + containerName = os.Getenv(common.EnvVarContainerName) + includeScriptOutput = os.Getenv(common.EnvVarIncludeScriptOutput) == "true" // capture stdout/stderr + template = &wfv1.Template{} + logger = log.WithField("argo", true) +) + +func NewEmissaryCommand() *cobra.Command { + return &cobra.Command{ + Use: "emissary", + SilenceUsage: true, // this prevents confusing usage message being printed when we SIGTERM + RunE: func(cmd *cobra.Command, args []string) error { + exitCode := 64 + + defer func() { + err := ioutil.WriteFile(varRunArgo+"/ctr/"+containerName+"/exitcode", []byte(strconv.Itoa(exitCode)), 0600) + if err != nil { + logger.Error(fmt.Errorf("failed to write exit code: %w", err)) + } + }() + + // this also indicates we've started + if err := os.MkdirAll(varRunArgo+"/ctr/"+containerName, 0700); err != nil { + return fmt.Errorf("failed to create ctr directory: %w", err) + } + + name, args := args[0], args[1:] + + signals := make(chan os.Signal, 1) + defer close(signals) + signal.Notify(signals) + defer signal.Reset() + go func() { + for s := range signals { + if s != syscall.SIGCHLD { + _ = syscall.Kill(-os.Getpid(), s.(syscall.Signal)) + } + } + }() + + data, err := ioutil.ReadFile(varRunArgo + "/template") + if err != nil { + return fmt.Errorf("failed to read template: %w", err) + } + + if err := json.Unmarshal(data, template); err != nil { + return fmt.Errorf("failed to unmarshal template: %w", err) + } + + name, err = path.Search(name) + if err != nil { + return fmt.Errorf("failed to find name in PATH: %w", err) + } + + command := exec.Command(name, args...) + command.Env = os.Environ() + command.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + command.Stdout = os.Stdout + command.Stderr = os.Stderr + + // this may not be that important an optimisation, except for very long logs we don't want to capture + if includeScriptOutput { + logger.Info("capturing script output") + stdout, err := os.Create(varRunArgo + "/ctr/" + containerName + "/stdout") + if err != nil { + return fmt.Errorf("failed to open stdout: %w", err) + } + defer func() { _ = stdout.Close() }() + command.Stdout = io.MultiWriter(os.Stdout, stdout) + + stderr, err := os.Create(varRunArgo + "/ctr/" + containerName + "/stderr") + if err != nil { + return fmt.Errorf("failed to open stderr: %w", err) + } + defer func() { _ = stderr.Close() }() + command.Stderr = io.MultiWriter(os.Stderr, stderr) + } + + if err := command.Start(); err != nil { + return err + } + + go func() { + for { + data, _ := ioutil.ReadFile(varRunArgo + "/ctr/" + containerName + "/signal") + _ = os.Remove(varRunArgo + "/ctr/" + containerName + "/signal") + s, _ := strconv.Atoi(string(data)) + if s > 0 { + _ = syscall.Kill(command.Process.Pid, syscall.Signal(s)) + } + time.Sleep(2 * time.Second) + } + }() + + cmdErr := command.Wait() + + if cmdErr == nil { + exitCode = 0 + } else if exitError, ok := cmdErr.(*exec.ExitError); ok { + if exitError.ExitCode() >= 0 { + exitCode = exitError.ExitCode() + } else { + exitCode = 137 // SIGTERM + } + } + + if containerName == common.MainContainerName { + for _, x := range template.Outputs.Parameters { + if x.ValueFrom != nil && x.ValueFrom.Path != "" { + if err := saveParameter(x.ValueFrom.Path); err != nil { + return err + } + } + } + for _, x := range template.Outputs.Artifacts { + if x.Path != "" { + if err := saveArtifact(x.Path); err != nil { + return err + } + } + } + } else { + logger.Info("not saving outputs - not main container") + } + + return cmdErr // this is the error returned from cmd.Wait(), which maybe an exitError + }, + } +} + +func saveArtifact(srcPath string) error { + if common.FindOverlappingVolume(template, srcPath) != nil { + logger.Infof("no need to save artifact - on overlapping volume: %s", srcPath) + return nil + } + if _, err := os.Stat(srcPath); os.IsNotExist(err) { // might be optional, so we ignore + logger.WithError(err).Errorf("cannot save artifact %s", srcPath) + return nil + } + dstPath := varRunArgo + "/outputs/artifacts/" + srcPath + ".tgz" + logger.Infof("%s -> %s", srcPath, dstPath) + z := filepath.Dir(dstPath) + if err := os.MkdirAll(z, 0700); err != nil { // chmod rwx------ + return fmt.Errorf("failed to create directory %s: %w", z, err) + } + dst, err := os.Create(dstPath) + if err != nil { + return fmt.Errorf("failed to create destination %s: %w", dstPath, err) + } + defer func() { _ = dst.Close() }() + if err = archive.TarGzToWriter(srcPath, gzip.DefaultCompression, dst); err != nil { + return fmt.Errorf("failed to tarball the output %s to %s: %w", srcPath, dstPath, err) + } + if err = dst.Close(); err != nil { + return fmt.Errorf("failed to close %s: %w", dstPath, err) + } + return nil +} + +func saveParameter(srcPath string) error { + if common.FindOverlappingVolume(template, srcPath) != nil { + logger.Infof("no need to save parameter - on overlapping volume: %s", srcPath) + return nil + } + src, err := os.Open(srcPath) + if os.IsNotExist(err) { // might be optional, so we ignore + logger.WithError(err).Errorf("cannot save parameter %s", srcPath) + return nil + } + if err != nil { + return fmt.Errorf("failed to open %s: %w", srcPath, err) + } + defer func() { _ = src.Close() }() + dstPath := varRunArgo + "/outputs/parameters/" + srcPath + logger.Infof("%s -> %s", srcPath, dstPath) + z := filepath.Dir(dstPath) + if err := os.MkdirAll(z, 0700); err != nil { // chmod rwx------ + return fmt.Errorf("failed to create directory %s: %w", z, err) + } + dst, err := os.Create(dstPath) + if err != nil { + return fmt.Errorf("failed to create %s: %w", srcPath, err) + } + defer func() { _ = dst.Close() }() + if _, err = io.Copy(dst, src); err != nil { + return fmt.Errorf("failed to copy %s to %s: %w", srcPath, dstPath, err) + } + if err = dst.Close(); err != nil { + return fmt.Errorf("failed to close %s: %w", dstPath, err) + } + return nil +} diff --git a/cmd/argoexec/commands/emissary_test.go b/cmd/argoexec/commands/emissary_test.go new file mode 100644 index 000000000000..135b7f3171b3 --- /dev/null +++ b/cmd/argoexec/commands/emissary_test.go @@ -0,0 +1,119 @@ +package commands + +import ( + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "strconv" + "sync" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestEmissary(t *testing.T) { + tmp, err := ioutil.TempDir("", "") + assert.NoError(t, err) + + varRunArgo = tmp + includeScriptOutput = true + + wd, err := os.Getwd() + assert.NoError(t, err) + + x := filepath.Join(wd, "../../../dist/argosay") + + err = ioutil.WriteFile(varRunArgo+"/template", []byte(`{}`), 0600) + assert.NoError(t, err) + + t.Run("Exit0", func(t *testing.T) { + err := run(x, []string{"exit"}) + assert.NoError(t, err) + data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/exitcode") + assert.NoError(t, err) + assert.Equal(t, "0", string(data)) + }) + t.Run("Exit1", func(t *testing.T) { + err := run(x, []string{"exit", "1"}) + assert.Equal(t, 1, err.(*exec.ExitError).ExitCode()) + data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/exitcode") + assert.NoError(t, err) + assert.Equal(t, "1", string(data)) + }) + t.Run("Stdout", func(t *testing.T) { + err := run(x, []string{"echo", "hello", "/dev/stdout"}) + assert.NoError(t, err) + data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/stdout") + assert.NoError(t, err) + assert.Equal(t, "hello", string(data)) + }) + t.Run("Stderr", func(t *testing.T) { + err := run(x, []string{"echo", "hello", "/dev/stderr"}) + assert.NoError(t, err) + data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/stderr") + assert.NoError(t, err) + assert.Equal(t, "hello", string(data)) + }) + t.Run("Signal", func(t *testing.T) { + for signal, message := range map[syscall.Signal]string{ + syscall.SIGTERM: "terminated", + syscall.SIGKILL: "killed", + } { + err := ioutil.WriteFile(varRunArgo+"/ctr/main/signal", []byte(strconv.Itoa(int(signal))), 0600) + assert.NoError(t, err) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := run(x, []string{"sleep", "5s"}) + assert.EqualError(t, err, "signal: "+message) + }() + time.Sleep(time.Second) + } + }) + t.Run("Artifact", func(t *testing.T) { + err = ioutil.WriteFile(varRunArgo+"/template", []byte(` +{ + "outputs": { + "artifacts": [ + {"path": "/tmp/artifact"} + ] + } +} +`), 0600) + assert.NoError(t, err) + err := run(x, []string{"echo", "hello", "/tmp/artifact"}) + assert.NoError(t, err) + data, err := ioutil.ReadFile(varRunArgo + "/outputs/artifacts/tmp/artifact.tgz") + assert.NoError(t, err) + assert.NotEmpty(t, string(data)) // data is tgz format + }) + t.Run("Parameter", func(t *testing.T) { + err = ioutil.WriteFile(varRunArgo+"/template", []byte(` +{ + "outputs": { + "parameters": [ + { + "valueFrom": {"path": "/tmp/parameter"} + } + ] + } +} +`), 0600) + assert.NoError(t, err) + err := run(x, []string{"echo", "hello", "/tmp/parameter"}) + assert.NoError(t, err) + data, err := ioutil.ReadFile(varRunArgo + "/outputs/parameters/tmp/parameter") + assert.NoError(t, err) + assert.Equal(t, "hello", string(data)) + }) +} + +func run(name string, args []string) error { + cmd := NewEmissaryCommand() + containerName = "main" + return cmd.RunE(cmd, append([]string{name}, args...)) +} diff --git a/cmd/argoexec/commands/init.go b/cmd/argoexec/commands/init.go index 6e0c3dc50c3a..13d642ae2353 100644 --- a/cmd/argoexec/commands/init.go +++ b/cmd/argoexec/commands/init.go @@ -28,6 +28,10 @@ func loadArtifacts(ctx context.Context) error { defer wfExecutor.HandleError(ctx) defer stats.LogStats() + if err := wfExecutor.Init(); err != nil { + wfExecutor.AddError(err) + return err + } // Download input artifacts err := wfExecutor.StageFiles() if err != nil { diff --git a/cmd/argoexec/commands/root.go b/cmd/argoexec/commands/root.go index 698a8100a557..1214b2c01718 100644 --- a/cmd/argoexec/commands/root.go +++ b/cmd/argoexec/commands/root.go @@ -20,6 +20,7 @@ import ( "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/executor" "github.com/argoproj/argo-workflows/v3/workflow/executor/docker" + "github.com/argoproj/argo-workflows/v3/workflow/executor/emissary" "github.com/argoproj/argo-workflows/v3/workflow/executor/k8sapi" "github.com/argoproj/argo-workflows/v3/workflow/executor/kubelet" "github.com/argoproj/argo-workflows/v3/workflow/executor/pns" @@ -59,6 +60,7 @@ func NewRootCommand() *cobra.Command { }, } + command.AddCommand(NewEmissaryCommand()) command.AddCommand(NewInitCommand()) command.AddCommand(NewResourceCommand()) command.AddCommand(NewWaitCommand()) @@ -74,9 +76,9 @@ func NewRootCommand() *cobra.Command { func initExecutor() *executor.WorkflowExecutor { version := argo.GetVersion() - log.WithField("version", version).Info("Starting Workflow Executor") - config, err := clientConfig.ClientConfig() executorType := os.Getenv(common.EnvVarContainerRuntimeExecutor) + log.WithFields(log.Fields{"version": version, "executorType": executorType}).Info("Starting Workflow Executor") + config, err := clientConfig.ClientConfig() config = restclient.AddUserAgent(config, fmt.Sprintf("argo-workflows/%s executor/%s", version.Version, executorType)) checkErr(err) @@ -104,6 +106,8 @@ func initExecutor() *executor.WorkflowExecutor { cre, err = kubelet.NewKubeletExecutor(namespace, podName) case common.ContainerRuntimeExecutorPNS: cre, err = pns.NewPNSExecutor(clientset, podName, namespace) + case common.ContainerRuntimeExecutorEmissary: + cre, err = emissary.New() default: cre, err = docker.NewDockerExecutor(namespace, podName) } diff --git a/cmd/argoexec/main.go b/cmd/argoexec/main.go index 604d82b865be..2a71f58e85c0 100644 --- a/cmd/argoexec/main.go +++ b/cmd/argoexec/main.go @@ -1,18 +1,29 @@ package main import ( - "fmt" "os" + "os/exec" // load authentication plugin for obtaining credentials from cloud providers. _ "k8s.io/client-go/plugin/pkg/client/auth" "github.com/argoproj/argo-workflows/v3/cmd/argoexec/commands" + "github.com/argoproj/argo-workflows/v3/util" ) func main() { - if err := commands.NewRootCommand().Execute(); err != nil { - fmt.Println(err) - os.Exit(1) + err := commands.NewRootCommand().Execute() + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + if exitError.ExitCode() >= 0 { + os.Exit(exitError.ExitCode()) + } else { + os.Exit(137) // probably SIGTERM or SIGKILL + } + } else { + util.WriteTeriminateMessage(err.Error()) // we don't want to overwrite any other message + println(err.Error()) + os.Exit(64) + } } } diff --git a/config/config.go b/config/config.go index a1c08a62ed1e..48fb22c83b84 100644 --- a/config/config.go +++ b/config/config.go @@ -105,6 +105,10 @@ type Config struct { // Adding configurable initial delay (for K8S clusters with mutating webhooks) to prevent workflow getting modified by MWC. InitialDelay metav1.Duration `json:"initialDelay,omitempty"` + + // The command/args for each image, needed when the command is not specified and the emissary executor is used. + // https://argoproj.github.io/argo-workflows/workflow-executors/#emissary + Images map[string]Image `json:"images,omitempty"` } func (c Config) GetContainerRuntimeExecutor(labels labels.Labels) (string, error) { diff --git a/config/image.go b/config/image.go new file mode 100644 index 000000000000..c99d8d2021f1 --- /dev/null +++ b/config/image.go @@ -0,0 +1,6 @@ +package config + +type Image struct { + Command []string `json:"command"` + Args []string `json:"args,omitempty"` +} diff --git a/docs/workflow-controller-configmap.yaml b/docs/workflow-controller-configmap.yaml index 334d941806a6..06de54f3b1c4 100644 --- a/docs/workflow-controller-configmap.yaml +++ b/docs/workflow-controller-configmap.yaml @@ -135,6 +135,18 @@ data: # disable the TLS verification of the kubelet executor (default: false) kubeletInsecure: false + # The command/args for each image, needed when the command is not specified and the emissary executor is used. + # https://argoproj.github.io/argo-workflows/workflow-executors/#emissary + images: | + argoproj/argosay:v1: + command: [cowsay] + argoproj/argosay:v2: + command: [/argosay] + docker/whalesay:latest: + command: [cowsay] + python:alpine3.6: + command: [python3] + # executor controls how the init and wait container should be customized # (available since Argo v2.3) executor: | diff --git a/docs/workflow-executors.md b/docs/workflow-executors.md index 1ac503a2d53d..99aa42dd6b10 100644 --- a/docs/workflow-executors.md +++ b/docs/workflow-executors.md @@ -16,7 +16,7 @@ The executor to be used in your workflows can be changed in [the configmap](./wo * It requires `privileged` access to `docker.sock` of the host to be mounted which. Often rejected by Open Policy Agent (OPA) or your Pod Security Policy (PSP). * It can escape the privileges of the pod's service account * It cannot [`runAsNonRoot`](workflow-pod-security-context.md). -* Most scalable: +* Equal most scalable: * It communicates directly with the local Docker daemon. * Artifacts: * Output artifacts can be located on the base layer (e.g. `/tmp`). @@ -28,8 +28,8 @@ The executor to be used in your workflows can be changed in [the configmap](./wo ## Kubelet (kubelet) * Reliability: - * Least well-tested - * Least popular + * Second least well-tested + * Second least popular * Secure * No `privileged` access * Cannot escape the privileges of the pod's service account @@ -79,3 +79,45 @@ The executor to be used in your workflows can be changed in [the configmap](./wo * [Doesn't work for Windows containers](https://kubernetes.io/docs/setup/production-environment/windows/intro-windows-in-kubernetes/#v1-pod). [https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/](https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/) + +## Emissary (emissary) + +![alpha](assets/alpha.svg) + +> v3.1 and after + +This is the most fully featured executor. + +* Reliability: + * Not yet well-tested. + * Not yet popular. +* More secure: + * No `privileged` access + * Cannot escape the privileges of the pod's service account + * Can [`runAsNonRoot`](workflow-pod-security-context.md). +* Scalable: + * It reads and writes to and from the container's disk and typically does not use any network APIs unless resource + type template is used. +* Artifacts: + * Output artifacts can be located on the base layer (e.g. `/tmp`). +* Configuration: + * `command` must be specified for containers. + +You can determine the command and args as follows: + +```bash +docker image inspect -f '{{.Config.Entrypoint}} {{.Config.Cmd}}' argoproj/argosay:v2 +``` + +[Learn more about command and args](https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#notes +) + +### Image Index + +If the emissary cannot determine which command to run, because you did not specify it in your workflow spec, then it +will look it up in the **image index**. This is nothing more fancy than +a [configuration item](workflow-controller-configmap.yaml). + +### Exit Code 64 + +The emissary will exit with code 64 if it fails. This may indicate a bug in the emissary. \ No newline at end of file diff --git a/manifests/quick-start-minimal.yaml b/manifests/quick-start-minimal.yaml index abc8c48dbac1..0c281e0dd24a 100644 --- a/manifests/quick-start-minimal.yaml +++ b/manifests/quick-start-minimal.yaml @@ -621,6 +621,15 @@ data: requests: cpu: 10m memory: 64Mi + images: | + argoproj/argosay:v1: + command: [cowsay] + argoproj/argosay:v2: + command: [/argosay] + docker/whalesay:latest: + command: [cowsay] + python:alpine3.6: + command: [python3] links: | - name: Workflow Link scope: workflow diff --git a/manifests/quick-start-mysql.yaml b/manifests/quick-start-mysql.yaml index d3adc1d9ef75..e8d4b628217d 100644 --- a/manifests/quick-start-mysql.yaml +++ b/manifests/quick-start-mysql.yaml @@ -621,6 +621,15 @@ data: requests: cpu: 10m memory: 64Mi + images: | + argoproj/argosay:v1: + command: [cowsay] + argoproj/argosay:v2: + command: [/argosay] + docker/whalesay:latest: + command: [cowsay] + python:alpine3.6: + command: [python3] links: | - name: Workflow Link scope: workflow diff --git a/manifests/quick-start-postgres.yaml b/manifests/quick-start-postgres.yaml index fb8f557bd24c..0c3169baabb5 100644 --- a/manifests/quick-start-postgres.yaml +++ b/manifests/quick-start-postgres.yaml @@ -621,6 +621,15 @@ data: requests: cpu: 10m memory: 64Mi + images: | + argoproj/argosay:v1: + command: [cowsay] + argoproj/argosay:v2: + command: [/argosay] + docker/whalesay:latest: + command: [cowsay] + python:alpine3.6: + command: [python3] links: | - name: Workflow Link scope: workflow diff --git a/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml b/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml index 6293e7d06119..ca776ad17e64 100644 --- a/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml +++ b/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml @@ -11,6 +11,15 @@ data: selector: matchLabels: workflows.argoproj.io/container-runtime-executor: k8sapi + images: | + argoproj/argosay:v1: + command: [cowsay] + argoproj/argosay:v2: + command: [/argosay] + docker/whalesay:latest: + command: [cowsay] + python:alpine3.6: + command: [python3] artifactRepository: | archiveLogs: true s3: diff --git a/test/e2e/fixtures/needs.go b/test/e2e/fixtures/needs.go index 2b5e1585cbc1..57c7a7d6c3b6 100644 --- a/test/e2e/fixtures/needs.go +++ b/test/e2e/fixtures/needs.go @@ -23,10 +23,11 @@ var ( WorkflowArchive Need = func(s *E2ESuite) (bool, string) { return s.Persistence.IsEnabled(), "workflow archive enabled" } - Docker = Executor("docker") - K8SAPI = Executor("k8sapi") - Kubelet = Executor("kubelet") - PNS = Executor("pns") + Docker = Executor("docker") + Emissary = Executor("emissary") + K8SAPI = Executor("k8sapi") + Kubelet = Executor("kubelet") + PNS = Executor("pns") ) func Executor(e string) Need { diff --git a/workflow/common/common.go b/workflow/common/common.go index 3bedc07b28e1..b942b4e23c56 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -104,6 +104,8 @@ const ( EnvVarPodName = "ARGO_POD_NAME" // EnvVarContainerName container the container's name for the current pod EnvVarContainerName = "ARGO_CONTAINER_NAME" + // EnvVarIncludeScriptOutput capture the stdout and stderr + EnvVarIncludeScriptOutput = "ARGO_INCLUDE_SCRIPT_OUTPUT" // EnvVarContainerRuntimeExecutor contains the name of the container runtime executor to use, empty is equal to "docker" EnvVarContainerRuntimeExecutor = "ARGO_CONTAINER_RUNTIME_EXECUTOR" // EnvVarDownwardAPINodeIP is the envvar used to get the `status.hostIP` @@ -127,6 +129,9 @@ const ( // ContainerRuntimeExecutorPNS indicates to use process namespace sharing as the container runtime executor ContainerRuntimeExecutorPNS = "pns" + // ContainerRuntimeExecutorEmissary indicates to use emissary container runtime executor + ContainerRuntimeExecutorEmissary = "emissary" + // Variables that are added to the scope during template execution and can be referenced using {{}} syntax // GlobalVarWorkflowName is a global workflow variable referencing the workflow's metadata.name field diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index e458bc129d14..b76e8c458028 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -19,6 +19,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/utils/pointer" + "github.com/argoproj/argo-workflows/v3/config" "github.com/argoproj/argo-workflows/v3/errors" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" @@ -28,7 +29,6 @@ import ( "github.com/argoproj/argo-workflows/v3/workflow/util" ) -// Reusable k8s pod spec portions used in workflow pods var ( // volumePodMetadata makes available the pod metadata available as a file // to the executor's init and sidecar containers. Specifically, the template @@ -53,7 +53,16 @@ var ( Name: volumePodMetadata.Name, MountPath: common.PodMetadataMountPath, } - + volumeVarArgo = apiv1.Volume{ + Name: "var-run-argo", + VolumeSource: apiv1.VolumeSource{ + EmptyDir: &apiv1.EmptyDirVolumeSource{}, + }, + } + volumeMountVarArgo = apiv1.VolumeMount{ + Name: volumeVarArgo.Name, + MountPath: "/var/run/argo", + } hostPathSocket = apiv1.HostPathSocket ) @@ -254,7 +263,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin // Add init container only if it needs input artifacts. This is also true for // script templates (which needs to populate the script) - if len(tmpl.Inputs.Artifacts) > 0 || tmpl.GetType() == wfv1.TemplateTypeScript { + if len(tmpl.Inputs.Artifacts) > 0 || tmpl.GetType() == wfv1.TemplateTypeScript || woc.getContainerRuntimeExecutor() == common.ContainerRuntimeExecutorEmissary { initCtr := woc.newInitContainer(tmpl) pod.Spec.InitContainers = []apiv1.Container{initCtr} } @@ -282,8 +291,34 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin addSidecars(pod, tmpl) addOutputArtifactsVolumes(pod, tmpl) + if woc.getContainerRuntimeExecutor() == common.ContainerRuntimeExecutorEmissary && tmpl.GetType() != wfv1.TemplateTypeResource { + for i, c := range pod.Spec.InitContainers { + c.VolumeMounts = append(c.VolumeMounts, volumeMountVarArgo) + pod.Spec.InitContainers[i] = c + } + for i, c := range pod.Spec.Containers { + if c.Name != common.WaitContainerName { + // https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#notes + if len(c.Command) == 0 { + x := woc.getImage(c.Image) + c.Command = x.Command + c.Args = x.Args + } + if len(c.Command) == 0 { + return nil, fmt.Errorf("when using the emissary executor you must either explicitly specify the command, or list the image's command in the index: https://argoproj.github.io/argo-workflows/workflow-executors/#emissary") + } + c.Command = append([]string{"/var/run/argo/argoexec", "emissary", "--"}, c.Command...) + } + c.VolumeMounts = append(c.VolumeMounts, volumeMountVarArgo) + pod.Spec.Containers[i] = c + } + } + for i, c := range pod.Spec.Containers { - c.Env = append(c.Env, apiv1.EnvVar{Name: common.EnvVarContainerName, Value: c.Name}) // used to identify the container name of the process + c.Env = append(c.Env, + apiv1.EnvVar{Name: common.EnvVarContainerName, Value: c.Name}, + apiv1.EnvVar{Name: common.EnvVarIncludeScriptOutput, Value: strconv.FormatBool(c.Name == common.MainContainerName && opts.includeScriptOutput)}, + ) pod.Spec.Containers[i] = c } @@ -394,6 +429,13 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin return created, nil } +func (woc *wfOperationCtx) getImage(image string) config.Image { + if woc.controller.Config.Images == nil { + return config.Image{} + } + return woc.controller.Config.Images[image] +} + // substitutePodParams returns a pod spec with parameter references substituted as well as pod.name func substitutePodParams(pod *apiv1.Pod, globalParams common.Parameters, tmpl *wfv1.Template) (*apiv1.Pod, error) { podParams := globalParams.DeepCopy() @@ -479,27 +521,26 @@ func containerIsPrivileged(ctr *apiv1.Container) bool { } func (woc *wfOperationCtx) createEnvVars() []apiv1.EnvVar { - var execEnvVars []apiv1.EnvVar - execEnvVars = append(execEnvVars, apiv1.EnvVar{ - Name: common.EnvVarPodName, - ValueFrom: &apiv1.EnvVarSource{ - FieldRef: &apiv1.ObjectFieldSelector{ - APIVersion: "v1", - FieldPath: "metadata.name", + execEnvVars := []apiv1.EnvVar{ + { + Name: common.EnvVarPodName, + ValueFrom: &apiv1.EnvVarSource{ + FieldRef: &apiv1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.name", + }, }, }, - }) + { + Name: common.EnvVarContainerRuntimeExecutor, + Value: woc.getContainerRuntimeExecutor(), + }, + } if woc.controller.Config.Executor != nil { execEnvVars = append(execEnvVars, woc.controller.Config.Executor.Env...) } switch woc.getContainerRuntimeExecutor() { case common.ContainerRuntimeExecutorK8sAPI: - execEnvVars = append(execEnvVars, - apiv1.EnvVar{ - Name: common.EnvVarContainerRuntimeExecutor, - Value: woc.getContainerRuntimeExecutor(), - }, - ) case common.ContainerRuntimeExecutorKubelet: execEnvVars = append(execEnvVars, apiv1.EnvVar{ @@ -523,13 +564,6 @@ func (woc *wfOperationCtx) createEnvVars() []apiv1.EnvVar { Value: strconv.FormatBool(woc.controller.Config.KubeletInsecure), }, ) - case common.ContainerRuntimeExecutorPNS: - execEnvVars = append(execEnvVars, - apiv1.EnvVar{ - Name: common.EnvVarContainerRuntimeExecutor, - Value: woc.getContainerRuntimeExecutor(), - }, - ) } return execEnvVars } @@ -554,10 +588,13 @@ func (woc *wfOperationCtx) createVolumes(tmpl *wfv1.Template) []apiv1.Volume { } switch woc.getContainerRuntimeExecutor() { case common.ContainerRuntimeExecutorKubelet, common.ContainerRuntimeExecutorK8sAPI, common.ContainerRuntimeExecutorPNS: - return volumes + case common.ContainerRuntimeExecutorEmissary: + volumes = append(volumes, volumeVarArgo) default: - return append(volumes, woc.getVolumeDockerSock(tmpl)) + volumes = append(volumes, woc.getVolumeDockerSock(tmpl)) } + volumes = append(volumes, tmpl.Volumes...) + return volumes } func (woc *wfOperationCtx) newExecContainer(name string, tmpl *wfv1.Template) *apiv1.Container { @@ -598,6 +635,7 @@ func (woc *wfOperationCtx) newExecContainer(name string, tmpl *wfv1.Template) *a }) exec.Args = append(exec.Args, "--kubeconfig="+path) } + executorServiceAccountName := "" if tmpl.Executor != nil && tmpl.Executor.ServiceAccountName != "" { executorServiceAccountName = tmpl.Executor.ServiceAccountName diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go index 6b4627dddbd3..521e2bce3b3d 100644 --- a/workflow/controller/workflowpod_test.go +++ b/workflow/controller/workflowpod_test.go @@ -580,7 +580,7 @@ func TestVolumeAndVolumeMounts(t *testing.T) { } // For Docker executor - { + t.Run("Docker", func(t *testing.T) { ctx := context.Background() woc := newWoc() woc.volumes = volumes @@ -601,10 +601,10 @@ func TestVolumeAndVolumeMounts(t *testing.T) { assert.Equal(t, "volume-name", pod.Spec.Volumes[2].Name) assert.Equal(t, 1, len(pod.Spec.Containers[1].VolumeMounts)) assert.Equal(t, "volume-name", pod.Spec.Containers[1].VolumeMounts[0].Name) - } + }) // For Kubelet executor - { + t.Run("Kubelet", func(t *testing.T) { ctx := context.Background() woc := newWoc() woc.volumes = volumes @@ -624,10 +624,10 @@ func TestVolumeAndVolumeMounts(t *testing.T) { assert.Equal(t, "volume-name", pod.Spec.Volumes[1].Name) assert.Equal(t, 1, len(pod.Spec.Containers[1].VolumeMounts)) assert.Equal(t, "volume-name", pod.Spec.Containers[1].VolumeMounts[0].Name) - } + }) // For K8sAPI executor - { + t.Run("K8SAPI", func(t *testing.T) { ctx := context.Background() woc := newWoc() woc.volumes = volumes @@ -647,7 +647,52 @@ func TestVolumeAndVolumeMounts(t *testing.T) { assert.Equal(t, "volume-name", pod.Spec.Volumes[1].Name) assert.Equal(t, 1, len(pod.Spec.Containers[1].VolumeMounts)) assert.Equal(t, "volume-name", pod.Spec.Containers[1].VolumeMounts[0].Name) - } + }) + + // For emissary executor + t.Run("Emissary", func(t *testing.T) { + ctx := context.Background() + woc := newWoc() + woc.volumes = volumes + woc.execWf.Spec.Templates[0].Container.VolumeMounts = volumeMounts + woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorEmissary + + tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "") + assert.NoError(t, err) + _, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}) + assert.NoError(t, err) + pods, err := listPods(woc) + assert.NoError(t, err) + assert.Len(t, pods.Items, 1) + pod := pods.Items[0] + if assert.Len(t, pod.Spec.Volumes, 3) { + assert.Equal(t, "podmetadata", pod.Spec.Volumes[0].Name) + assert.Equal(t, "var-run-argo", pod.Spec.Volumes[1].Name) + assert.Equal(t, "volume-name", pod.Spec.Volumes[2].Name) + } + if assert.Len(t, pod.Spec.InitContainers, 1) { + init := pod.Spec.InitContainers[0] + if assert.Len(t, init.VolumeMounts, 2) { + assert.Equal(t, "podmetadata", init.VolumeMounts[0].Name) + assert.Equal(t, "var-run-argo", init.VolumeMounts[1].Name) + } + } + containers := pod.Spec.Containers + if assert.Len(t, containers, 2) { + wait := containers[0] + if assert.Len(t, wait.VolumeMounts, 3) { + assert.Equal(t, "podmetadata", wait.VolumeMounts[0].Name) + assert.Equal(t, "volume-name", wait.VolumeMounts[1].Name) + assert.Equal(t, "var-run-argo", wait.VolumeMounts[2].Name) + } + main := containers[1] + assert.Equal(t, []string{"/var/run/argo/argoexec", "emissary", "--", "cowsay"}, main.Command) + if assert.Len(t, main.VolumeMounts, 2) { + assert.Equal(t, "volume-name", main.VolumeMounts[0].Name) + assert.Equal(t, "var-run-argo", main.VolumeMounts[1].Name) + } + } + }) } func TestVolumesPodSubstitution(t *testing.T) { diff --git a/workflow/executor/emissary/binary.go b/workflow/executor/emissary/binary.go new file mode 100644 index 000000000000..2bc57c20c712 --- /dev/null +++ b/workflow/executor/emissary/binary.go @@ -0,0 +1,28 @@ +package emissary + +import ( + "io" + "os" + + "github.com/argoproj/argo-workflows/v3/workflow/util/path" +) + +func copyBinary() error { + name, err := path.Search("argoexec") + if err != nil { + return err + } + in, err := os.Open(name) + if err != nil { + return err + } + defer func() { _ = in.Close() }() + out, err := os.OpenFile("/var/run/argo/argoexec", os.O_RDWR|os.O_CREATE, 0500) // r-x------ + if err != nil { + return err + } + if _, err = io.Copy(out, in); err != nil { + return err + } + return out.Close() +} diff --git a/workflow/executor/emissary/emissary.go b/workflow/executor/emissary/emissary.go new file mode 100644 index 000000000000..a8955e304cf5 --- /dev/null +++ b/workflow/executor/emissary/emissary.go @@ -0,0 +1,165 @@ +package emissary + +import ( + "context" + "encoding/json" + "io" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "syscall" + "time" + + log "github.com/sirupsen/logrus" + + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/workflow/executor" +) + +/* +This executor works very differently to the others. It mounts an empty-dir on all containers at `/var/run/argo`. The main container command is replaces by a new binary `argoexec` which starts the original command in a sub-process and when it is finished, captures the outputs: + +The init container creates these files: + +* `/var/run/argo/argoexec` The binary, copied from the `argoexec` image. +* `/var/run/argo/template` A JSON encoding of the template. + +In the main container, the emissary creates these files: + +* `/var/run/argo/ctr/${containerName}/exitcode` The container exit code. +* `/var/run/argo/ctr/${containerName}/stderr` A copy of stderr (if needed). +* `/var/run/argo/ctr/${containerName}/stdout` A copy of stdout (if needed). + +If the container is named `main` it also copies base-layer artifacts to the shared volume: + +* `/var/run/argo/outputs/parameters/${path}` All output parameters are copied here, e.g. `/tmp/message` is moved to `/var/run/argo/outputs/parameters/tmp/message`. +* `/var/run/argo/outputs/artifacts/${path}.tgz` All output artifacts are copied here, e.g. `/tmp/message` is moved to /var/run/argo/outputs/artifacts/tmp/message.tgz`. + +The wait container can create one file itself, used for terminating the sub-process: + +* `/var/run/argo/ctr/${containerName}/signal` The emissary binary listens to changes in this file, and signals the sub-process with the value found in this file. + +*/ +type emissary struct{} + +func New() (executor.ContainerRuntimeExecutor, error) { + return &emissary{}, nil +} + +func (e *emissary) Init(t wfv1.Template) error { + if err := copyBinary(); err != nil { + return err + } + if err := e.writeTemplate(t); err != nil { + return err + } + return nil +} + +func (e emissary) writeTemplate(t wfv1.Template) error { + data, err := json.Marshal(t) + if err != nil { + return err + } + return ioutil.WriteFile("/var/run/argo/template", data, 0400) // chmod -r-------- +} + +func (e emissary) GetFileContents(_ string, sourcePath string) (string, error) { + data, err := ioutil.ReadFile(filepath.Join("/var/run/argo/outputs/parameters", sourcePath)) + return string(data), err +} + +func (e emissary) CopyFile(_ string, sourcePath string, destPath string, _ int) error { + // this implementation is very different, because we expect the emissary binary has already compressed the file + // so no compression can or needs to be implemented here + // TODO - warn the user we ignored compression? + sourceFile := filepath.Join("/var/run/argo/outputs/artifacts", sourcePath+".tgz") + log.Infof("%s -> %s", sourceFile, destPath) + src, err := os.Open(sourceFile) + if err != nil { + return err + } + defer func() { _ = src.Close() }() + dst, err := os.Create(destPath) + if err != nil { + return err + } + defer func() { _ = dst.Close() }() + _, err = io.Copy(dst, src) + if err := dst.Close(); err != nil { + return err + } + return err +} + +func (e emissary) GetOutputStream(_ context.Context, containerName string, combinedOutput bool) (io.ReadCloser, error) { + names := []string{"stdout"} + if combinedOutput { + names = append(names, "stderr") + } + var files []io.ReadCloser + for _, name := range names { + f, err := os.Open("/var/run/argo/ctr/" + containerName + "/" + name) + if os.IsNotExist(err) { + continue + } else if err != nil { + return nil, err + } + files = append(files, f) + } + return newMultiReaderCloser(files...), nil +} + +func (e emissary) GetExitCode(_ context.Context, containerName string) (string, error) { + data, err := ioutil.ReadFile("/var/run/argo/ctr/" + containerName + "/exitcode") + if err != nil { + return "", err + } + exitCode, err := strconv.Atoi(string(data)) + return strconv.Itoa(exitCode), err +} + +func (e emissary) Wait(ctx context.Context, containerNames, sidecarNames []string) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if e.isComplete(containerNames) { + return nil + } + time.Sleep(3 * time.Second) + } + } +} + +func (e emissary) isComplete(containerNames []string) bool { + for _, containerName := range containerNames { + _, err := os.Stat("/var/run/argo/ctr/" + containerName + "/exitcode") + if os.IsNotExist(err) { + return false + } + } + return true +} + +func (e emissary) Kill(ctx context.Context, containerNames []string, terminationGracePeriodDuration time.Duration) error { + for _, containerName := range containerNames { + if err := ioutil.WriteFile("/var/run/argo/ctr/"+containerName+"/signal", []byte(strconv.Itoa(int(syscall.SIGTERM))), 0600); err != nil { + return err + } + } + ctx, cancel := context.WithTimeout(ctx, terminationGracePeriodDuration) + defer cancel() + err := e.Wait(ctx, containerNames, nil) + if err != context.Canceled { + return err + } + for _, containerName := range containerNames { + if err := ioutil.WriteFile("/var/run/argo/ctr/"+containerName+"/signal", []byte(strconv.Itoa(int(syscall.SIGKILL))), 0600); err != nil { + return err + } + } + return e.Wait(ctx, containerNames, nil) +} diff --git a/workflow/executor/emissary/multi_reader_closer.go b/workflow/executor/emissary/multi_reader_closer.go new file mode 100644 index 000000000000..94bb48ff4e3c --- /dev/null +++ b/workflow/executor/emissary/multi_reader_closer.go @@ -0,0 +1,30 @@ +package emissary + +import ( + "io" +) + +type multiReaderCloser struct { + io.Reader + closer []io.ReadCloser +} + +func newMultiReaderCloser(x ...io.ReadCloser) io.ReadCloser { + var readers []io.Reader + for _, r := range x { + readers = append(readers, r) + } + return &multiReaderCloser{ + Reader: io.MultiReader(readers...), + closer: x, + } +} + +func (m *multiReaderCloser) Close() error { + for _, c := range m.closer { + if err := c.Close(); err != nil { + return err + } + } + return nil +} diff --git a/workflow/executor/emissary/multi_reader_closer_test.go b/workflow/executor/emissary/multi_reader_closer_test.go new file mode 100644 index 000000000000..cb0f643814ea --- /dev/null +++ b/workflow/executor/emissary/multi_reader_closer_test.go @@ -0,0 +1,21 @@ +package emissary + +import ( + "bufio" + "io/ioutil" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_multiReaderCloser(t *testing.T) { + a := ioutil.NopCloser(strings.NewReader("a")) + b := ioutil.NopCloser(strings.NewReader("b")) + c := newMultiReaderCloser(a, b) + s := bufio.NewScanner(c) + assert.True(t, s.Scan()) + assert.Equal(t, "ab", s.Text()) + assert.False(t, s.Scan()) + assert.NoError(t, c.Close()) +} diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index 9d45ec5d7a9d..50288a6ff9b2 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -76,6 +76,10 @@ type WorkflowExecutor struct { errors []error } +type Initializer interface { + Init(tmpl wfv1.Template) error +} + //go:generate mockery -name ContainerRuntimeExecutor // ContainerRuntimeExecutor is the interface for interacting with a container runtime (e.g. docker) @@ -1083,6 +1087,13 @@ func (we *WorkflowExecutor) KillSidecars(ctx context.Context) error { return we.RuntimeExecutor.Kill(ctx, sidecarNames, terminationGracePeriodDuration) } +func (we *WorkflowExecutor) Init() error { + if i, ok := we.RuntimeExecutor.(Initializer); ok { + return i.Init(we.Template) + } + return nil +} + // LoadExecutionControl reads the execution control definition from the the Kubernetes downward api annotations volume file func (we *WorkflowExecutor) LoadExecutionControl() error { err := unmarshalAnnotationField(we.PodAnnotationsPath, common.AnnotationKeyExecutionControl, &we.ExecutionControl) diff --git a/workflow/util/path/search.go b/workflow/util/path/search.go new file mode 100644 index 000000000000..005f8c668f04 --- /dev/null +++ b/workflow/util/path/search.go @@ -0,0 +1,22 @@ +package path + +import ( + "fmt" + "os" + "path/filepath" + "strings" +) + +func Search(name string) (string, error) { + if filepath.IsAbs(name) { + return name, nil + } + envPath := os.Getenv("PATH") + for _, dir := range strings.Split(envPath, ":") { + absName := filepath.Join(dir, name) + if _, err := os.Stat(absName); err == nil { + return absName, nil + } + } + return "", fmt.Errorf("failed to find %s in %s", name, envPath) +} diff --git a/workflow/validate/validate.go b/workflow/validate/validate.go index ab500559dd33..2cd3e217a938 100644 --- a/workflow/validate/validate.go +++ b/workflow/validate/validate.go @@ -928,8 +928,6 @@ func (ctx *templateValidationCtx) validateBaseImageOutputs(tmpl *wfv1.Template) return nil } switch ctx.ContainerRuntimeExecutor { - case "", common.ContainerRuntimeExecutorDocker: - // docker executor supports all modes of artifact outputs case common.ContainerRuntimeExecutorPNS: // pns supports copying from the base image, but only if there is no volume mount underneath it errMsg := "pns executor does not support outputs from base image layer with volume mounts. Use an emptyDir: https://argoproj.github.io/argo-workflows/empty-dir/"