-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add necessary data types to api and database to support pipeline version. #1873
Changes from all commits
bd122ec
dfe6588
14cd856
e980617
586f5ce
8205623
30d9616
bc47bed
fc0c1a8
b835319
0e17a3d
899d42b
509a8db
fc60a57
3d47da3
fea90d4
4db4be7
69539e0
d28e656
5bc2086
840a7bc
90322b9
e32752d
40c9bac
f55e230
bf94cb9
b3165f9
83b9bc9
058199d
569a506
6141be3
a3fe5ed
1171e09
b95eed2
a004d96
05a7601
cd2d678
7f90853
3ea7beb
0198ab0
4a29213
5e69126
0b5eb15
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,8 +20,10 @@ package api; | |
import "google/api/annotations.proto"; | ||
import "google/protobuf/timestamp.proto"; | ||
import "google/protobuf/empty.proto"; | ||
import "backend/api/parameter.proto"; | ||
import "backend/api/error.proto"; | ||
import "backend/api/parameter.proto"; | ||
import "backend/api/pipeline_spec.proto"; | ||
import "backend/api/resource_reference.proto"; | ||
import "protoc-gen-swagger/options/annotations.proto"; | ||
|
||
option (grpc.gateway.protoc_gen_swagger.options.openapiv2_swagger) = { | ||
|
@@ -136,6 +138,32 @@ message GetTemplateResponse { | |
string template = 1; | ||
} | ||
|
||
message GetPipelineVersionTemplateRequest{ | ||
string version_id = 1; | ||
} | ||
|
||
message CreatePipelineVersionRequest { | ||
// ResourceReference inside PipelineVersion specifies the pipeline that this | ||
// version belongs to. | ||
PipelineVersion version = 1; | ||
} | ||
|
||
message GetPipelineVersionRequest { | ||
string version_id = 1; | ||
} | ||
|
||
message ListPipelineVersionsRequest { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should consider adding pagination for list version. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
// ResourceKey specifies the pipeline whose versions are to be listed. | ||
ResourceKey resource_key = 1; | ||
int32 page_size = 2; | ||
string page_token = 3; | ||
} | ||
|
||
message ListPipelineVersionsResponse { | ||
repeated PipelineVersion versions = 1; | ||
string next_page_token = 2; | ||
} | ||
|
||
message Pipeline { | ||
// Output. Unique pipeline ID. Generated by API server. | ||
string id = 1; | ||
|
@@ -151,14 +179,53 @@ message Pipeline { | |
string description = 4; | ||
|
||
// Output. The input parameters for this pipeline. | ||
// TODO(jingzhang36): replace this parameters field with the parameters field | ||
// inside PipelineVersion when all usage of the former has been changed to use | ||
// the latter. | ||
repeated Parameter parameters = 5; | ||
|
||
// The URL to the source of the pipeline. This is required when creating the | ||
// pipeine through CreatePipeline API. | ||
// TODO(jingzhang36): replace this url field with the code_source_urls field | ||
// inside PipelineVersion when all usage of the former has been changed to use | ||
// the latter. | ||
Url url = 7; | ||
|
||
// In case any error happens retrieving a pipeline field, only pipeline ID | ||
// and the error message is returned. Client has the flexibility of choosing | ||
// how to handle error. This is especially useful during listing call. | ||
string error = 6; | ||
|
||
// Output only. The default version of the pipeline. As of now, the latest | ||
// version is used as default. (In the future, if desired by customers, we | ||
// can allow them to set default version.) | ||
// TODO(jingzhang36): expose this in API pipeline definition with FE changes. | ||
// PipelineVersion default_version = 8; | ||
} | ||
|
||
message PipelineVersion { | ||
// Output. Unique version ID. Generated by API server. | ||
string id = 1; | ||
|
||
// Optional input field. Version name provided by user. | ||
string name = 2; | ||
|
||
// Output. The time this pipeline version is created. | ||
google.protobuf.Timestamp created_at = 3; | ||
|
||
// Output. The input parameters for this pipeline. | ||
repeated Parameter parameters = 4; | ||
|
||
// Input. Optional. Pipeline version code source. | ||
string code_source_url = 5; | ||
|
||
// Input. Required. Pipeline version package url. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should be optional. Same as the Url in here |
||
// Whe calling CreatePipelineVersion API method, need to provide one package | ||
// file location. | ||
Url package_url = 6; | ||
|
||
// Input. Required. E.g., specify which pipeline this pipeline version belongs | ||
// to. | ||
repeated ResourceReference resource_references = 7; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -176,11 +176,24 @@ func initDBClient(initConnectionTimeout time.Duration) *storage.DB { | |
db, err := gorm.Open(driverName, arg) | ||
util.TerminateIfError(err) | ||
|
||
// If pipeline_versions table is introduced into DB for the first time, | ||
// it needs initialization or data backfill. | ||
var tableNames []string | ||
var initializePipelineVersions = true | ||
db.Raw(`show tables`).Pluck("Tables_in_mlpipeline", &tableNames) | ||
for _, tableName := range tableNames { | ||
if tableName == "pipeline_versions" { | ||
initializePipelineVersions = false | ||
break | ||
} | ||
} | ||
|
||
// Create table | ||
response := db.AutoMigrate( | ||
&model.Experiment{}, | ||
&model.Job{}, | ||
&model.Pipeline{}, | ||
&model.PipelineVersion{}, | ||
&model.ResourceReference{}, | ||
&model.RunDetail{}, | ||
&model.RunMetric{}, | ||
|
@@ -201,6 +214,18 @@ func initDBClient(initConnectionTimeout time.Duration) *storage.DB { | |
if response.Error != nil { | ||
glog.Fatalf("Failed to create a foreign key for RunID in run_metrics table. Error: %s", response.Error) | ||
} | ||
response = db.Model(&model.PipelineVersion{}). | ||
AddForeignKey("PipelineId", "pipelines(UUID)", "CASCADE" /* onDelete */, "CASCADE" /* update */) | ||
if response.Error != nil { | ||
glog.Fatalf("Failed to create a foreign key for PipelineId in pipeline_versions table. Error: %s", response.Error) | ||
} | ||
|
||
// Data backfill for pipeline_versions if this is the first time for | ||
// pipeline_versions to enter mlpipeline DB. | ||
if initializePipelineVersions { | ||
initPipelineVersionsFromPipelines(db) | ||
} | ||
|
||
return storage.NewDB(db.DB(), storage.NewMySQLDialect()) | ||
} | ||
|
||
|
@@ -288,3 +313,34 @@ func newClientManager() ClientManager { | |
|
||
return clientManager | ||
} | ||
|
||
// Data migration in 2 steps to introduce pipeline_versions table. This | ||
// migration shall be called only once when pipeline_versions table is created | ||
// for the first time in DB. | ||
func initPipelineVersionsFromPipelines(db *gorm.DB) { | ||
tx := db.Begin() | ||
|
||
// Step 1: duplicate pipelines to pipeline versions. | ||
// The pipeline versions created here are not through KFP pipeine version | ||
// API, and are only for the legacy pipelines that are created | ||
// before pipeline version API is introduced. | ||
// For those legacy pipelines, who don't have versions before, we create one | ||
// implicit version for each of them. Given a legacy pipeline, the implicit | ||
// version created here is assigned an ID the same as the pipeline ID. This | ||
// way we don't need to move the minio file of pipeline package around, | ||
// since the minio file's path is based on the pipeline ID (and now on the | ||
// implicit version ID too). Meanwhile, IDs are required to be unique inside | ||
// the same resource type, so pipeline and pipeline version as two different | ||
// resources useing the same ID is OK. | ||
// On the other hand, pipeline and its pipeline versions created after | ||
// pipeline version API is introduced will have different Ids; and the minio | ||
// file will be put directly into the directories for pipeline versions. | ||
tx.Exec(`INSERT INTO | ||
pipeline_versions (UUID, Name, CreatedAtInSec, Parameters, Status, PipelineId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do u mind add a comment here briefly mention why use pipelineid for versionid There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
SELECT UUID, Name, CreatedAtInSec, Parameters, Status, UUID FROM pipelines;`) | ||
|
||
// Step 2: modifiy pipelines table after pipeline_versions are populated. | ||
tx.Exec("update pipelines set DefaultVersionId=UUID;") | ||
|
||
tx.Commit() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,8 @@ type token struct { | |
KeyFieldValue interface{} | ||
// IsDesc is true if the sorting order should be descending. | ||
IsDesc bool | ||
// ModelName is the table where ***FieldName belongs to. | ||
ModelName string | ||
// Filter represents the filtering that should be applied in the query. | ||
Filter *filter.Filter | ||
} | ||
|
@@ -125,7 +127,9 @@ func NewOptions(listable Listable, pageSize int, sortBy string, filterProto *api | |
return nil, err | ||
} | ||
|
||
token := &token{KeyFieldName: listable.PrimaryKeyColumnName()} | ||
token := &token{ | ||
KeyFieldName: listable.PrimaryKeyColumnName(), | ||
ModelName: listable.GetModelName()} | ||
|
||
// Ignore the case of the letter. Split query string by space. | ||
queryList := strings.Fields(strings.ToLower(sortBy)) | ||
|
@@ -177,15 +181,23 @@ func (o *Options) AddPaginationToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBu | |
// containing these. | ||
func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuilder { | ||
// If next row's value is specified, set those values in the clause. | ||
var modelNamePrefix string | ||
if len(o.ModelName) == 0 { | ||
modelNamePrefix = "" | ||
} else { | ||
modelNamePrefix = o.ModelName + "." | ||
} | ||
if o.SortByFieldValue != nil && o.KeyFieldValue != nil { | ||
if o.IsDesc { | ||
sqlBuilder = sqlBuilder. | ||
Where(sq.Or{sq.Lt{o.SortByFieldName: o.SortByFieldValue}, | ||
sq.And{sq.Eq{o.SortByFieldName: o.SortByFieldValue}, sq.LtOrEq{o.KeyFieldName: o.KeyFieldValue}}}) | ||
Where(sq.Or{sq.Lt{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue}, | ||
sq.And{sq.Eq{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue}, | ||
sq.LtOrEq{modelNamePrefix + o.KeyFieldName: o.KeyFieldValue}}}) | ||
} else { | ||
sqlBuilder = sqlBuilder. | ||
Where(sq.Or{sq.Gt{o.SortByFieldName: o.SortByFieldValue}, | ||
sq.And{sq.Eq{o.SortByFieldName: o.SortByFieldValue}, sq.GtOrEq{o.KeyFieldName: o.KeyFieldValue}}}) | ||
Where(sq.Or{sq.Gt{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue}, | ||
sq.And{sq.Eq{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue}, | ||
sq.GtOrEq{modelNamePrefix + o.KeyFieldName: o.KeyFieldValue}}}) | ||
} | ||
} | ||
|
||
|
@@ -194,8 +206,8 @@ func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuild | |
order = "DESC" | ||
} | ||
sqlBuilder = sqlBuilder. | ||
OrderBy(fmt.Sprintf("%v %v", o.SortByFieldName, order)). | ||
OrderBy(fmt.Sprintf("%v %v", o.KeyFieldName, order)) | ||
OrderBy(fmt.Sprintf("%v %v", modelNamePrefix+o.SortByFieldName, order)). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you might need to gofmt the code for extra space around +. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. umm didn't see the change. could you double check? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ur, I manually inserted space...and on save, it goes back to this...let me check extensions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems that gofmt is the one I used and it insists on remove the space around "+" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wait. line 198, 199, 200 are actually properly formatted with space. Not sure why line 209 and line 210 can't... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. given that other lines are ok, these two lines might be for some other reasons formatted this way. I'd say let's leave it as it is for now. |
||
OrderBy(fmt.Sprintf("%v %v", modelNamePrefix+o.KeyFieldName, order)) | ||
|
||
return sqlBuilder | ||
} | ||
|
@@ -258,6 +270,8 @@ type Listable interface { | |
// APIToModelFieldMap returns a map from field names in the API representation | ||
// of the model to its corresponding field name in the model itself. | ||
APIToModelFieldMap() map[string]string | ||
// GetModelName returns table name used as sort field prefix. | ||
GetModelName() string | ||
} | ||
|
||
// NextPageToken returns a string that can be used to fetch the subsequent set | ||
|
@@ -292,6 +306,7 @@ func (o *Options) nextPageToken(listable Listable) (*token, error) { | |
KeyFieldValue: keyField.Interface(), | ||
IsDesc: o.IsDesc, | ||
Filter: o.Filter, | ||
ModelName: o.ModelName, | ||
}, nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intended to add API definition in a separate change?
We should use resource reference for flexible pipeline <-> version mapping
https://github.com/kubeflow/pipelines/blob/master/backend/api/resource_reference.proto
Please check experiment <-> job and experiment <-> run as examples to see how this is being used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Api PipelineVersion definition will use resource reference to point to Pipeline. Model/Storage PipelineVersion will use foreign key of pipeline id.