Skip to content

Commit

Permalink
feat(backend): allow configuring if default version should be updated…
Browse files Browse the repository at this point in the history
… when uploading new pipeline version. Fixes #4049 (#4476)

* update to fetch remote

* missed to add the description

* fixed merge conflict

* initial work

* fixed test and bug

* updated python client

* clean up

* clean up

* added config default

* fixed bug in API

* moved config  value

* reverted to load from config

* clean up

* Update _client.py

* removed unecessary function and updated after feedback

* missed to save pipeline.proto

* updated the last parts after feedback

* reverted back to use string and env variable

* updated typo

* fix typo in path

* clean up

* removed option in api

* clean up python part

* typo, cant run test locally

* clean up, problems with local env

* clean up missing differences

* reverted proto files

* further clean up

* clean up

* updated after feedback

* Added tests

* error in my defer statement

* Updated the test
  • Loading branch information
NikeNano committed Oct 19, 2020
1 parent 8699a05 commit 2317015
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 49 deletions.
5 changes: 5 additions & 0 deletions backend/src/apiserver/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ const (
DefaultPipelineRunnerServiceAccount string = "DefaultPipelineRunnerServiceAccount"
KubeflowUserIDHeader string = "KUBEFLOW_USERID_HEADER"
KubeflowUserIDPrefix string = "KUBEFLOW_USERID_PREFIX"
UpdatePipelineVersionByDefault string = "AUTO_UPDATE_PIPELINE_DEFAULT_VERSION"
)

func IsPipelineVersionUpdatedByDefault() bool {
return GetBoolConfigWithDefault(UpdatePipelineVersionByDefault, true)
}

func GetStringConfig(configName string) string {
if !viper.IsSet(configName) {
glog.Fatalf("Please specify flag %s", configName)
Expand Down
4 changes: 2 additions & 2 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ func (r *ResourceManager) getDefaultSA() string {
return common.GetStringConfigWithDefault(common.DefaultPipelineRunnerServiceAccount, defaultPipelineRunnerServiceAccount)
}

func (r *ResourceManager) CreatePipelineVersion(apiVersion *api.PipelineVersion, pipelineFile []byte) (*model.PipelineVersion, error) {
func (r *ResourceManager) CreatePipelineVersion(apiVersion *api.PipelineVersion, pipelineFile []byte, updateDefaultVersion bool) (*model.PipelineVersion, error) {
// Extract the parameters from the pipeline
params, err := util.GetParameters(pipelineFile)
if err != nil {
Expand All @@ -1045,7 +1045,7 @@ func (r *ResourceManager) CreatePipelineVersion(apiVersion *api.PipelineVersion,
Parameters: params,
CodeSourceUrl: apiVersion.CodeSourceUrl,
}
version, err = r.pipelineStore.CreatePipelineVersion(version)
version, err = r.pipelineStore.CreatePipelineVersion(version, updateDefaultVersion)
if err != nil {
return nil, util.Wrap(err, "Create pipeline version failed")
}
Expand Down
22 changes: 11 additions & 11 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func TestCreateRun_ThroughPipelineID(t *testing.T) {
Relationship: api.Relationship_OWNER,
},
},
}, []byte(testWorkflow.ToStringForStore()))
}, []byte(testWorkflow.ToStringForStore()), true)
assert.Nil(t, err)

// The pipeline specified via pipeline id will be converted to this
Expand Down Expand Up @@ -516,7 +516,7 @@ func TestCreateRun_ThroughPipelineVersion(t *testing.T) {
Relationship: api.Relationship_OWNER,
},
},
}, []byte(testWorkflow.ToStringForStore()))
}, []byte(testWorkflow.ToStringForStore()), true)
assert.Nil(t, err)

apiRun := &api.Run{
Expand Down Expand Up @@ -990,7 +990,7 @@ func TestCreateJob_ThroughPipelineID(t *testing.T) {
Relationship: api.Relationship_OWNER,
},
},
}, []byte(testWorkflow.ToStringForStore()))
}, []byte(testWorkflow.ToStringForStore()), true)
assert.Nil(t, err)

// The pipeline specified via pipeline id will be converted to this
Expand Down Expand Up @@ -1053,7 +1053,7 @@ func TestCreateJob_ThroughPipelineVersion(t *testing.T) {
Relationship: api.Relationship_OWNER,
},
},
}, []byte(testWorkflow.ToStringForStore()))
}, []byte(testWorkflow.ToStringForStore()), true)
assert.Nil(t, err)

