Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayJob] implement deletion policy API #2643

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

andrewsykim
Copy link
Collaborator

Why are these changes needed?

Implement RayJob DeletionPolicy API

Related issue number

#2615

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

@andrewsykim andrewsykim force-pushed the rayjob-delete-policy branch 2 times, most recently from 08bdbfb to b2d43be Compare December 15, 2024 04:53
@andrewsykim andrewsykim changed the title [WIP][RayJob] implement deletion policy API [RayJob] implement deletion policy API Dec 15, 2024
Copy link
Member

@MortalHappiness MortalHappiness left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you resolve the conflicts? Also, should we add some tests for this feature?

ray-operator/controllers/ray/rayjob_controller.go Outdated Show resolved Hide resolved
@andrewsykim
Copy link
Collaborator Author

Fixed conflicts, will add tests tomorrow

@andrewsykim andrewsykim force-pushed the rayjob-delete-policy branch 2 times, most recently from 30adbd6 to 33747ec Compare December 17, 2024 00:31
@andrewsykim
Copy link
Collaborator Author

Added unit tests, going to skip e2e tests for now since it's currently not trivial to enable feature gates in the e2e tests

Copy link
Member

@MortalHappiness MortalHappiness left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

rayJobInstance.Spec.DeletionPolicy != nil &&
*rayJobInstance.Spec.DeletionPolicy != rayv1.DeleteNoneDeletionPolicy &&
len(rayJobInstance.Spec.ClusterSelector) == 0 {
logger.Info(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move

			logger.Info(
				"RayJob deployment status",
				"jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
				"deletionPolicy", rayJobInstance.Spec.DeletionPolicy,
				"ttlSecondsAfterFinished", ttlSeconds,
				"Status.endTime", rayJobInstance.Status.EndTime,
				"Now", nowTime,
				"ShutdownTime", shutdownTime)
			if shutdownTime.After(nowTime) {
				delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
				logger.Info("shutdownTime not reached, requeue this RayJob for n seconds", "seconds", delta)
				return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
			}

to the above of if features.Enabled(features.RayJobDeletionPolicy) && and remove the similar logics from L391 to L403.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conditions are split here because the keys passed into logger.Info are different (shutdownAfterJobFinishes vs deletePolicy)

@@ -617,6 +655,31 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns
return isClusterDeleted, nil
}

func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to return bool?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this is an oversight

@@ -617,6 +655,31 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns
return isClusterDeleted, nil
}

func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function may not work when autoscaling is enabled. If autoscaling is enabled, Pod deletion is always determined by the Ray Autoscaler. KubeRay will not delete any Pods, even if the number of Pods exceeds the goal state.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, but autoscaling with RayJob is pretty uncommon though right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to fix this is to also set max replicas to 0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but autoscaling with RayJob is pretty uncommon though right?

I checked with my colleagues, and this may be incorrect. Autoscaling is not very common for Ray Train. However, it is commonly used for Ray Data, Ray Tune, and RLlib.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most Ray Data users use autoscaling.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mean the Ray API, I mean autoscaling is not common when using the RayJob custom resource. I am sure Ray Data with RayCluster + autoscaling is very common

@@ -617,6 +655,31 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns
return isClusterDeleted, nil
}

func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically, a K8s controller should only write to the CR status and treat the CR spec as read-only, but implementing this feature without writing to the CR spec is challenging for us.

Perhaps a compromise solution is to add a new field to the RayCluster CRD (e.g., suspendWorkers: bool), where the RayJob controller only sets this field to true, and the RayCluster is responsible for deleting all Ray worker Pods.

This way, the RayJob controller doesn't need to modify replicas and minReplicas, which can also be modified by the Ray Autoscaler or users. Allowing multiple stakeholders to modify a field is typically the root cause of KubeRay's instability issues.

Copy link
Collaborator Author

@andrewsykim andrewsykim Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Controllers writing to spec is not necessarily bad, but I see what you mean. I think it would be wrong to write to RayJob spec from RayJob controller, but in this case we're writing to RayCluster spec from RayJob controller. I feel that updating replcias, minReplicas and maxReplicas for ephemeral RayCluster specifcally is actually fine because we don't care about the RayCluster spec once the cluster is deleted.

