Skip to content

Conversation

zazulam
Copy link
Collaborator

@zazulam zazulam commented Mar 5, 2025

Description of your changes:
Jumping off from #11627, these changes resolve #10050 in regards to the using dsl.Collected for both parameters and artifacts in pipelines. Currently for artifacts, the executor needs to be updated in the sdk and have a release prior to tests being enabled. As for parameters that should work out of the box with solely backend changes.

This PR introduces a few new methods to help ease with detecting and resolving collected inputs as well as cleaning up some of the shared logic between resolveUpstreamParameters & resolveUpstreamArtifacts.

A few things to keep in mind:
A parallelFor creates x+1 dags, where x is the number of iterations. The extra dag is something I like to refer to as the parallelFor Head dag, being that it only contains the iteration dags within it.

With these changes, the keys within the task map returned from getDAGTasks will now contain the parent dag id associated with the task. This is to help maintain uniqueness, also when detecting a parallelFor iteration task/dag, the index will be added to the task name to further prevent any potential collisions and map the appropriate tasks within the same iteration for input/output resolution.

When resolving the inputs, if the current Task is a DAG the driver now first checks if the it is a parallelFor by inspecting if there exists an iteration_count custom property, as only parallelFor heads have that property. If found, the iteration dag names will be added to a queue that the CollectInputs will use to start the resolution search.

New methods:

  • CollectInputs - Performs a BFS on the tasks passed to reach the final producer task iterations and collects the values into the appropriate structure for either Artifacts or Parameters
  • CollectContainerOutput - Helper function for processing the output of a containerExecution, currently only used in the CollectInputs.
    * GetProducerTask - Performs the check on a task for the potential new producer task and output parameter/artifact key and returns the updated value if found.
  • InferIndexedTaskName - Used to update the producer task with the appropriate index if the current dag context is a parallelFor iteration dag i.e. resolving task inputs within the loop.
  • getParallelForIterationCount - Helper to get the number of iterations / determine if a parallelFor head dag.
  • GetParallelForTaskName - Used to update the task name with the iteration value supplied.
  • GetTaskNameWithDagID - Used to update the task name with the it's parent's dag id.

Checklist:

@google-oss-prow google-oss-prow bot requested review from Ark-kun, DharmitD and mprahl March 5, 2025 01:41
@zazulam zazulam force-pushed the collected branch 2 times, most recently from c6c8142 to 4146148 Compare March 5, 2025 01:49
@zazulam
Copy link
Collaborator Author

zazulam commented Mar 5, 2025

Providing some more context, this is the error that occurs for artifacts when a user attempts use either List[Artifact]:

