Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record TFX output artifacts in Metadata store #884

Merged
merged 9 commits into from
Mar 6, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,14 @@ matrix:
- cd $TRAVIS_BUILD_DIR/backend/src
- gimme -f 1.11.4
- source ~/.gimme/envs/go1.11.4.env
- go vet -all -shadow ./...
- go test ./...
- go vet -all -shadow ./agent/...
- go vet -all -shadow ./cmd/...
- go vet -all -shadow ./common/...
- go vet -all -shadow ./crd/...
- go test ./agent/...
- go test ./cmd/...
- go test ./common/...
- go test ./crd/...
- language: python
python: "2.7"
env: TOXENV=py27
Expand Down
4 changes: 4 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ load("@bazel_gazelle//:def.bzl", "gazelle")
# gazelle:resolve proto protoc-gen-swagger/options/annotations.proto @com_github_grpc_ecosystem_grpc_gateway//protoc-gen-swagger/options:options_proto
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple questions;

  • What's the ETA to check this in?
  • Could you provide a quick step by step overview (5/6 sentences) of how it works? What does the DSL provide? Where to the types come from?
  • Is it a temporary implementation to speed things up for the dev summit? Or will we do some changes later? (Let's add TODOs if the latter)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. ETA is ASAP, before dev summit.

  2. How this works roughly, is that we parse ml_metadata protos from any node that has an output parameter with the specified convention (/output/ml_metadata/*). This parsing happens when a Run is updated. If the step has such an output, we will parse it. Only true for TFX components right now. Otherwise, this is a no-op and no metadata tracking happens for other types of components.

  3. It's a temporary implementation. I will most likely move all of this out post dev-summit for a more complete solution for metadata tracking.

# gazelle:resolve proto go protoc-gen-swagger/options/annotations.proto @com_github_grpc_ecosystem_grpc_gateway//protoc-gen-swagger/options:go_default_library
# gazelle:resolve go github.com/kubeflow/pipelines/backend/api/go_client //backend/api:go_default_library
# gazelle:resolve go ml_metadata/metadata_store/mlmetadata @google_ml_metadata//ml_metadata/metadata_store:metadata_store_go
# gazelle:resolve go ml_metadata/proto/metadata_store_go_proto @google_ml_metadata//ml_metadata/proto:metadata_store_go_proto
# gazelle:resolve go ml_metadata/proto/metadata_store_service_go_proto @google_ml_metadata//ml_metadata/proto:metadata_store_service_go_proto
# gazelle:exclude vendor/
gazelle(name = "gazelle")
46 changes: 43 additions & 3 deletions WORKSPACE
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

http_archive(
name = "io_bazel_rules_go",
sha256 = "7be7dc01f1e0afdba6c8eb2b43d2fa01c743be1b9273ab1eaf6c233df078d705",
urls = ["https://github.com/bazelbuild/rules_go/releases/download/0.16.5/rules_go-0.16.5.tar.gz"],
sha256 = "492c3ac68ed9dcf527a07e6a1b2dcbf199c6bf8b35517951467ac32e421c06c1",
urls = ["https://github.com/bazelbuild/rules_go/releases/download/0.17.0/rules_go-0.17.0.tar.gz"],
)

load("@io_bazel_rules_go//go:def.bzl", "go_register_toolchains", "go_rules_dependencies")
load("@io_bazel_rules_go//go:deps.bzl", "go_register_toolchains", "go_rules_dependencies")

go_rules_dependencies()

Expand All @@ -22,6 +23,45 @@ load("@bazel_gazelle//:deps.bzl", "gazelle_dependencies", "go_repository")

gazelle_dependencies()

http_archive(
name = "org_tensorflow",
sha256 = "24570d860d87dcfb936f53fb8dd30302452d0aa6b8b8537e4555c1bf839121a6",
strip_prefix = "tensorflow-1.13.0-rc0",
urls = [
"https://github.com/tensorflow/tensorflow/archive/v1.13.0-rc0.tar.gz",
],
)

http_archive(
name = "io_bazel_rules_closure",
sha256 = "43c9b882fa921923bcba764453f4058d102bece35a37c9f6383c713004aacff1",
strip_prefix = "rules_closure-9889e2348259a5aad7e805547c1a0cf311cfcd91",
urls = [
"https://mirror.bazel.build/github.com/bazelbuild/rules_closure/archive/9889e2348259a5aad7e805547c1a0cf311cfcd91.tar.gz",
"https://github.com/bazelbuild/rules_closure/archive/9889e2348259a5aad7e805547c1a0cf311cfcd91.tar.gz", # 2018-12-21
],
)

load("@org_tensorflow//tensorflow:workspace.bzl", "tf_workspace")

tf_workspace()

load("@bazel_tools//tools/build_defs/repo:git.bzl", "new_git_repository")

go_repository(
name = "google_ml_metadata",
commit = "becc26ab61f82bfe7c812894f56f597949ce0fdc",
importpath = "github.com/google/ml-metadata",
)

new_git_repository(
name = "libmysqlclient",
build_file = "@google_ml_metadata//ml_metadata:libmysqlclient.BUILD",
remote = "https://github.com/MariaDB/mariadb-connector-c.git",
tag = "v3.0.8-release",
workspace_file = "@google_ml_metadata//ml_metadata:libmysqlclient.WORKSPACE",
)

go_repository(
name = "io_k8s_client_go",
build_file_proto_mode = "disable_global",
Expand Down
31 changes: 17 additions & 14 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
FROM golang:1.11-alpine3.7 as builder
FROM l.gcr.io/google/bazel:0.21.0 as builder

RUN apt-get update && \
apt-get install -y cmake clang musl-dev openssl
WORKDIR /go/src/github.com/kubeflow/pipelines
COPY . .

# Needed musl-dev for github.com/mattn/go-sqlite3
RUN apk update && apk upgrade && \
apk add --no-cache bash git openssh gcc musl-dev
COPY WORKSPACE WORKSPACE
COPY backend/src backend/src
COPY backend/api backend/api

RUN GO111MODULE=on go build -o /bin/apiserver backend/src/apiserver/*.go
RUN bazel build -c opt --action_env=PATH --define=grpc_no_ares=true backend/src/apiserver:apiserver


# Compile
FROM python:3.5 as compiler

RUN apt-get update -y && \
apt-get install --no-install-recommends -y -q default-jdk wget

apt-get install --no-install-recommends -y -q default-jdk wget
RUN pip3 install setuptools==40.5.0

RUN wget http://central.maven.org/maven2/io/swagger/swagger-codegen-cli/2.4.1/swagger-codegen-cli-2.4.1.jar -O /tmp/swagger-codegen-cli.jar

# WORKDIR /go/src/github.com/kubeflow/pipelines
WORKDIR /go/src/github.com/kubeflow/pipelines
COPY . .
COPY backend/api backend/api
COPY sdk sdk
WORKDIR /go/src/github.com/kubeflow/pipelines/sdk/python
RUN ./build.sh /kfp.tar.gz
RUN pip3 install /kfp.tar.gz
Expand All @@ -36,22 +40,21 @@ COPY ./samples .
#The "for" loop breaks on all whitespace, so we either need to override IFS or use the "read" command instead.
RUN find . -maxdepth 2 -name '*.py' -type f | while read pipeline; do dsl-compile --py "$pipeline" --output "$pipeline.tar.gz"; done


FROM alpine:3.8
FROM debian:stretch

ARG COMMIT_SHA=unknown
ENV COMMIT_SHA=${COMMIT_SHA}

WORKDIR /bin

COPY --from=builder /bin/apiserver /bin/apiserver
COPY --from=builder /go/src/github.com/kubeflow/pipelines/third_party/license.txt /bin/license.txt
COPY third_party/license.txt /bin/license.txt
COPY --from=builder /go/src/github.com/kubeflow/pipelines/bazel-bin/backend/src/apiserver/linux_amd64_stripped/apiserver /bin/apiserver
COPY backend/src/apiserver/config/ /config

COPY --from=compiler /samples/ /samples/

# Adding CA certificate so API server can download pipeline through URL
RUN apk add ca-certificates
RUN apt-get update && apt-get install -y ca-certificates

# Expose apiserver port
EXPOSE 8888
Expand Down
5 changes: 5 additions & 0 deletions backend/src/apiserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
deps = [
"//backend/api:go_default_library",
"//backend/src/apiserver/client:go_default_library",
"//backend/src/apiserver/metadata:go_default_library",
"//backend/src/apiserver/model:go_default_library",
"//backend/src/apiserver/resource:go_default_library",
"//backend/src/apiserver/server:go_default_library",
Expand All @@ -23,12 +24,16 @@ go_library(
"@com_github_cenkalti_backoff//:go_default_library",
"@com_github_fsnotify_fsnotify//:go_default_library",
"@com_github_golang_glog//:go_default_library",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_grpc_ecosystem_grpc_gateway//runtime:go_default_library",
"@com_github_jinzhu_gorm//:go_default_library",
"@com_github_jinzhu_gorm//dialects/sqlite:go_default_library",
"@com_github_minio_minio_go//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_spf13_viper//:go_default_library",
"@google_ml_metadata//ml_metadata/metadata_store:metadata_store_go", # keep
"@google_ml_metadata//ml_metadata/proto:metadata_store_go_proto", # keep
"@google_ml_metadata//ml_metadata/proto:metadata_store_service_go_proto", # keep
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//reflection:go_default_library",
],
Expand Down
38 changes: 37 additions & 1 deletion backend/src/apiserver/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,25 @@ package main
import (
"database/sql"
"fmt"
"strconv"
"time"

workflowclient "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/cenkalti/backoff"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/sqlite"
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
"github.com/kubeflow/pipelines/backend/src/apiserver/metadata"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
minio "github.com/minio/minio-go"

"ml_metadata/metadata_store/mlmetadata"
mlpb "ml_metadata/proto/metadata_store_go_proto"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does mlpb stands for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just an alias for the proto package I'm referring to here. Standard Go google3 practice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it stands for ml protobuf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, mlmd protobuf. I'm trying to be consistent with how it's used in mlmd Go client itself.

)

const (
Expand All @@ -56,6 +62,8 @@ type ClientManager struct {
swfClient scheduledworkflowclient.ScheduledWorkflowInterface
time util.TimeInterface
uuid util.UUIDGeneratorInterface

MetadataStore *mlmetadata.Store
}

func (c *ClientManager) ExperimentStore() storage.ExperimentStoreInterface {
Expand Down Expand Up @@ -117,7 +125,6 @@ func (c *ClientManager) init() {
c.experimentStore = storage.NewExperimentStore(db, c.time, c.uuid)
c.pipelineStore = storage.NewPipelineStore(db, c.time, c.uuid)
c.jobStore = storage.NewJobStore(db, c.time)
c.runStore = storage.NewRunStore(db, c.time)
c.resourceReferenceStore = storage.NewResourceReferenceStore(db)
c.dBStatusStore = storage.NewDBStatusStore(db)
c.objectStore = initMinioClient(getDurationConfig(initConnectionTimeout))
Expand All @@ -127,13 +134,42 @@ func (c *ClientManager) init() {

c.swfClient = client.CreateScheduledWorkflowClientOrFatal(
getStringConfig(podNamespace), getDurationConfig(initConnectionTimeout))

metadataStore := initMetadataStore()
runStore := storage.NewRunStore(db, c.time, metadataStore)
c.runStore = runStore

glog.Infof("Client manager initialized successfully")
}

func (c *ClientManager) Close() {
c.db.Close()
}

func initMetadataStore() *metadata.Store {
port, err := strconv.Atoi(getStringConfig(mysqlServicePort))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to follow the pattern above and encapsulate the metadata store setup in a method? (storage.NewMetadataStore())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, move it to it's own method. It's not going to be in storage, as it's not part of that package. It's in it's own package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, moved it to it's own function below.

if err != nil {
glog.Fatalf("Failed to parse valid MySQL service port from %q: %v", getStringConfig(mysqlServicePort), err)
}

cfg := &mlpb.ConnectionConfig{
Config: &mlpb.ConnectionConfig_Mysql{
&mlpb.MySQLDatabaseConfig{
Host: proto.String(getStringConfig(mysqlServiceHost)),
Port: proto.Uint32(uint32(port)),
Database: proto.String("mlmetadata"),
User: proto.String("root"),
},
},
}

mlmdStore, err := mlmetadata.NewStore(cfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there support for a "fake" implementation using SQLLite so that we can run a lot of backend tests as go tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. mlmetadata supports sqlite, so it should be possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add support for SQLLite right away so that we can run tests quickly locally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, sorry, I can't do it right now.
If it's any consolation, I plan to move all of this out to its own service soon, so API server can stay the same as before.

if err != nil {
glog.Fatalf("Failed to create ML Metadata store: %v", err)
}
return metadata.NewStore(mlmdStore)
}

func initDBClient(initConnectionTimeout time.Duration) *storage.DB {
driverName := getStringConfig("DBConfig.DriverName")
var arg string
Expand Down
5 changes: 4 additions & 1 deletion backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

_ "ml_metadata/metadata_store/mlmetadata"
_ "ml_metadata/proto/metadata_store_go_proto"
_ "ml_metadata/proto/metadata_store_service_go_proto"
)

var (
Expand All @@ -48,7 +52,6 @@ type RegisterHttpHandlerFromEndpoint func(ctx context.Context, mux *runtime.Serv

func main() {
flag.Parse()
glog.Infof("starting API server")

initConfig()
clientManager := newClientManager()
Expand Down
27 changes: 27 additions & 0 deletions backend/src/apiserver/metadata/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["metadata_store.go"],
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/metadata",
visibility = ["//visibility:public"],
deps = [
"//backend/src/common/util:go_default_library",
"@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library",
"@com_github_golang_protobuf//jsonpb:go_default_library_gen",
"@com_github_golang_protobuf//proto:go_default_library",
"@google_ml_metadata//ml_metadata/metadata_store:metadata_store_go",
"@google_ml_metadata//ml_metadata/proto:metadata_store_go_proto",
],
)

go_test(
name = "go_default_test",
srcs = ["metadata_store_test.go"],
embed = [":go_default_library"],
deps = [
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_google_go_cmp//cmp:go_default_library",
"@google_ml_metadata//ml_metadata/proto:metadata_store_go_proto",
],
)
Loading