Skip to content

Commit

Permalink
[Backend]Cache - Cache logic with db interaction (kubeflow#3266)
Browse files Browse the repository at this point in the history
* Initial execution cache

This commit adds initial execution cache service. Including http service
and execution key generation.

* Add initial server logic

* Add const

* Change folder name

* Change execution key name

* Fix unit test

* Add Dockerfile and OWNERS file

This commit adds Dockerfile for building source code and OWNERS file for
easy review. This commit also renames some functions.

* fix go.sum

This PR fixes changes on go.sum

* Add local deployment scripts

This commit adds local deployment scripts which can deploy cache service
to an existing cluster with KFP installed.

* refactor src code

* Add standalone deployment scripts and yamls

This commit adds execution cache deployment scripts and yaml files in
KFP standalone deployment. Including a deployer which will generate the
certification and mutatingwebhookconfiguration and execution cache
deployment.

* Minor fix

* Add execution cache image build in test folder

* fix test cloudbuild

* Fix cloudbuild

* Add execution cache deployer image to test folder

* Add copyright

* Fix deployer build

* Add license for execution cache and cloudbuild for execution cache
images

This commit adds licenses for execution cache source code. Also adds
cloud build step for building cache image and cache deployer image.
Change the manifest name based on changed image.

* Refactor license intermediate data

* Fix execution cache image manifest

* Typo fix for cache and cache deployer images

* Add arguments in ca generation scripts and change deployer base image to google/cloud

* minor fix

* fix arg

* Mirror source code with MPL in execution_cache image

* Minor fix

* minor refactor on error handling

* Refactor cache source code, Docker image and manifest

* Fix variable names

* Add images in .release.cloudbuild.yaml

* Change execution_cache to generic name

* revice readme

* Move deployer job out of upgrade script

* fix tests

* fix tests

* Seperate cache service and cache deployer job

* mysql set up

* wip

* WIP

* WIP

* work mysql connection

* initial cache logic

* watcher

* WIP pod watching with mysql

* worked crud

* Add sql unit test

* fix manifest

* Add copyright

* Add watcher check and update cache key generation logic

* test replace container images

* work cache service

* Add configmap for cache service

* refactor

* fix manifest

* Add unit tests

* Remove delete table

* Fix sql dialect

* Add cached step log

* Add metadata execution id

* minor fix

* revert go.mod and go.sum

* revert go.sum and go.mod

* revert go.sum and go.mod

* revert go.mod and go.sum
  • Loading branch information
rui5i authored and Jeffwan committed Dec 9, 2020
1 parent 2f2f8b6 commit 39366c6
Show file tree
Hide file tree
Showing 26 changed files with 1,549 additions and 169 deletions.
58 changes: 58 additions & 0 deletions backend/src/cache/client/kubernetes_core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package client

import (
"time"

"github.com/cenkalti/backoff"
"github.com/golang/glog"
"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
)

type KubernetesCoreInterface interface {
PodClient(namespace string) v1.PodInterface
}

type KubernetesCore struct {
coreV1Client v1.CoreV1Interface
}

func (c *KubernetesCore) PodClient(namespace string) v1.PodInterface {
return c.coreV1Client.Pods(namespace)
}

func createKubernetesCore() (KubernetesCoreInterface, error) {
restConfig, err := rest.InClusterConfig()
if err != nil {
return nil, errors.Wrap(err, "Failed to initialize kubernetes client.")
}

clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, errors.Wrap(err, "Failed to initialize kubernetes client set.")
}
return &KubernetesCore{clientSet.CoreV1()}, nil
}

// CreateKubernetesCoreOrFatal creates a new client for the Kubernetes pod.
func CreateKubernetesCoreOrFatal(initConnectionTimeout time.Duration) KubernetesCoreInterface {
var client KubernetesCoreInterface
var err error
var operation = func() error {
client, err = createKubernetesCore()
if err != nil {
return err
}
return nil
}
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = initConnectionTimeout
err = backoff.Retry(operation, b)

if err != nil {
glog.Fatalf("Failed to create pod client. Error: %v", err)
}
return client
}
47 changes: 47 additions & 0 deletions backend/src/cache/client/kubernetes_core_fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"github.com/kubeflow/pipelines/backend/src/common/util"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

type FakeKuberneteCoreClient struct {
podClientFake *FakePodClient
}

func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface {
if len(namespace) == 0 {
panic(util.NewResourceNotFoundError("Namespace", namespace))
}
return c.podClientFake
}

func NewFakeKuberneteCoresClient() *FakeKuberneteCoreClient {
return &FakeKuberneteCoreClient{&FakePodClient{}}
}

type FakeKubernetesCoreClientWithBadPodClient struct {
podClientFake *FakeBadPodClient
}

