Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline version api methods #2338

Merged
merged 18 commits into from
Oct 17, 2019
40 changes: 38 additions & 2 deletions backend/api/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,31 @@ service PipelineService {
get: "/apis/v1beta1/pipelines/{id}/templates"
};
}

rpc CreatePipelineVersion(CreatePipelineVersionRequest) returns (PipelineVersion) {
option (google.api.http) = {
post: "/apis/v1beta1/pipeline_versions"
body: "version"
};
}

rpc GetPipelineVersion(GetPipelineVersionRequest) returns (PipelineVersion) {
option (google.api.http) = {
get: "/apis/v1beta1/pipeline_versions/{version_id}"
};
}

rpc ListPipelineVersions(ListPipelineVersionsRequest) returns (ListPipelineVersionsResponse) {
option (google.api.http) = {
get: "/apis/v1beta1/pipeline_versions/pipeline/{resource_key.id}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this URL looks not right. as reference can u check URL to list run by experiment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed api path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another related api path question. If we merge this PR, users can call api server to directly manipulate (create, delete, get, list) versions in spite that they can't do that from FE. Would that be ok, say, considered as a too-early or premature exposure?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could, in implementation, return not implemented.

Copy link
Contributor Author

@jingzhang36 jingzhang36 Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I commented out the http point in pipeline.proto, so i can leave the method implementation in its expected final state. BTW, by doing the comment-out, the curl returns "not found".

};
}

rpc DeletePipelineVersion(DeletePipelineVersionRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
delete: "/apis/v1beta1/pipeline_versions/{version_id}"
};
}
}