job := &api.Job{
Expand Down Expand Up @@ -2183,7 +2183,7 @@ func TestCreatePipelineVersion(t *testing.T) {
},
},
},
[]byte(testWorkflow.ToStringForStore()))
[]byte(testWorkflow.ToStringForStore()), true)
assert.Nil(t, err)

defer store.Close()
Expand Down Expand Up @@ -2224,7 +2224,7 @@ func TestCreatePipelineVersion_ComplexPipelineVersion(t *testing.T) {
},
},
},
[]byte(strings.TrimSpace(complexPipeline)))
[]byte(strings.TrimSpace(complexPipeline)), true)
assert.Nil(t, err)

_, err = manager.GetPipeline(createdPipeline.UUID)
Expand Down Expand Up @@ -2263,7 +2263,7 @@ func TestCreatePipelineVersion_CreatePipelineVersionFileError(t *testing.T) {
},
},
},
[]byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"))
[]byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"), true)
assert.Equal(t, codes.Internal, err.(*util.UserError).ExternalStatusCode())
assert.Contains(t, err.Error(), "bad object store")

Expand Down Expand Up @@ -2299,7 +2299,7 @@ func TestCreatePipelineVersion_GetParametersError(t *testing.T) {
},
},
},
[]byte("I am invalid yaml"))
[]byte("I am invalid yaml"), true)
assert.Equal(t, codes.InvalidArgument, err.(*util.UserError).ExternalStatusCode())
assert.Contains(t, err.Error(), "Failed to parse the parameter")
}
Expand Down Expand Up @@ -2338,7 +2338,7 @@ func TestCreatePipelineVersion_StorePipelineVersionMetadataError(t *testing.T) {
},
},
},
[]byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"))
[]byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"), true)
assert.Equal(t, codes.Internal, err.(*util.UserError).ExternalStatusCode())
assert.Contains(t, err.Error(), "database is closed")
}
Expand Down Expand Up @@ -2369,7 +2369,7 @@ func TestDeletePipelineVersion(t *testing.T) {
},
},
},
[]byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"))
[]byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"), true)

// Delete the above pipeline_version.
err = manager.DeletePipelineVersion(FakeUUIDOne)
Expand Down Expand Up @@ -2406,7 +2406,7 @@ func TestDeletePipelineVersion_FileError(t *testing.T) {
},
},
},
[]byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"))
[]byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"), true)

// Switch to a bad object store
manager.objectStore = &FakeBadObjectStore{}
Expand Down
4 changes: 2 additions & 2 deletions backend/src/apiserver/resource/resource_manager_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func TestConvertPipelineIdToDefaultPipelineVersion(t *testing.T) {
Relationship: api.Relationship_OWNER,
},
},
}, []byte(testWorkflow.ToStringForStore()))
}, []byte(testWorkflow.ToStringForStore()), true)
assert.Nil(t, err)

// Create a run of the latest pipeline version, but by specifying the pipeline id.
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestConvertPipelineIdToDefaultPipelineVersion_NoOp(t *testing.T) {
Relationship: api.Relationship_OWNER,
},
},
}, []byte(testWorkflow.ToStringForStore()))
}, []byte(testWorkflow.ToStringForStore()), true)
assert.Nil(t, err)
// FakeUUID is the new default version's id.
assert.NotEqual(t, oldVersionId, FakeUUIDOne)
Expand Down
3 changes: 2 additions & 1 deletion backend/src/apiserver/server/pipeline_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/golang/protobuf/ptypes/empty"
api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/common/util"
Expand Down Expand Up @@ -242,7 +243,7 @@ func (s *PipelineServer) CreatePipelineVersion(ctx context.Context, request *api
return nil, util.Wrap(err, "The URL is valid but pipeline system failed to read the file.")
}

version, err := s.resourceManager.CreatePipelineVersion(request.Version, pipelineFile)
version, err := s.resourceManager.CreatePipelineVersion(request.Version, pipelineFile, common.IsPipelineVersionUpdatedByDefault())
if err != nil {
return nil, util.Wrap(err, "Failed to create a version.")
}
Expand Down
47 changes: 47 additions & 0 deletions backend/src/apiserver/server/pipeline_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"testing"

api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
)
Expand Down Expand Up @@ -261,6 +263,51 @@ func TestListPipelineVersion_NoResourceKey(t *testing.T) {
assert.Equal(t, "Invalid input error: ResourceKey must be set in the input", err.Error())
}

func TestCreatePipelineVersionDontUpdateDefault(t *testing.T) {
viper.Set(common.UpdatePipelineVersionByDefault, "false")
defer viper.Set(common.UpdatePipelineVersionByDefault, "true")
httpServer := getMockServer(t)
// Close the server when test finishes
defer httpServer.Close()
clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
resourceManager := resource.NewResourceManager(clientManager)

pipelineServer := PipelineServer{resourceManager: resourceManager, httpClient: httpServer.Client(), options: &PipelineServerOptions{CollectMetrics: false}}
pipeline, err := pipelineServer.CreatePipeline(context.Background(), &api.CreatePipelineRequest{
Pipeline: &api.Pipeline{
Url: &api.Url{PipelineUrl: httpServer.URL + "/arguments_tarball/arguments.tar.gz"},
Name: "argument-parameters",
Description: "pipeline description",
}})

assert.Nil(t, err)
assert.NotNil(t, pipeline)

clientManager.UpdateUUID(util.NewFakeUUIDGeneratorOrFatal("123e4567-e89b-12d3-a456-526655440001", nil))
resourceManager = resource.NewResourceManager(clientManager)

pipelineServer = PipelineServer{resourceManager: resourceManager, httpClient: httpServer.Client(), options: &PipelineServerOptions{CollectMetrics: false}}
pipelineVersion, err := pipelineServer.CreatePipelineVersion(
context.Background(), &api.CreatePipelineVersionRequest{
Version: &api.PipelineVersion{
PackageUrl: &api.Url{
PipelineUrl: httpServer.URL + "/arguments-parameters.yaml"},
Name: "argument-parameters-update",
ResourceReferences: []*api.ResourceReference{
&api.ResourceReference{
Key: &api.ResourceKey{
Type: api.ResourceType_PIPELINE,
Id: pipeline.Id,
},
Relationship: api.Relationship_OWNER,
}}}})
assert.Nil(t, err)

pipelines, err := pipelineServer.GetPipeline(context.Background(), &api.GetPipelineRequest{Id: pipeline.Id})
assert.NotNil(t, pipelineVersion.Id)
assert.NotEqual(t, pipelines.DefaultVersion.Id, pipelineVersion.Id)
}

