Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions cli/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ const (
_titleRequested = "requested"
_titleFailed = "failed"
_titleLastupdated = "last update"
_titleAvgRequest = "avg request"
_title2XX = "2XX"
_title4XX = "4XX"
_title5XX = "5XX"
)

var (
Expand Down
15 changes: 0 additions & 15 deletions cli/cmd/lib_async_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ func asyncAPIsTable(asyncAPIs []schema.APIResponse, envNames []string) table.Tab

var totalFailed int32
var totalStale int32
var total4XX int
var total5XX int

for i, asyncAPI := range asyncAPIs {
lastUpdated := time.Unix(asyncAPI.Spec.LastUpdated, 0)
Expand All @@ -76,19 +74,10 @@ func asyncAPIsTable(asyncAPIs []schema.APIResponse, envNames []string) table.Tab
asyncAPI.Status.Requested,
asyncAPI.Status.Updated.TotalFailed(),
libtime.SinceStr(&lastUpdated),
latencyStr(asyncAPI.Metrics),
code2XXStr(asyncAPI.Metrics),
code4XXStr(asyncAPI.Metrics),
code5XXStr(asyncAPI.Metrics),
})

totalFailed += asyncAPI.Status.Updated.TotalFailed()
totalStale += asyncAPI.Status.Stale.Ready

if asyncAPI.Metrics.NetworkStats != nil {
total4XX += asyncAPI.Metrics.NetworkStats.Code4XX
total5XX += asyncAPI.Metrics.NetworkStats.Code5XX
}
}