main     executor = component_executor.Executor(
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 52, in __init__
main     self.assign_input_and_output_artifacts()
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 73, in assign_input_and_output_artifacts
main     self.input_artifacts[name] = [
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 74, in <listcomp>
main     self.make_artifact(
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 118, in make_artifact
main     return create_artifact_instance(
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 378, in create_artifact_instance
main     ) if hasattr(artifact_cls, '_from_executor_fields') else artifact_cls(
main   File "/usr/local/lib/python3.9/typing.py", line 685, in __call__
main     raise TypeError(f"Type {self._name} cannot be instantiated; "
main TypeError: Type List cannot be instantiated; use list() instead

or list[Artifact]

main     return _run_code(code, main_globals, None,
main   File "/usr/local/lib/python3.9/runpy.py", line 87, in _run_code
main     exec(code, run_globals)
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor_main.py", line 109, in <module>
main     executor_main()
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor_main.py", line 98, in executor_main
main     executor = component_executor.Executor(
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 52, in __init__
main     self.assign_input_and_output_artifacts()
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 73, in assign_input_and_output_artifacts
main     self.input_artifacts[name] = [
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 74, in <listcomp>
main     self.make_artifact(
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 118, in make_artifact
main     return create_artifact_instance(
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 378, in create_artifact_instance
main     ) if hasattr(artifact_cls, '_from_executor_fields') else artifact_cls(
main TypeError: list() takes no keyword arguments

So we use the get_args & get_origin to determine the actual artifact cls type used within the collection.

I had made a custom image to test with the sdk installed with the executor changes, this is what it looks like:
image

@zazulam zazulam force-pushed the collected branch 2 times, most recently from b82df21 to e9cf281 Compare March 23, 2025 15:53
@zazulam zazulam requested a review from chensun March 23, 2025 15:54
@zazulam
Copy link
Collaborator Author

zazulam commented Mar 23, 2025

/retest

@zazulam
Copy link
Collaborator Author

zazulam commented Mar 23, 2025

After rebasing CI started to fail.

@zazulam
Copy link
Collaborator Author

zazulam commented Mar 24, 2025

/retest

@zazulam zazulam force-pushed the collected branch 2 times, most recently from 4898385 to 45abf43 Compare March 24, 2025 18:39
@zazulam
Copy link
Collaborator Author

zazulam commented Mar 24, 2025

Tests are finally passing after rebasing again 🙏

@zazulam zazulam changed the title feat(backend): enable dsl.Collected for parameters & artifacts feat(backend/sdk): enable dsl.Collected for parameters & artifacts Mar 24, 2025
@zazulam
Copy link
Collaborator Author

zazulam commented Mar 24, 2025

Here's visuals of the sample test supplied with the updates, along with the logs of the main container from the launcher pod of the read-files component, the components here are using a custom image I built that installs the sdk with the changes from this PR, along with some additional print statements to verify the types used when parsing the artifacts from the artifact list.

image
image

Logs:

time="2025-03-24T20:12:24.247Z" level=info msg="capturing logs" argo=true
I0324 20:12:24.265991      21 main.go:58] Setting log level to: '1'
I0324 20:12:24.267121      21 cache.go:117] Connecting to cache endpoint 10.96.185.120:8887
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: 
https://pip.pypa.io/warnings/venv
[KFP Executor 2025-03-24 20:12:24,614 INFO]: Looking for component `read_files` in --component_module_path `/tmp/tmp.SbP6qw5nZb/ephemeral_component.py`
[KFP Executor 2025-03-24 20:12:24,614 INFO]: Loading KFP component "read_files" from /tmp/tmp.SbP6qw5nZb/ephemeral_component.py (directory "/tmp/tmp.SbP6qw5nZb" and module name "ephemeral_component")
[KFP Executor 2025-03-24 20:12:24,614 INFO]: Got executor_input:
{
    "inputs": {
        "artifacts": {
            "files": {
                "artifacts": [
                    {
                        "name": "440",
                        "type": {
                            "instanceSchema": ""
                        },
                        "uri": "minio://mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/ddfbc49b-8d72-42f3-a757-7fdd00abef8a/file",
                        "metadata": {
                            "display_name": "file",
                            "store_session_info": "{\"Provider\":\"minio\",\"Params\":{\"accessKeyKey\":\"accesskey\",\"disableSSL\":\"true\",\"endpoint\":\"10.96.103.164:9000\",\"fromEnv\":\"false\",\"region\":\"minio\",\"secretKeyKey\":\"secretkey\",\"secretName\":\"mlpipeline-minio-artifact\"}}"
                        }
                    },
                    {
                        "name": "441",
                        "type": {
                            "instanceSchema": ""
                        },
                        "uri": "minio://mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/437be769-9cc6-4388-84fc-2a8a42538198/file",
                        "metadata": {
                            "display_name": "file",
                            "store_session_info": "{\"Provider\":\"minio\",\"Params\":{\"accessKeyKey\":\"accesskey\",\"disableSSL\":\"true\",\"endpoint\":\"10.96.103.164:9000\",\"fromEnv\":\"false\",\"region\":\"minio\",\"secretKeyKey\":\"secretkey\",\"secretName\":\"mlpipeline-minio-artifact\"}}"
                        }
                    },
                    {
                        "name": "442",
                        "type": {
                            "instanceSchema": ""
                        },
                        "uri": "minio://mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/a5bf1932-3407-412b-a96b-68c177a382c1/file",
                        "metadata": {
                            "display_name": "file",
                            "store_session_info": "{\"Provider\":\"minio\",\"Params\":{\"accessKeyKey\":\"accesskey\",\"disableSSL\":\"true\",\"endpoint\":\"10.96.103.164:9000\",\"fromEnv\":\"false\",\"region\":\"minio\",\"secretKeyKey\":\"secretkey\",\"secretName\":\"mlpipeline-minio-artifact\"}}"
                        }
                    }
                ]
            }
        }
    },
    "outputs": {
        "parameters": {
            "Output": {
                "outputFile": "/tmp/kfp/outputs/Output"
            }
        },
        "outputFile": "/tmp/kfp_outputs/output_metadata.json"
    }
}
inner_annotation:  <class 'kfp.dsl.types.artifact_types.Artifact'>
param name:  files
annotation:  <class 'kfp.dsl.types.artifact_types.Artifact'>
runtime_artifact:  {'name': '440', 'type': {'instanceSchema': ''}, 'uri': 'minio://mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/ddfbc49b-8d72-42f3-a757-7fdd00abef8a/file', 'metadata': {'display_name': 'file', 'store_session_info': '{"Provider":"minio","Params":{"accessKeyKey":"accesskey","disableSSL":"true","endpoint":"10.96.103.164:9000","fromEnv":"false","region":"minio","secretKeyKey":"secretkey","secretName":"mlpipeline-minio-artifact"}}'}}
artifact_cls to create artifact instance:  <class 'kfp.dsl.types.artifact_types.Artifact'>
param name:  files
annotation:  <class 'kfp.dsl.types.artifact_types.Artifact'>
runtime_artifact:  {'name': '441', 'type': {'instanceSchema': ''}, 'uri': 'minio://mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/437be769-9cc6-4388-84fc-2a8a42538198/file', 'metadata': {'display_name': 'file', 'store_session_info': '{"Provider":"minio","Params":{"accessKeyKey":"accesskey","disableSSL":"true","endpoint":"10.96.103.164:9000","fromEnv":"false","region":"minio","secretKeyKey":"secretkey","secretName":"mlpipeline-minio-artifact"}}'}}
artifact_cls to create artifact instance:  <class 'kfp.dsl.types.artifact_types.Artifact'>
param name:  files
annotation:  <class 'kfp.dsl.types.artifact_types.Artifact'>
runtime_artifact:  {'name': '442', 'type': {'instanceSchema': ''}, 'uri': 'minio://mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/a5bf1932-3407-412b-a96b-68c177a382c1/file', 'metadata': {'display_name': 'file', 'store_session_info': '{"Provider":"minio","Params":{"accessKeyKey":"accesskey","disableSSL":"true","endpoint":"10.96.103.164:9000","fromEnv":"false","region":"minio","secretKeyKey":"secretkey","secretName":"mlpipeline-minio-artifact"}}'}}
artifact_cls to create artifact instance:  <class 'kfp.dsl.types.artifact_types.Artifact'>
type_annotations.is_list_of_artifacts(v): typing.List[kfp.dsl.types.artifact_types.Artifact]
Reading artifact 440 file: /minio/mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/ddfbc49b-8d72-42f3-a757-7fdd00abef8a/file
s1
Reading artifact 441 file: /minio/mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/437be769-9cc6-4388-84fc-2a8a42538198/file
s2
Reading artifact 442 file: /minio/mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/a5bf1932-3407-412b-a96b-68c177a382c1/file
s3
[KFP Executor 2025-03-24 20:12:24,615 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json.
I0324 20:12:24.620573      21 launcher_v2.go:806] ExecutorOutput: {
  "parameterValues": {
    "Output": "files read"
  }
}
I0324 20:12:24.633672      21 launcher_v2.go:188] publish success.
I0324 20:12:24.657377      21 client.go:724] Attempting to update DAG state
time="2025-03-24T20:12:25.248Z" level=info msg="sub-process exited" argo=true error="<nil>"

@zazulam
Copy link
Collaborator Author

zazulam commented Mar 24, 2025

I think there should be some consensus on how the UI should depict the fan-in of artifacts since there is a dag UI element associated with them, but we can save that for another PR 😃.

@zazulam zazulam force-pushed the collected branch 3 times, most recently from 42c03a7 to 065dede Compare March 31, 2025 23:18
@zazulam zazulam force-pushed the collected branch 4 times, most recently from 18ae546 to 635321d Compare April 7, 2025 19:20
@mprahl
Copy link
Collaborator

mprahl commented Apr 8, 2025

/rerun-workflow "KFP e2e tests"

Copy link
Collaborator

@droctothorpe droctothorpe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for addressing the barrage of feedback, @zazulam. This PR is a legit exception to this principle:
image

Copy link
Collaborator

@mprahl mprahl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm
/approve

Awesome work!

zazulam added 2 commits May 5, 2025 17:47
Signed-off-by: zazulam <m.zazula@gmail.com>
Signed-off-by: zazulam <m.zazula@gmail.com>

To enable users to use loops similar to subdags, the initial collecting
implementation went only 1 layer deep of loops/subdags. This
implementation serves to handle multifacted approaches of pipelines that
users can generate.
@HumairAK
Copy link
Collaborator

HumairAK commented May 5, 2025

/lgtm
/approve

amazing work all around folks!!

@zazulam ++
@droctothorpe ++
@mprahl ++

Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: HumairAK, mprahl

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@google-oss-prow google-oss-prow bot merged commit ed828b5 into kubeflow:master May 5, 2025
64 checks passed
@github-project-automation github-project-automation bot moved this from In Review to Done in KFP Project Tracker May 5, 2025
alyssacgoins added a commit to alyssacgoins/data-science-pipelines that referenced this pull request May 7, 2025
Signed-off-by: agoins <alyssacgoins@gmail.com>

Unit test to validate 2-template creation

Signed-off-by: agoins <alyssacgoins@gmail.com>

Integration test case to validate set_retry all args

Signed-off-by: agoins <alyssacgoins@gmail.com>

Integration test to validate failure with no retry

Signed-off-by: agoins <alyssacgoins@gmail.com>

pipeline failure w/out retry - raise exception

Signed-off-by: agoins <alyssacgoins@gmail.com>

Update integration tests

Signed-off-by: agoins <alyssacgoins@gmail.com>

Update integration test

Signed-off-by: agoins <alyssacgoins@gmail.com>

retryStrategy helper method return empty slice not nil

Signed-off-by: agoins <alyssacgoins@gmail.com>

refactor setting parameters

Signed-off-by: agoins <alyssacgoins@gmail.com>

Update test master file

Signed-off-by: agoins <alyssacgoins@gmail.com>

Remove tests not using currently

Signed-off-by: agoins <alyssacgoins@gmail.com>

Refactor input parameters

Signed-off-by: agoins <alyssacgoins@gmail.com>

Revert unnecessary syntax changes.

Signed-off-by: agoins <alyssacgoins@gmail.com>

Remove unnecessary syntax changes

Signed-off-by: agoins <alyssacgoins@gmail.com>

Refactor getTaskRetryParameters

Signed-off-by: agoins <alyssacgoins@gmail.com>

getTaskRetryParameters()

Signed-off-by: agoins <alyssacgoins@gmail.com>

Remove unnecessary string formatting

Signed-off-by: agoins <alyssacgoins@gmail.com>

Add pointers in helper method getTaskRetryParameters()

Signed-off-by: agoins <alyssacgoins@gmail.com>

Update test case with default behavior

Signed-off-by: agoins <alyssacgoins@gmail.com>

rename templateParameterInstructions

Signed-off-by: agoins <alyssacgoins@gmail.com>

Add comments

Signed-off-by: agoins <alyssacgoins@gmail.com>

unit test for 2-template template case

Signed-off-by: agoins <alyssacgoins@gmail.com>

Update integration tests for setRetry

Signed-off-by: agoins <alyssacgoins@gmail.com>

reformat backoff duration

Signed-off-by: agoins <alyssacgoins@gmail.com>

add comments to templateParameterInstructions

Signed-off-by: agoins <alyssacgoins@gmail.com>

Split getTaskRetryParameters to handle set value separately

Signed-off-by: agoins <alyssacgoins@gmail.com>

Address PR comments

Signed-off-by: agoins <alyssacgoins@gmail.com>

Update samples/v2/sample_test.py

Co-authored-by: Matt Prahl <mprahl@users.noreply.github.com>
Signed-off-by: Alyssa Goins <80764587+alyssacgoins@users.noreply.github.com>

Update sample.py pipeline retry tests

Signed-off-by: agoins <alyssacgoins@gmail.com>

Update backend retry integration test expected behavior

Signed-off-by: agoins <alyssacgoins@gmail.com>

add name field to pipeline annotations

Signed-off-by: agoins <alyssacgoins@gmail.com>

Refactor getTaskRetryParameters() logic.

Signed-off-by: agoins <alyssacgoins@gmail.com>

Update samples test formatting.

Signed-off-by: agoins <alyssacgoins@gmail.com>

fix pipeline annotation

Signed-off-by: agoins <alyssacgoins@gmail.com>

reformat sample_test retry tests

Signed-off-by: agoins <alyssacgoins@gmail.com>

refactor sample test retry files

Signed-off-by: agoins <alyssacgoins@gmail.com>

refactor sample test retry files

Signed-off-by: agoins <alyssacgoins@gmail.com>

update input from int to string

Signed-off-by: agoins <alyssacgoins@gmail.com>

Update sample test retry units.

Signed-off-by: agoins <alyssacgoins@gmail.com>

Appropriate retry duration units

Signed-off-by: agoins <alyssacgoins@gmail.com>

Update PR comments.

Signed-off-by: agoins <alyssacgoins@gmail.com>

feat(backend/sdk): enable dsl.Collected for parameters & artifacts (kubeflow#11725)

* feat(backend/sdk): enable dsl.Collected for params & artifacts

Signed-off-by: zazulam <m.zazula@gmail.com>

* feat(backend): collect through loops & dags

Signed-off-by: zazulam <m.zazula@gmail.com>

To enable users to use loops similar to subdags, the initial collecting
implementation went only 1 layer deep of loops/subdags. This
implementation serves to handle multifacted approaches of pipelines that
users can generate.

---------

Signed-off-by: zazulam <m.zazula@gmail.com>

fix(sdk): fix pip install for dev (kubeflow#11891)

Signed-off-by: Daniel Dowler <12484302+dandawg@users.noreply.github.com>

chore: Adding Adopters file for CNCF graduation (kubeflow#11894)

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
zazulam added a commit to zazulam/website that referenced this pull request Jul 11, 2025
Current implemented features on master for pipelines that will be available for 2.6:
loop parallelism - kubeflow/pipelines#10798
oneOf - kubeflow/pipelines#11196
dsl.Collected - kubeflow/pipelines#11725
PipelineTaskFinalStatus - kubeflow/pipelines#11953

Signed-off-by: Michael <m.zazula@gmail.com>
google-oss-prow bot pushed a commit to kubeflow/website that referenced this pull request Jul 11, 2025
Current implemented features on master for pipelines that will be available for 2.6:
loop parallelism - kubeflow/pipelines#10798
oneOf - kubeflow/pipelines#11196
dsl.Collected - kubeflow/pipelines#11725
PipelineTaskFinalStatus - kubeflow/pipelines#11953

Signed-off-by: Michael <m.zazula@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Development

Successfully merging this pull request may close these issues.

[backend/sdk] Support dsl.collected() in KFP
6 participants