Skip to content

Commit

Permalink
Add pipeline version api methods (#2338)
Browse files Browse the repository at this point in the history
* add version api

* unit tests

* remove debug fmt

* remove unused func

* remove another unused method

* formatting

* remove unused consts

* some comments

* build

* unit tests

* unit tests

* formatting

* unit tests

* address comments

* address comment

* address comment

* add comment
  • Loading branch information
jingzhang36 authored and k8s-ci-robot committed Oct 17, 2019
1 parent 9ddf4c4 commit f66af1f
Show file tree
Hide file tree
Showing 12 changed files with 1,690 additions and 23 deletions.
44 changes: 42 additions & 2 deletions backend/api/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,35 @@ service PipelineService {
get: "/apis/v1beta1/pipelines/{id}/templates"
};
}

rpc CreatePipelineVersion(CreatePipelineVersionRequest) returns (PipelineVersion) {
// TODO(jingzhang36): uncomment when exposing this API method.
// option (google.api.http) = {
// post: "/apis/v1beta1/pipeline_versions"
// body: "version"
// };
}

rpc GetPipelineVersion(GetPipelineVersionRequest) returns (PipelineVersion) {
// TODO(jingzhang36): uncomment when exposing this API method.
// option (google.api.http) = {
// get: "/apis/v1beta1/pipeline_versions/{version_id}"
// };
}

rpc ListPipelineVersions(ListPipelineVersionsRequest) returns (ListPipelineVersionsResponse) {
// TODO(jingzhang36): uncomment when exposing this API method.
// option (google.api.http) = {
// get: "/apis/v1beta1/pipeline_versions"
// };
}

rpc DeletePipelineVersion(DeletePipelineVersionRequest) returns (google.protobuf.Empty) {
// TODO(jingzhang36): uncomment when exposing this API method.
// option (google.api.http) = {
// delete: "/apis/v1beta1/pipeline_versions/{version_id}"
// };
}
}