func getMockServer(t *testing.T) *httptest.Server {
httpServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// Send response to be tested
Expand Down
3 changes: 2 additions & 1 deletion backend/src/apiserver/server/pipeline_upload_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/golang/glog"
"github.com/golang/protobuf/jsonpb"
api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/pkg/errors"
Expand Down Expand Up @@ -166,7 +167,7 @@ func (s *PipelineUploadServer) UploadPipelineVersion(w http.ResponseWriter, r *h
Relationship: api.Relationship_OWNER,
},
},
}, pipelineFile)
}, pipelineFile, common.IsPipelineVersionUpdatedByDefault())
if err != nil {
s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Error creating pipeline version"))
return
Expand Down
94 changes: 94 additions & 0 deletions backend/src/apiserver/server/pipeline_upload_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"os"
"testing"

"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/list"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -459,3 +461,95 @@ func TestUploadPipelineVersion_FileNameTooLong(t *testing.T) {
assert.Equal(t, 400, rr.Code)
assert.Contains(t, string(rr.Body.Bytes()), "Pipeline name too long")
}

func TestDefaultNotUpdatedPipelineVersion(t *testing.T) {
viper.Set(common.UpdatePipelineVersionByDefault, "false")
defer viper.Set(common.UpdatePipelineVersionByDefault, "true")

clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
resourceManager := resource.NewResourceManager(clientManager)
server := PipelineUploadServer{resourceManager: resourceManager, options: &PipelineUploadServerOptions{CollectMetrics: false}}
b := &bytes.Buffer{}
w := multipart.NewWriter(b)
part, _ := w.CreateFormFile("uploadfile", "arguments.tar.gz")
fileReader, _ := os.Open("test/arguments_tarball/arguments.tar.gz")
io.Copy(part, fileReader)
w.Close()
req, _ := http.NewRequest("POST", "/apis/v1beta1/pipelines/upload", bytes.NewReader(b.Bytes()))
req.Header.Set("Content-Type", w.FormDataContentType())

rr := httptest.NewRecorder()
handler := http.HandlerFunc(server.UploadPipeline)
handler.ServeHTTP(rr, req)

pipelineVersion, err := clientManager.PipelineStore().GetPipelineVersion(resource.DefaultFakeUUID)
assert.Nil(t, err)
assert.Equal(t, pipelineVersion.PipelineId, resource.DefaultFakeUUID)

// Upload a new version under this pipeline and check that the default version is not updated

// Set the fake uuid generator with a new uuid to avoid generate a same uuid as above.
clientManager.UpdateUUID(util.NewFakeUUIDGeneratorOrFatal(fakeVersionUUID, nil))
resourceManager = resource.NewResourceManager(clientManager)
server = PipelineUploadServer{resourceManager: resourceManager, options: &PipelineUploadServerOptions{CollectMetrics: false}}
b = &bytes.Buffer{}
w = multipart.NewWriter(b)
part, _ = w.CreateFormFile("uploadfile", "arguments-version.tar.gz")
fileReader, _ = os.Open("test/arguments_tarball/arguments-version.tar.gz")
io.Copy(part, fileReader)
w.Close()
req, _ = http.NewRequest("POST", "/apis/v1beta1/pipelines/upload_version?pipelineid="+resource.DefaultFakeUUID, bytes.NewReader(b.Bytes()))
req.Header.Set("Content-Type", w.FormDataContentType())
rr = httptest.NewRecorder()
handler = http.HandlerFunc(server.UploadPipelineVersion)
handler.ServeHTTP(rr, req)

pipeline, err := clientManager.PipelineStore().GetPipeline(resource.DefaultFakeUUID)
assert.Nil(t, err)
assert.Equal(t, pipeline.DefaultVersionId, resource.DefaultFakeUUID)
assert.NotEqual(t, pipeline.DefaultVersionId, fakeVersionUUID)
}

func TestDefaultUpdatedPipelineVersion(t *testing.T) {
clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
resourceManager := resource.NewResourceManager(clientManager)
server := PipelineUploadServer{resourceManager: resourceManager, options: &PipelineUploadServerOptions{CollectMetrics: false}}
b := &bytes.Buffer{}
w := multipart.NewWriter(b)
part, _ := w.CreateFormFile("uploadfile", "arguments.tar.gz")
fileReader, _ := os.Open("test/arguments_tarball/arguments.tar.gz")
io.Copy(part, fileReader)
w.Close()
req, _ := http.NewRequest("POST", "/apis/v1beta1/pipelines/upload", bytes.NewReader(b.Bytes()))
req.Header.Set("Content-Type", w.FormDataContentType())

rr := httptest.NewRecorder()
handler := http.HandlerFunc(server.UploadPipeline)
handler.ServeHTTP(rr, req)

pipelineVersion, err := clientManager.PipelineStore().GetPipelineVersion(resource.DefaultFakeUUID)
assert.Nil(t, err)
assert.Equal(t, pipelineVersion.PipelineId, resource.DefaultFakeUUID)

// Upload a new version under this pipeline and check that the default version is not updated

// Set the fake uuid generator with a new uuid to avoid generate a same uuid as above.
clientManager.UpdateUUID(util.NewFakeUUIDGeneratorOrFatal(fakeVersionUUID, nil))
resourceManager = resource.NewResourceManager(clientManager)
server = PipelineUploadServer{resourceManager: resourceManager, options: &PipelineUploadServerOptions{CollectMetrics: false}}
b = &bytes.Buffer{}
w = multipart.NewWriter(b)
part, _ = w.CreateFormFile("uploadfile", "arguments-version.tar.gz")
fileReader, _ = os.Open("test/arguments_tarball/arguments-version.tar.gz")
io.Copy(part, fileReader)
w.Close()
req, _ = http.NewRequest("POST", "/apis/v1beta1/pipelines/upload_version?pipelineid="+resource.DefaultFakeUUID, bytes.NewReader(b.Bytes()))
req.Header.Set("Content-Type", w.FormDataContentType())
rr = httptest.NewRecorder()
handler = http.HandlerFunc(server.UploadPipelineVersion)
handler.ServeHTTP(rr, req)

pipeline, err := clientManager.PipelineStore().GetPipeline(resource.DefaultFakeUUID)
assert.Nil(t, err)
assert.Equal(t, pipeline.DefaultVersionId, fakeVersionUUID)
}
2 changes: 1 addition & 1 deletion backend/src/apiserver/server/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func initWithExperimentAndPipelineVersion(t *testing.T) (*resource.FakeClientMan
},
},
},
[]byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"))
[]byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"), true)

return clientManager, resourceManager, experiment
}
Expand Down
Loading

0 comments on commit 2317015

Please sign in to comment.