Skip to content

Commit

Permalink
Include experiment ID as part of run table (kubeflow#2929)
Browse files Browse the repository at this point in the history
* Include and index experiment ID as part of run table

* * Add auto backfill

* remove unnecessary binary file

* attempt to fix bazel build

* address comments

* update new index on both ExperimentUUID + CreatedAtInSec instead of only ExperimentUUID

* add more make commands

* update make commands

* revert some changes for online build
  • Loading branch information
frozeNinK authored and Jeffwan committed Dec 9, 2020
1 parent c6fabc4 commit be9a466
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 12 deletions.
39 changes: 37 additions & 2 deletions backend/src/apiserver/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import (

"github.com/cenkalti/backoff"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"

"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/sqlite"
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
Expand Down Expand Up @@ -216,6 +215,11 @@ func initDBClient(initConnectionTimeout time.Duration) *storage.DB {
glog.Fatalf("Failed to update the resource reference payload type. Error: %s", response.Error)
}

response = db.Model(&model.RunDetail{}).AddIndex("experimentuuid_createatinsec", "ExperimentUUID", "CreatedAtInSec")
if response.Error != nil {
glog.Fatalf("Failed to create index experimentuuid_createatinsec on run_details. Error: %s", response.Error)
}

response = db.Model(&model.RunMetric{}).
AddForeignKey("RunUUID", "run_details(UUID)", "CASCADE" /* onDelete */, "CASCADE" /* update */)
if response.Error != nil {
Expand All @@ -232,6 +236,10 @@ func initDBClient(initConnectionTimeout time.Duration) *storage.DB {
if initializePipelineVersions {
initPipelineVersionsFromPipelines(db)
}
err = backfillExperimentIDToRunTable(db)
if err != nil {
glog.Fatalf("Failed to backfill experiment UUID in run_details table: %s", err)
}

response = db.Model(&model.Pipeline{}).ModifyColumn("Description", "longtext not null")
if response.Error != nil {
Expand Down Expand Up @@ -374,3 +382,30 @@ func initPipelineVersionsFromPipelines(db *gorm.DB) {

tx.Commit()
}

func backfillExperimentIDToRunTable(db *gorm.DB) (retError error) {
// check if there is any row in the run table has experiment ID being empty
rows, err := db.CommonDB().Query(`SELECT ExperimentUUID FROM run_details WHERE ExperimentUUID = '' LIMIT 1`)
if err != nil {
return err
}
defer rows.Close()

// no row in run_details table has empty ExperimentUUID
if !rows.Next() {
return nil
}

_, err = db.CommonDB().Exec(`
UPDATE
run_details, resource_references
SET
run_details.ExperimentUUID = resource_references.ReferenceUUID
WHERE
run_details.UUID = resource_references.ResourceUUID
AND resource_references.ResourceType = 'Run'
AND resource_references.ReferenceType = 'Experiment'
AND run_details.ExperimentUUID = ''
`)
return err
}
18 changes: 18 additions & 0 deletions backend/src/apiserver/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,24 @@ func FilterOnResourceReference(tableName string, columns []string, resourceType
return selectBuilder, nil
}

// FilterOnExperiment filters the given table by rows based on provided experiment ID,
// and returns the rebuilt SelectBuilder
func FilterRunOnExperiment(
tableName string,
columns []string,
selectCount bool,
experimentID string,
) (sq.SelectBuilder, error) {
selectBuilder := sq.Select(columns...)
if selectCount {
selectBuilder = sq.Select("count(*)")
}
selectBuilder = selectBuilder.From(tableName).Where(
sq.Eq{"ExperimentUUID": experimentID},
)
return selectBuilder, nil
}

// Scans the one given row into a number, and returns the number
func ScanRowToTotalSize(rows *sql.Rows) (int, error) {
var total_size int
Expand Down
1 change: 1 addition & 0 deletions backend/src/apiserver/model/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package model

type Run struct {
UUID string `gorm:"column:UUID; not null; primary_key"`
ExperimentUUID string `gorm:"column:ExperimentUUID; not null;"`
DisplayName string `gorm:"column:DisplayName; not null;"` /* The name that user provides. Can contain special characters*/
Name string `gorm:"column:Name; not null;"` /* The name of the K8s resource. Follow regex '[a-z0-9]([-a-z0-9]*[a-z0-9])?'*/
StorageState string `gorm:"column:StorageState; not null;"`
Expand Down
23 changes: 22 additions & 1 deletion backend/src/apiserver/resource/model_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,15 @@ func (r *ResourceManager) ToModelRunDetail(run *api.Run, runId string, workflow
}
}

experimentUUID, err := r.getOwningExperimentUUID(run.ResourceReferences)
if err != nil {
return nil, util.Wrap(err, "Error getting the experiment UUID")
}

return &model.RunDetail{
Run: model.Run{
UUID: runId,
ExperimentUUID: experimentUUID,
DisplayName: run.Name,
Name: workflow.Name,
Namespace: workflow.Namespace,
Expand Down Expand Up @@ -199,7 +205,7 @@ func (r *ResourceManager) toModelResourceReferences(
if err != nil {
return nil, util.Wrap(err, "Failed to find the referred resource")
}

//TODO(gaoning777) further investigation: Is the plain namespace a good option? maybe uuid for distinctness even with namespace deletion/recreation.
modelRef := &model.ResourceReference{
ResourceUUID: resourceId,
Expand Down Expand Up @@ -252,3 +258,18 @@ func (r *ResourceManager) getResourceName(resourceType common.ResourceType, reso
return "", util.NewInvalidInputError("Unsupported resource type: %s", string(resourceType))
}
}

func (r *ResourceManager) getOwningExperimentUUID(references []*api.ResourceReference) (string, error) {
var experimentUUID string
for _, ref := range references {
if ref.Key.Type == api.ResourceType_EXPERIMENT && ref.Relationship == api.Relationship_OWNER {
experimentUUID = ref.Key.Id
break
}
}

if experimentUUID == "" {
return "", util.NewInternalServerError(nil, "Missing owning experiment UUID")
}
return experimentUUID, nil
}
11 changes: 6 additions & 5 deletions backend/src/apiserver/resource/model_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ func TestToModelRunDetail(t *testing.T) {

expectedModelRunDetail := &model.RunDetail{
Run: model.Run{
UUID: "123",
DisplayName: "name1",
Name: "workflow-name",
Conditions: "running",
Description: "this is a run",
UUID: "123",
ExperimentUUID: experiment.UUID,
DisplayName: "name1",
Name: "workflow-name",
Conditions: "running",
Description: "this is a run",
PipelineSpec: model.PipelineSpec{
WorkflowSpecManifest: "workflow spec",
Parameters: `[{"name":"param2","value":"world"}]`,
Expand Down
6 changes: 6 additions & 0 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func TestCreateRun_ThroughPipelineID(t *testing.T) {
expectedRunDetail := &model.RunDetail{
Run: model.Run{
UUID: "123e4567-e89b-12d3-a456-426655440000",
ExperimentUUID: experiment.UUID,
DisplayName: "run1",
Name: "workflow-name",
StorageState: api.Run_STORAGESTATE_AVAILABLE.String(),
Expand Down Expand Up @@ -338,6 +339,7 @@ func TestCreateRun_ThroughPipelineID(t *testing.T) {

func TestCreateRun_ThroughWorkflowSpec(t *testing.T) {
store, manager, runDetail := initWithOneTimeRun(t)
expectedExperimentUUID := runDetail.ExperimentUUID
expectedRuntimeWorkflow := testWorkflow.DeepCopy()
expectedRuntimeWorkflow.Spec.Arguments.Parameters = []v1alpha1.Parameter{
{Name: "param1", Value: util.StringPointer("world")}}
Expand All @@ -347,6 +349,7 @@ func TestCreateRun_ThroughWorkflowSpec(t *testing.T) {
expectedRunDetail := &model.RunDetail{
Run: model.Run{
UUID: "123e4567-e89b-12d3-a456-426655440000",
ExperimentUUID: expectedExperimentUUID,
DisplayName: "run1",
Name: "workflow-name",
StorageState: api.Run_STORAGESTATE_AVAILABLE.String(),
Expand Down Expand Up @@ -430,6 +433,7 @@ func TestCreateRun_ThroughPipelineVersion(t *testing.T) {
expectedRunDetail := &model.RunDetail{
Run: model.Run{
UUID: "123e4567-e89b-12d3-a456-426655440000",
ExperimentUUID: experiment.UUID,
DisplayName: "run1",
Name: "workflow-name",
StorageState: api.Run_STORAGESTATE_AVAILABLE.String(),
Expand Down Expand Up @@ -1104,6 +1108,7 @@ func TestDeleteJob_DbFailure(t *testing.T) {

func TestReportWorkflowResource_ScheduledWorkflowIDEmpty_Success(t *testing.T) {
store, manager, run := initWithOneTimeRun(t)
expectedExperimentUUID := run.ExperimentUUID
defer store.Close()
// report workflow
workflow := util.NewWorkflow(&v1alpha1.Workflow{
Expand All @@ -1119,6 +1124,7 @@ func TestReportWorkflowResource_ScheduledWorkflowIDEmpty_Success(t *testing.T) {
assert.Nil(t, err)
expectedRun := model.Run{
UUID: "123e4567-e89b-12d3-a456-426655440000",
ExperimentUUID: expectedExperimentUUID,
DisplayName: "run1",
Name: "workflow-name",
StorageState: api.Run_STORAGESTATE_AVAILABLE.String(),
Expand Down
24 changes: 20 additions & 4 deletions backend/src/apiserver/storage/run_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package storage
import (
"database/sql"
"fmt"

"github.com/pkg/errors"

sq "github.com/Masterminds/squirrel"
Expand All @@ -31,7 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/json"
)

var runColumns = []string{"UUID", "DisplayName", "Name", "StorageState", "Namespace", "Description",
var runColumns = []string{"UUID", "ExperimentUUID", "DisplayName", "Name", "StorageState", "Namespace", "Description",
"CreatedAtInSec", "ScheduledAtInSec", "FinishedAtInSec", "Conditions", "PipelineId", "PipelineName", "PipelineSpecManifest",
"WorkflowSpecManifest", "Parameters", "pipelineRuntimeManifest", "WorkflowRuntimeManifest",
}
Expand Down Expand Up @@ -143,8 +144,20 @@ func (s *RunStore) ListRuns(

func (s *RunStore) buildSelectRunsQuery(selectCount bool, opts *list.Options,
filterContext *common.FilterContext) (string, []interface{}, error) {
filteredSelectBuilder, err := list.FilterOnResourceReference("run_details", runColumns,
common.Run, selectCount, filterContext)

var filteredSelectBuilder sq.SelectBuilder
var err error

refKey := filterContext.ReferenceKey
if refKey != nil && refKey.Type == "ExperimentUUID" {
// for performance reasons need to special treat experiment ID filter on runs
// currently only the run table have experiment UUID column
filteredSelectBuilder, err = list.FilterRunOnExperiment("run_details", runColumns,
selectCount, refKey.ID)
} else {
filteredSelectBuilder, err = list.FilterOnResourceReference("run_details", runColumns,
common.Run, selectCount, filterContext)
}
if err != nil {
return "", nil, util.NewInternalServerError(err, "Failed to list runs: %v", err)
}
Expand Down Expand Up @@ -217,12 +230,13 @@ func (s *RunStore) addMetricsAndResourceReferences(filteredSelectBuilder sq.Sele
func (s *RunStore) scanRowsToRunDetails(rows *sql.Rows) ([]*model.RunDetail, error) {
var runs []*model.RunDetail
for rows.Next() {
var uuid, displayName, name, storageState, namespace, description, pipelineId, pipelineName, pipelineSpecManifest,
var uuid, experimentUUID, displayName, name, storageState, namespace, description, pipelineId, pipelineName, pipelineSpecManifest,
workflowSpecManifest, parameters, conditions, pipelineRuntimeManifest, workflowRuntimeManifest string
var createdAtInSec, scheduledAtInSec, finishedAtInSec int64
var metricsInString, resourceReferencesInString sql.NullString
err := rows.Scan(
&uuid,
&experimentUUID,
&displayName,
&name,
&storageState,
Expand Down Expand Up @@ -260,6 +274,7 @@ func (s *RunStore) scanRowsToRunDetails(rows *sql.Rows) ([]*model.RunDetail, err
}
runs = append(runs, &model.RunDetail{Run: model.Run{
UUID: uuid,
ExperimentUUID: experimentUUID,
DisplayName: displayName,
Name: name,
StorageState: storageState,
Expand Down Expand Up @@ -320,6 +335,7 @@ func (s *RunStore) CreateRun(r *model.RunDetail) (*model.RunDetail, error) {
Insert("run_details").
SetMap(sq.Eq{
"UUID": r.UUID,
"ExperimentUUID": r.ExperimentUUID,
"DisplayName": r.DisplayName,
"Name": r.Name,
"StorageState": r.StorageState,
Expand Down

0 comments on commit be9a466

Please sign in to comment.