-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(v2): Fetch cache result from MLMD and put cache result into argo output artifacts/parameters #5957
Conversation
/test kubeflow-pipelines-v2-go-test |
@capri-xiyue can you fix the bug in a separate PR? |
/test kubeflow-pipeline-backend-test |
@Bobgy It's ready for early review. |
"github.com/kubeflow/pipelines/backend/src/v2/common" | ||
"github.com/kubeflow/pipelines/backend/src/v2/common/mlmd" | ||
pb "github.com/kubeflow/pipelines/backend/src/v2/third_party/pipeline_spec" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
backend/src/v2
uses out-dated pipelinespec. The fix of v2 package takes time. As a workaround, I temporarily pasted out-dated pipeline spec that v2 is using.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the catch! I will update pipeline spec in a separate PR.
@Bobgy , this PR is ready for review. I disable caching in this PR. Will enabling caching in the PR of caching e2e test. |
/test kubeflow-pipelines-samples-v2 |
1 similar comment
/test kubeflow-pipelines-samples-v2 |
The sample-v2 test failure is not because of this PR. |
@@ -412,7 +412,11 @@ func TestCreateRun_ThroughPipelineID(t *testing.T) { | |||
expectedRuntimeWorkflow.Annotations = map[string]string{util.AnnotationKeyRunName: "run1"} | |||
expectedRuntimeWorkflow.Spec.Arguments.Parameters = []v1alpha1.Parameter{{Name: "param1", Value: v1alpha1.AnyStringPtr("world")}} | |||
expectedRuntimeWorkflow.Spec.ServiceAccountName = defaultPipelineRunnerServiceAccount | |||
|
|||
expectedRuntimeWorkflow.Spec.PodMetadata = &v1alpha1.Metadata{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding this to every test makes tests harder to read, shall we add the label consistently in AddMetadata method on 410 line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or another approach is changing the AddMetadata method to remove runtime metadata. In all tests, we remove test unrelated metadata from generated workflow before comparison with expectation
} | ||
|
||
func (c *Client) CreateExecutionCache(ctx context.Context, task *api.Task) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be in cache package? I think putting task records is theoretically unrelated to cache, just that cache uses these records
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache package has the Client(Currently the client is only used for storing cache entries). Therefore, I put it in the cache package. If later, we decide to put task records for every task(not just for tasks where cache is enabled), I think it makes sense to move it to other package like kfp
package.
} | ||
|
||
func GetOutputParamsFromCachedExecution(cachedExecution *ml_metadata.Execution) (map[string]string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consider avoiding stutter in naming?
GetOutputParams(cachedExecution ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
MLMDOutputArtifactByName := make(map[string]*pb.Artifact) | ||
for _, artifact := range MLMDOutputArtifacts { | ||
name := extractNameFromURI(*artifact.Uri) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name should be taken from the event that connects artifact and the original execution.
See event.path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. I printed the event.path. It records the id of the artifact which will change every time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@capri-xiyue strange, did you check event.artifactId or event.path?
Refer to code
pipelines/v2/metadata/client.go
Line 236 in ad419cd
Path: eventPath(oa.Name), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I printed the wrong one. Fixed it in the refactor PR.
/retest |
/test kubeflow-pipelines-samples-v2 |
I will submit another PR to resolve the comments. |
[APPROVALNOTIFIER] This PR is APPROVED Approval requirements bypassed by manually added approval. This pull-request has been approved by: The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Description of your changes:
Fixed #5819
Fetch cache result from MLMD and put cache result into argo output artifacts/parameters
Checklist: