Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record TFX output artifacts in Metadata store #884

Merged
merged 9 commits into from
Mar 6, 2019

Conversation

neuromage
Copy link
Contributor

@neuromage neuromage commented Mar 1, 2019

This change integrates ML Metadata into the API server.

The API server now looks for specific output parameters that, if they follow a certain convention (like TFX components) will be parsed and stored in the metadata store.

A consequence of this change is that api server can no longer be built with go build, and must use bazel for building and testing.


This change is Reviewable

@neuromage
Copy link
Contributor Author

/assign @IronPan
/cc @vicaire
/cc @paveldournov

@neuromage
Copy link
Contributor Author

Tests are likely to fail since they assume the use of the go toolchain rather than bazel. But this change is ready to be looked at for review.

@@ -127,6 +134,31 @@ func (c *ClientManager) init() {

c.swfClient = client.CreateScheduledWorkflowClientOrFatal(
getStringConfig(podNamespace), getDurationConfig(initConnectionTimeout))

port, err := strconv.Atoi(getStringConfig(mysqlServicePort))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to follow the pattern above and encapsulate the metadata store setup in a method? (storage.NewMetadataStore())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, move it to it's own method. It's not going to be in storage, as it's not part of that package. It's in it's own package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, moved it to it's own function below.

glog.Fatalf("Failed to create ML Metadata store: %v", err)
}
metadataStore := metadata.NewStore(mlmdStore)
runStore := storage.NewRunStore(db, c.time, metadataStore)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use a single DB for both the current tables and the new tables?

Or are there two DBs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's a new DB
Database: proto.String("mlmetadata")

No preference on either way but just curious what the pro/con of either approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a separate db for isolation. They are not related to each other. Schema upgrades etc will anyway happen separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 dbs here. I'd like to keep these two thing separate for now, for better isolation. Schema upgrades etc need to happen separately anyway.

},
}

mlmdStore, err := mlmetadata.NewStore(cfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there support for a "fake" implementation using SQLLite so that we can run a lot of backend tests as go tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. mlmetadata supports sqlite, so it should be possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add support for SQLLite right away so that we can run tests quickly locally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, sorry, I can't do it right now.
If it's any consolation, I plan to move all of this out to its own service soon, so API server can stay the same as before.

@@ -49,6 +54,8 @@ type RegisterHttpHandlerFromEndpoint func(ctx context.Context, mux *runtime.Serv
func main() {
flag.Parse()
glog.Infof("starting API server")
log.Println("log says hello")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the same logging utilities as in other places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, leftover from testing. Removed.

@@ -4,4 +4,8 @@ load("@bazel_gazelle//:def.bzl", "gazelle")
# gazelle:resolve proto protoc-gen-swagger/options/annotations.proto @com_github_grpc_ecosystem_grpc_gateway//protoc-gen-swagger/options:options_proto
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple questions;

  • What's the ETA to check this in?
  • Could you provide a quick step by step overview (5/6 sentences) of how it works? What does the DSL provide? Where to the types come from?
  • Is it a temporary implementation to speed things up for the dev summit? Or will we do some changes later? (Let's add TODOs if the latter)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. ETA is ASAP, before dev summit.

  2. How this works roughly, is that we parse ml_metadata protos from any node that has an output parameter with the specified convention (/output/ml_metadata/*). This parsing happens when a Run is updated. If the step has such an output, we will parse it. Only true for TFX components right now. Otherwise, this is a no-op and no metadata tracking happens for other types of components.

  3. It's a temporary implementation. I will most likely move all of this out post dev-summit for a more complete solution for metadata tracking.


func (a *artifactStruct) UnmarshalJSON(b []byte) error {
errorF := func(err error) error {
return fmt.Errorf("JSON Unmarshal failure: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the error library in the Pipeline API server so that we don't lose the stack trace? (Here and in other places).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

var storedManifest string
if err := row.Scan(&storedManifest); err != nil {
tx.Rollback()
return util.NewResourceNotFoundError("Run", runID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We lose the error message here.

Additionally, isn't it possible that the error is not a RESOURCE_NOT_FOUND?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, done.

return util.NewResourceNotFoundError("Run", runID)
}

if s.metadataStore != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we encapsulate the metadata store operations in separate functions?

Should the "if s.metadaStore != nil" happen before the first query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is encapsulated in a separate package. This is the place where we update runs, we need to put the logic to call metadata tracking here. And this needs to happen as part of a transaction to ensure we don't duplicate metadata being stored.

return util.NewInternalServerError(err, "transaction creation failed")
}

row := tx.QueryRow("SELECT WorkflowRuntimeManifest FROM run_details WHERE UUID = ? FOR UPDATE", runID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment indicating what each query is for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -63,7 +63,7 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf
experimentStore: storage.NewExperimentStore(db, time, uuid),
pipelineStore: storage.NewPipelineStore(db, time, uuid),
jobStore: storage.NewJobStore(db, time),
runStore: storage.NewRunStore(db, time),
runStore: storage.NewRunStore(db, time, nil),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a fake for the third term? (I am guessing it is the metadata store)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also have an issue? I would suggest we address this as a high priority as not having the SQLLite implementation will decrease development velocity: errors will only be caught at the time of integration testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will move this out of API server. In the meantime, filed issue #907

"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
minio "github.com/minio/minio-go"

"ml_metadata/metadata_store/mlmetadata"
mlpb "ml_metadata/proto/metadata_store_go_proto"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does mlpb stands for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just an alias for the proto package I'm referring to here. Standard Go google3 practice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it stands for ml protobuf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, mlmd protobuf. I'm trying to be consistent with how it's used in mlmd Go client itself.

mlpb "ml_metadata/proto/metadata_store_go_proto"
)

// Store ...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a todo to fill the comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, filled comments.

glog.Fatalf("Failed to create ML Metadata store: %v", err)
}
metadataStore := metadata.NewStore(mlmdStore)
runStore := storage.NewRunStore(db, c.time, metadataStore)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's a new DB
Database: proto.String("mlmetadata")

No preference on either way but just curious what the pro/con of either approach?

}

// RecordOutputArtifacts ...
func (s *Store) RecordOutputArtifacts(runID string, storedManifest, currentManifest string) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you make it consistent
runID, storedManifest, currentManifest string

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also maybe name storedManifest -> storedWorkflowString? currentManifest -> currentWorkflowString

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done for the first. For the second, I prefer the current naming, and am not a fan of putting type names in variable names (Hungarian notation?). It's noisy, adds little value, and is not idiomatic in Go code. What do you think?

}

for _, n := range currentWorkflow.Status.Nodes {
if n.Completed() && !completed[n.ID] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for nodes that are not complete, they are not in the map. might be good if we store them as false if they are not complete above, to be more readable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Go, false is the default value for booleans, so it's already implied: https://blog.golang.org/go-maps-in-action

return nil
}

type artifactStruct struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we picturing this to be the sole artifact structure or tfx specific? If latter, i would recommend reflect that in the type name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this should be generic for all types. It's a ml-metadata concept, and consists of the type, and it's actual artifact. I'm hoping we can reuse it for other types as well.

"testing"

"github.com/google/go-cmp/cmp"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please format import, same in other files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. This is what gofmt gives us, so I'd rather leave it as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - some gofmt/goimport tools don't remove empty lines for you. you might want to manually remove them to keep consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gofmt should just auto-run on save, so I'd rather not mess with it :-) It'll just introduce inconsistencies. The blank line is correct here, in that it tries to separate external and internal packages. However, ml_metadata is a little messed up, in that there is no proper go package for it (you can't do go get on it). Hence, the reason it looks like this. I'll look into getting this fixed upstream. For now though, I think gofmt is doing the right thing.

@@ -33,7 +33,7 @@ func initializeRunStore() (*DB, *RunStore) {
expStore.CreateExperiment(&model.Experiment{Name: "exp1"})
expStore = NewExperimentStore(db, util.NewFakeTimeForEpoch(), util.NewFakeUUIDGeneratorOrFatal(defaultFakeExpIdTwo, nil))
expStore.CreateExperiment(&model.Experiment{Name: "exp2"})
runStore := NewRunStore(db, util.NewFakeTimeForEpoch())
runStore := NewRunStore(db, util.NewFakeTimeForEpoch(), nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we should test the new code? (or a todo)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TODO.

@neuromage
Copy link
Contributor Author

/retest

@IronPan
Copy link
Member

IronPan commented Mar 4, 2019

Seems API image failed to build

The command '/bin/sh -c bazel build -c opt --action_env=PATH --define=grpc_no_ares=true backend/src/apiserver:apiserver' returned a non-zero code: 1

)

// Store encapsulates a ML Metadata store.
type Store struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be called MetadataStore? Store seems generic. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it should be Store. The package name is metadata, so clients always refer to it as metadata.Store. See
http://go/gh/golang/go/wiki/CodeReviewComments#package-names
https://golang.org/doc/effective_go.html#package-names

@IronPan
Copy link
Member

IronPan commented Mar 4, 2019

/lgtm
/approve
/hold
with minor comment. hold for @vicaire approve

completed := make(map[string]bool)
for _, n := range storedWorkflow.Status.Nodes {
if n.Completed() {
completed[n.ID] = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is used for the ID of the node?

Is it a UUID? Or just the name of the workflow step?

If the later, this won't work as a step can be run multiple times in different parts of the workflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would that matter? We are operating within a specified runID, which is already unique right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It matters because if a step is run multiple times as part of a workflow, we end up with collisions. For instance, if the template is used twice within the same workflow we end up with the data being stored at:

  • workflow_UUID/node_ID for the first instance.
  • workflow_UUID/node_ID for the second instance.

Hence we get a collision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that makes sense. I verified that this is the actual node id, not the step name. So it's actually unique.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

}

for _, n := range currentWorkflow.Status.Nodes {
if n.Completed() && !completed[n.ID] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rename to make the code easier to follow:

  • n => currentNodeID
  • completed => StoredNodeIDToCompletedMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's not idiomatic Go code. In Go, it's idiomatic to prefer short variable names. See:
https://github.com/golang/go/wiki/CodeReviewComments#variable-names

Verbose variable names are ok if the scope is large, but when the scope is small like it is here, in a tight for loop, that seems unnecessary. Please see also
https://talks.golang.org/2014/names.slide#1

if n.Completed() && !completed[n.ID] {
// Newly completed node. Record output ml-metadata artifacts.
if n.Outputs != nil {
for _, output := range n.Outputs.Parameters {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems hacky. What if a container decides to use the same path even though it is not related to metadata? (Please file an issue if you need to follow up at a later time).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hacky. I would say, as hacky as the current file output conventions we already have, like metadata for UI and metrics. This is no different I would argue. In the event someone uses this same file name for something else, parsing metadata fails, but things carry on as before. Nonetheless, filed issue #906

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to check the output.name, not path. It's reasonable to have some reserved names that have special handling. We can prefix the names with the product name to be safe: if output.name == 'kfp_metadata'

id, err := s.mlmdStore.PutArtifactType(
a.ArtifactType, &mlmetadata.PutTypeOptions{AllFieldsMustMatch: true})
if err != nil {
return util.NewInternalServerError(err, "failed to register artifact type")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be an "internal Server error" in all cases? What if the metadata returned by the container is the problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an internal error. It can't be an invalid input error, as that's for user input right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An internal error is when something is broken in either the API server or a main piece of infrastructure.

So:

  • Internal error would be when a connection to the DB cannot be made.
  • If the data returned by the container is wrong, there is nothing wrong with the pipeline system itself, so it shouldn't be an internal error. (and clients shouldn't retry).

In this case, it seems that we need to distinguish between the two cases, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. What's the right error here? I'll change it to whatever you suggest is the right error type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need two cases:

  • if the metadata content is the problem => InvalidInputError
  • Otherwise (connection error, etc.) => InternalServerError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -63,7 +63,7 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf
experimentStore: storage.NewExperimentStore(db, time, uuid),
pipelineStore: storage.NewPipelineStore(db, time, uuid),
jobStore: storage.NewJobStore(db, time),
runStore: storage.NewRunStore(db, time),
runStore: storage.NewRunStore(db, time, nil),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also have an issue? I would suggest we address this as a high priority as not having the SQLLite implementation will decrease development velocity: errors will only be caught at the time of integration testing.

return util.NewInternalServerError(err, "transaction creation failed")
}

// Lock the row for update, so we ensure no other update of the same run
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it possible to add duplicated metadata?

Updating metadata should idempotent since it is based on the ID of the workflow and POD. Is there something that prevents metadata updates to be idempotent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ml-metadata is not idempotent. It returns an id whenever we store a new artifact. That id is not based on the id of the pod or workflow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this approach work once the metadata store is independent from KFP? It won't be possible to use a transaction involving both databases at that point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we'll need transactions to be honest. Maybe we will, but we can solve that problem when we get to it. That question doesn't seem to affect what we're doing here right?

@neuromage
Copy link
Contributor Author

/retest

1 similar comment
@neuromage
Copy link
Contributor Author

/retest

@vicaire
Copy link
Contributor

vicaire commented Mar 6, 2019

/lgtm
/approve

@k8s-ci-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: IronPan, vicaire

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

1 similar comment
@k8s-ci-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: IronPan, vicaire

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@k8s-ci-robot k8s-ci-robot removed the lgtm label Mar 6, 2019
@IronPan
Copy link
Member

IronPan commented Mar 6, 2019

/lgtm

@neuromage
Copy link
Contributor Author

/hold cancel

Going ahead and removing that label for the merge

@k8s-ci-robot k8s-ci-robot merged commit ba64bd3 into kubeflow:master Mar 6, 2019
cheyang pushed a commit to alibaba/pipelines that referenced this pull request Mar 28, 2019
* WIP: ML Metadata in KFP

* Move metadata tracking to its own package.

* Clean up

* Address review comments, update travis.yml

* Add dependencies for building in Dockerfile

* Log errors but continue to update run when metadata storing fails.

* Update workspace to get latest ml-metadata version.

* Update errors
Linchin pushed a commit to Linchin/pipelines that referenced this pull request Apr 11, 2023
Use self-managed organzation for kfserving tests
HumairAK pushed a commit to red-hat-data-services/data-science-pipelines that referenced this pull request Mar 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants