From 567dc4dcb2db9efc51edc4dd8117fbf3b794a7ba Mon Sep 17 00:00:00 2001 From: diana Date: Fri, 23 Jun 2023 11:27:48 +0300 Subject: [PATCH] fix(backend): Fix performance issue within a mysql request Reprace the existing mysql request that use nested select, with inner join for better performance. The fix levarage 'SQLDialect' interface, because the new request is not supported by sqllite (used for testing) This interface bridges the difference between mysql (production) and sqlite // (test) Issue: https://github.com/kubeflow/pipelines/issues/6845 Signed-off-by: diana --- backend/src/apiserver/storage/db.go | 19 ++++++++++++ .../src/apiserver/storage/experiment_store.go | 29 ++----------------- 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/backend/src/apiserver/storage/db.go b/backend/src/apiserver/storage/db.go index d15d66c72dc0..467ddf3171ed 100644 --- a/backend/src/apiserver/storage/db.go +++ b/backend/src/apiserver/storage/db.go @@ -23,6 +23,8 @@ import ( "github.com/VividCortex/mysqlerr" "github.com/go-sql-driver/mysql" sqlite3 "github.com/mattn/go-sqlite3" + + "github.com/kubeflow/pipelines/backend/src/apiserver/model" ) // DB a struct wrapping plain sql library with SQL dialect, to solve any feature @@ -59,6 +61,9 @@ type SQLDialect interface { // Inserts new rows and updates duplicates based on the key column. Upsert(query string, key string, overwrite bool, columns ...string) string + + // Updates a run_details based on join (production) or inner select (test). + UpdateRunDetailsStorageState(expId string) (string, []interface{}) } // MySQLDialect implements SQLDialect with mysql dialect implementation. @@ -88,6 +93,13 @@ func (d MySQLDialect) IsDuplicateError(err error) bool { return ok && sqlError.Number == mysqlerr.ER_DUP_ENTRY } +// TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. +func (d MySQLDialect) UpdateRunDetailsStorageState(expId string) (string, []interface{}) { + var args []interface{} + args = append(args, model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType) + return "UPDATE run_details as runs JOIN resource_references as rf ON runs.UUID = rf.ResourceUUID SET StorageState = ? WHERE rf.ResourceType = ? AND rf.ReferenceUUID = ? AND rf.ReferenceType = ?", args +} + // SQLiteDialect implements SQLDialect with sqlite dialect implementation. type SQLiteDialect struct{} @@ -131,6 +143,13 @@ func (d SQLiteDialect) IsDuplicateError(err error) bool { return ok && sqlError.Code == sqlite3.ErrConstraint } +// TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. +func (d SQLiteDialect) UpdateRunDetailsStorageState(expId string) (string, []interface{}) { + var args []interface{} + args = append(args, model.StorageStateArchived.ToString(), model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType, expId) + return "UPDATE run_details SET StorageState = ? WHERE StorageState <> ? AND UUID in (SELECT ResourceUUID FROM resource_references as rf WHERE (rf.ResourceType = ? AND rf.ReferenceUUID = ? AND rf.ReferenceType = ?)) OR ExperimentUUID = ?", args +} + func NewMySQLDialect() MySQLDialect { return MySQLDialect{} } diff --git a/backend/src/apiserver/storage/experiment_store.go b/backend/src/apiserver/storage/experiment_store.go index d254537a28d2..49a00b56da58 100644 --- a/backend/src/apiserver/storage/experiment_store.go +++ b/backend/src/apiserver/storage/experiment_store.go @@ -309,32 +309,6 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error { "Failed to create query to archive experiment %s. error: '%v'", expId, err.Error()) } - // TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. - // TODO(jingzhang36): use inner join to replace nested query for better performance. - filteredRunsSql, filteredRunsArgs, err := sq.Select("ResourceUUID"). - From("resource_references as rf"). - Where(sq.And{ - sq.Eq{"rf.ResourceType": model.RunResourceType}, - sq.Eq{"rf.ReferenceUUID": expId}, - sq.Eq{"rf.ReferenceType": model.ExperimentResourceType}, - }).ToSql() - if err != nil { - return util.NewInternalServerError(err, - "Failed to create query to filter the runs in an experiment %s. error: '%v'", expId, err.Error()) - } - updateRunsSql, updateRunsArgs, err := sq. - Update("run_details"). - SetMap(sq.Eq{ - "StorageState": model.StorageStateArchived.ToString(), - }). - Where(sq.NotEq{"StorageState": model.StorageStateArchived.ToString()}). - Where(fmt.Sprintf("UUID in (%s) OR ExperimentUUID = '%s'", filteredRunsSql, expId), filteredRunsArgs...). - ToSql() - if err != nil { - return util.NewInternalServerError(err, - "Failed to create query to archive the runs in an experiment %s. error: '%v'", expId, err.Error()) - } - updateRunsWithExperimentUUIDSql, updateRunsWithExperimentUUIDArgs, err := sq. Update("run_details"). SetMap(sq.Eq{ @@ -388,7 +362,8 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error { "Failed to archive experiment %s. error: '%v'", expId, err.Error()) } - _, err = tx.Exec(updateRunsSql, updateRunsArgs...) + query, arguments := s.db.UpdateRunDetailsStorageState(expId) + _, err = tx.Exec(query, arguments...) if err != nil { tx.Rollback() return util.NewInternalServerError(err,