message Url {
Expand Down Expand Up @@ -157,11 +182,23 @@ message ListPipelineVersionsRequest {
ResourceKey resource_key = 1;
int32 page_size = 2;
string page_token = 3;

// Can be format of "field_name", "field_name asc" or "field_name des"
// Ascending by default.
string sort_by = 4;
// A base-64 encoded, JSON-serialized Filter protocol buffer (see
// filter.proto).
string filter = 5;
}

message ListPipelineVersionsResponse {
repeated PipelineVersion versions = 1;
string next_page_token = 2;
int32 total_size = 3;
}

message DeletePipelineVersionRequest {
string version_id = 1;
}

message Pipeline {
Expand Down Expand Up @@ -227,5 +264,4 @@ message PipelineVersion {
// Input. Required. E.g., specify which pipeline this pipeline version belongs
// to.
repeated ResourceReference resource_references = 7;
}

}
1 change: 0 additions & 1 deletion backend/src/apiserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ go_library(
"@com_github_grpc_ecosystem_grpc_gateway//runtime:go_default_library",
"@com_github_jinzhu_gorm//:go_default_library",
"@com_github_jinzhu_gorm//dialects/sqlite:go_default_library",
"@com_github_masterminds_squirrel//:go_default_library",
"@com_github_minio_minio_go//:go_default_library",
"@com_github_spf13_viper//:go_default_library",
"@io_k8s_client_go//kubernetes/typed/core/v1:go_default_library",
Expand Down
9 changes: 2 additions & 7 deletions backend/src/apiserver/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,13 @@ go_library(

go_test(
name = "go_default_test",
srcs = [
"pipeline_version_test.go",
],
srcs = ["pipeline_version_test.go"],
embed = [":go_default_library"],
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/model",
visibility = ["//visibility:public"],
embed = [":go_default_library"],
deps = [
"//backend/api:go_default_library",
"//backend/src/apiserver/filter:go_default_library",
"//backend/src/apiserver/list:go_default_library",
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_google_go_cmp//cmp/cmpopts:go_default_library",
"@com_github_masterminds_squirrel//:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
],
Expand Down
3 changes: 2 additions & 1 deletion backend/src/apiserver/resource/client_manager_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

const (
DefaultFakeUUID = "123e4567-e89b-12d3-a456-426655440000"
fakeUUIDOne = "123e4567-e89b-12d3-a456-426655440001"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upper case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)

type FakeClientManager struct {
Expand All @@ -45,7 +46,7 @@ type FakeClientManager struct {
}

func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterface) (
*FakeClientManager, error) {
*FakeClientManager, error) {

if time == nil {
glog.Fatalf("The time parameter must not be null.") // Must never happen
Expand Down
98 changes: 98 additions & 0 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,3 +814,101 @@ func (r *ResourceManager) MarkSampleLoaded() error {
func (r *ResourceManager) getDefaultSA() string {
return common.GetStringConfigWithDefault(defaultPipelineRunnerServiceAccountEnvVar, defaultPipelineRunnerServiceAccount)
}

func (r *ResourceManager) CreatePipelineVersion(apiVersion *api.PipelineVersion, pipelineFile []byte) (*model.PipelineVersion, error) {
// Extract the parameter from the pipeline
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parameter -> parameters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

params, err := util.GetParameters(pipelineFile)
if err != nil {
return nil, util.Wrap(err, "Create pipeline version failed")
}

// Extract pipeline id
var pipelineId = ""
for _, resourceReference := range apiVersion.ResourceReferences {
if resourceReference.Key.Type == api.ResourceType_PIPELINE &&
resourceReference.Relationship == api.Relationship_OWNER {
pipelineId = resourceReference.Key.Id
}
}
if len(pipelineId) == 0 {
return nil, util.Wrap(
err, "Create pipeline version failed due to missing pipeline id")
}

// Construct model.PipelineVersion
version := &model.PipelineVersion{
Name: apiVersion.Name,
PipelineId: pipelineId,
Status: model.PipelineVersionCreating,
Parameters: params,
CodeSourceUrl: apiVersion.CodeSourceUrl,
}
version, err = r.pipelineStore.CreatePipelineVersion(version)
if err != nil {
return nil, util.Wrap(err, "Create pipeline version failed")
}

// Store the pipeline file
err = r.objectStore.AddFile(
pipelineFile, storage.CreatePipelinePath(fmt.Sprint(version.UUID)))
if err != nil {
return nil, util.Wrap(err, "Create pipeline version failed")
}

// After pipeline version being created in DB and pipeline file being
// saved in minio server, set this pieline version to status ready.
version.Status = model.PipelineVersionReady
err = r.pipelineStore.UpdatePipelineVersionStatus(
version.UUID, version.Status)
if err != nil {
return nil, util.Wrap(err, "Create pipeline version failed")
}

return version, nil
}

func (r *ResourceManager) GetPipelineVersion(versionId string) (*model.PipelineVersion, error) {
return r.pipelineStore.GetPipelineVersion(versionId)
}

func (r *ResourceManager) ListPipelineVersions(pipelineId string, opts *list.Options) (pipelines []*model.PipelineVersion, total_size int, nextPageToken string, err error) {
return r.pipelineStore.ListPipelineVersions(pipelineId, opts)
}

func (r *ResourceManager) DeletePipelineVersion(pipelineVersionId string) error {
_, err := r.pipelineStore.GetPipelineVersion(pipelineVersionId)
if err != nil {
return util.Wrap(err, "Delete pipeline version failed")
}

// Mark pipeline as deleting so it's not visible to user.
err = r.pipelineStore.UpdatePipelineVersionStatus(
pipelineVersionId, model.PipelineVersionDeleting)
if err != nil {
return util.Wrap(err, "Delete pipeline version failed")
}

err = r.objectStore.DeleteFile(storage.CreatePipelinePath(
fmt.Sprint(pipelineVersionId)))
if err != nil {
glog.Errorf(
"%v",
errors.Wrapf(
err,
"Failed to delete pipeline file for pipeline version %v",
pipelineVersionId))
return util.Wrap(err, "Delete pipeline version failed")
}
err = r.pipelineStore.DeletePipelineVersion(pipelineVersionId)
if err != nil {
glog.Errorf(
"%v",
errors.Wrapf(
err,
"Failed to delete pipeline DB entry for pipeline %v",
pipelineVersionId))
return util.Wrap(err, "Delete pipeline version failed")
}

return nil
}
Loading