Skip to content

Commit

Permalink
fix(backend): Fix performance issue within a mysql request
Browse files Browse the repository at this point in the history
Reprace the existing mysql request that use nested select, with inner join for better performance.
The fix levarage 'SQLDialect' interface, because the new request is not supported by sqllite (used for testing)
This interface bridges the difference between mysql (production) and sqlite
// (test)
Issue: kubeflow#6845

Signed-off-by: diana <difince@gmail.com>
  • Loading branch information
difince committed Jun 23, 2023
1 parent 536d93a commit 567dc4d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 27 deletions.
19 changes: 19 additions & 0 deletions backend/src/apiserver/storage/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/VividCortex/mysqlerr"
"github.com/go-sql-driver/mysql"
sqlite3 "github.com/mattn/go-sqlite3"

"github.com/kubeflow/pipelines/backend/src/apiserver/model"
)

// DB a struct wrapping plain sql library with SQL dialect, to solve any feature
Expand Down Expand Up @@ -59,6 +61,9 @@ type SQLDialect interface {

// Inserts new rows and updates duplicates based on the key column.
Upsert(query string, key string, overwrite bool, columns ...string) string

// Updates a run_details based on join (production) or inner select (test).
UpdateRunDetailsStorageState(expId string) (string, []interface{})
}

// MySQLDialect implements SQLDialect with mysql dialect implementation.
Expand Down Expand Up @@ -88,6 +93,13 @@ func (d MySQLDialect) IsDuplicateError(err error) bool {
return ok && sqlError.Number == mysqlerr.ER_DUP_ENTRY
}

// TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available.
func (d MySQLDialect) UpdateRunDetailsStorageState(expId string) (string, []interface{}) {
var args []interface{}
args = append(args, model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType)
return "UPDATE run_details as runs JOIN resource_references as rf ON runs.UUID = rf.ResourceUUID SET StorageState = ? WHERE rf.ResourceType = ? AND rf.ReferenceUUID = ? AND rf.ReferenceType = ?", args
}

// SQLiteDialect implements SQLDialect with sqlite dialect implementation.
type SQLiteDialect struct{}

Expand Down Expand Up @@ -131,6 +143,13 @@ func (d SQLiteDialect) IsDuplicateError(err error) bool {
return ok && sqlError.Code == sqlite3.ErrConstraint
}

// TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available.
func (d SQLiteDialect) UpdateRunDetailsStorageState(expId string) (string, []interface{}) {
var args []interface{}
args = append(args, model.StorageStateArchived.ToString(), model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType, expId)
return "UPDATE run_details SET StorageState = ? WHERE StorageState <> ? AND UUID in (SELECT ResourceUUID FROM resource_references as rf WHERE (rf.ResourceType = ? AND rf.ReferenceUUID = ? AND rf.ReferenceType = ?)) OR ExperimentUUID = ?", args
}

func NewMySQLDialect() MySQLDialect {
return MySQLDialect{}
}
Expand Down
29 changes: 2 additions & 27 deletions backend/src/apiserver/storage/experiment_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,32 +309,6 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error {
"Failed to create query to archive experiment %s. error: '%v'", expId, err.Error())
}

// TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available.
// TODO(jingzhang36): use inner join to replace nested query for better performance.
filteredRunsSql, filteredRunsArgs, err := sq.Select("ResourceUUID").
From("resource_references as rf").
Where(sq.And{
sq.Eq{"rf.ResourceType": model.RunResourceType},
sq.Eq{"rf.ReferenceUUID": expId},
sq.Eq{"rf.ReferenceType": model.ExperimentResourceType},
}).ToSql()
if err != nil {
return util.NewInternalServerError(err,
"Failed to create query to filter the runs in an experiment %s. error: '%v'", expId, err.Error())
}
updateRunsSql, updateRunsArgs, err := sq.
Update("run_details").
SetMap(sq.Eq{
"StorageState": model.StorageStateArchived.ToString(),
}).
Where(sq.NotEq{"StorageState": model.StorageStateArchived.ToString()}).
Where(fmt.Sprintf("UUID in (%s) OR ExperimentUUID = '%s'", filteredRunsSql, expId), filteredRunsArgs...).
ToSql()
if err != nil {
return util.NewInternalServerError(err,
"Failed to create query to archive the runs in an experiment %s. error: '%v'", expId, err.Error())
}

updateRunsWithExperimentUUIDSql, updateRunsWithExperimentUUIDArgs, err := sq.
Update("run_details").
SetMap(sq.Eq{
Expand Down Expand Up @@ -388,7 +362,8 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error {
"Failed to archive experiment %s. error: '%v'", expId, err.Error())
}

_, err = tx.Exec(updateRunsSql, updateRunsArgs...)
query, arguments := s.db.UpdateRunDetailsStorageState(expId)
_, err = tx.Exec(query, arguments...)
if err != nil {
tx.Rollback()
return util.NewInternalServerError(err,
Expand Down

0 comments on commit 567dc4d

Please sign in to comment.