Skip to content
This repository was archived by the owner on Sep 19, 2022. It is now read-only.

fix the reconcile flow #242

Merged
merged 1 commit into from
Dec 18, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 64 additions & 50 deletions pkg/controller.v1/pytorch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,36 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *pyv1.PyTorchJob) error {
return err
}

// If the PyTorchJob is terminated, delete all pods and services.
if isSucceeded(job.Status) || isFailed(job.Status) {
if err := pc.deletePodsAndServices(job, pods,services); err != nil {
return err
}

if err := pc.cleanupPyTorchJob(job); err != nil {
return err
}

if pc.Config.EnableGangScheduling {
if err := pc.DeletePodGroup(job); err != nil {
return err
}
}

// At this point the pods may have been deleted, so if the job succeeded, we need to manually set the replica status.
// If any replicas are still Active, set their status to succeeded.
if isSucceeded(job.Status) {
for rtype := range job.Status.ReplicaStatuses {
job.Status.ReplicaStatuses[rtype].Succeeded += job.Status.ReplicaStatuses[rtype].Active
job.Status.ReplicaStatuses[rtype].Active = 0
}
}
if !apiequality.Semantic.DeepEqual(*oldStatus, job.Status) {
return pc.updateStatusHandler(job)
}
return nil
}

// retrieve the previous number of retry
previousRetry := pc.WorkQueue.NumRequeues(jobKey)

Expand Down Expand Up @@ -393,25 +423,11 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *pyv1.PyTorchJob) error {
jobExceedsLimit = true
}

// If the PyTorchJob is terminated, delete all pods and services.
if isSucceeded(job.Status) || isFailed(job.Status) || jobExceedsLimit {
if jobExceedsLimit {
if err := pc.deletePodsAndServices(job, pods,services); err != nil {
return err
}

if jobExceedsLimit {
pc.Recorder.Event(job, v1.EventTypeNormal, pytorchJobFailedReason, failureMessage)
if job.Status.CompletionTime == nil {
now := metav1.Now()
job.Status.CompletionTime = &now
}
err := updatePyTorchJobConditions(job, common.JobFailed, pytorchJobFailedReason, failureMessage)
if err != nil {
logger.Infof("Append pytorchjob condition error: %v", err)
return err
}
}

if err := pc.cleanupPyTorchJob(job); err != nil {
return err
}
Expand All @@ -422,48 +438,46 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *pyv1.PyTorchJob) error {
}
}

// At this point the pods may have been deleted, so if the job succeeded, we need to manually set the replica status.
// If any replicas are still Active, set their status to succeeded.
if isSucceeded(job.Status) {
for rtype := range job.Status.ReplicaStatuses {
job.Status.ReplicaStatuses[rtype].Succeeded += job.Status.ReplicaStatuses[rtype].Active
job.Status.ReplicaStatuses[rtype].Active = 0
}
}
if !apiequality.Semantic.DeepEqual(*oldStatus, job.Status) {
return pc.updateStatusHandler(job)
pc.Recorder.Event(job, v1.EventTypeNormal, pytorchJobFailedReason, failureMessage)
if job.Status.CompletionTime == nil {
now := metav1.Now()
job.Status.CompletionTime = &now
}
return nil
}

if pc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(job)
_, err := pc.SyncPodGroup(job, minAvailableReplicas)
err := updatePyTorchJobConditions(job, common.JobFailed, pytorchJobFailedReason, failureMessage)
if err != nil {
logger.Warnf("Sync PodGroup %v: %v", job.Name, err)
logger.Infof("Append pytorchjob condition error: %v", err)
return err
}
} else {
if pc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(job)
_, err := pc.SyncPodGroup(job, minAvailableReplicas)
if err != nil {
logger.Warnf("Sync PodGroup %v: %v", job.Name, err)
}
}
}

// Save the current state of the replicas
replicasStatus := make(map[string]v1.PodPhase)
// Save the current state of the replicas
replicasStatus := make(map[string]v1.PodPhase)

// Diff current active pods/services with replicas.
for rtype, spec := range job.Spec.PyTorchReplicaSpecs {
err = pc.reconcilePods(job, pods, rtype, spec, replicasStatus)
if err != nil {
logger.Warnf("reconcilePods error %v", err)
return err
}
// Diff current active pods/services with replicas.
for rtype, spec := range job.Spec.PyTorchReplicaSpecs {
err = pc.reconcilePods(job, pods, rtype, spec, replicasStatus)
if err != nil {
logger.Warnf("reconcilePods error %v", err)
return err
}

// Service is in need only for Master
if rtype != pyv1.PyTorchReplicaTypeMaster {
continue
}
err = pc.reconcileServices(job, services, rtype, spec)
// Service is in need only for Master
if rtype != pyv1.PyTorchReplicaTypeMaster {
continue
}
err = pc.reconcileServices(job, services, rtype, spec)

if err != nil {
logger.Warnf("reconcileServices error %v", err)
return err
if err != nil {
logger.Warnf("reconcileServices error %v", err)
return err
}
}
}

Expand Down