-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Record TFX output artifacts in Metadata store #884
Changes from all commits
fb34ae4
077eb6e
6be9fb5
1c404ad
2dda3b1
ead8bf3
c519d75
a115c21
446f3f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,19 +17,25 @@ package main | |
import ( | ||
"database/sql" | ||
"fmt" | ||
"strconv" | ||
"time" | ||
|
||
workflowclient "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" | ||
"github.com/cenkalti/backoff" | ||
"github.com/golang/glog" | ||
"github.com/golang/protobuf/proto" | ||
"github.com/jinzhu/gorm" | ||
_ "github.com/jinzhu/gorm/dialects/sqlite" | ||
"github.com/kubeflow/pipelines/backend/src/apiserver/client" | ||
"github.com/kubeflow/pipelines/backend/src/apiserver/metadata" | ||
"github.com/kubeflow/pipelines/backend/src/apiserver/model" | ||
"github.com/kubeflow/pipelines/backend/src/apiserver/storage" | ||
"github.com/kubeflow/pipelines/backend/src/common/util" | ||
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1" | ||
minio "github.com/minio/minio-go" | ||
|
||
"ml_metadata/metadata_store/mlmetadata" | ||
mlpb "ml_metadata/proto/metadata_store_go_proto" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does mlpb stands for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's just an alias for the proto package I'm referring to here. Standard Go google3 practice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does it stands for ml protobuf? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, mlmd protobuf. I'm trying to be consistent with how it's used in mlmd Go client itself. |
||
) | ||
|
||
const ( | ||
|
@@ -56,6 +62,8 @@ type ClientManager struct { | |
swfClient scheduledworkflowclient.ScheduledWorkflowInterface | ||
time util.TimeInterface | ||
uuid util.UUIDGeneratorInterface | ||
|
||
MetadataStore *mlmetadata.Store | ||
} | ||
|
||
func (c *ClientManager) ExperimentStore() storage.ExperimentStoreInterface { | ||
|
@@ -117,7 +125,6 @@ func (c *ClientManager) init() { | |
c.experimentStore = storage.NewExperimentStore(db, c.time, c.uuid) | ||
c.pipelineStore = storage.NewPipelineStore(db, c.time, c.uuid) | ||
c.jobStore = storage.NewJobStore(db, c.time) | ||
c.runStore = storage.NewRunStore(db, c.time) | ||
c.resourceReferenceStore = storage.NewResourceReferenceStore(db) | ||
c.dBStatusStore = storage.NewDBStatusStore(db) | ||
c.objectStore = initMinioClient(getDurationConfig(initConnectionTimeout)) | ||
|
@@ -127,13 +134,42 @@ func (c *ClientManager) init() { | |
|
||
c.swfClient = client.CreateScheduledWorkflowClientOrFatal( | ||
getStringConfig(podNamespace), getDurationConfig(initConnectionTimeout)) | ||
|
||
metadataStore := initMetadataStore() | ||
runStore := storage.NewRunStore(db, c.time, metadataStore) | ||
c.runStore = runStore | ||
|
||
glog.Infof("Client manager initialized successfully") | ||
} | ||
|
||
func (c *ClientManager) Close() { | ||
c.db.Close() | ||
} | ||
|
||
func initMetadataStore() *metadata.Store { | ||
port, err := strconv.Atoi(getStringConfig(mysqlServicePort)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be possible to follow the pattern above and encapsulate the metadata store setup in a method? (storage.NewMetadataStore()) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, move it to it's own method. It's not going to be in storage, as it's not part of that package. It's in it's own package. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, moved it to it's own function below. |
||
if err != nil { | ||
glog.Fatalf("Failed to parse valid MySQL service port from %q: %v", getStringConfig(mysqlServicePort), err) | ||
} | ||
|
||
cfg := &mlpb.ConnectionConfig{ | ||
Config: &mlpb.ConnectionConfig_Mysql{ | ||
&mlpb.MySQLDatabaseConfig{ | ||
Host: proto.String(getStringConfig(mysqlServiceHost)), | ||
Port: proto.Uint32(uint32(port)), | ||
Database: proto.String("mlmetadata"), | ||
User: proto.String("root"), | ||
}, | ||
}, | ||
} | ||
|
||
mlmdStore, err := mlmetadata.NewStore(cfg) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there support for a "fake" implementation using SQLLite so that we can run a lot of backend tests as go tests? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Definitely. mlmetadata supports sqlite, so it should be possible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to add support for SQLLite right away so that we can run tests quickly locally? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, sorry, I can't do it right now. |
||
if err != nil { | ||
glog.Fatalf("Failed to create ML Metadata store: %v", err) | ||
} | ||
return metadata.NewStore(mlmdStore) | ||
} | ||
|
||
func initDBClient(initConnectionTimeout time.Duration) *storage.DB { | ||
driverName := getStringConfig("DBConfig.DriverName") | ||
var arg string | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | ||
|
||
go_library( | ||
name = "go_default_library", | ||
srcs = ["metadata_store.go"], | ||
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/metadata", | ||
visibility = ["//visibility:public"], | ||
deps = [ | ||
"//backend/src/common/util:go_default_library", | ||
"@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library", | ||
"@com_github_golang_protobuf//jsonpb:go_default_library_gen", | ||
"@com_github_golang_protobuf//proto:go_default_library", | ||
"@google_ml_metadata//ml_metadata/metadata_store:metadata_store_go", | ||
"@google_ml_metadata//ml_metadata/proto:metadata_store_go_proto", | ||
], | ||
) | ||
|
||
go_test( | ||
name = "go_default_test", | ||
srcs = ["metadata_store_test.go"], | ||
embed = [":go_default_library"], | ||
deps = [ | ||
"@com_github_golang_protobuf//proto:go_default_library", | ||
"@com_github_google_go_cmp//cmp:go_default_library", | ||
"@google_ml_metadata//ml_metadata/proto:metadata_store_go_proto", | ||
], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple questions;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ETA is ASAP, before dev summit.
How this works roughly, is that we parse ml_metadata protos from any node that has an output parameter with the specified convention (/output/ml_metadata/*). This parsing happens when a Run is updated. If the step has such an output, we will parse it. Only true for TFX components right now. Otherwise, this is a no-op and no metadata tracking happens for other types of components.
It's a temporary implementation. I will most likely move all of this out post dev-summit for a more complete solution for metadata tracking.