Will think about this more and get back to you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main concern is that multiple personas can modify these fields, such as users, the Autoscaler, and the RayJob controller.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a compromise solution is to add a new field to the RayCluster CRD (e.g., suspendWorkers: bool), where the RayJob controller only sets this field to true, and the RayCluster is responsible for deleting all Ray worker Pods.

@kevin85421 how about a suspend field per worker group in WorkerGroupSpec? This allows for granularity of suspension per worker, and from RayJob we can just set suspend: true for all worker groups

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the draft PR #2663

Let me know what you think, I will clean up the PR and add tests if the API looks good to you

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API looks good to me.

Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2643 (comment)

Maybe we can split this issue into 3 PRs if the comment makes sense to you?

  1. Add a new field and feature flag in RayJob.
  2. Add a new field in RayCluster CRD to terminate all worker Pods.
  3. Implement the deletion policy API based on (2)

@andrewsykim andrewsykim force-pushed the rayjob-delete-policy branch 2 times, most recently from cb3980c to dbf7cef Compare December 30, 2024 14:45
@andrewsykim
Copy link
Collaborator Author

Updated to use the new worker group suspend API

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>
@kevin85421
Copy link
Member

I will review the PR tomorrow.

@@ -95,6 +104,12 @@ type RayJobSpec struct {
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="the managedBy field is immutable"
// +kubebuilder:validation:XValidation:rule="self in ['ray.io/kuberay-operator', 'kueue.x-k8s.io/multikueue']",message="the managedBy field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'"
ManagedBy *string `json:"managedBy,omitempty"`
// deletionPolicy indicates what resources of the RayJob are deleted upon job completion.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
// deletionPolicy indicates what resources of the RayJob are deleted upon job completion.
// DeletionPolicy indicates what resources of the RayJob are deleted upon job completion.

return err
}

logger.Info("All worker groups for RayCluster has been scaled to 0", "RayCluster", clusterIdentifier)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.Info("All worker groups for RayCluster has been scaled to 0", "RayCluster", clusterIdentifier)
logger.Info("All worker groups for RayCluster have had `suspend` set to true", "RayCluster", clusterIdentifier)

}

logger.Info("All worker groups for RayCluster has been scaled to 0", "RayCluster", clusterIdentifier)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Updated cluster %s/%s", cluster.Namespace, cluster.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Updated cluster %s/%s", cluster.Namespace, cluster.Name)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Set the `suspend` field to true for all worker groups in cluster %s/%s", cluster.Namespace, cluster.Name)

ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished
nowTime := time.Now()
shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe move the check before if features.Enabled(features.RayJobDeletionPolicy) ... to simplify the code?

	if shutdownTime.After(nowTime) {
		delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
		logger.Info("shutdownTime not reached; requeue this RayJob.", "requeue seconds", delta)
		return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
	}

@@ -347,10 +348,46 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
case rayv1.JobDeploymentStatusComplete, rayv1.JobDeploymentStatusFailed:
// If this RayJob uses an existing RayCluster (i.e., ClusterSelector is set), we should not delete the RayCluster.
logger.Info(string(rayJobInstance.Status.JobDeploymentStatus), "RayJob", rayJobInstance.Name, "ShutdownAfterJobFinishes", rayJobInstance.Spec.ShutdownAfterJobFinishes, "ClusterSelector", rayJobInstance.Spec.ClusterSelector)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the log message below and add the following 4 keys:

  • ttlSecondsAfterFinished
  • Status.endTime
  • Now
  • ShutdownTime

Then, remove these 4 keys from the following log messages.

DeleteClusterDeletionPolicy DeletionPolicy = "DeleteCluster"
DeleteWorkersDeletionPolicy DeletionPolicy = "DeleteWorkers"
DeleteSelfDeletionPolicy DeletionPolicy = "DeleteSelf"
DeleteNoneDeletionPolicy DeletionPolicy = "DeleteNone"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases, users may want to have different delete policies for Ray jobs that succeeded and Ray jobs that failed. For example, DeleteCluster if job succeeded and DeleteWorkers if job failed.

Do we plan to support these cases? We don't need to include these policies in this PR, but I am wondering what the APIs would look like.

type DeletionPolicy string

const (
DeleteClusterDeletionPolicy DeletionPolicy = "DeleteCluster"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding some comments to explain these policies?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants