Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compile samples instead of hard code them in API server #76

Merged
merged 15 commits into from
Nov 6, 2018
14 changes: 12 additions & 2 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ COPY . .
RUN apk add --update gcc musl-dev
RUN go build -o /bin/apiserver backend/src/apiserver/*.go

FROM python:3.5.0-slim as compiler

# This is hard coded to 0.0.26. Once kfp DSK release process is automated,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already in (#70). Give it a try?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test need to update to adopt that. it's kind of late for the last minute change. I'll do it post beta.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

# we can dynamically refer to the version from same commit SHA.
RUN pip install https://storage.googleapis.com/ml-pipeline/release/0.0.26/kfp-0.0.26.tar.gz --upgrade
WORKDIR /samples
COPY ./samples .
RUN find . -maxdepth 2 -name "*.py" -exec dsl-compile --py {} --output {}.tar.gz \;

FROM alpine

ARG COMMIT_SHA=unknown
Expand All @@ -24,7 +33,8 @@ WORKDIR /bin
COPY --from=builder /bin/apiserver /bin/apiserver
COPY --from=builder /go/src/github.com/kubeflow/pipelines/third_party/license.txt /bin/license.txt
COPY backend/src/apiserver/config/ /config
COPY backend/src/apiserver/samples/ /samples

COPY --from=compiler /samples/ /samples/

# Adding CA certificate so API server can download pipeline through URL
RUN apk add ca-certificates
Expand All @@ -33,4 +43,4 @@ RUN apk add ca-certificates
EXPOSE 8888

# Start the apiserver
CMD apiserver --config=/config --sampleconfig=/samples/sample_config.json
CMD apiserver --config=/config --sampleconfig=/config/sample_config.json
37 changes: 37 additions & 0 deletions backend/src/apiserver/config/sample_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[
{
"name":"[Sample] ML - XGBoost - Training with Confusion Matrix",
"description":"A trainer that does end-to-end distributed training for XGBoost models. For source code, refer to https://github.com/kubeflow/pipelines/tree/master/samples/xgboost-spark",
"file":"/samples/xgboost-spark/xgboost-training-cm.py.tar.gz"
},
{
"name":"[Sample] ML - TFX - Taxi Tip Prediction Model Trainer",
"description":"Example pipeline that does classification with model analysis based on a public tax cab BigQuery dataset. For source code, refer to https://github.com/kubeflow/pipelines/tree/master/samples/tfx",
"file":"/samples/tfx/taxi-cab-classification-pipeline.py.tar.gz"
},
{
"name":"[Sample] Basic - Sequential",
"description":"A pipeline with two sequential steps. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/sequential.py",
"file":"/samples/basic/sequential.py.tar.gz"
},
{
"name":"[Sample] Basic - Parallel Join",
"description":"A pipeline that downloads two messages in parallel and print the concatenated result. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/parallel_join.py",
"file":"/samples/basic/parallel_join.py.tar.gz"
},
{
"name":"[Sample] Basic - Immediate Value",
"description":"A pipeline with parameter values hard coded. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/immediate_value.py",
"file":"/samples/basic/immediate_value.py.tar.gz"
},
{
"name":"[Sample] Basic - Exit Handler",
"description":"A pipeline that downloads a message and print it out. Exit Handler will run at the end. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/exit_handler.py",
"file":"/samples/basic/exit_handler.py.tar.gz"
},
{
"name":"[Sample] Basic - Condition",
"description":"A pipeline shows how to use dsl.Condition. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/condition.py",
"file":"/samples/basic/condition.py.tar.gz"
}
]
39 changes: 29 additions & 10 deletions backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io/ioutil"
"net"
"net/http"
"time"

"github.com/golang/glog"
api "github.com/kubeflow/pipelines/backend/api/go_client"
Expand All @@ -30,6 +31,9 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"os"
"github.com/pkg/errors"
"fmt"
)

var (
Expand All @@ -48,7 +52,10 @@ func main() {
initConfig()
clientManager := newClientManager()
resourceManager := resource.NewResourceManager(&clientManager)
loadSamples(resourceManager)
err:= loadSamples(resourceManager)
if err!=nil{
glog.Fatalf("Failed to load samples. Err: %v", err.Error())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the server restarts?

Does this prevent the server from ever restarting unless the storage is deleted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the samples are baked into the image so it won't be a problem

}
go startRpcServer(resourceManager)
startHttpProxy(resourceManager)

Expand Down Expand Up @@ -119,29 +126,41 @@ func registerHttpHandlerFromEndpoint(handler RegisterHttpHandlerFromEndpoint, se
}

// Preload a bunch of pipeline samples
func loadSamples(resourceManager *resource.ResourceManager) {
func loadSamples(resourceManager *resource.ResourceManager) error {
configBytes, err := ioutil.ReadFile(*sampleConfigPath)
if err != nil {
glog.Warningf("Failed to read sample configurations. Err: %v", err.Error())
return
return errors.New(fmt.Sprintf("Failed to read sample configurations file. Err: %v", err.Error()))
}
type config struct {
Name string
Description string
File string
}
var configs []config
if json.Unmarshal(configBytes, &configs) != nil {
glog.Warningf("Failed to read sample configurations. Err: %v", err.Error())
return
if err:= json.Unmarshal(configBytes, &configs);err != nil {
return errors.New(fmt.Sprintf("Failed to read sample configurations. Err: %v", err.Error()))
}
for _, config := range configs {
sampleBytes, err := ioutil.ReadFile(config.File)
reader, err:= os.Open(config.File)
if err != nil {
glog.Warningf("Failed to load sample %s. Error: %v", config.Name, err.Error())
return errors.New(fmt.Sprintf("Failed to load sample %s. Error: %v", config.Name, err.Error()))
}
pipelineFile, err := server.ReadPipelineFile(config.File, reader, server.MaxFileLength)
if err!=nil{
return errors.New(fmt.Sprintf("Failed to decompress the file %s. Error: %v", config.Name, err.Error()))
}
_, err = resourceManager.CreatePipeline(config.Name, config.Description, pipelineFile)
if err!=nil{
// Log the error but not fail. The API Server pod can restart and it could potentially cause name collision.
// In the future, we might consider loading samples during deployment, instead of when API server starts.
glog.Warningf(fmt.Sprintf("Failed to create pipeline for %s. Error: %v", config.Name, err.Error()))
continue
}
resourceManager.CreatePipeline(config.Name, config.Description, sampleBytes)

// Since the default sorting is by create time,
// Sleep one second makes sure the samples are showing up in the same order as they are added.
time.Sleep(1*time.Second)
}
glog.Info("All samples are loaded.")
return nil
}
Loading