Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}

var finishedAt *time.Time
if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode || rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode {
var shouldUpdate bool
shouldUpdate, finishedAt, err = r.checkSubmitterAndUpdateStatusIfNeeded(ctx, rayJobInstance)
if err != nil {
Expand All @@ -288,6 +288,23 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}
}

if rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
// check head pod
headPod, err := common.GetRayClusterHeadPod(ctx, r.Client, rayClusterInstance)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
if headPod == nil {
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
rayJobInstance.Status.Reason = rayv1.AppFailed
rayJobInstance.Status.Message = "Ray head pod not found."
break
}

// Get finishedAt for timeout fallback when dashboard is unreachable
finishedAt = getSubmitterContainerFinishedTime(headPod)
}

// Check the current status of ray jobs
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
if err != nil {
Expand Down Expand Up @@ -318,6 +335,13 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
break
}
}
// If in Sidecar mode, apply timeout as fallback if dashboard is unreachable and submitter has finished
if rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
if checkSubmitterFinishedTimeoutAndUpdateStatusIfNeeded(ctx, rayJobInstance, finishedAt) {
// rayJobInstance.Status marked to Failed
break
}
}

logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
Expand All @@ -335,6 +359,12 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
isJobTerminal = isJobTerminal && finishedAt != nil
}

// inform the user when submitter exited but job is still running.
if rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode && finishedAt != nil && !isJobTerminal {
logger.Info("Submitter container exited but Ray job is still running.",
"JobId", rayJobInstance.Status.JobId, "JobStatus", jobInfo.JobStatus)
}

if isJobTerminal {
jobDeploymentStatus = rayv1.JobDeploymentStatusComplete
if jobInfo.JobStatus == rayv1.JobStatusFailed {
Expand Down
Loading