From 23993486c5c6c13a238587f7af48f5f73c9919f7 Mon Sep 17 00:00:00 2001 From: dushyanthsc <43390008+dushyanthsc@users.noreply.github.com> Date: Wed, 21 Aug 2019 13:44:31 -0700 Subject: [PATCH] apiserver: Remove TFX output artifact recording to metadatastore (#1904) --- BUILD.bazel | 3 - WORKSPACE | 178 +++++++-- backend/src/apiserver/BUILD.bazel | 5 - backend/src/apiserver/client_manager.go | 37 +- backend/src/apiserver/main.go | 4 - backend/src/apiserver/metadata/BUILD.bazel | 29 -- .../src/apiserver/metadata/metadata_store.go | 135 ------- .../apiserver/metadata/metadata_store_test.go | 357 ------------------ backend/src/apiserver/resource/BUILD.bazel | 3 - .../apiserver/resource/client_manager_fake.go | 18 +- backend/src/apiserver/storage/BUILD.bazel | 4 - backend/src/apiserver/storage/run_store.go | 35 +- .../src/apiserver/storage/run_store_test.go | 2 +- go.mod | 1 - go.sum | 4 + 15 files changed, 156 insertions(+), 659 deletions(-) delete mode 100644 backend/src/apiserver/metadata/BUILD.bazel delete mode 100644 backend/src/apiserver/metadata/metadata_store.go delete mode 100644 backend/src/apiserver/metadata/metadata_store_test.go diff --git a/BUILD.bazel b/BUILD.bazel index 0e9906756fd..94797ba522d 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -4,8 +4,5 @@ load("@bazel_gazelle//:def.bzl", "gazelle") # gazelle:resolve proto protoc-gen-swagger/options/annotations.proto @com_github_grpc_ecosystem_grpc_gateway//protoc-gen-swagger/options:options_proto # gazelle:resolve proto go protoc-gen-swagger/options/annotations.proto @com_github_grpc_ecosystem_grpc_gateway//protoc-gen-swagger/options:go_default_library # gazelle:resolve go github.com/kubeflow/pipelines/backend/api/go_client //backend/api:go_default_library -# gazelle:resolve go ml_metadata/metadata_store/mlmetadata @google_ml_metadata//ml_metadata/metadata_store:metadata_store_go -# gazelle:resolve go ml_metadata/proto/metadata_store_go_proto @google_ml_metadata//ml_metadata/proto:metadata_store_go_proto -# gazelle:resolve go ml_metadata/proto/metadata_store_service_go_proto @google_ml_metadata//ml_metadata/proto:metadata_store_service_go_proto # gazelle:exclude vendor/ gazelle(name = "gazelle") diff --git a/WORKSPACE b/WORKSPACE index 21f64e10e45..0ddcd7fa87b 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -48,20 +48,6 @@ tf_workspace() load("@bazel_tools//tools/build_defs/repo:git.bzl", "new_git_repository") -go_repository( - name = "google_ml_metadata", - commit = "0fb82dc56ff7", - importpath = "github.com/google/ml-metadata", -) - -new_git_repository( - name = "libmysqlclient", - build_file = "@google_ml_metadata//ml_metadata:libmysqlclient.BUILD", - remote = "https://github.com/MariaDB/mariadb-connector-c.git", - tag = "v3.0.8-release", - workspace_file = "@google_ml_metadata//ml_metadata:libmysqlclient.WORKSPACE", -) - go_repository( name = "io_k8s_client_go", build_file_proto_mode = "disable_global", @@ -152,8 +138,8 @@ http_archive( go_repository( name = "co_honnef_go_tools", - commit = "88497007e858", importpath = "honnef.co/go/tools", + tag = "v0.0.1-2019.2.2", ) go_repository( @@ -369,19 +355,19 @@ go_repository( go_repository( name = "com_github_golang_mock", importpath = "github.com/golang/mock", - tag = "v1.1.1", + tag = "v1.3.1", ) go_repository( name = "com_github_google_btree", - commit = "e89373fe6b4a", importpath = "github.com/google/btree", + tag = "v1.0.0", ) go_repository( name = "com_github_google_go_cmp", importpath = "github.com/google/go-cmp", - tag = "v0.2.0", + tag = "v0.3.1", ) go_repository( @@ -404,8 +390,8 @@ go_repository( go_repository( name = "com_github_hashicorp_golang_lru", - commit = "0fb14efe8c47", importpath = "github.com/hashicorp/golang-lru", + tag = "v0.5.3", ) go_repository( @@ -705,7 +691,7 @@ go_repository( go_repository( name = "com_google_cloud_go", importpath = "cloud.google.com/go", - tag = "v0.26.0", + tag = "v0.44.3", ) go_repository( @@ -716,7 +702,7 @@ go_repository( go_repository( name = "in_gopkg_check_v1", - commit = "20d25e280405", + commit = "788fd7840127", importpath = "gopkg.in/check.v1", ) @@ -765,12 +751,12 @@ go_repository( go_repository( name = "org_golang_google_appengine", importpath = "google.golang.org/appengine", - tag = "v1.1.0", + tag = "v1.6.1", ) go_repository( name = "org_golang_google_genproto", - commit = "ae2f86662275", + commit = "fa694d86fc64", importpath = "google.golang.org/genproto", ) @@ -778,61 +764,61 @@ go_repository( name = "org_golang_google_grpc", build_file_proto_mode = "disable_global", importpath = "google.golang.org/grpc", - tag = "v1.16.0", + tag = "v1.23.0", ) go_repository( name = "org_golang_x_crypto", - commit = "505ab145d0a9", + commit = "4def268fd1a4", importpath = "golang.org/x/crypto", ) go_repository( name = "org_golang_x_lint", - commit = "06c8688daad7", + commit = "959b441ac422", importpath = "golang.org/x/lint", ) go_repository( name = "org_golang_x_net", build_file_proto_mode = "disable_global", - commit = "351d144fa1fc", + commit = "74dc4d7220e7", importpath = "golang.org/x/net", ) go_repository( name = "org_golang_x_oauth2", - commit = "d2e6202438be", + commit = "0f29369cfe45", importpath = "golang.org/x/oauth2", ) go_repository( name = "org_golang_x_sync", - commit = "42b317875d0f", + commit = "112230192c58", importpath = "golang.org/x/sync", ) go_repository( name = "org_golang_x_sys", - commit = "a5c9d58dba9a", + commit = "fde4db37ae7a", importpath = "golang.org/x/sys", ) go_repository( name = "org_golang_x_text", importpath = "golang.org/x/text", - tag = "v0.3.0", + tag = "v0.3.2", ) go_repository( name = "org_golang_x_time", - commit = "fbb02b2291d2", + commit = "9d24e82272b4", importpath = "golang.org/x/time", ) go_repository( name = "org_golang_x_tools", - commit = "6cd1fcedba52", + commit = "922a4ee32d1a", importpath = "golang.org/x/tools", ) @@ -857,7 +843,7 @@ go_repository( go_repository( name = "com_github_golang_protobuf", importpath = "github.com/golang/protobuf", - tag = "v1.2.0", + tag = "v1.3.2", ) go_repository( @@ -916,7 +902,7 @@ go_repository( go_repository( name = "com_github_google_pprof", - commit = "3ea8567a2e57", + commit = "34ac40c74b70", importpath = "github.com/google/pprof", ) @@ -959,5 +945,125 @@ go_repository( go_repository( name = "com_github_gorilla_websocket", importpath = "github.com/gorilla/websocket", - tag = "v1.2.0", + tag = "v1.4.0", +) + +go_repository( + name = "com_github_burntsushi_xgb", + commit = "27f122750802", + importpath = "github.com/BurntSushi/xgb", +) + +go_repository( + name = "com_github_creack_pty", + importpath = "github.com/creack/pty", + tag = "v1.1.7", +) + +go_repository( + name = "com_github_google_martian", + importpath = "github.com/google/martian", + tag = "v2.1.0", +) + +go_repository( + name = "com_github_google_renameio", + importpath = "github.com/google/renameio", + tag = "v0.1.0", +) + +go_repository( + name = "com_github_googleapis_gax_go_v2", + importpath = "github.com/googleapis/gax-go/v2", + tag = "v2.0.5", +) + +go_repository( + name = "com_github_jstemmer_go_junit_report", + commit = "af01ea7f8024", + importpath = "github.com/jstemmer/go-junit-report", +) + +go_repository( + name = "com_github_kr_pretty", + importpath = "github.com/kr/pretty", + tag = "v0.1.0", +) + +go_repository( + name = "com_github_kr_pty", + importpath = "github.com/kr/pty", + tag = "v1.1.8", +) + +go_repository( + name = "com_github_kr_text", + importpath = "github.com/kr/text", + tag = "v0.1.0", +) + +go_repository( + name = "com_github_rogpeppe_go_internal", + importpath = "github.com/rogpeppe/go-internal", + tag = "v1.3.0", +) + +go_repository( + name = "com_google_cloud_go_datastore", + importpath = "cloud.google.com/go/datastore", + tag = "v1.0.0", +) + +go_repository( + name = "in_gopkg_errgo_v2", + importpath = "gopkg.in/errgo.v2", + tag = "v2.1.0", +) + +go_repository( + name = "io_opencensus_go", + importpath = "go.opencensus.io", + tag = "v0.22.0", +) + +go_repository( + name = "io_rsc_binaryregexp", + importpath = "rsc.io/binaryregexp", + tag = "v0.2.0", +) + +go_repository( + name = "org_golang_google_api", + importpath = "google.golang.org/api", + tag = "v0.8.0", +) + +go_repository( + name = "org_golang_x_exp", + commit = "ec7cb31e5a56", + importpath = "golang.org/x/exp", +) + +go_repository( + name = "org_golang_x_image", + commit = "cff245a6509b", + importpath = "golang.org/x/image", +) + +go_repository( + name = "org_golang_x_mobile", + commit = "e8b3e6111d02", + importpath = "golang.org/x/mobile", +) + +go_repository( + name = "org_golang_x_mod", + importpath = "golang.org/x/mod", + tag = "v0.1.0", +) + +go_repository( + name = "org_golang_x_xerrors", + commit = "a985d3407aa7", + importpath = "golang.org/x/xerrors", ) diff --git a/backend/src/apiserver/BUILD.bazel b/backend/src/apiserver/BUILD.bazel index 93d54a3b450..3ac3cfab7e4 100644 --- a/backend/src/apiserver/BUILD.bazel +++ b/backend/src/apiserver/BUILD.bazel @@ -13,7 +13,6 @@ go_library( deps = [ "//backend/api:go_default_library", "//backend/src/apiserver/client:go_default_library", - "//backend/src/apiserver/metadata:go_default_library", "//backend/src/apiserver/model:go_default_library", "//backend/src/apiserver/resource:go_default_library", "//backend/src/apiserver/server:go_default_library", @@ -24,15 +23,11 @@ go_library( "@com_github_cenkalti_backoff//:go_default_library", "@com_github_fsnotify_fsnotify//:go_default_library", "@com_github_golang_glog//:go_default_library", - "@com_github_golang_protobuf//proto:go_default_library", "@com_github_grpc_ecosystem_grpc_gateway//runtime:go_default_library", "@com_github_jinzhu_gorm//:go_default_library", "@com_github_jinzhu_gorm//dialects/sqlite:go_default_library", "@com_github_minio_minio_go//:go_default_library", "@com_github_spf13_viper//:go_default_library", - "@google_ml_metadata//ml_metadata/metadata_store:metadata_store_go", # keep - "@google_ml_metadata//ml_metadata/proto:metadata_store_go_proto", # keep - "@google_ml_metadata//ml_metadata/proto:metadata_store_service_go_proto", # keep "@io_k8s_client_go//kubernetes/typed/core/v1:go_default_library", "@org_golang_google_grpc//:go_default_library", "@org_golang_google_grpc//reflection:go_default_library", diff --git a/backend/src/apiserver/client_manager.go b/backend/src/apiserver/client_manager.go index 2efea073aa5..c56459e6515 100644 --- a/backend/src/apiserver/client_manager.go +++ b/backend/src/apiserver/client_manager.go @@ -19,25 +19,20 @@ import ( "fmt" v1 "k8s.io/client-go/kubernetes/typed/core/v1" "os" - "strconv" "time" workflowclient "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" "github.com/cenkalti/backoff" "github.com/golang/glog" - "github.com/golang/protobuf/proto" + "github.com/jinzhu/gorm" _ "github.com/jinzhu/gorm/dialects/sqlite" "github.com/kubeflow/pipelines/backend/src/apiserver/client" - "github.com/kubeflow/pipelines/backend/src/apiserver/metadata" "github.com/kubeflow/pipelines/backend/src/apiserver/model" "github.com/kubeflow/pipelines/backend/src/apiserver/storage" "github.com/kubeflow/pipelines/backend/src/common/util" scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1" "github.com/minio/minio-go" - - "ml_metadata/metadata_store/mlmetadata" - mlpb "ml_metadata/proto/metadata_store_go_proto" ) const ( @@ -69,8 +64,6 @@ type ClientManager struct { podClient v1.PodInterface time util.TimeInterface uuid util.UUIDGeneratorInterface - - MetadataStore *mlmetadata.Store } func (c *ClientManager) ExperimentStore() storage.ExperimentStoreInterface { @@ -154,8 +147,7 @@ func (c *ClientManager) init() { c.podClient = client.CreatePodClientOrFatal( getStringConfig(podNamespace), getDurationConfig(initConnectionTimeout)) - metadataStore := initMetadataStore() - runStore := storage.NewRunStore(db, c.time, metadataStore) + runStore := storage.NewRunStore(db, c.time) c.runStore = runStore glog.Infof("Client manager initialized successfully") @@ -165,31 +157,6 @@ func (c *ClientManager) Close() { c.db.Close() } -func initMetadataStore() *metadata.Store { - port, err := strconv.Atoi(getStringConfig(mysqlServicePort)) - if err != nil { - glog.Fatalf("Failed to parse valid MySQL service port from %q: %v", getStringConfig(mysqlServicePort), err) - } - - cfg := &mlpb.ConnectionConfig{ - Config: &mlpb.ConnectionConfig_Mysql{ - &mlpb.MySQLDatabaseConfig{ - Host: proto.String(getStringConfig(mysqlServiceHost)), - Port: proto.Uint32(uint32(port)), - Database: proto.String("mlmetadata"), - User: proto.String(getStringConfigWithDefault(mysqlUser, "root")), - Password: proto.String(getStringConfigWithDefault(mysqlPassword, "")), - }, - }, - } - - mlmdStore, err := mlmetadata.NewStore(cfg) - if err != nil { - glog.Fatalf("Failed to create ML Metadata store: %v", err) - } - return metadata.NewStore(mlmdStore) -} - func initDBClient(initConnectionTimeout time.Duration) *storage.DB { driverName := getStringConfig("DBConfig.DriverName") var arg string diff --git a/backend/src/apiserver/main.go b/backend/src/apiserver/main.go index 7bdd6243e57..45da3d9a70d 100644 --- a/backend/src/apiserver/main.go +++ b/backend/src/apiserver/main.go @@ -34,10 +34,6 @@ import ( "github.com/kubeflow/pipelines/backend/src/apiserver/server" "google.golang.org/grpc" "google.golang.org/grpc/reflection" - - _ "ml_metadata/metadata_store/mlmetadata" - _ "ml_metadata/proto/metadata_store_go_proto" - _ "ml_metadata/proto/metadata_store_service_go_proto" ) var ( diff --git a/backend/src/apiserver/metadata/BUILD.bazel b/backend/src/apiserver/metadata/BUILD.bazel deleted file mode 100644 index fa019b6763c..00000000000 --- a/backend/src/apiserver/metadata/BUILD.bazel +++ /dev/null @@ -1,29 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "go_default_library", - srcs = ["metadata_store.go"], - importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/metadata", - visibility = ["//visibility:public"], - deps = [ - "//backend/src/common/util:go_default_library", - "@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library", - "@com_github_golang_protobuf//jsonpb:go_default_library_gen", - "@com_github_golang_protobuf//proto:go_default_library", - "@google_ml_metadata//ml_metadata/metadata_store:metadata_store_go", - "@google_ml_metadata//ml_metadata/proto:metadata_store_go_proto", - ], -) - -go_test( - name = "go_default_test", - srcs = ["metadata_store_test.go"], - embed = [":go_default_library"], - deps = [ - "@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library", - "@com_github_golang_protobuf//proto:go_default_library", - "@com_github_google_go_cmp//cmp:go_default_library", - "@google_ml_metadata//ml_metadata/metadata_store:metadata_store_go", - "@google_ml_metadata//ml_metadata/proto:metadata_store_go_proto", - ], -) diff --git a/backend/src/apiserver/metadata/metadata_store.go b/backend/src/apiserver/metadata/metadata_store.go deleted file mode 100644 index 086bfa4634f..00000000000 --- a/backend/src/apiserver/metadata/metadata_store.go +++ /dev/null @@ -1,135 +0,0 @@ -package metadata - -import ( - "encoding/json" - "ml_metadata/metadata_store/mlmetadata" - mlpb "ml_metadata/proto/metadata_store_go_proto" - "strings" - - argoWorkflow "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" - "github.com/golang/protobuf/jsonpb" - "github.com/golang/protobuf/proto" - "github.com/kubeflow/pipelines/backend/src/common/util" -) - -// Store encapsulates a ML Metadata store. -type Store struct { - mlmdStore *mlmetadata.Store -} - -// NewStore creates a new Store, using mlmdStore as the backing ML Metadata -// store. -func NewStore(mlmdStore *mlmetadata.Store) *Store { - return &Store{mlmdStore: mlmdStore} -} - -// RecordOutputArtifacts records metadata on artifacts as parsed from the Argo -// output parameters in currentManifest. storedManifest represents the currently -// stored manifest for the run with id runID, and is used to ensure metadata is -// recorded at most once per artifact. -func (s *Store) RecordOutputArtifacts(runID, storedManifest, currentManifest string) error { - storedWorkflow := &argoWorkflow.Workflow{} - if err := json.Unmarshal([]byte(storedManifest), storedWorkflow); err != nil { - return util.NewInternalServerError(err, "unmarshaling workflow failed") - } - - currentWorkflow := &argoWorkflow.Workflow{} - if err := json.Unmarshal([]byte(currentManifest), currentWorkflow); err != nil { - return util.NewInternalServerError(err, "unmarshaling workflow failed") - } - - completed := make(map[string]bool) - for _, n := range storedWorkflow.Status.Nodes { - if n.Completed() { - completed[n.ID] = true - } - } - - for _, n := range currentWorkflow.Status.Nodes { - if n.Completed() && !completed[n.ID] { - // Newly completed node. Record output ml-metadata artifacts. - if n.Outputs != nil { - for _, output := range n.Outputs.Parameters { - if output.ValueFrom == nil || output.Value == nil || !strings.HasPrefix(output.ValueFrom.Path, "/output/ml_metadata/") { - continue - } - - artifacts, err := parseTFXMetadata(*output.Value) - if err != nil { - return util.NewInvalidInputError("metadata parsing failure: %v", err) - } - - if err := s.storeArtifacts(artifacts); err != nil { - return util.NewInvalidInputError("artifact storing failure: %v", err) - } - } - } - } - } - - return nil -} - -func (s *Store) storeArtifacts(artifacts artifactStructs) error { - for _, a := range artifacts { - id, err := s.mlmdStore.PutArtifactType( - a.ArtifactType, &mlmetadata.PutTypeOptions{AllFieldsMustMatch: true}) - if err != nil { - return util.NewInternalServerError(err, "failed to register artifact type") - } - a.Artifact.TypeId = proto.Int64(int64(id)) - _, err = s.mlmdStore.PutArtifacts([]*mlpb.Artifact{a.Artifact}) - if err != nil { - return util.NewInternalServerError(err, "failed to record artifact") - } - } - return nil -} - -type artifactStruct struct { - ArtifactType *mlpb.ArtifactType `json:"artifact_type"` - Artifact *mlpb.Artifact `json:"artifact"` -} - -func (a *artifactStruct) UnmarshalJSON(b []byte) error { - errorF := func(err error) error { - return util.NewInvalidInputError("JSON Unmarshal failure: %v", err) - } - - jsonMap := make(map[string]*json.RawMessage) - if err := json.Unmarshal(b, &jsonMap); err != nil { - return errorF(err) - } - - if _, ok := jsonMap["artifact_type"]; !ok { - return util.NewInvalidInputError("JSON Unmarshal failure: missing 'artifact_type' field") - } - - if _, ok := jsonMap["artifact"]; !ok { - return util.NewInvalidInputError("JSON Unmarshal failure: missing 'artifact_type' field") - } - - a.ArtifactType = &mlpb.ArtifactType{} - a.Artifact = &mlpb.Artifact{} - - if err := jsonpb.UnmarshalString(string(*jsonMap["artifact_type"]), a.ArtifactType); err != nil { - return errorF(err) - } - - if err := jsonpb.UnmarshalString(string(*jsonMap["artifact"]), a.Artifact); err != nil { - return errorF(err) - } - - return nil -} - -type artifactStructs []*artifactStruct - -func parseTFXMetadata(value string) (artifactStructs, error) { - var tfxArtifacts artifactStructs - - if err := json.Unmarshal([]byte(value), &tfxArtifacts); err != nil { - return nil, util.NewInternalServerError(err, "parse TFX metadata failure") - } - return tfxArtifacts, nil -} diff --git a/backend/src/apiserver/metadata/metadata_store_test.go b/backend/src/apiserver/metadata/metadata_store_test.go deleted file mode 100644 index 5ca9691be1a..00000000000 --- a/backend/src/apiserver/metadata/metadata_store_test.go +++ /dev/null @@ -1,357 +0,0 @@ -package metadata - -import ( - "encoding/json" - "fmt" - "ml_metadata/metadata_store/mlmetadata" - mlpb "ml_metadata/proto/metadata_store_go_proto" - "testing" - - argo "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" - "github.com/golang/protobuf/proto" - "github.com/google/go-cmp/cmp" -) - -func (as artifactStructs) String() string { - var s string - for _, a := range as { - s += fmt.Sprintf("%+v\n", a) - } - return s -} - -func TestParseValidTFXMetadata(t *testing.T) { - tests := []struct { - input string - want artifactStructs - }{ - { - `[{ - "artifact_type": { - "name": "Artifact", - "properties": {"state": "STRING", "span": "INT" } }, - "artifact": { - "uri": "/location", - "properties": { - "state": {"stringValue": "complete"}, - "span": {"intValue": 10} } - } - }]`, - []*artifactStruct{ - &artifactStruct{ - ArtifactType: &mlpb.ArtifactType{ - Name: proto.String("Artifact"), - Properties: map[string]mlpb.PropertyType{ - "state": mlpb.PropertyType_STRING, - "span": mlpb.PropertyType_INT, - }, - }, - Artifact: &mlpb.Artifact{ - Uri: proto.String("/location"), - Properties: map[string]*mlpb.Value{ - "state": &mlpb.Value{Value: &mlpb.Value_StringValue{"complete"}}, - "span": &mlpb.Value{Value: &mlpb.Value_IntValue{10}}, - }, - }, - }, - }, - }, - { - `[{ - "artifact_type": { - "name": "Artifact 1", - "properties": {"state": "STRING", "span": "INT" } }, - "artifact": { - "uri": "/location 1", - "properties": { - "state": {"stringValue": "complete"}, - "span": {"intValue": 10} } - } - }, - { - "artifact_type": { - "name": "Artifact 2", - "properties": {"state": "STRING", "span": "INT" } }, - "artifact": { - "uri": "/location 2", - "properties": { - "state": {"stringValue": "complete"}, - "span": {"intValue": 10} } - } - }]`, - []*artifactStruct{ - &artifactStruct{ - ArtifactType: &mlpb.ArtifactType{ - Name: proto.String("Artifact 1"), - Properties: map[string]mlpb.PropertyType{ - "state": mlpb.PropertyType_STRING, - "span": mlpb.PropertyType_INT, - }, - }, - Artifact: &mlpb.Artifact{ - Uri: proto.String("/location 1"), - Properties: map[string]*mlpb.Value{ - "state": &mlpb.Value{Value: &mlpb.Value_StringValue{"complete"}}, - "span": &mlpb.Value{Value: &mlpb.Value_IntValue{10}}, - }, - }, - }, - &artifactStruct{ - ArtifactType: &mlpb.ArtifactType{ - Name: proto.String("Artifact 2"), - Properties: map[string]mlpb.PropertyType{ - "state": mlpb.PropertyType_STRING, - "span": mlpb.PropertyType_INT, - }, - }, - Artifact: &mlpb.Artifact{ - Uri: proto.String("/location 2"), - Properties: map[string]*mlpb.Value{ - "state": &mlpb.Value{Value: &mlpb.Value_StringValue{"complete"}}, - "span": &mlpb.Value{Value: &mlpb.Value_IntValue{10}}, - }, - }, - }, - }, - }, - } - - for _, test := range tests { - got, err := parseTFXMetadata(test.input) - if err != nil || !cmp.Equal(got, test.want) { - t.Errorf("parseTFXMetadata(%q)\nGot:\n<%+v, %+v>\nWant:\n%+v, nil error\nDiff:\n%s", test.input, got, err, test.want, cmp.Diff(test.want, got)) - } - } -} - -func TestParseInvalidTFXMetadata(t *testing.T) { - tests := []struct { - desc string - input string - }{ - { - "no artifact type", - `[{ - "artifact": { - "uri": "/location", - "properties": { - "state": {"stringValue": "complete"}, - "span": {"intValue": 10} } - } - }]`, - }, - { - "no artifact", - `[{ - "artifact_type": { - "name": "Artifact", - "properties": {"state": "STRING", "span": "INT" } }, - }]`, - }, - { - "empty string", - "", - }, - } - - for _, test := range tests { - _, err := parseTFXMetadata(test.input) - if err == nil { - t.Errorf("Test: %q", test.desc) - t.Errorf("parseTFXMetadata(%q)\nGot non-nil error. Want error.", test.input) - } - } -} - -func fakeMLMDStore(t *testing.T) *mlmetadata.Store { - cfg := &mlpb.ConnectionConfig{ - Config: &mlpb.ConnectionConfig_FakeDatabase{&mlpb.FakeDatabaseConfig{}}, - } - - mlmdStore, err := mlmetadata.NewStore(cfg) - if err != nil { - t.Fatalf("Failed to create ML Metadata Store: %v", err) - } - return mlmdStore -} - -func TestRecordOutputArtifacts(t *testing.T) { - - tests := []struct { - desc string - stored *argo.Workflow - current *argo.Workflow - wantTypes []*mlpb.ArtifactType - wantArtifacts []*mlpb.Artifact - }{ - { - desc: "nothing is recorded when node is still running", - stored: &argo.Workflow{Status: argo.WorkflowStatus{ - Nodes: map[string]argo.NodeStatus{ - "step_1": argo.NodeStatus{ - ID: "step_1_node_id", - Phase: argo.NodeRunning, - Outputs: &argo.Outputs{ - Parameters: []argo.Parameter{ - argo.Parameter{ - ValueFrom: &argo.ValueFrom{Path: "/output/ml_metadata/output1"}, - Value: proto.String(`[{ - "artifact_type": { "name": "step_1_artifact_type", - "properties": {"state": "STRING" } }, - "artifact": { "uri": "/location 2", - "properties": { - "state": {"stringValue": "complete"} } } - }]`), - }}}}}}}, - current: &argo.Workflow{Status: argo.WorkflowStatus{ - Nodes: map[string]argo.NodeStatus{ - "step_1": argo.NodeStatus{ - ID: "step_1_node_id", - Phase: argo.NodeRunning, - Outputs: &argo.Outputs{ - Parameters: []argo.Parameter{ - argo.Parameter{ - ValueFrom: &argo.ValueFrom{Path: "/output/ml_metadata/output1"}, - Value: proto.String(`[{ - "artifact_type": { "name": "step_1_artifact_type", - "properties": {"state": "STRING" } }, - "artifact": { "uri": "/step_1_location", - "properties": { - "state": {"stringValue": "complete"} } } - }]`), - }}}}}}}, - wantTypes: nil, - wantArtifacts: nil, - }, - { - desc: "artifacts are recorded when node transitions to Complete", - stored: &argo.Workflow{Status: argo.WorkflowStatus{ - Nodes: map[string]argo.NodeStatus{ - "step_1": argo.NodeStatus{ - ID: "step_1_node_id", - Phase: argo.NodeRunning, - Outputs: &argo.Outputs{ - Parameters: []argo.Parameter{ - argo.Parameter{ - ValueFrom: &argo.ValueFrom{Path: "/output/ml_metadata/output1"}, - Value: proto.String(`[{ - "artifact_type": { "name": "step_1_artifact_type", - "properties": {"state": "STRING" } }, - "artifact": { "uri": "/location 2", - "properties": { - "state": {"stringValue": "complete"} } } - }]`), - }}}}}}}, - current: &argo.Workflow{Status: argo.WorkflowStatus{ - Nodes: map[string]argo.NodeStatus{ - "step_1": argo.NodeStatus{ - ID: "step_1_node_id", - Phase: argo.NodeSucceeded, - Outputs: &argo.Outputs{ - Parameters: []argo.Parameter{ - argo.Parameter{ - ValueFrom: &argo.ValueFrom{Path: "/output/ml_metadata/output1"}, - Value: proto.String(`[{ - "artifact_type": { "name": "step_1_artifact_type", - "properties": {"state": "STRING" } }, - "artifact": { "uri": "/step_1_location", - "properties": { - "state": {"stringValue": "complete"} } } - }]`), - }}}}}}}, - wantTypes: []*mlpb.ArtifactType{ - &mlpb.ArtifactType{ - Id: proto.Int64(1), - Name: proto.String("step_1_artifact_type"), - Properties: map[string]mlpb.PropertyType{"state": mlpb.PropertyType_STRING}, - }}, - wantArtifacts: []*mlpb.Artifact{ - &mlpb.Artifact{ - Id: proto.Int64(1), - TypeId: proto.Int64(1), - Uri: proto.String("/step_1_location"), - Properties: map[string]*mlpb.Value{ - "state": &mlpb.Value{Value: &mlpb.Value_StringValue{"complete"}}}, - }}, - }, - { - desc: "Records artifacts only from Node with output parameter specified in ValueFrom Path", - stored: &argo.Workflow{Status: argo.WorkflowStatus{ - Nodes: map[string]argo.NodeStatus{ - "step_1": argo.NodeStatus{ - ID: "step_1_node_id", - Phase: argo.NodeRunning, - Outputs: &argo.Outputs{ - Parameters: []argo.Parameter{ - argo.Parameter{ - Value: proto.String(`[{ - "artifact_type": { "name": "step_1_artifact_type", - "properties": {"state": "STRING" } }, - "artifact": { "uri": "/location 2", - "properties": { - "state": {"stringValue": "complete"} } } - }]`), - }}}}}}}, - current: &argo.Workflow{Status: argo.WorkflowStatus{ - Nodes: map[string]argo.NodeStatus{ - "step_1": argo.NodeStatus{ - ID: "step_1_node_id", - Phase: argo.NodeSucceeded, - Outputs: &argo.Outputs{ - Parameters: []argo.Parameter{ - argo.Parameter{ - Value: proto.String(`[{ - "artifact_type": { "name": "step_1_artifact_type", - "properties": {"state": "STRING" } }, - "artifact": { "uri": "/step_1_location", - "properties": { - "state": {"stringValue": "complete"} } } - }]`), - }}}}}}}, - wantTypes: nil, - wantArtifacts: nil, - }, - } - - for _, test := range tests { - mlmdStore := fakeMLMDStore(t) - store := Store{mlmdStore: mlmdStore} - - current, err := json.Marshal(test.current) - if err != nil { - t.Errorf("Test: %q", test.desc) - t.Errorf("json.Marshal(%v) = %v", test.current, err) - continue - } - - stored, err := json.Marshal(test.stored) - if err != nil { - t.Errorf("Test: %q", test.desc) - t.Errorf("json.Marshal(%v) = %v", test.stored, err) - continue - } - - if err := store.RecordOutputArtifacts("", string(stored), string(current)); err != nil { - t.Errorf("Test: %q", test.desc) - t.Errorf("store.RecordOutputArtifacts(%q, %q) = %v\nWant non-nil error\n", current, stored, err) - continue - } - - gotTypes, err := mlmdStore.GetArtifactTypesByID([]mlmetadata.ArtifactTypeID{1}) - if !cmp.Equal(gotTypes, test.wantTypes) { - t.Errorf("Test: %q", test.desc) - t.Errorf("mlmdStore.GetArtifactTypes()\n") - t.Errorf("Got artifact types:\n%v\nWant\n%v\n", gotTypes, test.wantTypes) - t.Errorf("Got error:\n%v\nWant nil error\n", err) - } - - gotArtifacts, err := mlmdStore.GetArtifacts() - if !cmp.Equal(gotArtifacts, test.wantArtifacts) { - t.Errorf("Test: %q", test.desc) - t.Errorf("mlmdStore.GetArtifacts()\n") - t.Errorf("Got artifacts:\n%v\nWant\n%v\n", gotArtifacts, test.wantArtifacts) - t.Errorf("Got error:\n%v\nWant nil error\n", err) - } - } - -} diff --git a/backend/src/apiserver/resource/BUILD.bazel b/backend/src/apiserver/resource/BUILD.bazel index fb004e94e43..c477c49e6aa 100644 --- a/backend/src/apiserver/resource/BUILD.bazel +++ b/backend/src/apiserver/resource/BUILD.bazel @@ -17,7 +17,6 @@ go_library( "//backend/api:go_default_library", "//backend/src/apiserver/common:go_default_library", "//backend/src/apiserver/list:go_default_library", - "//backend/src/apiserver/metadata:go_default_library", "//backend/src/apiserver/model:go_default_library", "//backend/src/apiserver/storage:go_default_library", "//backend/src/common/util:go_default_library", @@ -29,8 +28,6 @@ go_library( "@com_github_cenkalti_backoff//:go_default_library", "@com_github_golang_glog//:go_default_library", "@com_github_pkg_errors//:go_default_library", - "@google_ml_metadata//ml_metadata/metadata_store:metadata_store_go", - "@google_ml_metadata//ml_metadata/proto:metadata_store_go_proto", "@io_k8s_api//core/v1:go_default_library", "@io_k8s_api//policy/v1beta1:go_default_library", "@io_k8s_apimachinery//pkg/api/errors:go_default_library", diff --git a/backend/src/apiserver/resource/client_manager_fake.go b/backend/src/apiserver/resource/client_manager_fake.go index eb42b5b20dc..a0e62ce9add 100644 --- a/backend/src/apiserver/resource/client_manager_fake.go +++ b/backend/src/apiserver/resource/client_manager_fake.go @@ -15,12 +15,8 @@ package resource import ( - "ml_metadata/metadata_store/mlmetadata" - mlpb "ml_metadata/proto/metadata_store_go_proto" - workflowclient "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" "github.com/golang/glog" - "github.com/kubeflow/pipelines/backend/src/apiserver/metadata" "github.com/kubeflow/pipelines/backend/src/apiserver/storage" "github.com/kubeflow/pipelines/backend/src/common/util" scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1" @@ -71,7 +67,7 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf experimentStore: storage.NewExperimentStore(db, time, uuid), pipelineStore: storage.NewPipelineStore(db, time, uuid), jobStore: storage.NewJobStore(db, time), - runStore: storage.NewRunStore(db, time, initFakeMetadataStore()), + runStore: storage.NewRunStore(db, time), workflowClientFake: NewWorkflowClientFake(), resourceReferenceStore: storage.NewResourceReferenceStore(db), dBStatusStore: storage.NewDBStatusStore(db), @@ -84,18 +80,6 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf }, nil } -func initFakeMetadataStore() *metadata.Store { - cfg := &mlpb.ConnectionConfig{ - Config: &mlpb.ConnectionConfig_FakeDatabase{&mlpb.FakeDatabaseConfig{}}, - } - - mlmdStore, err := mlmetadata.NewStore(cfg) - if err != nil { - glog.Fatalf("Failed to create ML Metadata store: %v", err) - } - return metadata.NewStore(mlmdStore) -} - func NewFakeClientManagerOrFatal(time util.TimeInterface) *FakeClientManager { uuid := util.NewFakeUUIDGeneratorOrFatal(DefaultFakeUUID, nil) fakeStore, err := NewFakeClientManager(time, uuid) diff --git a/backend/src/apiserver/storage/BUILD.bazel b/backend/src/apiserver/storage/BUILD.bazel index adcf90ab634..26def425b38 100644 --- a/backend/src/apiserver/storage/BUILD.bazel +++ b/backend/src/apiserver/storage/BUILD.bazel @@ -25,7 +25,6 @@ go_library( "//backend/api:go_default_library", "//backend/src/apiserver/common:go_default_library", "//backend/src/apiserver/list:go_default_library", - "//backend/src/apiserver/metadata:go_default_library", "//backend/src/apiserver/model:go_default_library", "//backend/src/common/util:go_default_library", "@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library", @@ -38,9 +37,6 @@ go_library( "@com_github_minio_minio_go//:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_vividcortex_mysqlerr//:go_default_library", - "@google_ml_metadata//ml_metadata/metadata_store:metadata_store_go", # keep - "@google_ml_metadata//ml_metadata/proto:metadata_store_go_proto", # keep - "@google_ml_metadata//ml_metadata/proto:metadata_store_service_go_proto", # keep "@io_k8s_apimachinery//pkg/util/json:go_default_library", ], ) diff --git a/backend/src/apiserver/storage/run_store.go b/backend/src/apiserver/storage/run_store.go index e18fb1e39e5..36dd3385241 100644 --- a/backend/src/apiserver/storage/run_store.go +++ b/backend/src/apiserver/storage/run_store.go @@ -26,7 +26,6 @@ import ( api "github.com/kubeflow/pipelines/backend/api/go_client" "github.com/kubeflow/pipelines/backend/src/apiserver/common" "github.com/kubeflow/pipelines/backend/src/apiserver/list" - "github.com/kubeflow/pipelines/backend/src/apiserver/metadata" "github.com/kubeflow/pipelines/backend/src/apiserver/model" "github.com/kubeflow/pipelines/backend/src/common/util" "k8s.io/apimachinery/pkg/util/json" @@ -71,7 +70,6 @@ type RunStore struct { db *DB resourceReferenceStore *ResourceReferenceStore time util.TimeInterface - metadataStore *metadata.Store } // Runs two SQL queries in a transaction to return a list of matching runs, as well as their @@ -374,29 +372,6 @@ func (s *RunStore) UpdateRun(runID string, condition string, finishedAtInSec int return util.NewInternalServerError(err, "transaction creation failed") } - // Lock the row for update, so we ensure no other update of the same run - // happens while we're parsing it for metadata. We rely on per-row updates - // being synchronous, so metadata can be recorded at most once. Right now, - // persistence agent will call UpdateRun all the time, even if there is nothing - // new in the status of an Argo manifest. This means we need to keep track - // manually here on what the previously updated state of the run is, to ensure - // we do not add duplicate metadata. Hence the locking below. - query := "SELECT WorkflowRuntimeManifest FROM run_details WHERE UUID = ?" - query = s.db.SelectForUpdate(query) - - row := tx.QueryRow(query, runID) - var storedManifest string - if err := row.Scan(&storedManifest); err != nil { - tx.Rollback() - return util.NewInvalidInputError("Failed to update run %s. Row not found. Error: %s", runID, err.Error()) - } - - if err := s.metadataStore.RecordOutputArtifacts(runID, storedManifest, workflowRuntimeManifest); err != nil { - // Metadata storage failed. Log the error here, but continue to allow the run - // to be updated as per usual. - glog.Errorf("Failed to record output artifacts: %+v", err) - } - sql, args, err := sq. Update("run_details"). SetMap(sq.Eq{ @@ -425,7 +400,11 @@ func (s *RunStore) UpdateRun(runID string, condition string, finishedAtInSec int if r > 1 { tx.Rollback() return util.NewInternalServerError(errors.New("Failed to update run"), "Failed to update run %s. More than 1 rows affected", runID) + } else if r == 0 { + tx.Rollback() + return util.NewInternalServerError(errors.New("Failed to update run"), "Failed to update run %s. Row not found", runID) } + if err := tx.Commit(); err != nil { return util.NewInternalServerError(err, "failed to commit transaction") } @@ -570,14 +549,12 @@ func (s *RunStore) toRunMetadatas(models []model.ListableDataModel) []model.Run return runMetadatas } -// NewRunStore creates a new RunStore. If metadataStore is non-nil, it will be -// used to record artifact metadata. -func NewRunStore(db *DB, time util.TimeInterface, metadataStore *metadata.Store) *RunStore { +// NewRunStore creates a new RunStore. +func NewRunStore(db *DB, time util.TimeInterface) *RunStore { return &RunStore{ db: db, resourceReferenceStore: NewResourceReferenceStore(db), time: time, - metadataStore: metadataStore, } } diff --git a/backend/src/apiserver/storage/run_store_test.go b/backend/src/apiserver/storage/run_store_test.go index ddb39e310d5..8ea50479dac 100644 --- a/backend/src/apiserver/storage/run_store_test.go +++ b/backend/src/apiserver/storage/run_store_test.go @@ -33,7 +33,7 @@ func initializeRunStore() (*DB, *RunStore) { expStore.CreateExperiment(&model.Experiment{Name: "exp1"}) expStore = NewExperimentStore(db, util.NewFakeTimeForEpoch(), util.NewFakeUUIDGeneratorOrFatal(defaultFakeExpIdTwo, nil)) expStore.CreateExperiment(&model.Experiment{Name: "exp2"}) - runStore := NewRunStore(db, util.NewFakeTimeForEpoch(), nil) + runStore := NewRunStore(db, util.NewFakeTimeForEpoch()) run1 := &model.RunDetail{ Run: model.Run{ diff --git a/go.mod b/go.mod index 5cd5d23a99a..ccbda0a3e1e 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,6 @@ require ( github.com/golang/protobuf v1.3.2 github.com/google/go-cmp v0.3.1 github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect - github.com/google/ml-metadata v0.0.0-20190214221617-0fb82dc56ff7 github.com/google/pprof v0.0.0-20190723021845-34ac40c74b70 // indirect github.com/google/uuid v1.0.0 github.com/googleapis/gnostic v0.2.0 // indirect diff --git a/go.sum b/go.sum index 0b7b2a92e3e..7a4cd71af61 100644 --- a/go.sum +++ b/go.sum @@ -110,10 +110,12 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a h1:ZJu5NB1Bk5ms4vw0Xu4i+jD32SE9jQXyfnOvwhHqlT0= github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck= github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= @@ -262,6 +264,7 @@ golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -327,6 +330,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=