-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Record TFX output artifacts in Metadata store #884
Conversation
/assign @IronPan |
Tests are likely to fail since they assume the use of the go toolchain rather than bazel. But this change is ready to be looked at for review. |
@@ -127,6 +134,31 @@ func (c *ClientManager) init() { | |||
|
|||
c.swfClient = client.CreateScheduledWorkflowClientOrFatal( | |||
getStringConfig(podNamespace), getDurationConfig(initConnectionTimeout)) | |||
|
|||
port, err := strconv.Atoi(getStringConfig(mysqlServicePort)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to follow the pattern above and encapsulate the metadata store setup in a method? (storage.NewMetadataStore())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, move it to it's own method. It's not going to be in storage, as it's not part of that package. It's in it's own package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, moved it to it's own function below.
glog.Fatalf("Failed to create ML Metadata store: %v", err) | ||
} | ||
metadataStore := metadata.NewStore(mlmdStore) | ||
runStore := storage.NewRunStore(db, c.time, metadataStore) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we use a single DB for both the current tables and the new tables?
Or are there two DBs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it's a new DB
Database: proto.String("mlmetadata")
No preference on either way but just curious what the pro/con of either approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use a separate db for isolation. They are not related to each other. Schema upgrades etc will anyway happen separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 2 dbs here. I'd like to keep these two thing separate for now, for better isolation. Schema upgrades etc need to happen separately anyway.
}, | ||
} | ||
|
||
mlmdStore, err := mlmetadata.NewStore(cfg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there support for a "fake" implementation using SQLLite so that we can run a lot of backend tests as go tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely. mlmetadata supports sqlite, so it should be possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to add support for SQLLite right away so that we can run tests quickly locally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, sorry, I can't do it right now.
If it's any consolation, I plan to move all of this out to its own service soon, so API server can stay the same as before.
backend/src/apiserver/main.go
Outdated
@@ -49,6 +54,8 @@ type RegisterHttpHandlerFromEndpoint func(ctx context.Context, mux *runtime.Serv | |||
func main() { | |||
flag.Parse() | |||
glog.Infof("starting API server") | |||
log.Println("log says hello") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the same logging utilities as in other places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, leftover from testing. Removed.
@@ -4,4 +4,8 @@ load("@bazel_gazelle//:def.bzl", "gazelle") | |||
# gazelle:resolve proto protoc-gen-swagger/options/annotations.proto @com_github_grpc_ecosystem_grpc_gateway//protoc-gen-swagger/options:options_proto |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple questions;
- What's the ETA to check this in?
- Could you provide a quick step by step overview (5/6 sentences) of how it works? What does the DSL provide? Where to the types come from?
- Is it a temporary implementation to speed things up for the dev summit? Or will we do some changes later? (Let's add TODOs if the latter)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
ETA is ASAP, before dev summit.
-
How this works roughly, is that we parse ml_metadata protos from any node that has an output parameter with the specified convention (/output/ml_metadata/*). This parsing happens when a Run is updated. If the step has such an output, we will parse it. Only true for TFX components right now. Otherwise, this is a no-op and no metadata tracking happens for other types of components.
-
It's a temporary implementation. I will most likely move all of this out post dev-summit for a more complete solution for metadata tracking.
|
||
func (a *artifactStruct) UnmarshalJSON(b []byte) error { | ||
errorF := func(err error) error { | ||
return fmt.Errorf("JSON Unmarshal failure: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use the error library in the Pipeline API server so that we don't lose the stack trace? (Here and in other places).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
var storedManifest string | ||
if err := row.Scan(&storedManifest); err != nil { | ||
tx.Rollback() | ||
return util.NewResourceNotFoundError("Run", runID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We lose the error message here.
Additionally, isn't it possible that the error is not a RESOURCE_NOT_FOUND?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, done.
return util.NewResourceNotFoundError("Run", runID) | ||
} | ||
|
||
if s.metadataStore != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we encapsulate the metadata store operations in separate functions?
Should the "if s.metadaStore != nil" happen before the first query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is encapsulated in a separate package. This is the place where we update runs, we need to put the logic to call metadata tracking here. And this needs to happen as part of a transaction to ensure we don't duplicate metadata being stored.
return util.NewInternalServerError(err, "transaction creation failed") | ||
} | ||
|
||
row := tx.QueryRow("SELECT WorkflowRuntimeManifest FROM run_details WHERE UUID = ? FOR UPDATE", runID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment indicating what each query is for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -63,7 +63,7 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf | |||
experimentStore: storage.NewExperimentStore(db, time, uuid), | |||
pipelineStore: storage.NewPipelineStore(db, time, uuid), | |||
jobStore: storage.NewJobStore(db, time), | |||
runStore: storage.NewRunStore(db, time), | |||
runStore: storage.NewRunStore(db, time, nil), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a fake for the third term? (I am guessing it is the metadata store)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also have an issue? I would suggest we address this as a high priority as not having the SQLLite implementation will decrease development velocity: errors will only be caught at the time of integration testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will move this out of API server. In the meantime, filed issue #907
"github.com/kubeflow/pipelines/backend/src/apiserver/model" | ||
"github.com/kubeflow/pipelines/backend/src/apiserver/storage" | ||
"github.com/kubeflow/pipelines/backend/src/common/util" | ||
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1" | ||
minio "github.com/minio/minio-go" | ||
|
||
"ml_metadata/metadata_store/mlmetadata" | ||
mlpb "ml_metadata/proto/metadata_store_go_proto" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does mlpb stands for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just an alias for the proto package I'm referring to here. Standard Go google3 practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it stands for ml protobuf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, mlmd protobuf. I'm trying to be consistent with how it's used in mlmd Go client itself.
mlpb "ml_metadata/proto/metadata_store_go_proto" | ||
) | ||
|
||
// Store ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a todo to fill the comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, filled comments.
glog.Fatalf("Failed to create ML Metadata store: %v", err) | ||
} | ||
metadataStore := metadata.NewStore(mlmdStore) | ||
runStore := storage.NewRunStore(db, c.time, metadataStore) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it's a new DB
Database: proto.String("mlmetadata")
No preference on either way but just curious what the pro/con of either approach?
} | ||
|
||
// RecordOutputArtifacts ... | ||
func (s *Store) RecordOutputArtifacts(runID string, storedManifest, currentManifest string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you make it consistent
runID, storedManifest, currentManifest string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also maybe name storedManifest -> storedWorkflowString? currentManifest -> currentWorkflowString
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done for the first. For the second, I prefer the current naming, and am not a fan of putting type names in variable names (Hungarian notation?). It's noisy, adds little value, and is not idiomatic in Go code. What do you think?
} | ||
|
||
for _, n := range currentWorkflow.Status.Nodes { | ||
if n.Completed() && !completed[n.ID] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for nodes that are not complete, they are not in the map. might be good if we store them as false if they are not complete above, to be more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Go, false is the default value for booleans, so it's already implied: https://blog.golang.org/go-maps-in-action
return nil | ||
} | ||
|
||
type artifactStruct struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we picturing this to be the sole artifact structure or tfx specific? If latter, i would recommend reflect that in the type name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this should be generic for all types. It's a ml-metadata concept, and consists of the type, and it's actual artifact. I'm hoping we can reuse it for other types as well.
"testing" | ||
|
||
"github.com/google/go-cmp/cmp" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please format import, same in other files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. This is what gofmt gives us, so I'd rather leave it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - some gofmt/goimport tools don't remove empty lines for you. you might want to manually remove them to keep consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gofmt should just auto-run on save, so I'd rather not mess with it :-) It'll just introduce inconsistencies. The blank line is correct here, in that it tries to separate external and internal packages. However, ml_metadata is a little messed up, in that there is no proper go package for it (you can't do go get on it). Hence, the reason it looks like this. I'll look into getting this fixed upstream. For now though, I think gofmt is doing the right thing.
@@ -33,7 +33,7 @@ func initializeRunStore() (*DB, *RunStore) { | |||
expStore.CreateExperiment(&model.Experiment{Name: "exp1"}) | |||
expStore = NewExperimentStore(db, util.NewFakeTimeForEpoch(), util.NewFakeUUIDGeneratorOrFatal(defaultFakeExpIdTwo, nil)) | |||
expStore.CreateExperiment(&model.Experiment{Name: "exp2"}) | |||
runStore := NewRunStore(db, util.NewFakeTimeForEpoch()) | |||
runStore := NewRunStore(db, util.NewFakeTimeForEpoch(), nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think we should test the new code? (or a todo)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added TODO.
/retest |
Seems API image failed to build
|
) | ||
|
||
// Store encapsulates a ML Metadata store. | ||
type Store struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be called MetadataStore? Store seems generic. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it should be Store. The package name is metadata, so clients always refer to it as metadata.Store. See
http://go/gh/golang/go/wiki/CodeReviewComments#package-names
https://golang.org/doc/effective_go.html#package-names
/lgtm |
completed := make(map[string]bool) | ||
for _, n := range storedWorkflow.Status.Nodes { | ||
if n.Completed() { | ||
completed[n.ID] = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is used for the ID of the node?
Is it a UUID? Or just the name of the workflow step?
If the later, this won't work as a step can be run multiple times in different parts of the workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would that matter? We are operating within a specified runID, which is already unique right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It matters because if a step is run multiple times as part of a workflow, we end up with collisions. For instance, if the template is used twice within the same workflow we end up with the data being stored at:
- workflow_UUID/node_ID for the first instance.
- workflow_UUID/node_ID for the second instance.
Hence we get a collision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that makes sense. I verified that this is the actual node id, not the step name. So it's actually unique.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
} | ||
|
||
for _, n := range currentWorkflow.Status.Nodes { | ||
if n.Completed() && !completed[n.ID] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we rename to make the code easier to follow:
- n => currentNodeID
- completed => StoredNodeIDToCompletedMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, that's not idiomatic Go code. In Go, it's idiomatic to prefer short variable names. See:
https://github.com/golang/go/wiki/CodeReviewComments#variable-names
Verbose variable names are ok if the scope is large, but when the scope is small like it is here, in a tight for loop, that seems unnecessary. Please see also
https://talks.golang.org/2014/names.slide#1
if n.Completed() && !completed[n.ID] { | ||
// Newly completed node. Record output ml-metadata artifacts. | ||
if n.Outputs != nil { | ||
for _, output := range n.Outputs.Parameters { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems hacky. What if a container decides to use the same path even though it is not related to metadata? (Please file an issue if you need to follow up at a later time).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is hacky. I would say, as hacky as the current file output conventions we already have, like metadata for UI and metrics. This is no different I would argue. In the event someone uses this same file name for something else, parsing metadata fails, but things carry on as before. Nonetheless, filed issue #906
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to check the output.name
, not path. It's reasonable to have some reserved names that have special handling. We can prefix the names with the product name to be safe: if output.name == 'kfp_metadata'
id, err := s.mlmdStore.PutArtifactType( | ||
a.ArtifactType, &mlmetadata.PutTypeOptions{AllFieldsMustMatch: true}) | ||
if err != nil { | ||
return util.NewInternalServerError(err, "failed to register artifact type") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be an "internal Server error" in all cases? What if the metadata returned by the container is the problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an internal error. It can't be an invalid input error, as that's for user input right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An internal error is when something is broken in either the API server or a main piece of infrastructure.
So:
- Internal error would be when a connection to the DB cannot be made.
- If the data returned by the container is wrong, there is nothing wrong with the pipeline system itself, so it shouldn't be an internal error. (and clients shouldn't retry).
In this case, it seems that we need to distinguish between the two cases, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. What's the right error here? I'll change it to whatever you suggest is the right error type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need two cases:
- if the metadata content is the problem => InvalidInputError
- Otherwise (connection error, etc.) => InternalServerError.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -63,7 +63,7 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf | |||
experimentStore: storage.NewExperimentStore(db, time, uuid), | |||
pipelineStore: storage.NewPipelineStore(db, time, uuid), | |||
jobStore: storage.NewJobStore(db, time), | |||
runStore: storage.NewRunStore(db, time), | |||
runStore: storage.NewRunStore(db, time, nil), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also have an issue? I would suggest we address this as a high priority as not having the SQLLite implementation will decrease development velocity: errors will only be caught at the time of integration testing.
return util.NewInternalServerError(err, "transaction creation failed") | ||
} | ||
|
||
// Lock the row for update, so we ensure no other update of the same run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it possible to add duplicated metadata?
Updating metadata should idempotent since it is based on the ID of the workflow and POD. Is there something that prevents metadata updates to be idempotent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ml-metadata is not idempotent. It returns an id whenever we store a new artifact. That id is not based on the id of the pod or workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will this approach work once the metadata store is independent from KFP? It won't be possible to use a transaction involving both databases at that point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we'll need transactions to be honest. Maybe we will, but we can solve that problem when we get to it. That question doesn't seem to affect what we're doing here right?
/retest |
1 similar comment
/retest |
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: IronPan, vicaire The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
1 similar comment
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: IronPan, vicaire The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/lgtm |
/hold cancel Going ahead and removing that label for the merge |
* WIP: ML Metadata in KFP * Move metadata tracking to its own package. * Clean up * Address review comments, update travis.yml * Add dependencies for building in Dockerfile * Log errors but continue to update run when metadata storing fails. * Update workspace to get latest ml-metadata version. * Update errors
Use self-managed organzation for kfserving tests
This change integrates ML Metadata into the API server.
The API server now looks for specific output parameters that, if they follow a certain convention (like TFX components) will be parsed and stored in the metadata store.
A consequence of this change is that api server can no longer be built with
go build
, and must use bazel for building and testing.This change is