Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add necessary data types to api and database to support pipeline version. #1873

Merged
merged 43 commits into from
Sep 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
bd122ec
Add necessary data types/tables for pipeline version. Mostly based
jingzhang36 Aug 17, 2019
dfe6588
Modified comment
jingzhang36 Aug 17, 2019
14cd856
Modify api converter according with new pipeline (version) definition
jingzhang36 Aug 19, 2019
e980617
Change pipeline_store for DefaultVersionId field
jingzhang36 Aug 20, 2019
586f5ce
Add pipeline spec to pipeline version
jingzhang36 Aug 20, 2019
8205623
Merge branch 'master' into cicd_storage
jingzhang36 Aug 20, 2019
30d9616
fix model converter
jingzhang36 Aug 20, 2019
bc47bed
fix a comment
jingzhang36 Aug 20, 2019
fc0c1a8
Add foreign key, pagination of list request, refactor code source
jingzhang36 Aug 26, 2019
b835319
Refactor code source
jingzhang36 Aug 28, 2019
0e17a3d
Foreign key
jingzhang36 Sep 6, 2019
899d42b
Merge branch 'master' into cicd_storage
jingzhang36 Sep 9, 2019
509a8db
Change code source and package source type
jingzhang36 Sep 10, 2019
fc60a57
Fix ; separator
jingzhang36 Sep 10, 2019
3d47da3
Add versions table and modify existing pipeline apis
jingzhang36 Sep 16, 2019
fea90d4
Merge remote-tracking branch 'origin/master' into cicd_storage
jingzhang36 Sep 16, 2019
4db4be7
Remove api pipeline defintiion change and leave it for later PR
jingzhang36 Sep 17, 2019
69539e0
Merge remote-tracking branch 'origin/master' into cicd_storage
jingzhang36 Sep 17, 2019
d28e656
Add comment
jingzhang36 Sep 17, 2019
5bc2086
Make schema changing and data backfilling a single transaction
jingzhang36 Sep 17, 2019
840a7bc
Tolerate null default version id in code
jingzhang36 Sep 17, 2019
90322b9
Merge remote-tracking branch 'origin/master' into cicd_storage
jingzhang36 Sep 17, 2019
e32752d
fix status
jingzhang36 Sep 18, 2019
40c9bac
Revise delete pipeline func
jingzhang36 Sep 18, 2019
f55e230
Use raw query to migrate data
jingzhang36 Sep 18, 2019
bf94cb9
Merge remote-tracking branch 'origin/master' into cicd_storage
jingzhang36 Sep 18, 2019
b3165f9
No need to update versions status
jingzhang36 Sep 18, 2019
83b9bc9
rename and minor changes
jingzhang36 Sep 19, 2019
058199d
accidentally removed a where clause
jingzhang36 Sep 19, 2019
569a506
Fix a model name prefix
jingzhang36 Sep 19, 2019
6141be3
Refine comments
jingzhang36 Sep 19, 2019
a3fe5ed
Revise if condition
jingzhang36 Sep 19, 2019
1171e09
Address comments
jingzhang36 Sep 22, 2019
b95eed2
address more comments
jingzhang36 Sep 23, 2019
a004d96
Rearrange pipeline and version related parts inside CreatePipeline, t…
jingzhang36 Sep 23, 2019
05a7601
Merge remote-tracking branch 'origin/master' into cicd_storage
jingzhang36 Sep 23, 2019
cd2d678
Add package url to pipeline version. Required when calling CreatePipe…
jingzhang36 Sep 24, 2019
7f90853
Single code source url; remove pipeline id as sorting field; reformat
jingzhang36 Sep 25, 2019
3ea7beb
Merge remote-tracking branch 'origin/master' into cicd_storage
jingzhang36 Sep 25, 2019
0198ab0
Merge remote-tracking branch 'origin/cicd_storage' into cicd_storage
jingzhang36 Sep 25, 2019
4a29213
resolve remote branch and local branch diff
jingzhang36 Sep 25, 2019
5e69126
remove unused func
jingzhang36 Sep 25, 2019
0b5eb15
Remove an empty line
jingzhang36 Sep 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion backend/api/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package api;
import "google/api/annotations.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "backend/api/parameter.proto";
import "backend/api/error.proto";
import "backend/api/parameter.proto";
import "backend/api/pipeline_spec.proto";
import "backend/api/resource_reference.proto";
import "protoc-gen-swagger/options/annotations.proto";

