Skip to content

Commit

Permalink
Merge with master
Browse files Browse the repository at this point in the history
  • Loading branch information
neuromage committed Jan 4, 2019
2 parents a89577b + d4d4f62 commit af08744
Show file tree
Hide file tree
Showing 25 changed files with 1,006 additions and 606 deletions.
2 changes: 1 addition & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ RUN apk add ca-certificates
EXPOSE 8888

# Start the apiserver
CMD apiserver --config=/config
CMD apiserver --config=/config --sampleconfig=/config/sample_config.json
13 changes: 10 additions & 3 deletions backend/src/apiserver/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
workflowclient "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/cenkalti/backoff"
"github.com/golang/glog"
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/sqlite"
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1alpha1"
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/sqlite"
minio "github.com/minio/minio-go"
)

Expand All @@ -50,6 +50,7 @@ type ClientManager struct {
jobStore storage.JobStoreInterface
runStore storage.RunStoreInterface
resourceReferenceStore storage.ResourceReferenceStoreInterface
dBStatusStore storage.DBStatusStoreInterface
objectStore storage.ObjectStoreInterface
wfClient workflowclient.WorkflowInterface
swfClient scheduledworkflowclient.ScheduledWorkflowInterface
Expand Down Expand Up @@ -77,6 +78,10 @@ func (c *ClientManager) ResourceReferenceStore() storage.ResourceReferenceStoreI
return c.resourceReferenceStore
}

func (c *ClientManager) DBStatusStore() storage.DBStatusStoreInterface {
return c.dBStatusStore
}