message Url {
Expand Down Expand Up @@ -157,11 +186,23 @@ message ListPipelineVersionsRequest {
ResourceKey resource_key = 1;
int32 page_size = 2;
string page_token = 3;

// Can be format of "field_name", "field_name asc" or "field_name des"
// Ascending by default.
string sort_by = 4;
// A base-64 encoded, JSON-serialized Filter protocol buffer (see
// filter.proto).
string filter = 5;
}

message ListPipelineVersionsResponse {
repeated PipelineVersion versions = 1;
string next_page_token = 2;
int32 total_size = 3;
}

message DeletePipelineVersionRequest {
string version_id = 1;
}

message Pipeline {
Expand Down Expand Up @@ -227,5 +268,4 @@ message PipelineVersion {
// Input. Required. E.g., specify which pipeline this pipeline version belongs
// to.
repeated ResourceReference resource_references = 7;
}

}
1 change: 0 additions & 1 deletion backend/src/apiserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ go_library(
"@com_github_grpc_ecosystem_grpc_gateway//runtime:go_default_library",
"@com_github_jinzhu_gorm//:go_default_library",
"@com_github_jinzhu_gorm//dialects/sqlite:go_default_library",
"@com_github_masterminds_squirrel//:go_default_library",
"@com_github_minio_minio_go//:go_default_library",
"@com_github_spf13_viper//:go_default_library",
"@io_k8s_client_go//kubernetes/typed/core/v1:go_default_library",
Expand Down
9 changes: 2 additions & 7 deletions backend/src/apiserver/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,13 @@ go_library(

go_test(
name = "go_default_test",
srcs = [
"pipeline_version_test.go",
],
srcs = ["pipeline_version_test.go"],
embed = [":go_default_library"],
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/model",
visibility = ["//visibility:public"],
embed = [":go_default_library"],
deps = [
"//backend/api:go_default_library",
"//backend/src/apiserver/filter:go_default_library",
"//backend/src/apiserver/list:go_default_library",
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_google_go_cmp//cmp/cmpopts:go_default_library",
"@com_github_masterminds_squirrel//:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
],
Expand Down
3 changes: 2 additions & 1 deletion backend/src/apiserver/resource/client_manager_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

const (
DefaultFakeUUID = "123e4567-e89b-12d3-a456-426655440000"
FakeUUIDOne = "123e4567-e89b-12d3-a456-426655440001"
)

type FakeClientManager struct {
Expand All @@ -45,7 +46,7 @@ type FakeClientManager struct {
}

func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterface) (
*FakeClientManager, error) {
*FakeClientManager, error) {

if time == nil {
glog.Fatalf("The time parameter must not be null.") // Must never happen
Expand Down
82 changes: 82 additions & 0 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,3 +814,85 @@ func (r *ResourceManager) MarkSampleLoaded() error {
func (r *ResourceManager) getDefaultSA() string {
return common.GetStringConfigWithDefault(defaultPipelineRunnerServiceAccountEnvVar, defaultPipelineRunnerServiceAccount)
}

func (r *ResourceManager) CreatePipelineVersion(apiVersion *api.PipelineVersion, pipelineFile []byte) (*model.PipelineVersion, error) {
// Extract the parameters from the pipeline
params, err := util.GetParameters(pipelineFile)
if err != nil {
return nil, util.Wrap(err, "Create pipeline version failed")
}

// Extract pipeline id
var pipelineId = ""
for _, resourceReference := range apiVersion.ResourceReferences {
if resourceReference.Key.Type == api.ResourceType_PIPELINE && resourceReference.Relationship == api.Relationship_OWNER {
pipelineId = resourceReference.Key.Id
}
}
if len(pipelineId) == 0 {
return nil, util.Wrap(err, "Create pipeline version failed due to missing pipeline id")
}

// Construct model.PipelineVersion
version := &model.PipelineVersion{
Name: apiVersion.Name,
PipelineId: pipelineId,
Status: model.PipelineVersionCreating,
Parameters: params,
CodeSourceUrl: apiVersion.CodeSourceUrl,
}
version, err = r.pipelineStore.CreatePipelineVersion(version)
if err != nil {
return nil, util.Wrap(err, "Create pipeline version failed")
}

// Store the pipeline file
err = r.objectStore.AddFile(pipelineFile, storage.CreatePipelinePath(fmt.Sprint(version.UUID)))
if err != nil {
return nil, util.Wrap(err, "Create pipeline version failed")
}

// After pipeline version being created in DB and pipeline file being
// saved in minio server, set this pieline version to status ready.
version.Status = model.PipelineVersionReady
err = r.pipelineStore.UpdatePipelineVersionStatus(version.UUID, version.Status)
if err != nil {
return nil, util.Wrap(err, "Create pipeline version failed")
}

return version, nil
}

func (r *ResourceManager) GetPipelineVersion(versionId string) (*model.PipelineVersion, error) {
return r.pipelineStore.GetPipelineVersion(versionId)
}

func (r *ResourceManager) ListPipelineVersions(pipelineId string, opts *list.Options) (pipelines []*model.PipelineVersion, total_size int, nextPageToken string, err error) {
return r.pipelineStore.ListPipelineVersions(pipelineId, opts)
}

func (r *ResourceManager) DeletePipelineVersion(pipelineVersionId string) error {
_, err := r.pipelineStore.GetPipelineVersion(pipelineVersionId)
if err != nil {
return util.Wrap(err, "Delete pipeline version failed")
}

// Mark pipeline as deleting so it's not visible to user.
err = r.pipelineStore.UpdatePipelineVersionStatus(pipelineVersionId, model.PipelineVersionDeleting)
if err != nil {
return util.Wrap(err, "Delete pipeline version failed")
}

err = r.objectStore.DeleteFile(storage.CreatePipelinePath(fmt.Sprint(pipelineVersionId)))
if err != nil {
glog.Errorf("%v", errors.Wrapf(err, "Failed to delete pipeline file for pipeline version %v", pipelineVersionId))
return util.Wrap(err, "Delete pipeline version failed")
}
err = r.pipelineStore.DeletePipelineVersion(pipelineVersionId)
if err != nil {
glog.Errorf("%v", errors.Wrapf(err, "Failed to delete pipeline DB entry for pipeline %v", pipelineVersionId))
return util.Wrap(err, "Delete pipeline version failed")
}

return nil
}
Loading

0 comments on commit f66af1f

Please sign in to comment.