Skip to content

Commit

Permalink
feat: Pass include script output as an environment variable (#5994)
Browse files Browse the repository at this point in the history
  • Loading branch information
chazapis authored May 25, 2021
1 parent d7517cf commit 79f5fa5
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 36 deletions.
12 changes: 10 additions & 2 deletions cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func initExecutor() *executor.WorkflowExecutor {
tmpl, err := executor.LoadTemplate(podAnnotationsPath)
checkErr(err)

includeScriptOutput := os.Getenv(common.EnvVarIncludeScriptOutput) == "true"

var cre executor.ContainerRuntimeExecutor
switch executorType {
case common.ContainerRuntimeExecutorK8sAPI:
Expand All @@ -115,9 +117,15 @@ func initExecutor() *executor.WorkflowExecutor {
}
checkErr(err)

wfExecutor := executor.NewExecutor(clientset, restClient, podName, namespace, podAnnotationsPath, cre, *tmpl)
wfExecutor := executor.NewExecutor(clientset, restClient, podName, namespace, podAnnotationsPath, cre, *tmpl, includeScriptOutput)
yamlBytes, _ := json.Marshal(&wfExecutor.Template)
log.Infof("Executor (version: %s, build_date: %s) initialized (pod: %s/%s) with template:\n%s", version.Version, version.BuildDate, namespace, podName, string(yamlBytes))
log.
WithField("version", version.String()).
WithField("namespace", namespace).
WithField("podName", podName).
WithField("template", string(yamlBytes)).
WithField("includeScriptOutput", includeScriptOutput).
Info("Executor initialized")
return &wfExecutor
}

Expand Down
2 changes: 0 additions & 2 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ type ExecutionControl struct {
// It is used to signal the executor to terminate a daemoned container. In the future it will be
// used to support workflow or steps/dag level timeouts.
Deadline *time.Time `json:"deadline,omitempty"`
// IncludeScriptOutput is containing flag to include script output
IncludeScriptOutput bool `json:"includeScriptOutput,omitempty"`
}

func UnstructuredHasCompletedLabel(obj interface{}) bool {
Expand Down
10 changes: 6 additions & 4 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5665,10 +5665,12 @@ func TestWFWithRetryAndWithParam(t *testing.T) {
pods, err := listPods(woc)
assert.NoError(t, err)
assert.True(t, len(pods.Items) > 0)
for _, pod := range pods.Items {
podbyte, err := json.Marshal(pod)
assert.NoError(t, err)
assert.Contains(t, string(podbyte), "includeScriptOutput")
if assert.Len(t, pods.Items, 3) {
ctrs := pods.Items[0].Spec.Containers
assert.Len(t, ctrs, 2)
envs := ctrs[1].Env
assert.Len(t, envs, 2)
assert.Equal(t, apiv1.EnvVar{Name: "ARGO_INCLUDE_SCRIPT_OUTPUT", Value: "true"}, envs[1])
}
})
}
Expand Down
22 changes: 14 additions & 8 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,19 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
}
}

// Add standard environment variables, making pod spec larger
envVars := []apiv1.EnvVar{
{Name: common.EnvVarIncludeScriptOutput, Value: strconv.FormatBool(opts.includeScriptOutput)},
}

for i, c := range pod.Spec.InitContainers {
c.Env = append(c.Env, apiv1.EnvVar{Name: common.EnvVarContainerName, Value: c.Name})
c.Env = append(c.Env, envVars...)
pod.Spec.InitContainers[i] = c
}
for i, c := range pod.Spec.Containers {
c.Env = append(c.Env,
apiv1.EnvVar{Name: common.EnvVarContainerName, Value: c.Name},
apiv1.EnvVar{Name: common.EnvVarIncludeScriptOutput, Value: strconv.FormatBool(c.Name == common.MainContainerName && opts.includeScriptOutput)},
)
c.Env = append(c.Env, apiv1.EnvVar{Name: common.EnvVarContainerName, Value: c.Name})
c.Env = append(c.Env, envVars...)
pod.Spec.Containers[i] = c
}

Expand Down Expand Up @@ -679,9 +687,7 @@ func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template, opts
pod.ObjectMeta.Labels[k] = v
}

execCtl := common.ExecutionControl{
IncludeScriptOutput: opts.includeScriptOutput,
}
execCtl := common.ExecutionControl{}

if woc.workflowDeadline != nil {
execCtl.Deadline = woc.workflowDeadline
Expand All @@ -693,7 +699,7 @@ func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template, opts
execCtl.Deadline = &opts.executionDeadline
}

if execCtl.Deadline != nil || opts.includeScriptOutput {
if execCtl.Deadline != nil {
execCtlBytes, err := json.Marshal(execCtl)
if err != nil {
panic(err)
Expand Down
42 changes: 22 additions & 20 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ const (

// WorkflowExecutor is program which runs as the init/wait container
type WorkflowExecutor struct {
PodName string
Template wfv1.Template
ClientSet kubernetes.Interface
RESTClient rest.Interface
Namespace string
PodAnnotationsPath string
ExecutionControl *common.ExecutionControl
RuntimeExecutor ContainerRuntimeExecutor
PodName string
Template wfv1.Template
IncludeScriptOutput bool
ClientSet kubernetes.Interface
RESTClient rest.Interface
Namespace string
PodAnnotationsPath string
ExecutionControl *common.ExecutionControl
RuntimeExecutor ContainerRuntimeExecutor

// memoized configmaps
memoizedConfigMaps map[string]string
Expand Down Expand Up @@ -109,18 +110,19 @@ type ContainerRuntimeExecutor interface {
}

// NewExecutor instantiates a new workflow executor
func NewExecutor(clientset kubernetes.Interface, restClient rest.Interface, podName, namespace, podAnnotationsPath string, cre ContainerRuntimeExecutor, template wfv1.Template) WorkflowExecutor {
func NewExecutor(clientset kubernetes.Interface, restClient rest.Interface, podName, namespace, podAnnotationsPath string, cre ContainerRuntimeExecutor, template wfv1.Template, includeScriptOutput bool) WorkflowExecutor {
return WorkflowExecutor{
PodName: podName,
ClientSet: clientset,
RESTClient: restClient,
Namespace: namespace,
PodAnnotationsPath: podAnnotationsPath,
RuntimeExecutor: cre,
Template: template,
memoizedConfigMaps: map[string]string{},
memoizedSecrets: map[string][]byte{},
errors: []error{},
PodName: podName,
ClientSet: clientset,
RESTClient: restClient,
Namespace: namespace,
PodAnnotationsPath: podAnnotationsPath,
RuntimeExecutor: cre,
Template: template,
IncludeScriptOutput: includeScriptOutput,
memoizedConfigMaps: map[string]string{},
memoizedSecrets: map[string][]byte{},
errors: []error{},
}
}

Expand Down Expand Up @@ -674,7 +676,7 @@ func (we *WorkflowExecutor) GetTerminationGracePeriodDuration(ctx context.Contex

// CaptureScriptResult will add the stdout of a script template as output result
func (we *WorkflowExecutor) CaptureScriptResult(ctx context.Context) error {
if we.ExecutionControl == nil || !we.ExecutionControl.IncludeScriptOutput {
if !we.IncludeScriptOutput {
log.Infof("No Script output reference in workflow. Capturing script output ignored")
return nil
}
Expand Down

0 comments on commit 79f5fa5

Please sign in to comment.