func (c *ClientManager) ObjectStore() storage.ObjectStoreInterface {
return c.objectStore
}
Expand Down Expand Up @@ -114,6 +119,7 @@ func (c *ClientManager) init() {
c.jobStore = storage.NewJobStore(db, c.time)
c.runStore = storage.NewRunStore(db, c.time)
c.resourceReferenceStore = storage.NewResourceReferenceStore(db)
c.dBStatusStore = storage.NewDBStatusStore(db)
c.objectStore = initMinioClient(getDurationConfig(initConnectionTimeout))

c.wfClient = client.CreateWorkflowClientOrFatal(
Expand Down Expand Up @@ -151,7 +157,8 @@ func initDBClient(initConnectionTimeout time.Duration) *storage.DB {
&model.Pipeline{},
&model.ResourceReference{},
&model.RunDetail{},
&model.RunMetric{})
&model.RunMetric{},
&model.DBStatus{})

if response.Error != nil {
glog.Fatalf("Failed to initialize the databases.")
Expand Down
29 changes: 20 additions & 9 deletions backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,9 @@ func main() {
initConfig()
clientManager := newClientManager()
resourceManager := resource.NewResourceManager(&clientManager)

// If sample config path is not empty, this is a one time job to load samples to the system
// Load samples and terminate.
if *sampleConfigPath != "" {
err := loadSamples(resourceManager)
if err != nil {
glog.Fatalf("Failed to load samples. Err: %v", err.Error())
}
return
err := loadSamples(resourceManager)
if err != nil {
glog.Fatalf("Failed to load samples. Err: %v", err.Error())
}

go startRpcServer(resourceManager)
Expand Down Expand Up @@ -134,7 +128,19 @@ func registerHttpHandlerFromEndpoint(handler RegisterHttpHandlerFromEndpoint, se
}

// Preload a bunch of pipeline samples
// Samples are only loaded once when the pipeline system is initially installed.
// They won't be loaded when upgrade or pod restart, to prevent them reappear if user explicitly
// delete the samples.
func loadSamples(resourceManager *resource.ResourceManager) error {
// Check if sample has being loaded already and skip loading if true.
haveSamplesLoaded, err := resourceManager.HaveSamplesLoaded()
if err != nil {
return err
}
if haveSamplesLoaded {
glog.Infof("Samples already loaded in the past. Skip loading.")
return nil
}
configBytes, err := ioutil.ReadFile(*sampleConfigPath)
if err != nil {
return errors.New(fmt.Sprintf("Failed to read sample configurations file. Err: %v", err.Error()))
Expand Down Expand Up @@ -169,6 +175,11 @@ func loadSamples(resourceManager *resource.ResourceManager) error {
// Sleep one second makes sure the samples are showing up in the same order as they are added.
time.Sleep(1 * time.Second)
}
// Mark sample as loaded
err = resourceManager.MarkSampleLoaded()
if err != nil {
return err
}
glog.Info("All samples are loaded.")
return nil
}
19 changes: 19 additions & 0 deletions backend/src/apiserver/model/db_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package model

type DBStatus struct {
HaveSamplesLoaded bool `gorm:"column:HaveSamplesLoaded; not null"`
}
6 changes: 6 additions & 0 deletions backend/src/apiserver/resource/client_manager_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type FakeClientManager struct {
jobStore storage.JobStoreInterface
runStore storage.RunStoreInterface
resourceReferenceStore storage.ResourceReferenceStoreInterface
dBStatusStore storage.DBStatusStoreInterface
objectStore storage.ObjectStoreInterface
workflowClientFake *FakeWorkflowClient
scheduledWorkflowClientFake *FakeScheduledWorkflowClient
Expand Down Expand Up @@ -65,6 +66,7 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf
runStore: storage.NewRunStore(db, time),
workflowClientFake: NewWorkflowClientFake(),
resourceReferenceStore: storage.NewResourceReferenceStore(db),
dBStatusStore: storage.NewDBStatusStore(db),
objectStore: storage.NewFakeObjectStore(),
scheduledWorkflowClientFake: NewScheduledWorkflowClientFake(),
time: time,
Expand Down Expand Up @@ -121,6 +123,10 @@ func (f *FakeClientManager) ResourceReferenceStore() storage.ResourceReferenceSt
return f.resourceReferenceStore
}

func (f *FakeClientManager) DBStatusStore() storage.DBStatusStoreInterface {
return f.dBStatusStore
}

func (f *FakeClientManager) ScheduledWorkflow() scheduledworkflowclient.ScheduledWorkflowInterface {
return f.scheduledWorkflowClientFake
}
Expand Down
11 changes: 11 additions & 0 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ClientManagerInterface interface {
JobStore() storage.JobStoreInterface
RunStore() storage.RunStoreInterface
ResourceReferenceStore() storage.ResourceReferenceStoreInterface
DBStatusStore() storage.DBStatusStoreInterface
ObjectStore() storage.ObjectStoreInterface
Workflow() workflowclient.WorkflowInterface
ScheduledWorkflow() scheduledworkflowclient.ScheduledWorkflowInterface
Expand All @@ -54,6 +55,7 @@ type ResourceManager struct {
jobStore storage.JobStoreInterface
runStore storage.RunStoreInterface
resourceReferenceStore storage.ResourceReferenceStoreInterface
dBStatusStore storage.DBStatusStoreInterface
objectStore storage.ObjectStoreInterface
workflowClient workflowclient.WorkflowInterface
scheduledWorkflowClient scheduledworkflowclient.ScheduledWorkflowInterface
Expand All @@ -68,6 +70,7 @@ func NewResourceManager(clientManager ClientManagerInterface) *ResourceManager {
jobStore: clientManager.JobStore(),
runStore: clientManager.RunStore(),
resourceReferenceStore: clientManager.ResourceReferenceStore(),
dBStatusStore: clientManager.DBStatusStore(),
objectStore: clientManager.ObjectStore(),
workflowClient: clientManager.Workflow(),
scheduledWorkflowClient: clientManager.ScheduledWorkflow(),
Expand Down Expand Up @@ -479,3 +482,11 @@ func (r *ResourceManager) ReadArtifact(runID string, nodeID string, artifactName
}
return r.objectStore.GetFile(artifactPath)
}

func (r *ResourceManager) HaveSamplesLoaded() (bool, error) {
return r.dBStatusStore.HaveSamplesLoaded()
}

func (r *ResourceManager) MarkSampleLoaded() error {
return r.dBStatusStore.MarkSampleLoaded()
}
7 changes: 4 additions & 3 deletions backend/src/apiserver/server/list_request_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"encoding/base64"
"encoding/json"
"net/url"
"strconv"
"strings"

Expand Down Expand Up @@ -159,7 +160,7 @@ func deserializePageToken(pageToken string) (*common.Token, error) {
return &token, nil
}

// parseAPIFilter attempts to decode a base-64 encoded JSON-stringified api
// parseAPIFilter attempts to decode a url-encoded JSON-stringified api
// filter object. An empty string is considered valid input, and equivalent to
// the nil filter, which trivially does nothing.
func parseAPIFilter(encoded string) (*api.Filter, error) {
Expand All @@ -171,13 +172,13 @@ func parseAPIFilter(encoded string) (*api.Filter, error) {
return nil, util.NewInvalidInputError("failed to parse valid filter from %q: %v", encoded, err)
}

b, err := base64.StdEncoding.DecodeString(encoded)
decoded, err := url.QueryUnescape(encoded)
if err != nil {
return errorF(err)
}

f := &api.Filter{}
if err := jsonpb.UnmarshalString(string(b), f); err != nil {
if err := jsonpb.UnmarshalString(string(decoded), f); err != nil {
return errorF(err)
}
return f, nil
Expand Down
8 changes: 5 additions & 3 deletions backend/src/apiserver/server/list_request_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"

api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/common/util"
Expand Down Expand Up @@ -192,10 +193,11 @@ func TestParseAPIFilter_InvalidStringYieldsError(t *testing.T) {
}

func TestParseAPIFilter_DecodesEncodedString(t *testing.T) {
// in was generated by calling JSON.stringify followed by btoa in the browser
// on the following JSON string:
// in was generated by calling JSON.stringify followed by encodeURIComponent in
// the browser on the following JSON string:
// {"predicates":[{"op":"EQUALS","key":"testkey","stringValue":"testvalue"}]}
in := "eyJwcmVkaWNhdGVzIjpbeyJvcCI6IkVRVUFMUyIsImtleSI6InRlc3RrZXkiLCJzdHJpbmdWYWx1ZSI6InRlc3R2YWx1ZSJ9XX0="

in := "%7B%22predicates%22%3A%5B%7B%22op%22%3A%22EQUALS%22%2C%22key%22%3A%22testkey%22%2C%22stringValue%22%3A%22testvalue%22%7D%5D%7D"

// The above should correspond the following filter:
want := &api.Filter{
Expand Down
5 changes: 3 additions & 2 deletions backend/src/apiserver/storage/db_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"fmt"

"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/jinzhu/gorm"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
_ "github.com/mattn/go-sqlite3"
)

Expand All @@ -36,7 +36,8 @@ func NewFakeDb() (*DB, error) {
&model.Pipeline{},
&model.ResourceReference{},
&model.RunDetail{},
&model.RunMetric{})
&model.RunMetric{},
&model.DBStatus{})

return NewDB(db.DB(), NewSQLiteDialect()), nil
}
Expand Down
119 changes: 119 additions & 0 deletions backend/src/apiserver/storage/db_status_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
sq "github.com/Masterminds/squirrel"
"github.com/kubeflow/pipelines/backend/src/common/util"
)

var (
defaultDBStatus = sq.Eq{"HaveSamplesLoaded": false}
)

type DBStatusStoreInterface interface {
HaveSamplesLoaded() (bool, error)
MarkSampleLoaded() error
}

// Implementation of a DBStatusStoreInterface. This store read/write state of the database.
// For now we store status like whether sample is loaded.
type DBStatusStore struct {
db *DB
}

func (s *DBStatusStore) InitializeDBStatusTable() error {
getDBStatusSql, getDBStatusArgs, err := sq.Select("*").From("db_statuses").ToSql()
if err != nil {
return util.NewInternalServerError(err, "Error creating query to get database status.")
}
tx, err := s.db.Begin()
if err != nil {
return util.NewInternalServerError(err, "Failed to create a new transaction to initialize database status.")
}

rows, err := tx.Query(getDBStatusSql, getDBStatusArgs...)
if err != nil {
tx.Rollback()
return util.NewInternalServerError(err, "Failed to load database status.")
}

// The table is not initialized
if !rows.Next() {
sql, args, err := sq.
Insert("db_statuses").
SetMap(defaultDBStatus).
ToSql()

if err != nil {
tx.Rollback()
return util.NewInternalServerError(err, "Error creating query to initialize database status table.")
}
_, err = tx.Exec(sql, args...)
if err != nil {
tx.Rollback()
return util.NewInternalServerError(err, "Error initializing the database status table.")
}
}
err = tx.Commit()
if err != nil {
tx.Rollback()
return util.NewInternalServerError(err, "Failed to initializing the database status table.")
}
return nil
}

func (s *DBStatusStore) HaveSamplesLoaded() (bool, error) {
var haveSamplesLoaded bool
sql, args, err := sq.Select("*").From("db_statuses").ToSql()
if err != nil {
return false, util.NewInternalServerError(err, "Error creating query to get load sample status.")
}
rows, err := s.db.Query(sql, args...)
if err != nil {
return false, util.NewInternalServerError(err, "Error when getting load sample status")
}
if rows.Next() {
err = rows.Scan(&haveSamplesLoaded)
if err != nil {
return false, util.NewInternalServerError(err, "Error when scanning row to load sample status")
}
return haveSamplesLoaded, nil
}
return false, nil
}

func (s *DBStatusStore) MarkSampleLoaded() error {
sql, args, err := sq.
Update("db_statuses").
SetMap(sq.Eq{"HaveSamplesLoaded": true}).
ToSql()
if err != nil {
return util.NewInternalServerError(err, "Error creating query to mark samples as loaded.")
}
_, err = s.db.Exec(sql, args...)
if err != nil {
return util.NewInternalServerError(err, "Error mark samples as loaded.")
}
return nil
}

// factory function for database status store
func NewDBStatusStore(db *DB) *DBStatusStore {
s := &DBStatusStore{db: db}
// Initialize database status table
s.InitializeDBStatusTable()
return s
}
Loading

0 comments on commit af08744

Please sign in to comment.