return table.Table{
Expand All @@ -101,10 +90,6 @@ func asyncAPIsTable(asyncAPIs []schema.APIResponse, envNames []string) table.Tab
{Title: _titleRequested},
{Title: _titleFailed, Hidden: totalFailed == 0},
{Title: _titleLastupdated},
{Title: _titleAvgRequest},
{Title: _title2XX},
{Title: _title4XX, Hidden: total4XX == 0},
{Title: _title5XX, Hidden: total5XX == 0},
},
Rows: rows,
}
Expand Down
25 changes: 7 additions & 18 deletions cli/cmd/lib_batch_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,7 @@ func batchAPITable(batchAPI schema.APIResponse) string {
if len(batchAPI.BatchJobStatuses) == 0 {
out = console.Bold("no submitted batch jobs\n")
} else {
totalFailed := 0
for _, job := range batchAPI.BatchJobStatuses {
succeeded := 0
failed := 0

if job.BatchMetrics != nil {
failed = job.BatchMetrics.Failed
succeeded = job.BatchMetrics.Succeeded
totalFailed += failed
}

jobEndTime := time.Now()
if job.EndTime != nil {
Expand All @@ -108,8 +99,7 @@ func batchAPITable(batchAPI schema.APIResponse) string {
jobRows = append(jobRows, []interface{}{
job.ID,
job.Status.Message(),
fmt.Sprintf("%d/%d", succeeded, job.TotalBatchCount),
failed,
job.TotalBatchCount,
job.StartTime.Format(_timeFormat),
duration,
})
Expand All @@ -119,8 +109,7 @@ func batchAPITable(batchAPI schema.APIResponse) string {
Headers: []table.Header{
{Title: "job id"},
{Title: "status"},
{Title: "progress"}, // (succeeded/total)
{Title: "failed attempts", Hidden: totalFailed == 0},
{Title: "total batches"},
{Title: "start time"},
{Title: "duration"},
},
Expand Down Expand Up @@ -189,14 +178,14 @@ func getBatchJob(env cliconfig.Environment, apiName string, jobID string) (strin
failed := "-"
avgTimePerBatch := "-"

if job.BatchMetrics != nil {
if job.BatchMetrics.AverageTimePerBatch != nil {
batchMetricsDuration := time.Duration(*job.BatchMetrics.AverageTimePerBatch*1000000000) * time.Nanosecond
if resp.Metrics != nil {
if resp.Metrics.AverageTimePerBatch != nil {
batchMetricsDuration := time.Duration(*resp.Metrics.AverageTimePerBatch*1000000000) * time.Nanosecond
avgTimePerBatch = batchMetricsDuration.Truncate(time.Millisecond).String()
}

succeeded = s.Int(job.BatchMetrics.Succeeded)
failed = s.Int(job.BatchMetrics.Failed)
succeeded = s.Int(resp.Metrics.Succeeded)
failed = s.Int(resp.Metrics.Failed)
}

t := table.Table{
Expand Down
49 changes: 0 additions & 49 deletions cli/cmd/lib_realtime_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@ limitations under the License.
package cmd

import (
"fmt"
"strings"
"time"

"github.com/cortexlabs/cortex/cli/types/cliconfig"
"github.com/cortexlabs/cortex/pkg/lib/console"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/lib/table"
libtime "github.com/cortexlabs/cortex/pkg/lib/time"
"github.com/cortexlabs/cortex/pkg/operator/schema"
"github.com/cortexlabs/cortex/pkg/types/metrics"
)

func realtimeAPITable(realtimeAPI schema.APIResponse, env cliconfig.Environment) (string, error) {
Expand Down Expand Up @@ -61,8 +58,6 @@ func realtimeAPIsTable(realtimeAPIs []schema.APIResponse, envNames []string) tab

var totalFailed int32
var totalStale int32
var total4XX int
var total5XX int

for i, realtimeAPI := range realtimeAPIs {
lastUpdated := time.Unix(realtimeAPI.Spec.LastUpdated, 0)
Expand All @@ -75,19 +70,10 @@ func realtimeAPIsTable(realtimeAPIs []schema.APIResponse, envNames []string) tab
realtimeAPI.Status.Requested,
realtimeAPI.Status.Updated.TotalFailed(),
libtime.SinceStr(&lastUpdated),
latencyStr(realtimeAPI.Metrics),
code2XXStr(realtimeAPI.Metrics),
code4XXStr(realtimeAPI.Metrics),
code5XXStr(realtimeAPI.Metrics),
})

totalFailed += realtimeAPI.Status.Updated.TotalFailed()
totalStale += realtimeAPI.Status.Stale.Ready

if realtimeAPI.Metrics.NetworkStats != nil {
total4XX += realtimeAPI.Metrics.NetworkStats.Code4XX
total5XX += realtimeAPI.Metrics.NetworkStats.Code5XX
}
}

return table.Table{
Expand All @@ -100,42 +86,7 @@ func realtimeAPIsTable(realtimeAPIs []schema.APIResponse, envNames []string) tab
{Title: _titleRequested},
{Title: _titleFailed, Hidden: totalFailed == 0},
{Title: _titleLastupdated},
{Title: _titleAvgRequest},
{Title: _title2XX},
{Title: _title4XX, Hidden: total4XX == 0},
{Title: _title5XX, Hidden: total5XX == 0},
},
Rows: rows,
}
}

func latencyStr(metrics *metrics.Metrics) string {
if metrics.NetworkStats == nil || metrics.NetworkStats.Latency == nil {
return "-"
}
if *metrics.NetworkStats.Latency < 1000 {
return fmt.Sprintf("%.6g ms", *metrics.NetworkStats.Latency)
}
return fmt.Sprintf("%.6g s", (*metrics.NetworkStats.Latency)/1000)
}

func code2XXStr(metrics *metrics.Metrics) string {
if metrics.NetworkStats == nil || metrics.NetworkStats.Code2XX == 0 {
return "-"
}
return s.Int(metrics.NetworkStats.Code2XX)
}

func code4XXStr(metrics *metrics.Metrics) string {
if metrics.NetworkStats == nil || metrics.NetworkStats.Code4XX == 0 {
return "-"
}
return s.Int(metrics.NetworkStats.Code4XX)
}

func code5XXStr(metrics *metrics.Metrics) string {
if metrics.NetworkStats == nil || metrics.NetworkStats.Code5XX == 0 {
return "-"
}
return s.Int(metrics.NetworkStats.Code5XX)
}
6 changes: 0 additions & 6 deletions cli/cmd/lib_traffic_splitters.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ func trafficSplitTable(trafficSplitter schema.APIResponse, env cliconfig.Environ
apiRes.Status.Message(),
apiRes.Status.Requested,
libtime.SinceStr(&lastUpdated),
latencyStr(apiRes.Metrics),
code2XXStr(apiRes.Metrics),
code5XXStr(apiRes.Metrics),
})
}

Expand All @@ -99,9 +96,6 @@ func trafficSplitTable(trafficSplitter schema.APIResponse, env cliconfig.Environ
{Title: _titleStatus},
{Title: _titleRequested},
{Title: _titleLastupdated},
{Title: _titleAvgRequest},
{Title: _title2XX},
{Title: _title5XX},
},
Rows: rows,
}, nil
Expand Down
10 changes: 5 additions & 5 deletions docs/workloads/batch/jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,6 @@ RESPONSE:
"sqs_url": <string>,
"status": <string>,
"batches_in_queue": <int> # number of batches remaining in the queue
"batch_metrics": {
"succeeded": <int> # number of succeeded batches
"failed": int # number of failed attempts
"avg_time_per_batch": <float> (optional) # average time spent working on a batch (only considers successful attempts)
},
"worker_counts": { # worker counts are only available while a job is running
"pending": <int>, # number of workers that are waiting for compute resources to be provisioned
"initializing": <int>, # number of workers that are initializing
Expand All @@ -215,6 +210,11 @@ RESPONSE:
"endpoint": <string>
"api_spec": {
...
},
"metrics": {
"succeeded": <int> # number of succeeded batches
"failed": int # number of failed attempts
"avg_time_per_batch": <float> (optional) # average time spent working on a batch (only considers successful attempts)
}
}
```
Expand Down
6 changes: 3 additions & 3 deletions pkg/crds/apis/batch/v1alpha1/batchjob_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
)

// GetMetrics retrieves the BatchJob metrics from prometheus
func GetMetrics(promAPIv1 promv1.API, jobKey spec.JobKey, t time.Time) (metrics.BatchMetrics, error) {
func GetMetrics(promAPIv1 promv1.API, jobKey spec.JobKey, t time.Time) (*metrics.BatchMetrics, error) {
var (
jobBatchesSucceeded float64
jobBatchesFailed float64
Expand All @@ -59,10 +59,10 @@ func GetMetrics(promAPIv1 promv1.API, jobKey spec.JobKey, t time.Time) (metrics.
},
)
if err != nil {
return metrics.BatchMetrics{}, err
return nil, err
}

return metrics.BatchMetrics{
return &metrics.BatchMetrics{
Succeeded: int(jobBatchesSucceeded),
Failed: int(jobBatchesFailed),
AverageTimePerBatch: avgTimePerBatch,
Expand Down
2 changes: 1 addition & 1 deletion pkg/crds/controllers/batch/batchjob_controller_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ func getMetrics(r *BatchJobReconciler, batchJob batch.BatchJob) (metrics.BatchMe
return metrics.BatchMetrics{}, err
}

return jobMetrics, nil
return *jobMetrics, nil
}

func saveJobMetrics(r *BatchJobReconciler, batchJob batch.BatchJob) error {
Expand Down
33 changes: 2 additions & 31 deletions pkg/operator/endpoints/get_batch_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@ package endpoints

import (
"net/http"
"net/url"

"github.com/cortexlabs/cortex/pkg/operator/operator"
"github.com/cortexlabs/cortex/pkg/operator/resources"
"github.com/cortexlabs/cortex/pkg/operator/resources/job/batchapi"
"github.com/cortexlabs/cortex/pkg/operator/schema"
"github.com/cortexlabs/cortex/pkg/types/spec"
"github.com/cortexlabs/cortex/pkg/types/userconfig"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -54,37 +51,11 @@ func GetBatchJob(w http.ResponseWriter, r *http.Request) {
Kind: userconfig.BatchAPIKind,
}

jobStatus, err := batchapi.GetJobStatus(jobKey)
jobResponse, err := batchapi.GetJob(jobKey)
if err != nil {
respondError(w, r, err)
return
}

apiSpec, err := operator.DownloadAPISpec(jobStatus.APIName, jobStatus.APIID)
if err != nil {
respondError(w, r, err)
return
}

endpoint, err := operator.APIEndpoint(apiSpec)
if err != nil {
respondError(w, r, err)
return
}

parsedURL, err := url.Parse(endpoint)
if err != nil {
respondError(w, r, err)
}
q := parsedURL.Query()
q.Add("jobID", jobKey.ID)
parsedURL.RawQuery = q.Encode()

response := schema.BatchJobResponse{
JobStatus: *jobStatus,
APISpec: *apiSpec,
Endpoint: parsedURL.String(),
}

respondJSON(w, r, response)
respondJSON(w, r, jobResponse)
}
4 changes: 2 additions & 2 deletions pkg/operator/endpoints/logs_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func GetJobLogURL(w http.ResponseWriter, r *http.Request) {

switch deployedResource.Kind {
case userconfig.BatchAPIKind:
jobStatus, err := batchapi.GetJobStatus(spec.JobKey{
jobResponse, err := batchapi.GetJob(spec.JobKey{
ID: jobID,
APIName: apiName,
Kind: userconfig.BatchAPIKind,
Expand All @@ -90,7 +90,7 @@ func GetJobLogURL(w http.ResponseWriter, r *http.Request) {
respondError(w, r, err)
return
}
logURL, err := operator.BatchJobLogURL(apiName, *jobStatus)
logURL, err := operator.BatchJobLogURL(apiName, jobResponse.JobStatus)
if err != nil {
respondError(w, r, err)
return
Expand Down
12 changes: 0 additions & 12 deletions pkg/operator/resources/asyncapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,6 @@ func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResp
return nil, err
}

metrics, err := GetMetrics(*api)
if err != nil {
return nil, err
}

dashboardURL := pointer.String(getDashboardURL(api.Name))

return []schema.APIResponse{
Expand All @@ -278,7 +273,6 @@ func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResp
Status: status,
Endpoint: apiEndpoint,
DashboardURL: dashboardURL,
Metrics: metrics,
},
}, nil
}
Expand All @@ -295,11 +289,6 @@ func GetAllAPIs(pods []kcore.Pod, deployments []kapps.Deployment) ([]schema.APIR
return nil, err
}

allMetrics, err := GetMultipleMetrics(apis)
if err != nil {
return nil, err
}

asyncAPIs := make([]schema.APIResponse, len(apis))

for i := range apis {
Expand All @@ -313,7 +302,6 @@ func GetAllAPIs(pods []kcore.Pod, deployments []kapps.Deployment) ([]schema.APIR
Spec: api,
Status: &statuses[i],
Endpoint: endpoint,
Metrics: &allMetrics[i],
}
}

Expand Down
Loading