Skip to content

Commit 9d05891

Browse files
authored
Get job status from batch crd for in progress jobs (#2146)
1 parent cc280e5 commit 9d05891

File tree

17 files changed

+272
-150
lines changed

17 files changed

+272
-150
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ require (
3333
github.com/onsi/gomega v1.10.2
3434
github.com/opencontainers/go-digest v1.0.0 // indirect
3535
github.com/opencontainers/image-spec v1.0.1 // indirect
36+
github.com/patrickmn/go-cache v2.1.0+incompatible
3637
github.com/pkg/errors v0.9.1
3738
github.com/prometheus/client_golang v1.7.1
3839
github.com/prometheus/common v0.10.0

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,9 @@ github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3I
507507
github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVojFA6h/TRcI=
508508
github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
509509
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
510+
github.com/patrickmn/go-cache v1.0.0 h1:3gD5McaYs9CxjyK5AXGcq8gdeCARtd/9gJDUvVeaZ0Y=
511+
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
512+
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
510513
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
511514
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
512515
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=

manager/manifests/prometheus-monitoring.yaml.j2

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ spec:
224224
- port: metrics
225225
scheme: http
226226
path: /metrics
227-
interval: 30s
227+
interval: 20s
228228
namespaceSelector:
229229
any: true
230230
selector:

pkg/crds/apis/batch/v1alpha1/batchjob_types.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,15 @@ type BatchJobStatus struct {
8383
// Job ID
8484
ID string `json:"id,omitempty"`
8585

86-
// Processing start timestamp
87-
StartTime *kmeta.Time `json:"start_time,omitempty"`
88-
8986
// Processing ending timestamp
9087
EndTime *kmeta.Time `json:"end_time,omitempty"`
9188

9289
// URL for the used SQS queue
9390
QueueURL string `json:"queue_url,omitempty"`
9491

92+
// Total batch count
93+
TotalBatchCount int `json:"total_batch_count,omitempty"`
94+
9595
// +kubebuilder:validation:Type=string
9696
// Status of the batch job
9797
Status status.JobCode `json:"status,omitempty"`

pkg/crds/apis/batch/v1alpha1/zz_generated.deepcopy.go

Lines changed: 0 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/crds/config/crd/bases/batch.cortex.dev_batchjobs.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,12 @@ spec:
125125
queue_url:
126126
description: URL for the used SQS queue
127127
type: string
128-
start_time:
129-
description: Processing start timestamp
130-
format: date-time
131-
type: string
132128
status:
133129
description: Status of the batch job
134130
type: string
131+
total_batch_count:
132+
description: Total batch count
133+
type: integer
135134
worker_counts:
136135
description: Detailed worker counts with respective status
137136
properties:

pkg/crds/config/rbac/role.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@ metadata:
66
creationTimestamp: null
77
name: manager-role
88
rules:
9+
- apiGroups:
10+
- ""
11+
resources:
12+
- pods
13+
verbs:
14+
- get
15+
- list
16+
- watch
917
- apiGroups:
1018
- batch
1119
resources:

pkg/crds/controllers/batch/batchjob_controller.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type BatchJobReconciler struct {
5656
// +kubebuilder:rbac:groups=batch.cortex.dev,resources=batchjobs/status,verbs=get;update;patch
5757
// +kubebuilder:rbac:groups=batch.cortex.dev,resources=batchjobs/finalizers,verbs=update
5858
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch
59+
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch
5960

6061
// Reconcile is part of the main kubernetes reconciliation loop which aims to
6162
// move the current state of the cluster closer to the desired state.
@@ -150,12 +151,18 @@ func (r *BatchJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
150151
return ctrl.Result{}, err
151152
}
152153

154+
var totalBatchCount int
155+
if enqueuingStatus == batch.EnqueuingDone {
156+
totalBatchCount, err = r.Config.GetTotalBatchCount(r, batchJob)
157+
}
158+
153159
workerJobExists := workerJob != nil
154160
statusInfo := batchJobStatusInfo{
155161
QueueExists: queueExists,
156162
EnqueuingStatus: enqueuingStatus,
157163
EnqueuerJob: enqueuerJob,
158164
WorkerJob: workerJob,
165+
TotalBatchCount: totalBatchCount,
159166
}
160167

161168
log.V(1).Info("status data successfully acquired",

pkg/crds/controllers/batch/batchjob_controller_config.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@ import (
2323

2424
// BatchJobReconcilerConfig reconciler config for the BatchJob kind. Allows for mocking specific methods
2525
type BatchJobReconcilerConfig struct {
26-
GetMaxBatchCount func(r *BatchJobReconciler, batchJob batch.BatchJob) (int, error)
27-
GetMetrics func(r *BatchJobReconciler, batchJob batch.BatchJob) (metrics.BatchMetrics, error)
28-
SaveJobMetrics func(r *BatchJobReconciler, batchJob batch.BatchJob) error
29-
SaveJobStatus func(r *BatchJobReconciler, batchJob batch.BatchJob) error
26+
GetTotalBatchCount func(r *BatchJobReconciler, batchJob batch.BatchJob) (int, error)
27+
GetMetrics func(r *BatchJobReconciler, batchJob batch.BatchJob) (metrics.BatchMetrics, error)
28+
SaveJobMetrics func(r *BatchJobReconciler, batchJob batch.BatchJob) error
29+
SaveJobStatus func(r *BatchJobReconciler, batchJob batch.BatchJob) error
3030
}
3131

3232
// ApplyDefaults sets the defaults for BatchJobReconcilerConfig
3333
func (c BatchJobReconcilerConfig) ApplyDefaults() BatchJobReconcilerConfig {
34-
if c.GetMaxBatchCount == nil {
35-
c.GetMaxBatchCount = getMaxBatchCount
34+
if c.GetTotalBatchCount == nil {
35+
c.GetTotalBatchCount = getTotalBatchCount
3636
}
3737

3838
if c.GetMetrics == nil {

pkg/crds/controllers/batch/batchjob_controller_helpers.go

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/cortexlabs/cortex/pkg/types/userconfig"
4343
"github.com/cortexlabs/cortex/pkg/workloads"
4444
"github.com/cortexlabs/yaml"
45+
cache "github.com/patrickmn/go-cache"
4546
kbatch "k8s.io/api/batch/v1"
4647
kcore "k8s.io/api/core/v1"
4748
kerrors "k8s.io/apimachinery/pkg/api/errors"
@@ -52,13 +53,21 @@ import (
5253
const (
5354
_enqueuerContainerName = "enqueuer"
5455
_deadlineExceededReason = "DeadlineExceeded"
56+
_cacheDuration = 60 * time.Second
5557
)
5658

59+
var totalBatchCountCache *cache.Cache
60+
61+
func init() {
62+
totalBatchCountCache = cache.New(_cacheDuration, _cacheDuration)
63+
}
64+
5765
type batchJobStatusInfo struct {
5866
QueueExists bool
5967
EnqueuingStatus batch.EnqueuingStatus
6068
EnqueuerJob *kbatch.Job
6169
WorkerJob *kbatch.Job
70+
TotalBatchCount int
6271
}
6372

6473
func (r *BatchJobReconciler) checkIfQueueExists(batchJob batch.BatchJob) (bool, error) {
@@ -381,11 +390,12 @@ func (r *BatchJobReconciler) updateStatus(ctx context.Context, batchJob *batch.B
381390
case batch.EnqueuingFailed:
382391
batchJob.Status.Status = status.JobEnqueueFailed
383392
batchJob.Status.EndTime = statusInfo.EnqueuerJob.Status.CompletionTime
393+
case batch.EnqueuingDone:
394+
batchJob.Status.TotalBatchCount = statusInfo.TotalBatchCount
384395
}
385396

386397
worker := statusInfo.WorkerJob
387398
if worker != nil {
388-
batchJob.Status.StartTime = worker.Status.StartTime // assign right away, because it's a pointer
389399
batchJob.Status.EndTime = worker.Status.CompletionTime // assign right away, because it's a pointer
390400

391401
if worker.Status.Failed == batchJob.Spec.Workers {
@@ -423,9 +433,7 @@ func (r *BatchJobReconciler) updateStatus(ctx context.Context, batchJob *batch.B
423433
batchJob.Status.Status = status.JobRunning
424434
}
425435

426-
pendingWorkers := batchJob.Spec.Workers - (worker.Status.Active + worker.Status.Succeeded + worker.Status.Failed)
427436
batchJob.Status.WorkerCounts = &status.WorkerCounts{
428-
Pending: pendingWorkers,
429437
Running: worker.Status.Active,
430438
Succeeded: worker.Status.Succeeded,
431439
Failed: worker.Status.Failed,
@@ -495,7 +503,7 @@ func (r *BatchJobReconciler) uploadJobSpec(batchJob batch.BatchJob, api spec.API
495503
timeout = pointer.Int(int(batchJob.Spec.Timeout.Seconds()))
496504
}
497505

498-
maxBatchCount, err := r.Config.GetMaxBatchCount(r, batchJob)
506+
totalBatchCount, err := r.Config.GetTotalBatchCount(r, batchJob)
499507
if err != nil {
500508
return nil, err
501509
}
@@ -513,11 +521,9 @@ func (r *BatchJobReconciler) uploadJobSpec(batchJob batch.BatchJob, api spec.API
513521
Config: config,
514522
},
515523
APIID: api.ID,
516-
SpecID: api.SpecID,
517-
HandlerID: api.HandlerID,
518524
SQSUrl: queueURL,
519525
StartTime: batchJob.CreationTimestamp.Time,
520-
TotalBatchCount: maxBatchCount,
526+
TotalBatchCount: totalBatchCount,
521527
}
522528

523529
if err = r.AWS.UploadJSONToS3(&jobSpec, r.ClusterConfig.Bucket, r.jobSpecKey(batchJob)); err != nil {
@@ -569,19 +575,27 @@ func (r *BatchJobReconciler) persistJobToS3(batchJob batch.BatchJob) error {
569575
)
570576
}
571577

572-
func getMaxBatchCount(r *BatchJobReconciler, batchJob batch.BatchJob) (int, error) {
578+
func getTotalBatchCount(r *BatchJobReconciler, batchJob batch.BatchJob) (int, error) {
573579
key := spec.JobBatchCountKey(r.ClusterConfig.ClusterUID, userconfig.BatchAPIKind, batchJob.Spec.APIName, batchJob.Name)
574-
maxBatchCountBytes, err := r.AWS.ReadBytesFromS3(r.ClusterConfig.Bucket, key)
575-
if err != nil {
576-
return 0, err
577-
}
580+
cachedTotalBatchCount, found := totalBatchCountCache.Get(key)
581+
var totalBatchCount int
582+
if !found {
583+
totalBatchCountBytes, err := r.AWS.ReadBytesFromS3(r.ClusterConfig.Bucket, key)
584+
if err != nil {
585+
return 0, err
586+
}
578587

579-
maxBatchCount, err := strconv.Atoi(string(maxBatchCountBytes))
580-
if err != nil {
581-
return 0, err
588+
totalBatchCount, err = strconv.Atoi(string(totalBatchCountBytes))
589+
if err != nil {
590+
return 0, err
591+
}
592+
} else {
593+
totalBatchCount = cachedTotalBatchCount.(int)
582594
}
583595

584-
return maxBatchCount, nil
596+
totalBatchCountCache.Set(key, totalBatchCount, _cacheDuration)
597+
598+
return totalBatchCount, nil
585599
}
586600

587601
func getMetrics(r *BatchJobReconciler, batchJob batch.BatchJob) (metrics.BatchMetrics, error) {
@@ -611,14 +625,24 @@ func saveJobMetrics(r *BatchJobReconciler, batchJob batch.BatchJob) error {
611625
}
612626

613627
func saveJobStatus(r *BatchJobReconciler, batchJob batch.BatchJob) error {
614-
jobStatus := batchJob.Status.Status.String()
615-
key := filepath.Join(
616-
spec.JobAPIPrefix(r.ClusterConfig.ClusterUID, userconfig.BatchAPIKind, batchJob.Spec.APIName),
617-
batchJob.Name,
618-
jobStatus,
628+
return parallel.RunFirstErr(
629+
func() error {
630+
stoppedStatusKey := filepath.Join(
631+
spec.JobAPIPrefix(r.ClusterConfig.ClusterUID, userconfig.BatchAPIKind, batchJob.Spec.APIName),
632+
batchJob.Name,
633+
status.JobStopped.String(),
634+
)
635+
return r.AWS.UploadStringToS3("", r.ClusterConfig.Bucket, stoppedStatusKey)
636+
637+
},
638+
func() error {
639+
jobStatus := batchJob.Status.Status.String()
640+
key := filepath.Join(
641+
spec.JobAPIPrefix(r.ClusterConfig.ClusterUID, userconfig.BatchAPIKind, batchJob.Spec.APIName),
642+
batchJob.Name,
643+
jobStatus,
644+
)
645+
return r.AWS.UploadStringToS3("", r.ClusterConfig.Bucket, key)
646+
},
619647
)
620-
if err := r.AWS.UploadStringToS3("", r.ClusterConfig.Bucket, key); err != nil {
621-
return err
622-
}
623-
return nil
624648
}

pkg/crds/controllers/batch/suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ var _ = BeforeSuite(func(done Done) {
124124

125125
// mock certain methods of the reconciler
126126
reconcilerConfig := batchcontrollers.BatchJobReconcilerConfig{
127-
GetMaxBatchCount: func(r *batchcontrollers.BatchJobReconciler, batchJob batch.BatchJob) (int, error) {
127+
GetTotalBatchCount: func(r *batchcontrollers.BatchJobReconciler, batchJob batch.BatchJob) (int, error) {
128128
return 1, nil
129129
},
130130
GetMetrics: func(r *batchcontrollers.BatchJobReconciler, batchJob batch.BatchJob) (metrics.BatchMetrics, error) {

pkg/operator/resources/job/batchapi/api.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,10 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, batchJob
123123
batchAPIsMap := map[string]*schema.APIResponse{}
124124

125125
jobIDToBatchJobMap := map[string]*batch.BatchJob{}
126+
apiNameToBatchJobsMap := map[string][]*batch.BatchJob{}
126127
for i, batchJob := range batchJobList {
127128
jobIDToBatchJobMap[batchJob.Name] = &batchJobList[i]
129+
apiNameToBatchJobsMap[batchJob.Spec.APIName] = append(apiNameToBatchJobsMap[batchJob.Spec.APIName], &batchJobList[i])
128130
}
129131

130132
for _, virtualService := range virtualServices {
@@ -141,19 +143,33 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, batchJob
141143
return nil, err
142144
}
143145

144-
jobStates, err := job.GetMostRecentlySubmittedJobStates(apiName, 1, userconfig.BatchAPIKind)
145-
if err != nil {
146-
return nil, err
147-
}
148-
149146
var jobStatuses []status.BatchJobStatus
150-
if len(jobStates) > 0 {
151-
jobStatus, err := getJobStatusFromJobState(jobStates[0], jobIDToBatchJobMap[jobStates[0].ID])
147+
batchJobs := apiNameToBatchJobsMap[apiName]
148+
149+
if len(batchJobs) == 0 {
150+
jobStates, err := job.GetMostRecentlySubmittedJobStates(apiName, 1, userconfig.BatchAPIKind)
152151
if err != nil {
153152
return nil, err
154153
}
155154

156-
jobStatuses = append(jobStatuses, *jobStatus)
155+
if len(jobStates) > 0 {
156+
jobStatus, err := getJobStatusFromJobState(jobStates[0])
157+
if err != nil {
158+
return nil, err
159+
}
160+
161+
jobStatuses = append(jobStatuses, *jobStatus)
162+
}
163+
} else {
164+
for i := range batchJobs {
165+
batchJob := batchJobs[i]
166+
jobStatus, err := getJobStatusFromBatchJob(*batchJob)
167+
if err != nil {
168+
return nil, err
169+
}
170+
171+
jobStatuses = append(jobStatuses, *jobStatus)
172+
}
157173
}
158174

159175
batchAPIsMap[apiName] = &schema.APIResponse{
@@ -199,7 +215,7 @@ func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResp
199215
var jobStatuses []status.BatchJobStatus
200216
jobIDSet := strset.New()
201217
for _, batchJob := range batchJobList.Items {
202-
jobStatus, err := getJobStatusFromK8sBatchJob(batchJob)
218+
jobStatus, err := getJobStatusFromBatchJob(batchJob)
203219
if err != nil {
204220
return nil, err
205221
}
@@ -218,7 +234,7 @@ func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResp
218234
}
219235
jobIDSet.Add(jobState.ID)
220236

221-
jobStatus, err := getJobStatusFromJobState(jobState, nil)
237+
jobStatus, err := getJobStatusFromJobState(jobState)
222238
if err != nil {
223239
return nil, err
224240
}

0 commit comments

Comments
 (0)