diff --git a/backend/src/apiserver/common/config.go b/backend/src/apiserver/common/config.go index c79d24d9e59..38018c03298 100644 --- a/backend/src/apiserver/common/config.go +++ b/backend/src/apiserver/common/config.go @@ -30,8 +30,13 @@ const ( DefaultPipelineRunnerServiceAccount string = "DefaultPipelineRunnerServiceAccount" KubeflowUserIDHeader string = "KUBEFLOW_USERID_HEADER" KubeflowUserIDPrefix string = "KUBEFLOW_USERID_PREFIX" + UpdatePipelineVersionByDefault string = "AUTO_UPDATE_PIPELINE_DEFAULT_VERSION" ) +func IsPipelineVersionUpdatedByDefault() bool { + return GetBoolConfigWithDefault(UpdatePipelineVersionByDefault, true) +} + func GetStringConfig(configName string) string { if !viper.IsSet(configName) { glog.Fatalf("Please specify flag %s", configName) diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index ec3d0311c13..ccd8cc814ab 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -1019,7 +1019,7 @@ func (r *ResourceManager) getDefaultSA() string { return common.GetStringConfigWithDefault(common.DefaultPipelineRunnerServiceAccount, defaultPipelineRunnerServiceAccount) } -func (r *ResourceManager) CreatePipelineVersion(apiVersion *api.PipelineVersion, pipelineFile []byte) (*model.PipelineVersion, error) { +func (r *ResourceManager) CreatePipelineVersion(apiVersion *api.PipelineVersion, pipelineFile []byte, updateDefaultVersion bool) (*model.PipelineVersion, error) { // Extract the parameters from the pipeline params, err := util.GetParameters(pipelineFile) if err != nil { @@ -1045,7 +1045,7 @@ func (r *ResourceManager) CreatePipelineVersion(apiVersion *api.PipelineVersion, Parameters: params, CodeSourceUrl: apiVersion.CodeSourceUrl, } - version, err = r.pipelineStore.CreatePipelineVersion(version) + version, err = r.pipelineStore.CreatePipelineVersion(version, updateDefaultVersion) if err != nil { return nil, util.Wrap(err, "Create pipeline version failed") } diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index 9f7fb78b53d..7d364aa622e 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -326,7 +326,7 @@ func TestCreateRun_ThroughPipelineID(t *testing.T) { Relationship: api.Relationship_OWNER, }, }, - }, []byte(testWorkflow.ToStringForStore())) + }, []byte(testWorkflow.ToStringForStore()), true) assert.Nil(t, err) // The pipeline specified via pipeline id will be converted to this @@ -516,7 +516,7 @@ func TestCreateRun_ThroughPipelineVersion(t *testing.T) { Relationship: api.Relationship_OWNER, }, }, - }, []byte(testWorkflow.ToStringForStore())) + }, []byte(testWorkflow.ToStringForStore()), true) assert.Nil(t, err) apiRun := &api.Run{ @@ -990,7 +990,7 @@ func TestCreateJob_ThroughPipelineID(t *testing.T) { Relationship: api.Relationship_OWNER, }, }, - }, []byte(testWorkflow.ToStringForStore())) + }, []byte(testWorkflow.ToStringForStore()), true) assert.Nil(t, err) // The pipeline specified via pipeline id will be converted to this @@ -1053,7 +1053,7 @@ func TestCreateJob_ThroughPipelineVersion(t *testing.T) { Relationship: api.Relationship_OWNER, }, }, - }, []byte(testWorkflow.ToStringForStore())) + }, []byte(testWorkflow.ToStringForStore()), true) assert.Nil(t, err) job := &api.Job{ @@ -2183,7 +2183,7 @@ func TestCreatePipelineVersion(t *testing.T) { }, }, }, - []byte(testWorkflow.ToStringForStore())) + []byte(testWorkflow.ToStringForStore()), true) assert.Nil(t, err) defer store.Close() @@ -2224,7 +2224,7 @@ func TestCreatePipelineVersion_ComplexPipelineVersion(t *testing.T) { }, }, }, - []byte(strings.TrimSpace(complexPipeline))) + []byte(strings.TrimSpace(complexPipeline)), true) assert.Nil(t, err) _, err = manager.GetPipeline(createdPipeline.UUID) @@ -2263,7 +2263,7 @@ func TestCreatePipelineVersion_CreatePipelineVersionFileError(t *testing.T) { }, }, }, - []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow")) + []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"), true) assert.Equal(t, codes.Internal, err.(*util.UserError).ExternalStatusCode()) assert.Contains(t, err.Error(), "bad object store") @@ -2299,7 +2299,7 @@ func TestCreatePipelineVersion_GetParametersError(t *testing.T) { }, }, }, - []byte("I am invalid yaml")) + []byte("I am invalid yaml"), true) assert.Equal(t, codes.InvalidArgument, err.(*util.UserError).ExternalStatusCode()) assert.Contains(t, err.Error(), "Failed to parse the parameter") } @@ -2338,7 +2338,7 @@ func TestCreatePipelineVersion_StorePipelineVersionMetadataError(t *testing.T) { }, }, }, - []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow")) + []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"), true) assert.Equal(t, codes.Internal, err.(*util.UserError).ExternalStatusCode()) assert.Contains(t, err.Error(), "database is closed") } @@ -2369,7 +2369,7 @@ func TestDeletePipelineVersion(t *testing.T) { }, }, }, - []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow")) + []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"), true) // Delete the above pipeline_version. err = manager.DeletePipelineVersion(FakeUUIDOne) @@ -2406,7 +2406,7 @@ func TestDeletePipelineVersion_FileError(t *testing.T) { }, }, }, - []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow")) + []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"), true) // Switch to a bad object store manager.objectStore = &FakeBadObjectStore{} diff --git a/backend/src/apiserver/resource/resource_manager_util_test.go b/backend/src/apiserver/resource/resource_manager_util_test.go index 335b4667ac4..5e34ac4bdf5 100644 --- a/backend/src/apiserver/resource/resource_manager_util_test.go +++ b/backend/src/apiserver/resource/resource_manager_util_test.go @@ -342,7 +342,7 @@ func TestConvertPipelineIdToDefaultPipelineVersion(t *testing.T) { Relationship: api.Relationship_OWNER, }, }, - }, []byte(testWorkflow.ToStringForStore())) + }, []byte(testWorkflow.ToStringForStore()), true) assert.Nil(t, err) // Create a run of the latest pipeline version, but by specifying the pipeline id. @@ -400,7 +400,7 @@ func TestConvertPipelineIdToDefaultPipelineVersion_NoOp(t *testing.T) { Relationship: api.Relationship_OWNER, }, }, - }, []byte(testWorkflow.ToStringForStore())) + }, []byte(testWorkflow.ToStringForStore()), true) assert.Nil(t, err) // FakeUUID is the new default version's id. assert.NotEqual(t, oldVersionId, FakeUUIDOne) diff --git a/backend/src/apiserver/server/pipeline_server.go b/backend/src/apiserver/server/pipeline_server.go index 54e7e97139f..efbea696ba1 100644 --- a/backend/src/apiserver/server/pipeline_server.go +++ b/backend/src/apiserver/server/pipeline_server.go @@ -22,6 +22,7 @@ import ( "github.com/golang/protobuf/ptypes/empty" api "github.com/kubeflow/pipelines/backend/api/go_client" + "github.com/kubeflow/pipelines/backend/src/apiserver/common" "github.com/kubeflow/pipelines/backend/src/apiserver/model" "github.com/kubeflow/pipelines/backend/src/apiserver/resource" "github.com/kubeflow/pipelines/backend/src/common/util" @@ -242,7 +243,7 @@ func (s *PipelineServer) CreatePipelineVersion(ctx context.Context, request *api return nil, util.Wrap(err, "The URL is valid but pipeline system failed to read the file.") } - version, err := s.resourceManager.CreatePipelineVersion(request.Version, pipelineFile) + version, err := s.resourceManager.CreatePipelineVersion(request.Version, pipelineFile, common.IsPipelineVersionUpdatedByDefault()) if err != nil { return nil, util.Wrap(err, "Failed to create a version.") } diff --git a/backend/src/apiserver/server/pipeline_server_test.go b/backend/src/apiserver/server/pipeline_server_test.go index 8c2f39c2f41..248842739c6 100644 --- a/backend/src/apiserver/server/pipeline_server_test.go +++ b/backend/src/apiserver/server/pipeline_server_test.go @@ -10,8 +10,10 @@ import ( "testing" api "github.com/kubeflow/pipelines/backend/api/go_client" + "github.com/kubeflow/pipelines/backend/src/apiserver/common" "github.com/kubeflow/pipelines/backend/src/apiserver/resource" "github.com/kubeflow/pipelines/backend/src/common/util" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" ) @@ -261,6 +263,51 @@ func TestListPipelineVersion_NoResourceKey(t *testing.T) { assert.Equal(t, "Invalid input error: ResourceKey must be set in the input", err.Error()) } +func TestCreatePipelineVersionDontUpdateDefault(t *testing.T) { + viper.Set(common.UpdatePipelineVersionByDefault, "false") + defer viper.Set(common.UpdatePipelineVersionByDefault, "true") + httpServer := getMockServer(t) + // Close the server when test finishes + defer httpServer.Close() + clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) + resourceManager := resource.NewResourceManager(clientManager) + + pipelineServer := PipelineServer{resourceManager: resourceManager, httpClient: httpServer.Client(), options: &PipelineServerOptions{CollectMetrics: false}} + pipeline, err := pipelineServer.CreatePipeline(context.Background(), &api.CreatePipelineRequest{ + Pipeline: &api.Pipeline{ + Url: &api.Url{PipelineUrl: httpServer.URL + "/arguments_tarball/arguments.tar.gz"}, + Name: "argument-parameters", + Description: "pipeline description", + }}) + + assert.Nil(t, err) + assert.NotNil(t, pipeline) + + clientManager.UpdateUUID(util.NewFakeUUIDGeneratorOrFatal("123e4567-e89b-12d3-a456-526655440001", nil)) + resourceManager = resource.NewResourceManager(clientManager) + + pipelineServer = PipelineServer{resourceManager: resourceManager, httpClient: httpServer.Client(), options: &PipelineServerOptions{CollectMetrics: false}} + pipelineVersion, err := pipelineServer.CreatePipelineVersion( + context.Background(), &api.CreatePipelineVersionRequest{ + Version: &api.PipelineVersion{ + PackageUrl: &api.Url{ + PipelineUrl: httpServer.URL + "/arguments-parameters.yaml"}, + Name: "argument-parameters-update", + ResourceReferences: []*api.ResourceReference{ + &api.ResourceReference{ + Key: &api.ResourceKey{ + Type: api.ResourceType_PIPELINE, + Id: pipeline.Id, + }, + Relationship: api.Relationship_OWNER, + }}}}) + assert.Nil(t, err) + + pipelines, err := pipelineServer.GetPipeline(context.Background(), &api.GetPipelineRequest{Id: pipeline.Id}) + assert.NotNil(t, pipelineVersion.Id) + assert.NotEqual(t, pipelines.DefaultVersion.Id, pipelineVersion.Id) +} + func getMockServer(t *testing.T) *httptest.Server { httpServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { // Send response to be tested diff --git a/backend/src/apiserver/server/pipeline_upload_server.go b/backend/src/apiserver/server/pipeline_upload_server.go index 0e585167b0e..17eb0e1ad63 100644 --- a/backend/src/apiserver/server/pipeline_upload_server.go +++ b/backend/src/apiserver/server/pipeline_upload_server.go @@ -23,6 +23,7 @@ import ( "github.com/golang/glog" "github.com/golang/protobuf/jsonpb" api "github.com/kubeflow/pipelines/backend/api/go_client" + "github.com/kubeflow/pipelines/backend/src/apiserver/common" "github.com/kubeflow/pipelines/backend/src/apiserver/resource" "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/pkg/errors" @@ -166,7 +167,7 @@ func (s *PipelineUploadServer) UploadPipelineVersion(w http.ResponseWriter, r *h Relationship: api.Relationship_OWNER, }, }, - }, pipelineFile) + }, pipelineFile, common.IsPipelineVersionUpdatedByDefault()) if err != nil { s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Error creating pipeline version")) return diff --git a/backend/src/apiserver/server/pipeline_upload_server_test.go b/backend/src/apiserver/server/pipeline_upload_server_test.go index d5c2011fa9c..10b8340d5ab 100644 --- a/backend/src/apiserver/server/pipeline_upload_server_test.go +++ b/backend/src/apiserver/server/pipeline_upload_server_test.go @@ -26,10 +26,12 @@ import ( "os" "testing" + "github.com/kubeflow/pipelines/backend/src/apiserver/common" "github.com/kubeflow/pipelines/backend/src/apiserver/list" "github.com/kubeflow/pipelines/backend/src/apiserver/model" "github.com/kubeflow/pipelines/backend/src/apiserver/resource" "github.com/kubeflow/pipelines/backend/src/common/util" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" ) @@ -459,3 +461,95 @@ func TestUploadPipelineVersion_FileNameTooLong(t *testing.T) { assert.Equal(t, 400, rr.Code) assert.Contains(t, string(rr.Body.Bytes()), "Pipeline name too long") } + +func TestDefaultNotUpdatedPipelineVersion(t *testing.T) { + viper.Set(common.UpdatePipelineVersionByDefault, "false") + defer viper.Set(common.UpdatePipelineVersionByDefault, "true") + + clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) + resourceManager := resource.NewResourceManager(clientManager) + server := PipelineUploadServer{resourceManager: resourceManager, options: &PipelineUploadServerOptions{CollectMetrics: false}} + b := &bytes.Buffer{} + w := multipart.NewWriter(b) + part, _ := w.CreateFormFile("uploadfile", "arguments.tar.gz") + fileReader, _ := os.Open("test/arguments_tarball/arguments.tar.gz") + io.Copy(part, fileReader) + w.Close() + req, _ := http.NewRequest("POST", "/apis/v1beta1/pipelines/upload", bytes.NewReader(b.Bytes())) + req.Header.Set("Content-Type", w.FormDataContentType()) + + rr := httptest.NewRecorder() + handler := http.HandlerFunc(server.UploadPipeline) + handler.ServeHTTP(rr, req) + + pipelineVersion, err := clientManager.PipelineStore().GetPipelineVersion(resource.DefaultFakeUUID) + assert.Nil(t, err) + assert.Equal(t, pipelineVersion.PipelineId, resource.DefaultFakeUUID) + + // Upload a new version under this pipeline and check that the default version is not updated + + // Set the fake uuid generator with a new uuid to avoid generate a same uuid as above. + clientManager.UpdateUUID(util.NewFakeUUIDGeneratorOrFatal(fakeVersionUUID, nil)) + resourceManager = resource.NewResourceManager(clientManager) + server = PipelineUploadServer{resourceManager: resourceManager, options: &PipelineUploadServerOptions{CollectMetrics: false}} + b = &bytes.Buffer{} + w = multipart.NewWriter(b) + part, _ = w.CreateFormFile("uploadfile", "arguments-version.tar.gz") + fileReader, _ = os.Open("test/arguments_tarball/arguments-version.tar.gz") + io.Copy(part, fileReader) + w.Close() + req, _ = http.NewRequest("POST", "/apis/v1beta1/pipelines/upload_version?pipelineid="+resource.DefaultFakeUUID, bytes.NewReader(b.Bytes())) + req.Header.Set("Content-Type", w.FormDataContentType()) + rr = httptest.NewRecorder() + handler = http.HandlerFunc(server.UploadPipelineVersion) + handler.ServeHTTP(rr, req) + + pipeline, err := clientManager.PipelineStore().GetPipeline(resource.DefaultFakeUUID) + assert.Nil(t, err) + assert.Equal(t, pipeline.DefaultVersionId, resource.DefaultFakeUUID) + assert.NotEqual(t, pipeline.DefaultVersionId, fakeVersionUUID) +} + +func TestDefaultUpdatedPipelineVersion(t *testing.T) { + clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) + resourceManager := resource.NewResourceManager(clientManager) + server := PipelineUploadServer{resourceManager: resourceManager, options: &PipelineUploadServerOptions{CollectMetrics: false}} + b := &bytes.Buffer{} + w := multipart.NewWriter(b) + part, _ := w.CreateFormFile("uploadfile", "arguments.tar.gz") + fileReader, _ := os.Open("test/arguments_tarball/arguments.tar.gz") + io.Copy(part, fileReader) + w.Close() + req, _ := http.NewRequest("POST", "/apis/v1beta1/pipelines/upload", bytes.NewReader(b.Bytes())) + req.Header.Set("Content-Type", w.FormDataContentType()) + + rr := httptest.NewRecorder() + handler := http.HandlerFunc(server.UploadPipeline) + handler.ServeHTTP(rr, req) + + pipelineVersion, err := clientManager.PipelineStore().GetPipelineVersion(resource.DefaultFakeUUID) + assert.Nil(t, err) + assert.Equal(t, pipelineVersion.PipelineId, resource.DefaultFakeUUID) + + // Upload a new version under this pipeline and check that the default version is not updated + + // Set the fake uuid generator with a new uuid to avoid generate a same uuid as above. + clientManager.UpdateUUID(util.NewFakeUUIDGeneratorOrFatal(fakeVersionUUID, nil)) + resourceManager = resource.NewResourceManager(clientManager) + server = PipelineUploadServer{resourceManager: resourceManager, options: &PipelineUploadServerOptions{CollectMetrics: false}} + b = &bytes.Buffer{} + w = multipart.NewWriter(b) + part, _ = w.CreateFormFile("uploadfile", "arguments-version.tar.gz") + fileReader, _ = os.Open("test/arguments_tarball/arguments-version.tar.gz") + io.Copy(part, fileReader) + w.Close() + req, _ = http.NewRequest("POST", "/apis/v1beta1/pipelines/upload_version?pipelineid="+resource.DefaultFakeUUID, bytes.NewReader(b.Bytes())) + req.Header.Set("Content-Type", w.FormDataContentType()) + rr = httptest.NewRecorder() + handler = http.HandlerFunc(server.UploadPipelineVersion) + handler.ServeHTTP(rr, req) + + pipeline, err := clientManager.PipelineStore().GetPipeline(resource.DefaultFakeUUID) + assert.Nil(t, err) + assert.Equal(t, pipeline.DefaultVersionId, fakeVersionUUID) +} diff --git a/backend/src/apiserver/server/test_util.go b/backend/src/apiserver/server/test_util.go index da44ab92b0f..75769dc43b7 100644 --- a/backend/src/apiserver/server/test_util.go +++ b/backend/src/apiserver/server/test_util.go @@ -146,7 +146,7 @@ func initWithExperimentAndPipelineVersion(t *testing.T) (*resource.FakeClientMan }, }, }, - []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow")) + []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"), true) return clientManager, resourceManager, experiment } diff --git a/backend/src/apiserver/storage/pipeline_store.go b/backend/src/apiserver/storage/pipeline_store.go index 6839887efaa..d9184ab86a1 100644 --- a/backend/src/apiserver/storage/pipeline_store.go +++ b/backend/src/apiserver/storage/pipeline_store.go @@ -62,7 +62,7 @@ type PipelineStoreInterface interface { UpdatePipelineStatus(string, model.PipelineStatus) error UpdatePipelineDefaultVersion(string, string) error - CreatePipelineVersion(*model.PipelineVersion) (*model.PipelineVersion, error) + CreatePipelineVersion(*model.PipelineVersion, bool) (*model.PipelineVersion, error) GetPipelineVersion(versionId string) (*model.PipelineVersion, error) GetPipelineVersionWithStatus(versionId string, status model.PipelineVersionStatus) (*model.PipelineVersion, error) ListPipelineVersions(pipelineId string, opts *list.Options) ([]*model.PipelineVersion, int, string, error) @@ -447,7 +447,7 @@ func NewPipelineStore(db *DB, time util.TimeInterface, uuid util.UUIDGeneratorIn return &PipelineStore{db: db, time: time, uuid: uuid} } -func (s *PipelineStore) CreatePipelineVersion(v *model.PipelineVersion) (*model.PipelineVersion, error) { +func (s *PipelineStore) CreatePipelineVersion(v *model.PipelineVersion, updatePipelineDefaultVersion bool) (*model.PipelineVersion, error) { newPipelineVersion := *v newPipelineVersion.CreatedAtInSec = s.time.Now().Unix() id, err := s.uuid.NewRandom() @@ -506,12 +506,16 @@ func (s *PipelineStore) CreatePipelineVersion(v *model.PipelineVersion) (*model. return nil, util.NewInternalServerError(err, "Failed to add version to pipeline version table: %v", err.Error()) } - _, err = tx.Exec(pipelineSql, pipelineArgs...) - if err != nil { - tx.Rollback() - return nil, util.NewInternalServerError(err, "Failed to update pipeline default version id: %v", - err.Error()) + + if updatePipelineDefaultVersion { + _, err = tx.Exec(pipelineSql, pipelineArgs...) + if err != nil { + tx.Rollback() + return nil, util.NewInternalServerError(err, "Failed to update pipeline default version id: %v", + err.Error()) + } } + if err := tx.Commit(); err != nil { return nil, util.NewInternalServerError(err, "Failed to create new pipeline version: %v", err.Error()) diff --git a/backend/src/apiserver/storage/pipeline_store_test.go b/backend/src/apiserver/storage/pipeline_store_test.go index 5a22b5786ff..f892271136b 100644 --- a/backend/src/apiserver/storage/pipeline_store_test.go +++ b/backend/src/apiserver/storage/pipeline_store_test.go @@ -580,7 +580,7 @@ func TestCreatePipelineVersion(t *testing.T) { CodeSourceUrl: "code_source_url", } pipelineVersionCreated, err := pipelineStore.CreatePipelineVersion( - pipelineVersion) + pipelineVersion, true) // Check whether created pipeline version is as expected. pipelineVersionExpected := model.PipelineVersion{ @@ -605,6 +605,59 @@ func TestCreatePipelineVersion(t *testing.T) { assert.Equal(t, pipeline.DefaultVersionId, fakeUUIDTwo, "Got unexpected default version id.") } +func TestCreatePipelineVersionNotUpdateDefaultVersion(t *testing.T) { + db := NewFakeDbOrFatal() + defer db.Close() + pipelineStore := NewPipelineStore( + db, + util.NewFakeTimeForEpoch(), + util.NewFakeUUIDGeneratorOrFatal(fakeUUID, nil)) + + // Create a pipeline first. + pipelineStore.CreatePipeline( + &model.Pipeline{ + Name: "pipeline_1", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + }) + + // Create a version under the above pipeline. + pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDTwo, nil) + pipelineVersion := &model.PipelineVersion{ + Name: "pipeline_version_1", + Parameters: `[{"Name": "param1"}]`, + PipelineId: fakeUUID, + Status: model.PipelineVersionCreating, + CodeSourceUrl: "code_source_url", + } + pipelineVersionCreated, err := pipelineStore.CreatePipelineVersion( + pipelineVersion, false) + + // Check whether created pipeline version is as expected. + pipelineVersionExpected := model.PipelineVersion{ + UUID: fakeUUIDTwo, + CreatedAtInSec: 2, + Name: "pipeline_version_1", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineVersionCreating, + PipelineId: fakeUUID, + CodeSourceUrl: "code_source_url", + } + assert.Nil(t, err) + assert.Equal( + t, + pipelineVersionExpected, + *pipelineVersionCreated, + "Got unexpected pipeline.") + + // Check whether pipeline has updated default version id. + pipeline, err := pipelineStore.GetPipeline(fakeUUID) + assert.Nil(t, err) + assert.NotEqual(t, pipeline.DefaultVersionId, fakeUUIDTwo, "Got unexpected default version id.") + assert.Equal(t, pipeline.DefaultVersionId, fakeUUID, "Got unexpected default version id.") + +} + func TestCreatePipelineVersion_DuplicateKey(t *testing.T) { db := NewFakeDbOrFatal() defer db.Close() @@ -629,7 +682,7 @@ func TestCreatePipelineVersion_DuplicateKey(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionCreating, - }) + }, true) // Create another new version with same name. _, err := pipelineStore.CreatePipelineVersion( @@ -638,7 +691,7 @@ func TestCreatePipelineVersion_DuplicateKey(t *testing.T) { Parameters: `[{"Name": "param2"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionCreating, - }) + }, true) assert.NotNil(t, err) assert.Contains(t, err.Error(), "The name pipeline_version_1 already exist") } @@ -658,7 +711,7 @@ func TestCreatePipelineVersion_InternalServerError_DBClosed(t *testing.T) { Name: "pipeline_version_1", Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, - }) + }, true) assert.Equal(t, codes.Internal, err.(*util.UserError).ExternalStatusCode(), "Expected create pipeline version to return error") } @@ -687,7 +740,7 @@ func TestDeletePipelineVersion(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Create a second version, which will become the default version. pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDThree, nil) @@ -697,7 +750,7 @@ func TestDeletePipelineVersion(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Delete version with id being fakeUUIDThree. err := pipelineStore.DeletePipelineVersion(fakeUUIDThree) @@ -737,7 +790,7 @@ func TestDeletePipelineVersionError(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) db.Close() // On closed db, create pipeline version ends in internal error. @@ -769,7 +822,7 @@ func TestGetPipelineVersion(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Get pipeline version. pipelineVersion, err := pipelineStore.GetPipelineVersion(fakeUUIDTwo) @@ -826,7 +879,7 @@ func TestGetPipelineVersion_NotFound_VersionStatusCreating(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionCreating, - }) + }, true) _, err := pipelineStore.GetPipelineVersion(fakeUUIDTwo) assert.Equal(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode(), @@ -870,7 +923,7 @@ func TestListPipelineVersion_FilterOutNotReady(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Create a second version with status ready. pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDThree, nil) @@ -880,7 +933,7 @@ func TestListPipelineVersion_FilterOutNotReady(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Create a third version with status creating. pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDFour, nil) @@ -890,7 +943,7 @@ func TestListPipelineVersion_FilterOutNotReady(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionCreating, - }) + }, true) pipelineVersionsExpected := []*model.PipelineVersion{ &model.PipelineVersion{ @@ -944,7 +997,7 @@ func TestListPipelineVersions_Pagination(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Create "version_3" with fakeUUIDThree. pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDThree, nil) @@ -954,7 +1007,7 @@ func TestListPipelineVersions_Pagination(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Create "version_2" with fakeUUIDFour. pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDFour, nil) @@ -964,7 +1017,7 @@ func TestListPipelineVersions_Pagination(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Create "version_4" with fakeUUIDFive. pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDFive, nil) @@ -974,7 +1027,7 @@ func TestListPipelineVersions_Pagination(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // List results in 2 pages: first page containing version_1 and version_2; // and second page containing verion_3 and version_4. @@ -1059,7 +1112,7 @@ func TestListPipelineVersions_Pagination_Descend(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Create "version_3" with fakeUUIDThree. pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDThree, nil) @@ -1069,7 +1122,7 @@ func TestListPipelineVersions_Pagination_Descend(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Create "version_2" with fakeUUIDFour. pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDFour, nil) @@ -1079,7 +1132,7 @@ func TestListPipelineVersions_Pagination_Descend(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Create "version_4" with fakeUUIDFive. pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDFive, nil) @@ -1089,7 +1142,7 @@ func TestListPipelineVersions_Pagination_Descend(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // List result in 2 pages: first page "version_4" and "version_3"; second // page "version_2" and "version_1". @@ -1174,7 +1227,7 @@ func TestListPipelineVersions_Pagination_LessThanPageSize(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) opts, err := list.NewOptions(&model.PipelineVersion{}, 2, "", nil) assert.Nil(t, err) @@ -1219,7 +1272,7 @@ func TestListPipelineVersions_WithFilter(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Create "version_2" with fakeUUIDThree. pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDThree, nil) @@ -1229,7 +1282,7 @@ func TestListPipelineVersions_WithFilter(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Filter for name being equal to pipeline_version_1 equalFilterProto := &api.Filter{ @@ -1318,7 +1371,7 @@ func TestUpdatePipelineVersionStatus(t *testing.T) { Parameters: `[{"Name": "param1"}]`, PipelineId: fakeUUID, Status: model.PipelineVersionReady, - }) + }, true) // Change version to deleting status err := pipelineStore.UpdatePipelineVersionStatus( diff --git a/manifests/kustomize/base/params.env b/manifests/kustomize/base/params.env index 51d09d077c7..ad498476406 100644 --- a/manifests/kustomize/base/params.env +++ b/manifests/kustomize/base/params.env @@ -13,3 +13,8 @@ bucketName=mlpipeline ## artifacts, container lifecycles, etc.. ## Doc: https://github.com/argoproj/argo/blob/master/docs/workflow-executors.md containerRuntimeExecutor=docker + +## autoUpdatePipelineDefaultVersion: States if the pipeline version +## should be updated by defult for a versioned pipeline or not when a new +## version is uploaded. This sets the deployment wide definition. +autoUpdatePipelineDefaultVersion=true diff --git a/manifests/kustomize/base/pipeline/ml-pipeline-apiserver-deployment.yaml b/manifests/kustomize/base/pipeline/ml-pipeline-apiserver-deployment.yaml index f9e23fbe792..6cf514968a9 100644 --- a/manifests/kustomize/base/pipeline/ml-pipeline-apiserver-deployment.yaml +++ b/manifests/kustomize/base/pipeline/ml-pipeline-apiserver-deployment.yaml @@ -15,6 +15,11 @@ spec: spec: containers: - env: + - name: AUTO_UPDATE_PIPELINE_DEFAULT_VERSION + valueFrom: + configMapKeyRef: + name: pipeline-install-config + key: autoUpdatePipelineDefaultVersion - name: POD_NAMESPACE valueFrom: fieldRef: