-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Compile samples instead of hard code them in API server #76
Changes from all commits
8df55d3
6b38293
0ba9eee
3cd0d30
4c599ba
204d7bf
8a1ca01
2569f64
1b30574
1998970
5060b25
0389e64
05b5b21
de258a7
ee8911c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
[ | ||
{ | ||
"name":"[Sample] ML - XGBoost - Training with Confusion Matrix", | ||
"description":"A trainer that does end-to-end distributed training for XGBoost models. For source code, refer to https://github.com/kubeflow/pipelines/tree/master/samples/xgboost-spark", | ||
"file":"/samples/xgboost-spark/xgboost-training-cm.py.tar.gz" | ||
}, | ||
{ | ||
"name":"[Sample] ML - TFX - Taxi Tip Prediction Model Trainer", | ||
"description":"Example pipeline that does classification with model analysis based on a public tax cab BigQuery dataset. For source code, refer to https://github.com/kubeflow/pipelines/tree/master/samples/tfx", | ||
"file":"/samples/tfx/taxi-cab-classification-pipeline.py.tar.gz" | ||
}, | ||
{ | ||
"name":"[Sample] Basic - Sequential", | ||
"description":"A pipeline with two sequential steps. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/sequential.py", | ||
"file":"/samples/basic/sequential.py.tar.gz" | ||
}, | ||
{ | ||
"name":"[Sample] Basic - Parallel Join", | ||
"description":"A pipeline that downloads two messages in parallel and print the concatenated result. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/parallel_join.py", | ||
"file":"/samples/basic/parallel_join.py.tar.gz" | ||
}, | ||
{ | ||
"name":"[Sample] Basic - Immediate Value", | ||
"description":"A pipeline with parameter values hard coded. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/immediate_value.py", | ||
"file":"/samples/basic/immediate_value.py.tar.gz" | ||
}, | ||
{ | ||
"name":"[Sample] Basic - Exit Handler", | ||
"description":"A pipeline that downloads a message and print it out. Exit Handler will run at the end. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/exit_handler.py", | ||
"file":"/samples/basic/exit_handler.py.tar.gz" | ||
}, | ||
{ | ||
"name":"[Sample] Basic - Condition", | ||
"description":"A pipeline shows how to use dsl.Condition. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/condition.py", | ||
"file":"/samples/basic/condition.py.tar.gz" | ||
} | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ import ( | |
"io/ioutil" | ||
"net" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/golang/glog" | ||
api "github.com/kubeflow/pipelines/backend/api/go_client" | ||
|
@@ -30,6 +31,9 @@ import ( | |
"github.com/grpc-ecosystem/grpc-gateway/runtime" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/reflection" | ||
"os" | ||
"github.com/pkg/errors" | ||
"fmt" | ||
) | ||
|
||
var ( | ||
|
@@ -48,7 +52,10 @@ func main() { | |
initConfig() | ||
clientManager := newClientManager() | ||
resourceManager := resource.NewResourceManager(&clientManager) | ||
loadSamples(resourceManager) | ||
err:= loadSamples(resourceManager) | ||
if err!=nil{ | ||
glog.Fatalf("Failed to load samples. Err: %v", err.Error()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the server restarts? Does this prevent the server from ever restarting unless the storage is deleted? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the samples are baked into the image so it won't be a problem |
||
} | ||
go startRpcServer(resourceManager) | ||
startHttpProxy(resourceManager) | ||
|
||
|
@@ -119,29 +126,41 @@ func registerHttpHandlerFromEndpoint(handler RegisterHttpHandlerFromEndpoint, se | |
} | ||
|
||
// Preload a bunch of pipeline samples | ||
func loadSamples(resourceManager *resource.ResourceManager) { | ||
func loadSamples(resourceManager *resource.ResourceManager) error { | ||
configBytes, err := ioutil.ReadFile(*sampleConfigPath) | ||
if err != nil { | ||
glog.Warningf("Failed to read sample configurations. Err: %v", err.Error()) | ||
return | ||
return errors.New(fmt.Sprintf("Failed to read sample configurations file. Err: %v", err.Error())) | ||
} | ||
type config struct { | ||
Name string | ||
Description string | ||
File string | ||
} | ||
var configs []config | ||
if json.Unmarshal(configBytes, &configs) != nil { | ||
glog.Warningf("Failed to read sample configurations. Err: %v", err.Error()) | ||
return | ||
if err:= json.Unmarshal(configBytes, &configs);err != nil { | ||
return errors.New(fmt.Sprintf("Failed to read sample configurations. Err: %v", err.Error())) | ||
} | ||
for _, config := range configs { | ||
sampleBytes, err := ioutil.ReadFile(config.File) | ||
reader, err:= os.Open(config.File) | ||
if err != nil { | ||
glog.Warningf("Failed to load sample %s. Error: %v", config.Name, err.Error()) | ||
return errors.New(fmt.Sprintf("Failed to load sample %s. Error: %v", config.Name, err.Error())) | ||
} | ||
pipelineFile, err := server.ReadPipelineFile(config.File, reader, server.MaxFileLength) | ||
if err!=nil{ | ||
return errors.New(fmt.Sprintf("Failed to decompress the file %s. Error: %v", config.Name, err.Error())) | ||
} | ||
_, err = resourceManager.CreatePipeline(config.Name, config.Description, pipelineFile) | ||
if err!=nil{ | ||
// Log the error but not fail. The API Server pod can restart and it could potentially cause name collision. | ||
// In the future, we might consider loading samples during deployment, instead of when API server starts. | ||
glog.Warningf(fmt.Sprintf("Failed to create pipeline for %s. Error: %v", config.Name, err.Error())) | ||
continue | ||
} | ||
resourceManager.CreatePipeline(config.Name, config.Description, sampleBytes) | ||
|
||
// Since the default sorting is by create time, | ||
// Sleep one second makes sure the samples are showing up in the same order as they are added. | ||
time.Sleep(1*time.Second) | ||
} | ||
glog.Info("All samples are loaded.") | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already in (#70). Give it a try?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test need to update to adopt that. it's kind of late for the last minute change. I'll do it post beta.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.