Skip to content

TaskAPI #1717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 56 commits into from
Jan 18, 2021
Merged

TaskAPI #1717

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
9e6b565
Remove unused arguments from ValidateTrafficSplitter
Dec 15, 2020
f58d960
Add TaskAPIKind and TaskDefinition types
Dec 15, 2020
c4f73f1
Add api spec validations for task types
Dec 15, 2020
35c8dfc
Add taskapi.Deploy functionality
Dec 16, 2020
9caa5ca
WIP: Add taskapi.GetAllAPIs functionality (without task jobs)
Dec 16, 2020
626ab5e
Add basic delete functionality to taskapi (WIP)
Dec 17, 2020
f2bf1f0
Add redundancy call to taskapi.DeleteAPI when deployed resource is no…
Dec 17, 2020
edc0320
WIP: Add taskapi.GetAPIByName and corresponding CLI functionality
Dec 17, 2020
7f70e9c
Initial implementation for task runner in python (#1706)
vishalbollu Dec 18, 2020
c6f55cb
Fix linting issues on bootloader.sh
Dec 18, 2020
456d6b2
Refactor batchapi and taskapi code in order to reuse code
Dec 21, 2020
8f5a7db
Update k8s specs for TaskAPI
Dec 22, 2020
96fc821
Fix BatchAPI cron to avoid clashing with TaskAPI jobs
Dec 22, 2020
6406b14
Fix k8s env vars for TaskAPI
Dec 22, 2020
618705f
Working task submission end to end
Dec 22, 2020
3874854
Fix rebase conflict issues
Dec 23, 2020
191b6b3
Fix compilation errors
Jan 7, 2021
3f6610a
Update copyright date
Jan 7, 2021
114be17
Merge branch 'master' into tasks
RobertLucian Jan 7, 2021
c261578
Add docker images for task kind
RobertLucian Jan 7, 2021
3f03c58
Merge branch 'master' into tasks
RobertLucian Jan 8, 2021
9e78050
WIP task api
RobertLucian Jan 8, 2021
1e34749
WIP on Task API
RobertLucian Jan 8, 2021
4509c60
WIP on Task API
RobertLucian Jan 11, 2021
27681a4
Fix merge conflict that slipped through
RobertLucian Jan 11, 2021
f8abbb9
WIP on Task API
RobertLucian Jan 11, 2021
8c047a4
Merge branch 'master' into tasks
RobertLucian Jan 11, 2021
61d7a97
WIP on Task API
RobertLucian Jan 11, 2021
922ce9c
Add newlines in between different API kinds
RobertLucian Jan 11, 2021
c4a116c
WIP on Task API
RobertLucian Jan 12, 2021
277c45a
WIP on Task API
RobertLucian Jan 12, 2021
008ea9b
Merge branch 'master' into tasks
RobertLucian Jan 12, 2021
34c3afc
WIP on Task API
RobertLucian Jan 12, 2021
fb0ced2
Remove python-task-* images
RobertLucian Jan 13, 2021
5e8d062
Bug fixes
RobertLucian Jan 13, 2021
b2403c2
Add Task API docs
RobertLucian Jan 13, 2021
1175c34
Update task example
RobertLucian Jan 13, 2021
c02385a
Misc stuff
RobertLucian Jan 13, 2021
4a4be64
Fix batch logs & add task logs
RobertLucian Jan 13, 2021
a27e8ff
Support Task API on GCP
RobertLucian Jan 14, 2021
1d3b3cd
Fix TaskAPI on GCP
RobertLucian Jan 14, 2021
2adc12c
Docs fixes
RobertLucian Jan 14, 2021
7b05aa5
Docs change
RobertLucian Jan 14, 2021
933d109
Cleanup
RobertLucian Jan 14, 2021
3888c97
Merge branch 'master' into tasks
vishalbollu Jan 14, 2021
3f4d76e
Bug fixes
vishalbollu Jan 14, 2021
bb19d12
Update logs docs
vishalbollu Jan 14, 2021
a548611
Merge branch 'master' into tasks
RobertLucian Jan 18, 2021
8a50249
Address PR comments
RobertLucian Jan 18, 2021
8cdc8f8
Fix function call parameters
RobertLucian Jan 18, 2021
62a0dc7
Fix job endpoints for task/batch APIs
RobertLucian Jan 18, 2021
cf72494
Remove request monitor from task + rename functions
RobertLucian Jan 18, 2021
fe0baf7
Address PR comments
RobertLucian Jan 18, 2021
087347e
Merge branch 'master' into tasks
RobertLucian Jan 18, 2021
3d2d816
Merge branch 'master' into tasks
RobertLucian Jan 18, 2021
6ff9a1e
Merge branch 'master' into tasks
RobertLucian Jan 18, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/build-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ if [[ "$image" == *"-slim" ]]; then
build_args="--build-arg SLIM=true"
fi

if [ "${image}" == "python-predictor-gpu-slim" ]; then
if [ "${image}" == *"gpu-slim" ]; then
cuda=("10.0" "10.1" "10.1" "10.2" "10.2" "11.0" "11.1")
cudnn=("7" "7" "8" "7" "8" "8" "8")
for i in ${!cudnn[@]}; do
Expand Down
12 changes: 10 additions & 2 deletions cli/cluster/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cortexlabs/cortex/pkg/lib/prompt"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/operator/schema"
"github.com/cortexlabs/cortex/pkg/types/userconfig"
)

