Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aggregate or separately process the output artifacts of parallel pods #6805

Open
clumsy456 opened this issue Sep 27, 2021 · 17 comments
Open
Labels
area/artifacts S3/GCP/OSS/Git/HDFS etc area/controller Controller issues, panics area/looping `withParams`, `withItems`, and `withSequence` type/feature Feature request

Comments

@clumsy456
Copy link

clumsy456 commented Sep 27, 2021

Summary

If a dag/task includes withItems/withParam parameter, we are not able to resolve its output artifacts, as is shown in #1625. I suggest a syntax to aggregate or separately process the output artifacts of parallel pods.

Use Cases

There are two situations to use the output artifacts of parallel pods.

separately process

        |---> Pod1_1 ---|---> Pod2_1
begin --|---> Pod1_2 ---|---> Pod2_2
        |---> Pod1_3 ---|---> Pod2_3
  • Task1 includes withItems/withParam parameters, and 3 pods executed actually.
  • Task2 will also create 3 pods, and each pod will process the previous output artifacts of task1 one-on-one.

I suggest task2 should include withParam parameter which refers to the output of task1, so the number of pods of task2 will be equal to that of task1.

spec:
  entrypoint: dag
  templates:
  - name: dag
    dag:
      tasks:
      # generate output artifacts of parallel pods
      - name: gen
        template: gen-artifact
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems: ["1", "2", "3"]
      # process only one path
      - name: deal-path1
        template: deal-one-path
        arguments:
          artifacts:
          - name: path
            from: "{{item}}"
        withParam: "{{tasks.gen.outputs.artifacts.path1}}"
      # process multi path
      - name: deal-paths
        template: deal-two-path
        arguments:
          artifacts:
          - name: path1
            from: "{{item.path1}}"
          - name: path2
            from: "{{item.path2}}"
        withParam: "{{tasks.gen.outputs.artifacts}}"
      # process param and path
      - name: deal-param-path
        template: deal-param-path
        arguments:
          parameters:
          - name: param
            value: "{{item.parameters.param}}"
          artifacts:
          - name: path
            value: "{{item.artifacts.path1}}"
        withParam: "{{tasks.gen.outputs}}"

  - name: gen-artifact
    inputs:
      parameters:
      - name: message
        ...
    outputs:
      parameters:
      - name: param
        ...
      artifacts:
      - name: path1
        ...
      - name: path2
        ...
  - name: deal-one-path
    inputs:
      artifacts:
      - name: path
        ...
  - name: deal-two-path
    inputs:
      artifacts:
      - name: path1
        ...
      - name: path2
        ...
  - name: deal-param-path
    inputs:
      parameters:
      - name: param
        ...
      artifacts:
      - name: path
        ...
  • if withParam is {{tasks.A.outputs.artifacts.path}}, we can use {{item}} to refer the output artifact;
  • if withParam is {{tasks.A.outputs.artifacts}}, we can use {{item.path1}} and {{item.path2}} to refer to multi output artifacts of one pod;
  • if withParam is {{tasks.A.outputs}}, we can use {{item.parameters.param}} and {{item.artifacts.path}} to refer to output parameter and artifact of one pod.

aggregate

        |---> Pod1_1 ---|
begin --|---> Pod1_2 ---|---> Pod2
        |---> Pod1_3 ---|
  • Task1 includes withItems/withParam parameters, and 3 pods executed actually.
  • Task2 will aggregate all the output artifacts of the 3 pods in one pod.

I suggest add fromMulti and pathMulti field to tasks.arguments.artifacts and templates.inputs.artifacts seperately.

spec:
  entrypoint: dag
  templates:
  - name: dag
    dag:
      tasks:
      # generate output artifacts of parallel pods
      - name: gen
        template: gen-artifact
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems: ["1", "2", "3"]
      # aggregate
      - name: aggregate
        template: aggregate
        arguments:
          artifacts:
          - name: paths
            fromMulti: "{{tasks.gen.outputs.artifacts.path}}"

  - name: gen-artifact
    inputs:
      parameters:
      - name: message
        ...
    outputs:
      artifacts:
      - name: path
        ...
  - name: aggregate
    inputs:
      artifacts:
      - name: paths
        pathMulti: /tmp/{{index}}_path
        ...