option (grpc.gateway.protoc_gen_swagger.options.openapiv2_swagger) = {
Expand Down Expand Up @@ -136,6 +138,32 @@ message GetTemplateResponse {
string template = 1;
}

message GetPipelineVersionTemplateRequest{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intended to add API definition in a separate change?

We should use resource reference for flexible pipeline <-> version mapping
https://github.com/kubeflow/pipelines/blob/master/backend/api/resource_reference.proto

Please check experiment <-> job and experiment <-> run as examples to see how this is being used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Api PipelineVersion definition will use resource reference to point to Pipeline. Model/Storage PipelineVersion will use foreign key of pipeline id.

string version_id = 1;
}

message CreatePipelineVersionRequest {
// ResourceReference inside PipelineVersion specifies the pipeline that this
// version belongs to.
PipelineVersion version = 1;
}

message GetPipelineVersionRequest {
string version_id = 1;
}

message ListPipelineVersionsRequest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should consider adding pagination for list version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// ResourceKey specifies the pipeline whose versions are to be listed.
ResourceKey resource_key = 1;
int32 page_size = 2;
string page_token = 3;
}

message ListPipelineVersionsResponse {
repeated PipelineVersion versions = 1;
string next_page_token = 2;
}

message Pipeline {
// Output. Unique pipeline ID. Generated by API server.
string id = 1;
Expand All @@ -151,14 +179,53 @@ message Pipeline {
string description = 4;

// Output. The input parameters for this pipeline.
// TODO(jingzhang36): replace this parameters field with the parameters field
// inside PipelineVersion when all usage of the former has been changed to use
// the latter.
repeated Parameter parameters = 5;

// The URL to the source of the pipeline. This is required when creating the
// pipeine through CreatePipeline API.
// TODO(jingzhang36): replace this url field with the code_source_urls field
// inside PipelineVersion when all usage of the former has been changed to use
// the latter.
Url url = 7;

// In case any error happens retrieving a pipeline field, only pipeline ID
// and the error message is returned. Client has the flexibility of choosing
// how to handle error. This is especially useful during listing call.
string error = 6;

// Output only. The default version of the pipeline. As of now, the latest
// version is used as default. (In the future, if desired by customers, we
// can allow them to set default version.)
// TODO(jingzhang36): expose this in API pipeline definition with FE changes.
// PipelineVersion default_version = 8;
}

message PipelineVersion {
// Output. Unique version ID. Generated by API server.
string id = 1;

// Optional input field. Version name provided by user.
string name = 2;

// Output. The time this pipeline version is created.
google.protobuf.Timestamp created_at = 3;

// Output. The input parameters for this pipeline.
repeated Parameter parameters = 4;

// Input. Optional. Pipeline version code source.
string code_source_url = 5;

// Input. Required. Pipeline version package url.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Whe calling CreatePipelineVersion API method, need to provide one package
// file location.
Url package_url = 6;

// Input. Required. E.g., specify which pipeline this pipeline version belongs
// to.
repeated ResourceReference resource_references = 7;
}

1 change: 1 addition & 0 deletions backend/api/resource_reference.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ enum ResourceType {
UNKNOWN_RESOURCE_TYPE = 0;
EXPERIMENT = 1;
JOB = 2;
PIPELINE = 3;
}

enum Relationship {
Expand Down
1 change: 1 addition & 0 deletions backend/src/apiserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"@com_github_grpc_ecosystem_grpc_gateway//runtime:go_default_library",
"@com_github_jinzhu_gorm//:go_default_library",
"@com_github_jinzhu_gorm//dialects/sqlite:go_default_library",
"@com_github_masterminds_squirrel//:go_default_library",
"@com_github_minio_minio_go//:go_default_library",
"@com_github_spf13_viper//:go_default_library",
"@io_k8s_client_go//kubernetes/typed/core/v1:go_default_library",
Expand Down
56 changes: 56 additions & 0 deletions backend/src/apiserver/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,24 @@ func initDBClient(initConnectionTimeout time.Duration) *storage.DB {
db, err := gorm.Open(driverName, arg)
util.TerminateIfError(err)

// If pipeline_versions table is introduced into DB for the first time,
// it needs initialization or data backfill.
var tableNames []string
var initializePipelineVersions = true
db.Raw(`show tables`).Pluck("Tables_in_mlpipeline", &tableNames)
for _, tableName := range tableNames {
if tableName == "pipeline_versions" {
initializePipelineVersions = false
break
}
}

// Create table
response := db.AutoMigrate(
&model.Experiment{},
&model.Job{},
&model.Pipeline{},
&model.PipelineVersion{},
&model.ResourceReference{},
&model.RunDetail{},
&model.RunMetric{},
Expand All @@ -201,6 +214,18 @@ func initDBClient(initConnectionTimeout time.Duration) *storage.DB {
if response.Error != nil {
glog.Fatalf("Failed to create a foreign key for RunID in run_metrics table. Error: %s", response.Error)
}
response = db.Model(&model.PipelineVersion{}).
AddForeignKey("PipelineId", "pipelines(UUID)", "CASCADE" /* onDelete */, "CASCADE" /* update */)
if response.Error != nil {
glog.Fatalf("Failed to create a foreign key for PipelineId in pipeline_versions table. Error: %s", response.Error)
}

// Data backfill for pipeline_versions if this is the first time for
// pipeline_versions to enter mlpipeline DB.
if initializePipelineVersions {
initPipelineVersionsFromPipelines(db)
}

return storage.NewDB(db.DB(), storage.NewMySQLDialect())
}

Expand Down Expand Up @@ -288,3 +313,34 @@ func newClientManager() ClientManager {

return clientManager
}

// Data migration in 2 steps to introduce pipeline_versions table. This
// migration shall be called only once when pipeline_versions table is created
// for the first time in DB.
func initPipelineVersionsFromPipelines(db *gorm.DB) {
tx := db.Begin()

// Step 1: duplicate pipelines to pipeline versions.
// The pipeline versions created here are not through KFP pipeine version
// API, and are only for the legacy pipelines that are created
// before pipeline version API is introduced.
// For those legacy pipelines, who don't have versions before, we create one
// implicit version for each of them. Given a legacy pipeline, the implicit
// version created here is assigned an ID the same as the pipeline ID. This
// way we don't need to move the minio file of pipeline package around,
// since the minio file's path is based on the pipeline ID (and now on the
// implicit version ID too). Meanwhile, IDs are required to be unique inside
// the same resource type, so pipeline and pipeline version as two different
// resources useing the same ID is OK.
// On the other hand, pipeline and its pipeline versions created after
// pipeline version API is introduced will have different Ids; and the minio
// file will be put directly into the directories for pipeline versions.
tx.Exec(`INSERT INTO
pipeline_versions (UUID, Name, CreatedAtInSec, Parameters, Status, PipelineId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do u mind add a comment here briefly mention why use pipelineid for versionid

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

SELECT UUID, Name, CreatedAtInSec, Parameters, Status, UUID FROM pipelines;`)

// Step 2: modifiy pipelines table after pipeline_versions are populated.
tx.Exec("update pipelines set DefaultVersionId=UUID;")

tx.Commit()
}
29 changes: 22 additions & 7 deletions backend/src/apiserver/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type token struct {
KeyFieldValue interface{}
// IsDesc is true if the sorting order should be descending.
IsDesc bool
// ModelName is the table where ***FieldName belongs to.
ModelName string
// Filter represents the filtering that should be applied in the query.
Filter *filter.Filter
}
Expand Down Expand Up @@ -125,7 +127,9 @@ func NewOptions(listable Listable, pageSize int, sortBy string, filterProto *api
return nil, err
}

token := &token{KeyFieldName: listable.PrimaryKeyColumnName()}
token := &token{
KeyFieldName: listable.PrimaryKeyColumnName(),
ModelName: listable.GetModelName()}

// Ignore the case of the letter. Split query string by space.
queryList := strings.Fields(strings.ToLower(sortBy))
Expand Down Expand Up @@ -177,15 +181,23 @@ func (o *Options) AddPaginationToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBu
// containing these.
func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuilder {
// If next row's value is specified, set those values in the clause.
var modelNamePrefix string
if len(o.ModelName) == 0 {
modelNamePrefix = ""
} else {
modelNamePrefix = o.ModelName + "."
}
if o.SortByFieldValue != nil && o.KeyFieldValue != nil {
if o.IsDesc {
sqlBuilder = sqlBuilder.
Where(sq.Or{sq.Lt{o.SortByFieldName: o.SortByFieldValue},
sq.And{sq.Eq{o.SortByFieldName: o.SortByFieldValue}, sq.LtOrEq{o.KeyFieldName: o.KeyFieldValue}}})
Where(sq.Or{sq.Lt{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue},
sq.And{sq.Eq{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue},
sq.LtOrEq{modelNamePrefix + o.KeyFieldName: o.KeyFieldValue}}})
} else {
sqlBuilder = sqlBuilder.
Where(sq.Or{sq.Gt{o.SortByFieldName: o.SortByFieldValue},
sq.And{sq.Eq{o.SortByFieldName: o.SortByFieldValue}, sq.GtOrEq{o.KeyFieldName: o.KeyFieldValue}}})
Where(sq.Or{sq.Gt{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue},
sq.And{sq.Eq{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue},
sq.GtOrEq{modelNamePrefix + o.KeyFieldName: o.KeyFieldValue}}})
}
}

Expand All @@ -194,8 +206,8 @@ func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuild
order = "DESC"
}
sqlBuilder = sqlBuilder.
OrderBy(fmt.Sprintf("%v %v", o.SortByFieldName, order)).
OrderBy(fmt.Sprintf("%v %v", o.KeyFieldName, order))
OrderBy(fmt.Sprintf("%v %v", modelNamePrefix+o.SortByFieldName, order)).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might need to gofmt the code for extra space around +.
same for all other files changed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm didn't see the change. could you double check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ur, I manually inserted space...and on save, it goes back to this...let me check extensions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that gofmt is the one I used and it insists on remove the space around "+"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait. line 198, 199, 200 are actually properly formatted with space. Not sure why line 209 and line 210 can't...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that other lines are ok, these two lines might be for some other reasons formatted this way. I'd say let's leave it as it is for now.

OrderBy(fmt.Sprintf("%v %v", modelNamePrefix+o.KeyFieldName, order))

return sqlBuilder
}
Expand Down Expand Up @@ -258,6 +270,8 @@ type Listable interface {
// APIToModelFieldMap returns a map from field names in the API representation
// of the model to its corresponding field name in the model itself.
APIToModelFieldMap() map[string]string
// GetModelName returns table name used as sort field prefix.
GetModelName() string
}

// NextPageToken returns a string that can be used to fetch the subsequent set
Expand Down Expand Up @@ -292,6 +306,7 @@ func (o *Options) nextPageToken(listable Listable) (*token, error) {
KeyFieldValue: keyField.Interface(),
IsDesc: o.IsDesc,
Filter: o.Filter,
ModelName: o.ModelName,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions backend/src/apiserver/list/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func (f *fakeListable) APIToModelFieldMap() map[string]string {
return fakeAPIToModelMap
}

func (f *fakeListable) GetModelName() string {
return ""
}

func TestNextPageToken_ValidTokens(t *testing.T) {
l := &fakeListable{PrimaryKey: "uuid123", FakeName: "Fake", CreatedTimestamp: 1234}

Expand Down
22 changes: 21 additions & 1 deletion backend/src/apiserver/model/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
Expand All @@ -10,10 +10,30 @@ go_library(
"listable_model.go",
"pipeline.go",
"pipeline_spec.go",
"pipeline_version.go",
"resource_reference.go",
"run.go",
],
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/model",
visibility = ["//visibility:public"],
deps = ["//backend/src/apiserver/common:go_default_library"],
)

go_test(
name = "go_default_test",
srcs = [
"pipeline_version_test.go",
],
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/model",
visibility = ["//visibility:public"],
embed = [":go_default_library"],
deps = [
"//backend/api:go_default_library",
"//backend/src/apiserver/filter:go_default_library",
"//backend/src/apiserver/list:go_default_library",
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_google_go_cmp//cmp/cmpopts:go_default_library",
"@com_github_masterminds_squirrel//:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
],
)
5 changes: 5 additions & 0 deletions backend/src/apiserver/model/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,8 @@ var experimentAPIToModelFieldMap = map[string]string{
func (e *Experiment) APIToModelFieldMap() map[string]string {
return experimentAPIToModelFieldMap
}

// GetModelName returns table name used as sort field prefix
func (e *Experiment) GetModelName() string {
return "experiments"
}
5 changes: 5 additions & 0 deletions backend/src/apiserver/model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,8 @@ var jobAPIToModelFieldMap = map[string]string{
func (k *Job) APIToModelFieldMap() map[string]string {
return jobAPIToModelFieldMap
}

// GetModelName returns table name used as sort field prefix
func (j *Job) GetModelName() string {
return "jobs"
}
12 changes: 12 additions & 0 deletions backend/src/apiserver/model/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,14 @@ type Pipeline struct {
CreatedAtInSec int64 `gorm:"column:CreatedAtInSec; not null"`
Name string `gorm:"column:Name; not null; unique"`
Description string `gorm:"column:Description; not null"`
// TODO(jingzhang36): remove Parameters when no code is accessing this
// field. Should use PipelineVersion.Parameters instead.
/* Set size to 65535 so it will be stored as longtext. https://dev.mysql.com/doc/refman/8.0/en/column-count-limit.html */
Parameters string `gorm:"column:Parameters; not null; size:65535"`
Status PipelineStatus `gorm:"column:Status; not null"`
// Default version of this pipeline. It could be null.
DefaultVersionId string `gorm:"column:DefaultVersionId;"`
DefaultVersion *PipelineVersion `gorm:"-"`
}

func (p Pipeline) GetValueOfPrimaryKey() string {
Expand All @@ -59,10 +64,17 @@ var pipelineAPIToModelFieldMap = map[string]string{
"name": "Name",
"created_at": "CreatedAtInSec",
"description": "Description",
// TODO(jingzhang36): uncomment this field when we expose it to API
// "default_version_id": "DefaultVersionId",
}

// APIToModelFieldMap returns a map from API names to field names for model
// Pipeline.
func (p *Pipeline) APIToModelFieldMap() map[string]string {
return pipelineAPIToModelFieldMap
}

// GetModelName returns table name used as sort field prefix
func (p *Pipeline) GetModelName() string {
return "pipelines"
}
Loading