func Delete(operatorConfig OperatorConfig, apiName string, keepCache bool, force bool) (schema.DeleteResponse, error) {
Expand Down Expand Up @@ -73,13 +74,20 @@ func getReadyRealtimeAPIReplicasOrNil(operatorConfig OperatorConfig, apiName str
return &totalReady
}

func StopJob(operatorConfig OperatorConfig, apiName string, jobID string) (schema.DeleteResponse, error) {
func StopJob(operatorConfig OperatorConfig, kind userconfig.Kind, apiName string, jobID string) (schema.DeleteResponse, error) {
params := map[string]string{
"apiName": apiName,
"jobID": jobID,
}

httpRes, err := HTTPDelete(operatorConfig, path.Join("/batch", apiName), params)
var endpointComponent string
if kind == userconfig.BatchAPIKind {
endpointComponent = "batch"
} else {
endpointComponent = "tasks"
}

httpRes, err := HTTPDelete(operatorConfig, path.Join("/"+endpointComponent, apiName), params)
if err != nil {
return schema.DeleteResponse{}, err
}
Expand Down
23 changes: 19 additions & 4 deletions cli/cluster/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,31 @@ func GetAPIByID(operatorConfig OperatorConfig, apiName string, apiID string) ([]
return apiRes, nil
}

func GetJob(operatorConfig OperatorConfig, apiName string, jobID string) (schema.JobResponse, error) {
func GetBatchJob(operatorConfig OperatorConfig, apiName string, jobID string) (schema.BatchJobResponse, error) {
endpoint := path.Join("/batch", apiName)
httpRes, err := HTTPGet(operatorConfig, endpoint, map[string]string{"jobID": jobID})
if err != nil {
return schema.JobResponse{}, err
return schema.BatchJobResponse{}, err
}

var jobRes schema.JobResponse
var jobRes schema.BatchJobResponse
if err = json.Unmarshal(httpRes, &jobRes); err != nil {
return schema.JobResponse{}, errors.Wrap(err, endpoint, string(httpRes))
return schema.BatchJobResponse{}, errors.Wrap(err, endpoint, string(httpRes))
}

return jobRes, nil
}

func GetTaskJob(operatorConfig OperatorConfig, apiName string, jobID string) (schema.TaskJobResponse, error) {
endpoint := path.Join("/tasks", apiName)
httpRes, err := HTTPGet(operatorConfig, endpoint, map[string]string{"jobID": jobID})
if err != nil {
return schema.TaskJobResponse{}, err
}

var jobRes schema.TaskJobResponse
if err = json.Unmarshal(httpRes, &jobRes); err != nil {
return schema.TaskJobResponse{}, errors.Wrap(err, endpoint, string(httpRes))
}

return jobRes, nil
Expand Down
14 changes: 0 additions & 14 deletions cli/cluster/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,6 @@ func StreamJobLogs(operatorConfig OperatorConfig, apiName string, jobID string)
return streamLogs(operatorConfig, "/logs/"+apiName, map[string]string{"jobID": jobID})
}

func GetGCPLogsURL(operatorConfig OperatorConfig, apiName string) (schema.GCPLogsResponse, error) {
httpRes, err := HTTPGet(operatorConfig, "/logs/"+apiName)
if err != nil {
return schema.GCPLogsResponse{}, err
}

var gcpLogsResponse schema.GCPLogsResponse
if err = json.Unmarshal(httpRes, &gcpLogsResponse); err != nil {
return schema.GCPLogsResponse{}, errors.Wrap(err, "/logs/"+apiName, string(httpRes))
}

return gcpLogsResponse, nil
}

func streamLogs(operatorConfig OperatorConfig, path string, qParams ...map[string]string) error {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
Expand Down
7 changes: 6 additions & 1 deletion cli/cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ var _deleteCmd = &cobra.Command{

var deleteResponse schema.DeleteResponse
if len(args) == 2 {
deleteResponse, err = cluster.StopJob(MustGetOperatorConfig(env.Name), args[0], args[1])
apisRes, err := cluster.GetAPI(MustGetOperatorConfig(env.Name), args[0])
if err != nil {
exit.Error(err)
}

deleteResponse, err = cluster.StopJob(MustGetOperatorConfig(env.Name), apisRes[0].Spec.Kind, args[0], args[1])
if err != nil {
exit.Error(err)
}
Expand Down
60 changes: 51 additions & 9 deletions cli/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,17 @@ var _getCmd = &cobra.Command{
return "", err
}

jobTable, err := getJob(env, args[0], args[1])
apisRes, err := cluster.GetAPI(MustGetOperatorConfig(envName), args[0])
if err != nil {
return "", err
}

var jobTable string
if apisRes[0].Spec.Kind == userconfig.BatchAPIKind {
jobTable, err = getBatchJob(env, args[0], args[1])
} else {
jobTable, err = getTaskJob(env, args[0], args[1])
}
if err != nil {
return "", err
}
Expand Down Expand Up @@ -189,6 +199,8 @@ func getAPIsInAllEnvironments() (string, error) {
var allRealtimeAPIEnvs []string
var allBatchAPIs []schema.APIResponse
var allBatchAPIEnvs []string
var allTaskAPIs []schema.APIResponse
var allTaskAPIEnvs []string
var allTrafficSplitters []schema.APIResponse
var allTrafficSplitterEnvs []string

Expand Down Expand Up @@ -219,6 +231,9 @@ func getAPIsInAllEnvironments() (string, error) {
case userconfig.RealtimeAPIKind:
allRealtimeAPIEnvs = append(allRealtimeAPIEnvs, env.Name)
allRealtimeAPIs = append(allRealtimeAPIs, api)
case userconfig.TaskAPIKind:
allTaskAPIEnvs = append(allTaskAPIEnvs, env.Name)
allTaskAPIs = append(allTaskAPIs, api)
case userconfig.TrafficSplitterKind:
allTrafficSplitterEnvs = append(allTrafficSplitterEnvs, env.Name)
allTrafficSplitters = append(allTrafficSplitters, api)
Expand All @@ -243,7 +258,7 @@ func getAPIsInAllEnvironments() (string, error) {

out := ""

if len(allRealtimeAPIs) == 0 && len(allBatchAPIs) == 0 && len(allTrafficSplitters) == 0 {
if len(allRealtimeAPIs) == 0 && len(allBatchAPIs) == 0 && len(allTrafficSplitters) == 0 && len(allTaskAPIs) == 0 {
// check if any environments errorred
if len(errorsMap) != len(cliConfig.Environments) {
if len(errorsMap) == 0 {
Expand Down Expand Up @@ -271,20 +286,26 @@ func getAPIsInAllEnvironments() (string, error) {
out += t.MustFormat()
}

if len(allRealtimeAPIs) > 0 {
t := realtimeAPIsTable(allRealtimeAPIs, allRealtimeAPIEnvs)

if len(allTaskAPIs) > 0 {
t := taskAPIsTable(allTaskAPIs, allTaskAPIEnvs)
if len(allBatchAPIs) > 0 {
out += "\n"
}
out += t.MustFormat()
}

if len(allRealtimeAPIs) > 0 {
t := realtimeAPIsTable(allRealtimeAPIs, allRealtimeAPIEnvs)
if len(allBatchAPIs) > 0 || len(allTaskAPIs) > 0 {
out += "\n"
}
out += t.MustFormat()
}

if len(allTrafficSplitters) > 0 {
t := trafficSplitterListTable(allTrafficSplitters, allTrafficSplitterEnvs)

if len(allRealtimeAPIs) > 0 || len(allBatchAPIs) > 0 {
if len(allRealtimeAPIs) > 0 || len(allBatchAPIs) > 0 || len(allTaskAPIs) > 0 {
out += "\n"
}

Expand Down Expand Up @@ -319,20 +340,23 @@ func getAPIsByEnv(env cliconfig.Environment, printEnv bool) (string, error) {

var allRealtimeAPIs []schema.APIResponse
var allBatchAPIs []schema.APIResponse
var allTaskAPIs []schema.APIResponse
var allTrafficSplitters []schema.APIResponse

for _, api := range apisRes {
switch api.Spec.Kind {
case userconfig.BatchAPIKind:
allBatchAPIs = append(allBatchAPIs, api)
case userconfig.TaskAPIKind:
allTaskAPIs = append(allTaskAPIs, api)
case userconfig.RealtimeAPIKind:
allRealtimeAPIs = append(allRealtimeAPIs, api)
case userconfig.TrafficSplitterKind:
allTrafficSplitters = append(allTrafficSplitters, api)
}
}

if len(allRealtimeAPIs) == 0 && len(allBatchAPIs) == 0 && len(allTrafficSplitters) == 0 {
if len(allRealtimeAPIs) == 0 && len(allBatchAPIs) == 0 && len(allTaskAPIs) == 0 && len(allTrafficSplitters) == 0 {
return console.Bold("no apis are deployed"), nil
}

Expand All @@ -350,6 +374,22 @@ func getAPIsByEnv(env cliconfig.Environment, printEnv bool) (string, error) {
out += t.MustFormat()
}

if len(allTaskAPIs) > 0 {
envNames := []string{}
for range allTaskAPIs {
envNames = append(envNames, env.Name)
}

t := taskAPIsTable(allTaskAPIs, envNames)
t.FindHeaderByTitle(_titleEnvironment).Hidden = true

if len(allBatchAPIs) > 0 {
out += "\n"
}

out += t.MustFormat()
}

if len(allRealtimeAPIs) > 0 {
envNames := []string{}
for range allRealtimeAPIs {
Expand All @@ -359,7 +399,7 @@ func getAPIsByEnv(env cliconfig.Environment, printEnv bool) (string, error) {
t := realtimeAPIsTable(allRealtimeAPIs, envNames)
t.FindHeaderByTitle(_titleEnvironment).Hidden = true

if len(allBatchAPIs) > 0 {
if len(allBatchAPIs) > 0 || len(allTaskAPIs) > 0 {
out += "\n"
}

Expand All @@ -375,7 +415,7 @@ func getAPIsByEnv(env cliconfig.Environment, printEnv bool) (string, error) {
t := trafficSplitterListTable(allTrafficSplitters, envNames)
t.FindHeaderByTitle(_titleEnvironment).Hidden = true

if len(allBatchAPIs) > 0 || len(allRealtimeAPIs) > 0 {
if len(allBatchAPIs) > 0 || len(allTaskAPIs) > 0 || len(allRealtimeAPIs) > 0 {
out += "\n"
}

Expand Down Expand Up @@ -412,6 +452,8 @@ func getAPI(env cliconfig.Environment, apiName string) (string, error) {
return trafficSplitterTable(apiRes, env)
case userconfig.BatchAPIKind:
return batchAPITable(apiRes), nil
case userconfig.TaskAPIKind:
return taskAPITable(apiRes), nil
default:
return "", errors.ErrorUnexpected(fmt.Sprintf("encountered unexpected kind %s for api %s", apiRes.Spec.Kind, apiRes.Spec.Name))
}
Expand Down
21 changes: 10 additions & 11 deletions cli/cmd/lib_batch_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func batchAPIsTable(batchAPIs []schema.APIResponse, envNames []string) table.Tab
latestJobID := "-"
runningJobs := 0

for _, job := range batchAPI.JobStatuses {
for _, job := range batchAPI.BatchJobStatuses {
if job.StartTime.After(latestStartTime) {
latestStartTime = job.StartTime
latestJobID = job.ID + fmt.Sprintf(" (submitted %s ago)", libtime.SinceStr(&latestStartTime))
Expand Down Expand Up @@ -82,14 +82,14 @@ func batchAPIsTable(batchAPIs []schema.APIResponse, envNames []string) table.Tab
}

func batchAPITable(batchAPI schema.APIResponse) string {
jobRows := make([][]interface{}, 0, len(batchAPI.JobStatuses))
jobRows := make([][]interface{}, 0, len(batchAPI.BatchJobStatuses))

out := ""
if len(batchAPI.JobStatuses) == 0 {
out = console.Bold("no submitted jobs\n")
if len(batchAPI.BatchJobStatuses) == 0 {
out = console.Bold("no submitted batch jobs\n")
} else {
totalFailed := 0
for _, job := range batchAPI.JobStatuses {
for _, job := range batchAPI.BatchJobStatuses {
succeeded := 0
failed := 0

Expand Down Expand Up @@ -144,8 +144,8 @@ func batchAPITable(batchAPI schema.APIResponse) string {
return out
}

func getJob(env cliconfig.Environment, apiName string, jobID string) (string, error) {
resp, err := cluster.GetJob(MustGetOperatorConfig(env.Name), apiName, jobID)
func getBatchJob(env cliconfig.Environment, apiName string, jobID string) (string, error) {
resp, err := cluster.GetBatchJob(MustGetOperatorConfig(env.Name), apiName, jobID)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -216,12 +216,11 @@ func getJob(env cliconfig.Environment, apiName string, jobID string) (string, er
out += titleStr("batch stats") + t.MustFormat(&table.Opts{BoldHeader: pointer.Bool(false)})

if job.Status == status.JobEnqueuing {
out += "\nstill enqueuing, workers have not been allocated for this job yet\n"
out += "\n" + "still enqueuing, workers have not been allocated for this job yet\n"
} else if job.Status.IsCompleted() {
out += "\nworker stats are not available because this job is not currently running\n"
out += "\n" + "worker stats are not available because this job is not currently running\n"
} else {
out += titleStr("worker stats")

if job.WorkerCounts != nil {
t := table.Table{
Headers: []table.Header{
Expand Down Expand Up @@ -253,7 +252,7 @@ func getJob(env cliconfig.Environment, apiName string, jobID string) (string, er

out += "\n" + console.Bold("job endpoint: ") + resp.Endpoint + "\n"

jobSpecStr, err := libjson.Pretty(job.Job)
jobSpecStr, err := libjson.Pretty(job.BatchJob)
if err != nil {
return "", err
}
Expand Down
Loading