Skip to content

Commit

Permalink
feat(backend): support pvc creation, deletion and mount; fixes #8708 (#…
Browse files Browse the repository at this point in the history
…9106)

* support pvc creation, deletion and mount

* replace image address for unit tests

* go mod tidy

* address comments

* Update go.mod

nit: fix format

* go mod tidy

* fix merge error

* update driver image and nit
  • Loading branch information
Linchin authored Apr 11, 2023
1 parent ce096dd commit 761a592
Show file tree
Hide file tree
Showing 12 changed files with 593 additions and 73 deletions.
4 changes: 2 additions & 2 deletions backend/src/apiserver/template/template_test.go

Large diffs are not rendered by default.

42 changes: 26 additions & 16 deletions backend/src/v2/cmd/driver/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 The Kubeflow Authors
// Copyright 2021-2023 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -26,6 +26,7 @@ import (

"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
"github.com/kubeflow/pipelines/backend/src/v2/driver"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"

"github.com/golang/glog"
"github.com/golang/protobuf/jsonpb"
Expand All @@ -40,14 +41,14 @@ const (

var (
// inputs
driverType = flag.String(driverTypeArg, "", "task driver type, one of ROOT_DAG, DAG, CONTAINER")
pipelineName = flag.String("pipeline_name", "", "pipeline context name")
runID = flag.String("run_id", "", "pipeline run uid")
componentSpecJson = flag.String("component", "{}", "component spec")
taskSpecJson = flag.String("task", "", "task spec")
runtimeConfigJson = flag.String("runtime_config", "", "jobruntime config")
iterationIndex = flag.Int("iteration_index", -1, "iteration index, -1 means not an interation")
kubernetesConfig = flag.String("kubernetes_config", "", "Kuberntes specific configurations")
driverType = flag.String(driverTypeArg, "", "task driver type, one of ROOT_DAG, DAG, CONTAINER")
pipelineName = flag.String("pipeline_name", "", "pipeline context name")
runID = flag.String("run_id", "", "pipeline run uid")
componentSpecJson = flag.String("component", "{}", "component spec")
taskSpecJson = flag.String("task", "", "task spec")
runtimeConfigJson = flag.String("runtime_config", "", "jobruntime config")
iterationIndex = flag.Int("iteration_index", -1, "iteration index, -1 means not an interation")
kubernetesConfigJson = flag.String("kubernetes_config", "", "Kuberntes specific configurations")

// container inputs
dagExecutionID = flag.Int64("dag_execution_id", 0, "DAG execution ID")
Expand Down Expand Up @@ -127,6 +128,14 @@ func drive() (err error) {
return fmt.Errorf("failed to unmarshal runtime config, error: %w\nruntimeConfig: %v", err, runtimeConfigJson)
}
}
var kubernetesConfig *kubernetesplatform.KubernetesExecutorConfig
if *kubernetesConfigJson != "" {
glog.Infof("input kubernetesConfig:%s\n", prettyPrint(*kubernetesConfigJson))
kubernetesConfig = &kubernetesplatform.KubernetesExecutorConfig{}
if err := jsonpb.UnmarshalString(*kubernetesConfigJson, kubernetesConfig); err != nil {
return fmt.Errorf("failed to unmarshal Kubernetes config, error: %w\nKubernetesConfig: %v", err, kubernetesConfigJson)
}
}
namespace, err := config.InPodNamespace()
if err != nil {
return err
Expand All @@ -140,13 +149,13 @@ func drive() (err error) {
return err
}
options := driver.Options{
PipelineName: *pipelineName,
RunID: *runID,
Namespace: namespace,
Component: componentSpec,
Task: taskSpec,
DAGExecutionID: *dagExecutionID,
IterationIndex: *iterationIndex,
PipelineName: *pipelineName,
RunID: *runID,
Namespace: namespace,
Component: componentSpec,
Task: taskSpec,
DAGExecutionID: *dagExecutionID,
IterationIndex: *iterationIndex,
}
var execution *driver.Execution
var driverErr error
Expand All @@ -158,6 +167,7 @@ func drive() (err error) {
execution, driverErr = driver.DAG(ctx, options, client)
case "CONTAINER":
options.Container = containerSpec
options.KubernetesConfig = kubernetesConfig
execution, driverErr = driver.Container(ctx, options, client, cacheClient)
default:
err = fmt.Errorf("unknown driverType %s", *driverType)
Expand Down
11 changes: 10 additions & 1 deletion backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
wf: wf,
templates: make(map[string]*wfapi.Template),
// TODO(chensun): release process and update the images.
driverImage: "gcr.io/ml-pipeline-test/kfp-driver@sha256:fd53656e109ff9269d9041a49aade07fd9fa5b0a00e6e0c76440b7ce0cf449fc",
driverImage: "gcr.io/ml-pipeline-test/kfp-driver@sha256:e5140c17627920db911b2d7adbad53ff08d2a80ed4c3e2093b62c4304324035d",
launcherImage: "gcr.io/ml-pipeline-test/kfp-launcher-v2@sha256:2b29da85580823f524a349d2e94db3822be00bd49afa82c9f4a718d51a3b7c06",
job: job,
spec: spec,
Expand Down Expand Up @@ -326,3 +326,12 @@ var launcherResources = k8score.ResourceRequirements{
const (
tmplEntrypoint = "entrypoint"
)

// Here is the collection of all special dummy images that the backend recognizes.
// User need to avoid these image names for their self-defined components.
// These values are in sync with the values in SDK to form a contract between BE and SDK.
// TODO(lingqinggan): clarify these in documentation for KFP V2.
var dummyImages = map[string]bool{
"argostub/createpvc": true,
"argostub/deletepvc": true,
}
11 changes: 8 additions & 3 deletions backend/src/v2/compiler/argocompiler/dag.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
// Copyright 2021 The Kubeflow Authors
// Copyright 2021-2023 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package argocompiler

import (
Expand Down Expand Up @@ -222,6 +221,12 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
if task.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" {
driver.Depends = depends_exit_handler(task.GetDependentTasks())
}
// When using a dummy image, this means this task is for Kubernetes configs.
// In this case skip executor(launcher).
if dummyImages[e.Container.GetImage()] {
driver.Name = name
return []wfapi.DAGTask{*driver}, nil
}
executor := c.containerExecutorTask(name, containerExecutorInputs{
podSpecPatch: driverOutputs.podSpecPatch,
cachedDecision: driverOutputs.cached,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ spec:
- '{{inputs.parameters.kubernetes-config}}'
command:
- driver
image: gcr.io/ml-pipeline-test/kfp-driver@sha256:fd53656e109ff9269d9041a49aade07fd9fa5b0a00e6e0c76440b7ce0cf449fc
image: gcr.io/ml-pipeline-test/kfp-driver@sha256:e5140c17627920db911b2d7adbad53ff08d2a80ed4c3e2093b62c4304324035d
name: ""
resources:
limits:
Expand Down Expand Up @@ -230,18 +230,8 @@ spec:
value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-createpvc}}'
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
name: createpvc-driver
template: system-container-driver
- arguments:
parameters:
- name: pod-spec-patch
value: '{{tasks.createpvc-driver.outputs.parameters.pod-spec-patch}}'
- default: "false"
name: cached-decision
value: '{{tasks.createpvc-driver.outputs.parameters.cached-decision}}'
depends: createpvc-driver.Succeeded
name: createpvc
template: system-container-executor
template: system-container-driver
- arguments:
parameters:
- name: component
Expand All @@ -253,18 +243,8 @@ spec:
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
depends: comp-2.Succeeded && createpvc.Succeeded
name: deletepvc-driver
template: system-container-driver
- arguments:
parameters:
- name: pod-spec-patch
value: '{{tasks.deletepvc-driver.outputs.parameters.pod-spec-patch}}'
- default: "false"
name: cached-decision
value: '{{tasks.deletepvc-driver.outputs.parameters.cached-decision}}'
depends: deletepvc-driver.Succeeded
name: deletepvc
template: system-container-executor
template: system-container-driver
inputs:
parameters:
- name: parent-dag-id
Expand Down Expand Up @@ -297,7 +277,7 @@ spec:
- '{{outputs.parameters.condition.path}}'
command:
- driver
image: gcr.io/ml-pipeline-test/kfp-driver@sha256:fd53656e109ff9269d9041a49aade07fd9fa5b0a00e6e0c76440b7ce0cf449fc
image: gcr.io/ml-pipeline-test/kfp-driver@sha256:e5140c17627920db911b2d7adbad53ff08d2a80ed4c3e2093b62c4304324035d
name: ""
resources:
limits:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ spec:
- '{{inputs.parameters.kubernetes-config}}'
command:
- driver
image: gcr.io/ml-pipeline-test/kfp-driver@sha256:fd53656e109ff9269d9041a49aade07fd9fa5b0a00e6e0c76440b7ce0cf449fc
image: gcr.io/ml-pipeline-test/kfp-driver@sha256:e5140c17627920db911b2d7adbad53ff08d2a80ed4c3e2093b62c4304324035d
name: ""
resources:
limits:
Expand Down Expand Up @@ -207,7 +207,7 @@ spec:
- '{{outputs.parameters.condition.path}}'
command:
- driver
image: gcr.io/ml-pipeline-test/kfp-driver@sha256:fd53656e109ff9269d9041a49aade07fd9fa5b0a00e6e0c76440b7ce0cf449fc
image: gcr.io/ml-pipeline-test/kfp-driver@sha256:e5140c17627920db911b2d7adbad53ff08d2a80ed4c3e2093b62c4304324035d
name: ""
resources:
limits:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ spec:
- '{{outputs.parameters.condition.path}}'
command:
- driver
image: gcr.io/ml-pipeline-test/kfp-driver@sha256:fd53656e109ff9269d9041a49aade07fd9fa5b0a00e6e0c76440b7ce0cf449fc
image: gcr.io/ml-pipeline-test/kfp-driver@sha256:e5140c17627920db911b2d7adbad53ff08d2a80ed4c3e2093b62c4304324035d
name: ""
resources:
limits:
Expand Down
13 changes: 13 additions & 0 deletions backend/src/v2/component/launcher_v2.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2021-2023 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package component

import (
Expand Down
Loading

0 comments on commit 761a592

Please sign in to comment.