Skip to content

Commit

Permalink
Compile samples instead of hard code them in API server (#76)
Browse files Browse the repository at this point in the history
* compile samples

* update logging

* update description

* update sample

* add immediate value sample

* revert

* fail fast if the samples are failed to load

* comment

* address comments

* comment out

* update command

* comments
  • Loading branch information
IronPan authored and k8s-ci-robot committed Nov 6, 2018
1 parent c7bb1ff commit 7847b74
Show file tree
Hide file tree
Showing 15 changed files with 84 additions and 2,377 deletions.
14 changes: 12 additions & 2 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ COPY . .
RUN apk add --update gcc musl-dev
RUN go build -o /bin/apiserver backend/src/apiserver/*.go

FROM python:3.5.0-slim as compiler

# This is hard coded to 0.0.26. Once kfp DSK release process is automated,
# we can dynamically refer to the version from same commit SHA.
RUN pip install https://storage.googleapis.com/ml-pipeline/release/0.0.26/kfp-0.0.26.tar.gz --upgrade
WORKDIR /samples
COPY ./samples .
RUN find . -maxdepth 2 -name "*.py" -exec dsl-compile --py {} --output {}.tar.gz \;

FROM alpine

ARG COMMIT_SHA=unknown
Expand All @@ -24,7 +33,8 @@ WORKDIR /bin
COPY --from=builder /bin/apiserver /bin/apiserver
COPY --from=builder /go/src/github.com/kubeflow/pipelines/third_party/license.txt /bin/license.txt
COPY backend/src/apiserver/config/ /config
COPY backend/src/apiserver/samples/ /samples

COPY --from=compiler /samples/ /samples/

# Adding CA certificate so API server can download pipeline through URL
RUN apk add ca-certificates
Expand All @@ -33,4 +43,4 @@ RUN apk add ca-certificates
EXPOSE 8888

# Start the apiserver
CMD apiserver --config=/config --sampleconfig=/samples/sample_config.json
CMD apiserver --config=/config --sampleconfig=/config/sample_config.json
37 changes: 37 additions & 0 deletions backend/src/apiserver/config/sample_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[
{
"name":"[Sample] ML - XGBoost - Training with Confusion Matrix",
"description":"A trainer that does end-to-end distributed training for XGBoost models. For source code, refer to https://github.com/kubeflow/pipelines/tree/master/samples/xgboost-spark",
"file":"/samples/xgboost-spark/xgboost-training-cm.py.tar.gz"
},
{
"name":"[Sample] ML - TFX - Taxi Tip Prediction Model Trainer",
"description":"Example pipeline that does classification with model analysis based on a public tax cab BigQuery dataset. For source code, refer to https://github.com/kubeflow/pipelines/tree/master/samples/tfx",
"file":"/samples/tfx/taxi-cab-classification-pipeline.py.tar.gz"
},
{
"name":"[Sample] Basic - Sequential",
"description":"A pipeline with two sequential steps. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/sequential.py",
"file":"/samples/basic/sequential.py.tar.gz"
},
{
"name":"[Sample] Basic - Parallel Join",
"description":"A pipeline that downloads two messages in parallel and print the concatenated result. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/parallel_join.py",
"file":"/samples/basic/parallel_join.py.tar.gz"
},
{
"name":"[Sample] Basic - Immediate Value",
"description":"A pipeline with parameter values hard coded. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/immediate_value.py",
"file":"/samples/basic/immediate_value.py.tar.gz"
},
{
"name":"[Sample] Basic - Exit Handler",
"description":"A pipeline that downloads a message and print it out. Exit Handler will run at the end. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/exit_handler.py",
"file":"/samples/basic/exit_handler.py.tar.gz"
},
{
"name":"[Sample] Basic - Condition",
"description":"A pipeline shows how to use dsl.Condition. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/condition.py",
"file":"/samples/basic/condition.py.tar.gz"
}
]
39 changes: 29 additions & 10 deletions backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io/ioutil"
"net"
"net/http"
"time"

"github.com/golang/glog"
api "github.com/kubeflow/pipelines/backend/api/go_client"
Expand All @@ -30,6 +31,9 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"os"
"github.com/pkg/errors"
"fmt"
)

var (
Expand All @@ -48,7 +52,10 @@ func main() {
initConfig()
clientManager := newClientManager()
resourceManager := resource.NewResourceManager(&clientManager)
loadSamples(resourceManager)
err:= loadSamples(resourceManager)
if err!=nil{
glog.Fatalf("Failed to load samples. Err: %v", err.Error())
}
go startRpcServer(resourceManager)
startHttpProxy(resourceManager)

Expand Down Expand Up @@ -119,29 +126,41 @@ func registerHttpHandlerFromEndpoint(handler RegisterHttpHandlerFromEndpoint, se
}

// Preload a bunch of pipeline samples
func loadSamples(resourceManager *resource.ResourceManager) {
func loadSamples(resourceManager *resource.ResourceManager) error {
configBytes, err := ioutil.ReadFile(*sampleConfigPath)
if err != nil {
glog.Warningf("Failed to read sample configurations. Err: %v", err.Error())
return
return errors.New(fmt.Sprintf("Failed to read sample configurations file. Err: %v", err.Error()))
}
type config struct {
Name string
Description string
File string
}
var configs []config
if json.Unmarshal(configBytes, &configs) != nil {
glog.Warningf("Failed to read sample configurations. Err: %v", err.Error())
return
if err:= json.Unmarshal(configBytes, &configs);err != nil {
return errors.New(fmt.Sprintf("Failed to read sample configurations. Err: %v", err.Error()))
}
for _, config := range configs {
sampleBytes, err := ioutil.ReadFile(config.File)
reader, err:= os.Open(config.File)
if err != nil {
glog.Warningf("Failed to load sample %s. Error: %v", config.Name, err.Error())
return errors.New(fmt.Sprintf("Failed to load sample %s. Error: %v", config.Name, err.Error()))
}
pipelineFile, err := server.ReadPipelineFile(config.File, reader, server.MaxFileLength)
if err!=nil{
return errors.New(fmt.Sprintf("Failed to decompress the file %s. Error: %v", config.Name, err.Error()))
}
_, err = resourceManager.CreatePipeline(config.Name, config.Description, pipelineFile)
if err!=nil{
// Log the error but not fail. The API Server pod can restart and it could potentially cause name collision.
// In the future, we might consider loading samples during deployment, instead of when API server starts.
glog.Warningf(fmt.Sprintf("Failed to create pipeline for %s. Error: %v", config.Name, err.Error()))
continue
}
resourceManager.CreatePipeline(config.Name, config.Description, sampleBytes)

// Since the default sorting is by create time,
// Sleep one second makes sure the samples are showing up in the same order as they are added.
time.Sleep(1*time.Second)
}
glog.Info("All samples are loaded.")
return nil
}
Loading

0 comments on commit 7847b74

Please sign in to comment.