pathMulti is just a template with {{index}}. The actual path in the aggregation pod, will be /tmp/0_path, /tmp/1_path and /tmp/2_path.


Message from the maintainers:

Impacted by this bug? Give it a 👍. We prioritise the issues with the most 👍.

@alexec
Copy link
Contributor

alexec commented Nov 4, 2021

I think this one of the intended uses of key-only artifacts.

Could you achieve this by writing each output to a unique key within bucket storage, and have the aggregating/grouping pod can then just list the key within the bucket to find the needed files:

spec:
  entrypoint: dag
  templates:
  - name: dag
    dag:
      tasks:
      # generate output artifacts of parallel pods
      - name: gen
        template: gen-artifact
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems: ["1", "2", "3"]
        outputs:
          artifacts:
           - name: output
              path: /output
              s3:
                key: "{{workflow.name}}/{{item}}"
      # aggregate
      - name: aggregate
        template: aggregate

  - name: gen-artifact
    inputs:
        ...
    outputs:
      artifacts:
      - name: output
         path: /output
        ...
  - name: aggregate
    inputs:
      artifacts:
      - name: input
         path: /inputs
           s3:
              key: "{{workflow.name}}"

The use of key: "{{workflow.name}}/{{item}}" for generation and then key: "{{workflow.name}}" for aggregation is the key.

@jixinchi
Copy link

jixinchi commented Nov 4, 2021

Thanks for your reply!

  • As far as I know, there is no outputs field in the task struct. I'm not sure if you meant to put s3.key into templates[gen-artifact].outputs.artifacts
  • In our usual use, we tend to use templateRef in the dag type template, and put the container type template into an individual WorkflowTemplate, so that different workflows can use the same container and its inputs and outputs. If we use s3.key and {{workflow.name}} in an container type template, we cannot achieve the decoupling described above.

@jixinchi
Copy link

jixinchi commented Nov 4, 2021

Sorry for replying to you with my company account. "jixinchi" is my company account and "clumsy456" is my personal account. I should have used my company account to raise this issue.

@alexec
Copy link
Contributor

alexec commented Nov 10, 2021

You are correct, it is in the template:

spec:
  entrypoint: dag
  templates:
  - name: dag
    dag:
      tasks:
      # generate output artifacts of parallel pods
      - name: gen
        template: gen-artifact
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems: ["1", "2", "3"]
      # aggregate
      - name: aggregate
        template: aggregate

  - name: gen-artifact
    inputs:
        ...

    outputs:
          artifacts:
           - name: output
              path: /output
              s3:
                key: "{{workflow.name}}/{{item}}"
        ...
  - name: aggregate
    inputs:
      artifacts:
      - name: input
         path: /inputs
           s3:
              key: "{{workflow.name}}"

@jixinchi
Copy link

I'm not sure if the {{workflow.name}} and {{item}} tag work when using templateRef. I'll try it later.

What we discussed above is only about the "aggregate" case. Do you have any comments on the "separately process" case?

@alexec
Copy link
Contributor

alexec commented Nov 17, 2021

I think most patterns should be supported using key-only artifacts.

@jixinchi
Copy link

You are correct, it is in the template:

spec:
  entrypoint: dag
  templates:
  - name: dag
    dag:
      tasks:
      # generate output artifacts of parallel pods
      - name: gen
        template: gen-artifact
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems: ["1", "2", "3"]
      # aggregate
      - name: aggregate
        template: aggregate

  - name: gen-artifact
    inputs:
        ...

    outputs:
          artifacts:
           - name: output
              path: /output
              s3:
                key: "{{workflow.name}}/{{item}}"
        ...
  - name: aggregate
    inputs:
      artifacts:
      - name: input
         path: /inputs
           s3:
              key: "{{workflow.name}}"

