Skip to content

Commit

Permalink
Clear default exp table on delete and create default exp on run creat…
Browse files Browse the repository at this point in the history
…e if none exists (#1199)

* Clear default exp table on delete and create default exp on run create
if no default exists

With this change, if the delete experiment API is called on the default
experiment, then the ID will also be removed from the default_experiments
table.

Additionally, if the default experiment doesn't exist and a new run is
created without an experiment, a new default experiment will be created,
and the run will be placed within this experiment.

* Adds integration test for creating a run without an experiment

* Fixes failure to close database connection and adds tests for recreating and deleting default experiment

* Rename function

* Revert some row.Close() calls
  • Loading branch information
rileyjbauer authored and k8s-ci-robot committed Apr 29, 2019
1 parent c77554a commit d88ba38
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 64 deletions.
36 changes: 1 addition & 35 deletions backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/golang/glog"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/apiserver/server"
"google.golang.org/grpc"
Expand Down Expand Up @@ -61,7 +60,7 @@ func main() {
glog.Fatalf("Failed to load samples. Err: %v", err)
}

err = createDefaultExperiment(resourceManager)
_, err = resourceManager.CreateDefaultExperiment()
if err != nil {
glog.Fatalf("Failed to create default experiment. Err: %v", err)
}
Expand Down Expand Up @@ -135,39 +134,6 @@ func registerHttpHandlerFromEndpoint(handler RegisterHttpHandlerFromEndpoint, se
}
}

// Used to initialize the Experiment database with a default to be used for runs
func createDefaultExperiment(resourceManager *resource.ResourceManager) error {
// First check that we don't already have a default experiment ID in the DB.
defaultExperimentId, err := resourceManager.GetDefaultExperimentId()
if err != nil {
return fmt.Errorf("Failed to check if default experiment exists. Err: %v", err)
}
// If default experiment ID is already present, don't fail, simply return.
if defaultExperimentId != "" {
glog.Info("Default experiment already exists! ID: %v", defaultExperimentId)
return nil
}

// Create default experiment
defaultExperiment := &model.Experiment{
Name: "Default",
Description: "All runs created without specifying an experiment will be grouped here.",
}
experiment, err := resourceManager.CreateExperiment(defaultExperiment)
if err != nil {
return fmt.Errorf("Failed to create default experiment. Err: %v", err)
}

// Set default experiment ID in the DB
err = resourceManager.SetDefaultExperimentId(experiment.UUID)
if err != nil {
return fmt.Errorf("Failed to set default experiment ID. Err: %v", err)
}

glog.Info("Default experiment is set. ID is: %v", experiment.UUID)
return nil
}

// Preload a bunch of pipeline samples
// Samples are only loaded once when the pipeline system is initially installed.
// They won't be loaded when upgrade or pod restart, to prevent them reappear if user explicitly
Expand Down
41 changes: 39 additions & 2 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,40 @@ func (r *ResourceManager) getWorkflowSpecBytes(spec *api.PipelineSpec) ([]byte,
return nil, util.NewInvalidInputError("Please provide a valid pipeline spec")
}

// setDefaultExperimentIfNotPresent If the provided run does not include a reference to a containing
// Used to initialize the Experiment database with a default to be used for runs
func (r *ResourceManager) CreateDefaultExperiment() (string, error) {
// First check that we don't already have a default experiment ID in the DB.
defaultExperimentId, err := r.GetDefaultExperimentId()
if err != nil {
return "", fmt.Errorf("Failed to check if default experiment exists. Err: %v", err)
}
// If default experiment ID is already present, don't fail, simply return.
if defaultExperimentId != "" {
glog.Info("Default experiment already exists! ID: %v", defaultExperimentId)
return "", nil
}

// Create default experiment
defaultExperiment := &model.Experiment{
Name: "Default",
Description: "All runs created without specifying an experiment will be grouped here.",
}
experiment, err := r.CreateExperiment(defaultExperiment)
if err != nil {
return "", fmt.Errorf("Failed to create default experiment. Err: %v", err)
}

// Set default experiment ID in the DB
err = r.SetDefaultExperimentId(experiment.UUID)
if err != nil {
return "", fmt.Errorf("Failed to set default experiment ID. Err: %v", err)
}

glog.Info("Default experiment is set. ID is: %v", experiment.UUID)
return experiment.UUID, nil
}

