Skip to content

Determine job status based on k8s job and fix pointer bug #1917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 26 additions & 31 deletions pkg/operator/resources/job/batchapi/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ func ManageJobResources() error {
return err
}

k8sJobMap := map[string]*kbatch.Job{}
k8sJobMap := map[string]kbatch.Job{}
k8sJobIDSet := strset.Set{}
for _, kJob := range jobs {
k8sJobMap[kJob.Labels["jobID"]] = &kJob
k8sJobMap[kJob.Labels["jobID"]] = kJob
k8sJobIDSet.Add(kJob.Labels["jobID"])
}

Expand All @@ -103,7 +103,7 @@ func ManageJobResources() error {
queueURL = pointer.String(queueURLMap[jobKey.ID])
}

k8sJob := k8sJobMap[jobKey.ID]
k8sJob, jobFound := k8sJobMap[jobKey.ID]

jobLogger, err := operator.GetJobLogger(jobKey)
if err != nil {
Expand Down Expand Up @@ -135,7 +135,7 @@ func ManageJobResources() error {
continue
}

newStatusCode, msg, err := reconcileInProgressJob(jobState, queueURL, k8sJob)
newStatusCode, msg, err := reconcileInProgressJob(jobState, queueURL, jobFound)
if err != nil {
telemetry.Error(err)
operatorLogger.Error(err)
Expand All @@ -150,7 +150,7 @@ func ManageJobResources() error {
continue
}
}
if queueURL == nil || k8sJob == nil {
if queueURL == nil {
// job has been submitted within the grace period, it may take a while for a newly created queues and jobs to show up in list results
continue
}
Expand Down Expand Up @@ -249,7 +249,7 @@ func ManageJobResources() error {
}

// verifies that queue exists for an in progress job and k8s job exists for a job in running status, if verification fails return the a job code to reflect the state
func reconcileInProgressJob(jobState *job.State, queueURL *string, k8sJob *kbatch.Job) (status.JobCode, string, error) {
func reconcileInProgressJob(jobState *job.State, queueURL *string, jobFound bool) (status.JobCode, string, error) {
jobKey := jobState.JobKey

if queueURL == nil {
Expand All @@ -275,45 +275,49 @@ func reconcileInProgressJob(jobState *job.State, queueURL *string, k8sJob *kbatc
return jobState.Status, "", nil
}

if k8sJob == nil { // unexpected k8s job missing
if !jobFound { // unexpected k8s job missing
return status.JobUnexpectedError, fmt.Sprintf("terminating job %s; unable to find kubernetes job", jobKey.UserString()), nil
}
}

return jobState.Status, "", nil
}

func checkIfJobCompleted(jobState *job.State, queueURL string, k8sJob *kbatch.Job) error {
func checkIfJobCompleted(jobState *job.State, queueURL string, k8sJob kbatch.Job) error {
jobKey := jobState.JobKey

jobFailed, err := checkForJobFailure(jobKey, k8sJob)
if err != nil || jobFailed {
return err
}

queueMessages, err := getQueueMetricsFromURL(queueURL)
jobLogger, err := operator.GetJobLogger(jobKey)
if err != nil {
return err
}

jobLogger, err := operator.GetJobLogger(jobKey)
// job is still in-progress
if int(k8sJob.Status.Active) != 0 {
return nil
}

queueMessages, err := getQueueMetricsFromURL(queueURL)
if err != nil {
return err
}

if !queueMessages.IsEmpty() {
// Give time for queue metrics to reach consistency
if k8sJob != nil && int(k8sJob.Status.Active) == 0 {
if _jobsToDelete.Has(jobKey.ID) {
_jobsToDelete.Remove(jobKey.ID)
jobLogger.Error("unexpected job status because cluster state indicates job has completed but metrics indicate that job is still in progress")
return errors.FirstError(
job.SetUnexpectedErrorStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}
_jobsToDelete.Add(jobKey.ID)
if _jobsToDelete.Has(jobKey.ID) {
_jobsToDelete.Remove(jobKey.ID)
jobLogger.Error("unexpected job status because cluster state indicates job has completed but metrics indicate that job is still in progress")
return errors.FirstError(
job.SetUnexpectedErrorStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}
_jobsToDelete.Add(jobKey.ID)

return nil
}

Expand Down Expand Up @@ -356,7 +360,7 @@ func checkIfJobCompleted(jobState *job.State, queueURL string, k8sJob *kbatch.Jo
return nil
}

func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) {
func checkForJobFailure(jobKey spec.JobKey, k8sJob kbatch.Job) (bool, error) {
jobLogger, err := operator.GetJobLogger(jobKey)
if err != nil {
return false, err
Expand All @@ -372,7 +376,7 @@ func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) {
deleteJobRuntimeResources(jobKey),
)
}
if k8sJob != nil && int(k8sJob.Status.Failed) > 0 {
if int(k8sJob.Status.Failed) > 0 {
podStatus := k8s.GetPodStatus(&pod)
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.LastTerminationState.Terminated != nil {
Expand All @@ -394,9 +398,6 @@ func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) {
}
}

if k8sJob == nil {
return false, nil
}
if int(k8sJob.Status.Failed) > 0 {
if !reasonFound {
jobLogger.Error("workers were killed for unknown reason")
Expand All @@ -405,12 +406,6 @@ func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) {
job.SetWorkerErrorStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
} else if int(k8sJob.Status.Active) == 0 && int(k8sJob.Status.Failed) == 0 && len(pods) == 0 {
// really unexpected situation which doesn't hurt if we check
return true, errors.FirstError(
job.SetUnexpectedErrorStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}

return false, nil
Expand Down
23 changes: 7 additions & 16 deletions pkg/operator/resources/job/taskapi/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ func ManageJobResources() error {
return err
}

k8sJobMap := map[string]*kbatch.Job{}
k8sJobMap := map[string]kbatch.Job{}
k8sJobIDSet := strset.Set{}
for _, kJob := range jobs {
k8sJobMap[kJob.Labels["jobID"]] = &kJob
k8sJobMap[kJob.Labels["jobID"]] = kJob
k8sJobIDSet.Add(kJob.Labels["jobID"])
}

Expand All @@ -87,7 +87,7 @@ func ManageJobResources() error {
continue
}

k8sJob := k8sJobMap[jobKey.ID]
k8sJob, jobFound := k8sJobMap[jobKey.ID]

jobState, err := job.GetJobState(jobKey)
if err != nil {
Expand All @@ -112,7 +112,7 @@ func ManageJobResources() error {
}

// reconcile job state and k8s job
newStatusCode, msg, err := reconcileInProgressJob(jobState, k8sJob)
newStatusCode, msg, err := reconcileInProgressJob(jobState, jobFound)
if err != nil {
telemetry.Error(err)
operatorLogger.Error(err)
Expand Down Expand Up @@ -187,21 +187,21 @@ func ManageJobResources() error {
}

// verifies k8s job exists for a job in running status, if verification fails return a job code to reflect the state
func reconcileInProgressJob(jobState *job.State, k8sJob *kbatch.Job) (status.JobCode, string, error) {
func reconcileInProgressJob(jobState *job.State, jobFound bool) (status.JobCode, string, error) {
if jobState.Status == status.JobRunning {
if time.Now().Sub(jobState.LastUpdatedMap[status.JobRunning.String()]) <= _k8sJobExistenceGracePeriod {
return jobState.Status, "", nil
}

if k8sJob == nil { // unexpected k8s job missing
if !jobFound { // unexpected k8s job missing
return status.JobUnexpectedError, fmt.Sprintf("terminating job %s; unable to find kubernetes job", jobState.JobKey.UserString()), nil
}
}

return jobState.Status, "", nil
}

func checkIfJobCompleted(jobKey spec.JobKey, k8sJob *kbatch.Job) error {
func checkIfJobCompleted(jobKey spec.JobKey, k8sJob kbatch.Job) error {
pods, _ := config.K8s.ListPodsByLabel("jobID", jobKey.ID)
for _, pod := range pods {
if k8s.WasPodOOMKilled(&pod) {
Expand All @@ -212,9 +212,6 @@ func checkIfJobCompleted(jobKey spec.JobKey, k8sJob *kbatch.Job) error {
}
}

if k8sJob == nil {
return nil
}
if int(k8sJob.Status.Failed) == 1 {
return errors.FirstError(
job.SetWorkerErrorStatus(jobKey),
Expand All @@ -225,12 +222,6 @@ func checkIfJobCompleted(jobKey spec.JobKey, k8sJob *kbatch.Job) error {
job.SetSucceededStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
} else if int(k8sJob.Status.Active) == 0 && int(k8sJob.Status.Failed) == 0 && len(pods) == 0 {
// really unexpected situation which doesn't hurt if we check
return errors.FirstError(
job.SetUnexpectedErrorStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}

return nil
Expand Down