I've tried this workflow. The {{item}} tag does not work outside the dag task, because the gen-artifact outputs a file named "{{item}}".

Key-only artifacts only supports cases that you have already known how many artifacts will be generated and how to user them before running the workflow. It is not suitable for WithParam and WithItems cases.

@alexec
Copy link
Contributor

alexec commented Nov 18, 2021

Do you want to set-up a 30m to chat?

https://bit.ly/book-30m-with-argo-team

@jixinchi
Copy link

I am from China, and it's 00:10 now...
In addition, I'm sorry that my English is not good enough to speak or talk about technical problems. Maybe text communication is more effective.

@jixinchi
Copy link

My email is jixinchi@bytedance.com.
Or you can find me on slack, named Xinchi Ji.

@alexec
Copy link
Contributor

alexec commented Nov 23, 2021

What about using data template with S3? That allows you to drop a number of artifacts into a bucket, then use withItems to fan-out process them.

https://argoproj.github.io/argo-workflows/data-sourcing-and-transformation/

@jixinchi
Copy link

Data template is really a good method to solve similar problems. However, the expression is limited in our business. We need to run some python or R scripts in a container to process the artifacts, not just simple text processing.

@alexec
Copy link
Contributor

alexec commented Nov 23, 2021

I think that is not what data template does. It basically allows you to list artifacts, and then start a new process for each artifact.

@jixinchi
Copy link

As is shown in the doc you provided, data template can only process each artifact by methods limited in the exporession. Therefore, data template cannot support our business.

@alexec alexec added the area/controller Controller issues, panics label Feb 7, 2022
@chenbodeng719
Copy link

chenbodeng719 commented Jun 29, 2022

@clumsy456 Hi, I used the way you mentioned. But I get the error sometimes.

image

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: artifact-passing-
spec:
  entrypoint: artifact-example
  templates:
  - name: artifact-example
    steps:
    - - name: generate-artifact
        template: whalesay
    - - name: consume-artifact
        template: print-message
        arguments:
          artifacts:
          # bind message to the hello-art artifact
          # generated by the generate-artifact step
          - name: message
            from: "{{steps.generate-artifact.outputs.artifacts.hello-art}}"
  - name: whalesay
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["cowsay hello world | tee /tmp/hello_world.txt"]
      volumeMounts:
        - name: out
          mountPath: /tmp
    volumes:
      - name: out
        emptyDir: { }
    outputs:
      artifacts:
      # generate hello-art artifact from /tmp/hello_world.txt
      # artifacts can be directories as well as files
      - name: hello-art
        path: /tmp/hello_world.txt

  - name: print-message
    inputs:
      artifacts:
      # unpack the message input artifact
      # and put it at /tmp/message
      - name: message
        path: /tmp/message
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["cat /tmp/message"]

@clumsy456
Copy link
Author

clumsy456 commented Jun 29, 2022

Above is just my proposal, not implemented...

@davidghiurco
Copy link

davidghiurco commented Jul 29, 2022

Is there a workaround for this problem?

The issue summary indicates:
"If a dag/task includes withItems/withParam parameter, we are not able to resolve its output artifacts"

Does that mean dag/tasks don't work with withItems / withParam, but steps do?

I see #6899 has been closed without merging, so I'm wondering what I can do to get withItems / withParam loops to work with artifact passing.

@agilgur5 agilgur5 added the area/artifacts S3/GCP/OSS/Git/HDFS etc label Aug 18, 2024
@agilgur5 agilgur5 added the area/looping `withParams`, `withItems`, and `withSequence` label Oct 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/artifacts S3/GCP/OSS/Git/HDFS etc area/controller Controller issues, panics area/looping `withParams`, `withItems`, and `withSequence` type/feature Feature request
Projects
None yet
6 participants