// setExperimentIfNotPresent If the provided run does not include a reference to a containing
// experiment, then we fetch the default experiment's ID and create a reference to that.
func (r *ResourceManager) setExperimentIfNotPresent(apiRun *api.Run) error {
// First check if there is already a referenced experiment
Expand All @@ -522,7 +555,11 @@ func (r *ResourceManager) setExperimentIfNotPresent(apiRun *api.Run) error {
return util.NewInternalServerError(err, "Failed to retrieve default experiment")
}
if defaultExperimentId == "" {
return util.NewInternalServerError(errors.New("Default experiment ID was empty"), "Default experiment was not set")
glog.Info("No default experiment was found. Creating a new default experiment")
defaultExperimentId, err = r.CreateDefaultExperiment()
if defaultExperimentId == "" || err != nil {
return util.NewInternalServerError(err, "Failed to create new default experiment")
}
}
defaultExperimentRef := &api.ResourceReference{
Key: &api.ResourceKey{
Expand Down
93 changes: 93 additions & 0 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,36 @@ func TestCreateRun_ThroughWorkflowSpec(t *testing.T) {
assert.Equal(t, expectedRunDetail, runDetail, "CreateRun stored invalid data in database")
}

func TestCreateRun_NoExperiment(t *testing.T) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
manager := NewResourceManager(store)
apiRun := &api.Run{
Name: "No experiment",
PipelineSpec: &api.PipelineSpec{
WorkflowManifest: testWorkflow.ToStringForStore(),
Parameters: []*api.Parameter{
{Name: "param1", Value: "world"},
},
},
// No experiment
ResourceReferences: []*api.ResourceReference{},
}
runDetail, err := manager.CreateRun(apiRun)
assert.Nil(t, err)
expectedRunDetail := []*model.ResourceReference{{
ResourceUUID: "workflow1",
ResourceType: common.Run,
// Experiment is now set
ReferenceUUID: DefaultFakeUUID,
ReferenceType: common.Experiment,
Relationship: common.Owner,
}}
assert.Equal(t, expectedRunDetail, runDetail.Run.ResourceReferences, "The CreateRun return has unexpected value.")
runDetail, err = manager.GetRun(runDetail.UUID)
assert.Nil(t, err)
assert.Equal(t, expectedRunDetail, runDetail.Run.ResourceReferences, "CreateRun stored invalid data in database")
}

func TestCreateRun_EmptyPipelineSpec(t *testing.T) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()
Expand Down Expand Up @@ -448,6 +478,69 @@ func TestDeleteRun_DbFailure(t *testing.T) {
assert.Contains(t, err.Error(), "database is closed")
}

func TestDeleteExperiment(t *testing.T) {
store, manager, experiment := initWithExperiment(t)
defer store.Close()
err := manager.DeleteExperiment(experiment.UUID)
assert.Nil(t, err)

_, err = manager.GetExperiment(experiment.UUID)
assert.Equal(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode())
assert.Contains(t, err.Error(), "not found")
}

func TestDeleteExperiment_ClearsDefaultExperiment(t *testing.T) {
store, manager, experiment := initWithExperiment(t)
defer store.Close()
// Set default experiment ID. This is not normally done manually
err := manager.SetDefaultExperimentId(experiment.UUID)
assert.Nil(t, err)
// Verify that default experiment ID is set
defaultExperimentId, err := manager.GetDefaultExperimentId()
assert.Nil(t, err)
assert.Equal(t, experiment.UUID, defaultExperimentId)

err = manager.DeleteExperiment(experiment.UUID)
assert.Nil(t, err)

_, err = manager.GetExperiment(experiment.UUID)
assert.Equal(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode())
assert.Contains(t, err.Error(), "not found")

// Verify that default experiment ID has been cleared
defaultExperimentId, err = manager.GetDefaultExperimentId()
assert.Nil(t, err)
assert.Equal(t, "", defaultExperimentId)
}

func TestDeleteExperiment_ExperimentNotExist(t *testing.T) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()
manager := NewResourceManager(store)
err := manager.DeleteExperiment("1")
assert.Equal(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode())
assert.Contains(t, err.Error(), "not found")
}

func TestDeleteExperiment_CrdFailure(t *testing.T) {
store, manager, experiment := initWithExperiment(t)
defer store.Close()

manager.workflowClient = &FakeBadWorkflowClient{}
err := manager.DeleteExperiment(experiment.UUID)
assert.Nil(t, err)
}

func TestDeleteExperiment_DbFailure(t *testing.T) {
store, manager, experiment := initWithExperiment(t)
defer store.Close()

store.DB().Close()
err := manager.DeleteExperiment(experiment.UUID)
assert.Equal(t, codes.Internal, err.(*util.UserError).ExternalStatusCode())
assert.Contains(t, err.Error(), "database is closed")
}

func TestTerminateRun(t *testing.T) {
store, manager, runDetail := initWithOneTimeRun(t)
defer store.Close()
Expand Down
2 changes: 2 additions & 0 deletions backend/src/apiserver/storage/db_status_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (s *DBStatusStore) InitializeDBStatusTable() error {
tx.Rollback()
return util.NewInternalServerError(err, "Failed to load database status.")
}
defer rows.Close()

// The table is not initialized
if !rows.Next() {
Expand Down Expand Up @@ -83,6 +84,7 @@ func (s *DBStatusStore) HaveSamplesLoaded() (bool, error) {
if err != nil {
return false, util.NewInternalServerError(err, "Error when getting load sample status")
}
defer rows.Close()
if rows.Next() {
err = rows.Scan(&haveSamplesLoaded)
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions backend/src/apiserver/storage/default_experiment_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package storage

import (
"database/sql"

sq "github.com/Masterminds/squirrel"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/src/common/util"
Expand Down Expand Up @@ -46,6 +48,7 @@ func (s *DefaultExperimentStore) initializeDefaultExperimentTable() error {
tx.Rollback()
return util.NewInternalServerError(err, "Failed to get default experiment.")
}
defer rows.Close()

// If the table is not initialized, then set the default value.
if !rows.Next() {
Expand Down Expand Up @@ -77,6 +80,7 @@ func (s *DefaultExperimentStore) SetDefaultExperimentId(id string) error {
sql, args, err := sq.
Update("default_experiments").
SetMap(sq.Eq{"DefaultExperimentId": id}).
Where(sq.Eq{"DefaultExperimentId": ""}).
ToSql()
if err != nil {
return util.NewInternalServerError(err, "Error creating query to set default experiment ID.")
Expand All @@ -94,10 +98,13 @@ func (s *DefaultExperimentStore) GetDefaultExperimentId() (string, error) {
if err != nil {
return "", util.NewInternalServerError(err, "Error creating query to get default experiment ID.")
}

rows, err := s.db.Query(sql, args...)
if err != nil {
return "", util.NewInternalServerError(err, "Error when getting default experiment ID")
}
defer rows.Close()

if rows.Next() {
err = rows.Scan(&defaultExperimentId)
if err != nil {
Expand All @@ -108,6 +115,26 @@ func (s *DefaultExperimentStore) GetDefaultExperimentId() (string, error) {
return "", nil
}

// Sets the default experiment ID stored in the DB to the empty string. This needs to happen if the
// experiment is deleted via the normal delete experiment API so that the server knows to create a
// new default.
// This is always done alongside the deletion of the actual experiment itself, so a transaction is
// needed as input.
// Update is used instead of delete so that we don't need to first check that the experiment ID is
// there.
func (s *DefaultExperimentStore) UnsetDefaultExperimentIdIfIdMatches(tx *sql.Tx, id string) error {
sql, args, err := sq.
Update("default_experiments").
SetMap(sq.Eq{"DefaultExperimentId": ""}).
Where(sq.Eq{"DefaultExperimentId": id}).
ToSql()
_, err = tx.Exec(sql, args...)
if err != nil {
return util.NewInternalServerError(err, "Failed to clear default experiment with ID: %s", id)
}
return nil
}

// factory function for creating default experiment store
func NewDefaultExperimentStore(db *DB) *DefaultExperimentStore {
s := &DefaultExperimentStore{db: db}
Expand Down
35 changes: 31 additions & 4 deletions backend/src/apiserver/storage/default_experiment_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestInitializeDefaultExperimentTable(t *testing.T) {
// Default experiment ID is empty after table initialization
defaultExperimentId, err := defaultExperimentStore.GetDefaultExperimentId()
assert.Nil(t, err)
assert.Equal(t, defaultExperimentId, "")
assert.Equal(t, "", defaultExperimentId)

// Initializing the table with an invalid DB is an error
db.Close()
Expand All @@ -54,10 +54,13 @@ func TestGetAndSetDefaultExperimentId(t *testing.T) {
// Get the default experiment ID
defaultExperimentId, err := defaultExperimentStore.GetDefaultExperimentId()
assert.Nil(t, err)
assert.Equal(t, defaultExperimentId, "test-ID")
// Trying to set the default experiment ID again is an error
assert.Equal(t, "test-ID", defaultExperimentId)
// Trying to set the default experiment ID again is not an error, but the ID is not changed
err = defaultExperimentStore.SetDefaultExperimentId("a-different-ID")
assert.NotNil(t, err)
assert.Nil(t, err)
defaultExperimentId, err = defaultExperimentStore.GetDefaultExperimentId()
assert.Nil(t, err)
assert.Equal(t, "test-ID", defaultExperimentId)

// Setting or getting the default experiment ID with an invalid DB is an error
db.Close()
Expand All @@ -66,3 +69,27 @@ func TestGetAndSetDefaultExperimentId(t *testing.T) {
_, err = defaultExperimentStore.GetDefaultExperimentId()
assert.NotNil(t, err)
}

func TestUnsetDefaultExperimentIdIfIdMatches(t *testing.T) {
db := NewFakeDbOrFatal()
defaultExperimentStore := NewDefaultExperimentStore(db)

// Initialize for the first time
err := defaultExperimentStore.initializeDefaultExperimentTable()
assert.Nil(t, err)
// Set the default experiment ID
err = defaultExperimentStore.SetDefaultExperimentId("test-ID")
assert.Nil(t, err)
// Clear the default experiment ID. This requires a transaction.
tx, _ := db.Begin()
err = defaultExperimentStore.UnsetDefaultExperimentIdIfIdMatches(tx, "test-ID")
assert.Nil(t, err)
err = tx.Commit()
assert.Nil(t, err)
// Get the default experiment ID
defaultExperimentId, err := defaultExperimentStore.GetDefaultExperimentId()
assert.Nil(t, err)
assert.Equal(t, "", defaultExperimentId)

db.Close()
}
16 changes: 14 additions & 2 deletions backend/src/apiserver/storage/experiment_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ExperimentStore struct {
time util.TimeInterface
uuid util.UUIDGeneratorInterface
resourceReferenceStore *ResourceReferenceStore
defaultExperimentStore *DefaultExperimentStore
}

// Runs two SQL queries in a transaction to return a list of matching experiments, as well as their
Expand Down Expand Up @@ -178,7 +179,7 @@ func (s *ExperimentStore) DeleteExperiment(id string) error {
return util.NewInternalServerError(err,
"Failed to create query to delete experiment: %s", id)
}
// Use a transaction to make sure both experiment and its resource references are stored.
// Use a transaction to make sure both experiment and its resource references are deleted.
tx, err := s.db.Begin()
if err != nil {
return util.NewInternalServerError(err, "Failed to create a new transaction to delete experiment.")
Expand All @@ -188,6 +189,11 @@ func (s *ExperimentStore) DeleteExperiment(id string) error {
tx.Rollback()
return util.NewInternalServerError(err, "Failed to delete experiment %s from table", id)
}
err = s.defaultExperimentStore.UnsetDefaultExperimentIdIfIdMatches(tx, id)
if err != nil {
tx.Rollback()
return util.NewInternalServerError(err, "Failed to clear default experiment ID for experiment %v ", id)
}
err = s.resourceReferenceStore.DeleteResourceReferences(tx, id, common.Run)
if err != nil {
tx.Rollback()
Expand All @@ -202,5 +208,11 @@ func (s *ExperimentStore) DeleteExperiment(id string) error {

// factory function for experiment store
func NewExperimentStore(db *DB, time util.TimeInterface, uuid util.UUIDGeneratorInterface) *ExperimentStore {
return &ExperimentStore{db: db, time: time, uuid: uuid, resourceReferenceStore: NewResourceReferenceStore(db)}
return &ExperimentStore{
db: db,
time: time,
uuid: uuid,
resourceReferenceStore: NewResourceReferenceStore(db),
defaultExperimentStore: NewDefaultExperimentStore(db),
}
}
Loading

0 comments on commit d88ba38

Please sign in to comment.