func NewFakeKubernetesCoreClientWithBadPodClient() *FakeKubernetesCoreClientWithBadPodClient {
return &FakeKubernetesCoreClientWithBadPodClient{&FakeBadPodClient{}}
}

func (c *FakeKubernetesCoreClientWithBadPodClient) PodClient(namespace string) v1.PodInterface {
return c.podClientFake
}
105 changes: 105 additions & 0 deletions backend/src/cache/client/pod_fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"errors"

"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
)

type FakePodClient struct {
watchIsCalled bool
patchIsCalled bool
}

func (FakePodClient) Create(*corev1.Pod) (*corev1.Pod, error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (FakePodClient) Update(*corev1.Pod) (*corev1.Pod, error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (FakePodClient) UpdateStatus(*corev1.Pod) (*corev1.Pod, error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (FakePodClient) Delete(name string, options *v1.DeleteOptions) error {
return nil
}

func (FakePodClient) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error {
glog.Error("This fake method is not yet implemented.")
return nil
}

func (FakePodClient) Get(name string, options v1.GetOptions) (*corev1.Pod, error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (FakePodClient) List(opts v1.ListOptions) (*corev1.PodList, error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (f FakePodClient) Watch(opts v1.ListOptions) (watch.Interface, error) {
f.watchIsCalled = true
event := watch.Event{
Type: watch.Added,
Object: &corev1.Pod{},
}
ch := make(chan watch.Event, 1)
ch <- event
return nil, nil
}

func (f FakePodClient) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *corev1.Pod, err error) {
f.patchIsCalled = true
return nil, nil
}

func (FakePodClient) Bind(binding *corev1.Binding) error {
glog.Error("This fake method is not yet implemented.")
return nil
}

func (FakePodClient) Evict(eviction *v1beta1.Eviction) error {
glog.Error("This fake method is not yet implemented.")
return nil
}

func (FakePodClient) GetLogs(name string, opts *corev1.PodLogOptions) *rest.Request {
glog.Error("This fake method is not yet implemented.")
return nil
}

type FakeBadPodClient struct {
FakePodClient
}

func (FakeBadPodClient) Delete(name string, options *v1.DeleteOptions) error {
return errors.New("failed to delete pod")
}
46 changes: 46 additions & 0 deletions backend/src/cache/client/sql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"fmt"

"github.com/go-sql-driver/mysql"
)

func CreateMySQLConfig(user, password string, mysqlServiceHost string,
mysqlServicePort string, dbName string, mysqlGroupConcatMaxLen string, mysqlExtraParams map[string]string) *mysql.Config {

params := map[string]string{
"charset": "utf8",
"parseTime": "True",
"loc": "Local",
"group_concat_max_len": mysqlGroupConcatMaxLen,
}

for k, v := range mysqlExtraParams {
params[k] = v
}

return &mysql.Config{
User: user,
Passwd: password,
Net: "tcp",
Addr: fmt.Sprintf("%s:%s", mysqlServiceHost, mysqlServicePort),
Params: params,
DBName: dbName,
AllowNativePasswords: true,
}
}
81 changes: 81 additions & 0 deletions backend/src/cache/client/sql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"reflect"
"testing"

"github.com/go-sql-driver/mysql"
)

func TestCreateMySQLConfig(t *testing.T) {
type args struct {
user string
password string
host string
port string
dbName string
mysqlGroupConcatMaxLen string
mysqlExtraParams map[string]string
}
tests := []struct {
name string
args args
want *mysql.Config
}{
{
name: "default config",
args: args{
user: "root",
host: "mysql",
port: "3306",
mysqlGroupConcatMaxLen: "1024",
mysqlExtraParams: nil,
},
want: &mysql.Config{
User: "root",
Net: "tcp",
Addr: "mysql:3306",
Params: map[string]string{"charset": "utf8", "parseTime": "True", "loc": "Local", "group_concat_max_len": "1024"},
AllowNativePasswords: true,
},
},
{
name: "extra parameters",
args: args{
user: "root",
host: "mysql",
port: "3306",
mysqlGroupConcatMaxLen: "1024",
mysqlExtraParams: map[string]string{"tls": "true"},
},
want: &mysql.Config{
User: "root",
Net: "tcp",
Addr: "mysql:3306",
Params: map[string]string{"charset": "utf8", "parseTime": "True", "loc": "Local", "group_concat_max_len": "1024", "tls": "true"},
AllowNativePasswords: true,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := CreateMySQLConfig(tt.args.user, tt.args.password, tt.args.host, tt.args.port, tt.args.dbName, tt.args.mysqlGroupConcatMaxLen, tt.args.mysqlExtraParams); !reflect.DeepEqual(got, tt.want) {
t.Errorf("CreateMySQLConfig() = %#v, want %v", got, tt.want)
}
})
}
}
Loading

0 comments on commit 39366c6

Please sign in to comment.