-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(v2): V2 create run api #6689
feat(v2): V2 create run api #6689
Conversation
/test kubeflow-pipeline-e2e-test |
/test kubeflow-pipeline-upgrade-test |
@Bobgy This is ready for early review. I need sdk change before I enable v2 IR spec support in backend |
/test kubeflow-pipeline-e2e-test |
/test kubeflow-pipeline-upgrade-test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, left mostly nit pickings comments!
templateType := util.InferTemplateFormat(manifestBytes) | ||
|
||
if templateType == util.Unknown { | ||
return nil, util.NewInternalServerError(fmt.Errorf("failed to infer template type from manifest bytes"), "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why is there empty string here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
util.NewInternalServerError accepts error and string as arguments
if apiRun.GetPipelineSpec().GetRuntimeConfig().GetPipelineRoot() != "" { | ||
job.RuntimeConfig.GcsOutputDirectory = apiRun.GetPipelineSpec().GetRuntimeConfig().GetPipelineRoot() | ||
} | ||
wf, err := compiler.Compile(job, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, we have a quick question. How can other pipeline runtime fit into here? Right now the compiler.compile function is converting to Argo. Will there be a common interface we can use to plugin other pipeline runtime like Tekton?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isn't an existing interface for compiler.Compile, but considering the return value is different, do we need an interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JFYI, you can reuse the Visitor interface for implementation of tekton v2 compiler.
pipelines/v2/compiler/visitor.go
Line 40 in 3a2ef14
func Accept(job *pipelinespec.PipelineJob, v Visitor) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Bobgy thanks for the info and yes, currently I implemented a compiler for Tekton, implementing the Visitor interface is what I have done. However, that's for the compiler's implementation. In order to integrate different compilers with pipeline run
API here and pass the returned artifact to the underlying engine, I think a generic data struct is still needed. For example, a JSON data is returned representing Argo's workflow or Tekton's PipelineRun then passes to the pipeline engine. Or any other approaches to support different engines would be great too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, the part you mentioned is not designed yet. We want to start with still special casing for argo. Any concerns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apology for the late response. no concerns as long as we agree to have an interface in compiler and pipeline run to integrate different pipeline engines. should I create an issue to make sure we will address this later on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, please create an issue. I cannot make a promise though, it will depend on the efforts and cost.
} | ||
|
||
// Value is the value of the field. | ||
message Value { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use protobuf.Value instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V2 launcher has not support protobuf.Value instead. Will change it to protobuf.Value once v2 launcher supports it.
@@ -578,7 +522,7 @@ func (r *ResourceManager) RetryRun(ctx context.Context, runId string) error { | |||
} | |||
|
|||
if runDetail.WorkflowRuntimeManifest == "" { | |||
return util.NewBadRequestError(errors.New("workflow cannot be retried"), "Workflow must be Failed/Error to retry") | |||
return util.NewBadRequestError(errors.New("workflow cannot be retried"), "Workflow must be Failed/Error to retry or run is with v2 mode") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we know when it's v2 mode. Only return this error message when in v2 mode makes it clearer to understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this error message can also happen in v1 mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant that the root cause is different for v1 and v2, so it will be easier for users to understand when they hit the error, if we return accurate error messages (instead of it can be A reason or B reason).
For v1, we can tell the workflow must be in some state.
For v2, we can tell the feature is basically not implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
workflow.SetAnnotationsToAllTemplatesIfKeyNotExist(util.AnnotationKeyIstioSidecarInject, util.AnnotationValueIstioSidecarInjectDisabled) | ||
|
||
swfGeneratedName, err := toSWFCRDResourceGeneratedName(apiJob.Name) | ||
scheduledWorkflow, err := tmpl.ScheduledWorkflow(apiJob) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about waiting for the #6207, so that scheduled workflow can be unaware of v1/v2 difference.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I think that's not necessary, just commented on #6207 (comment).
Also note swf controller implementation -- when pipeline version ID or pipeline ID is available, we will never go to the workflow spec branch.
pipelines/backend/src/crd/controller/scheduledworkflow/controller.go
Lines 528 to 565 in f129bc7
if swf.Spec.PipelineVersionID != nil { | |
if _, err := c.runServiceClient.CreateRun(ctx, | |
// These are not created under the correct experiment atm | |
// That is the issue that make the integration tests fail. | |
// This should not be to hard to fix, but needs to be handled. | |
&api.CreateRunRequest{Run: &api.Run{ | |
Name: workflowName, | |
ResourceReferences: append(refs, &api.ResourceReference{ | |
Key: &api.ResourceKey{ | |
Type: api.ResourceType_PIPELINE_VERSION, | |
Id: *swf.Spec.PipelineVersionID}, | |
Relationship: api.Relationship_CREATOR}), | |
PipelineSpec: &api.PipelineSpec{ | |
Parameters: swf.GetParameters()}, | |
Owner: &api.Owner{ | |
Name: swf.GetName(), | |
Id: string(swf.GetUID())}}}); err != nil { | |
return false, "", err | |
} | |
return true, workflowName, nil | |
} | |
if swf.Spec.PipelineID != nil { | |
if _, err := c.runServiceClient.CreateRun(ctx, | |
&api.CreateRunRequest{Run: &api.Run{ | |
Name: workflowName, | |
PipelineSpec: &api.PipelineSpec{ | |
Parameters: swf.GetParameters(), | |
PipelineId: *swf.Spec.PipelineID}, | |
Owner: &api.Owner{ | |
Name: swf.GetName(), | |
Id: string(swf.GetUID())}, | |
ResourceReferences: refs, | |
}}); err != nil { | |
return false, "", err | |
} | |
return true, workflowName, nil | |
} |
GetTemplateType() TemplateType | ||
|
||
//Get workflow | ||
RunWorkflow(apiRun *api.Run, options RunWorkflowOptions) (*Workflow, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With these new methods, this file immediately gets too large. Do you think it's helpful, in this PR, moving the interface to package kubeflow/pipelines/backend/src/template and move implementations to kubeflow/pipelines/backend/src/template/argo and kubeflow/pipelines/backend/src/template/v2?
(Paths are just examples)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
only some nit pickings, still reading the full PR
ParametersJSON() (string, error) | ||
// Get bytes content. | ||
Bytes() []byte | ||
GetTemplateType() TemplateType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would probably prefer using language features to achieve this,
tmpl := template.New(bytes)
argo, ok := tmpl.(*template.Argo)
if ok {
// tmpl is Argo template
}
v2, ok := tmpl.(*template.V2)
if ok {
// tmpl is V2 template
}
refer to https://stackoverflow.com/a/50940347
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any advantage of using language feature here? I think using interface method is more clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because adding this method in the interface implies users of the interface need to know which types are there in the first place -- it's leaking interface implementation details to users...
If someone wants to introduce a new type of template, they cannot just implement this interface. They also need to check all the usages of the type and make sure they are updated as well.
But I don't think it's urgent to resolve this issue now, let's get this in.
I found refactoring create run UT takes long time. Will add a to do item to refactor it later. |
Because there were many file moves, let's get this in quickly and continue to improve |
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: Bobgy The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
* added draft of create v2 pipeline run * fixed broken UT and added UT for parsing template * modified run apis to support v2 IR spec * remove temporary patch * fixed dependency * fixed build failure * finished draft * finished create job and run * refactor template and fixed broken UT * updated go license * fixed build failure * fixed build * added UT * modified UT * fixed build failure * fixed license
Description of your changes:
Modified run api to support v2
Checklist: