Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial admission #771

Merged
merged 8 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@ type PodSet struct {
// count is the number of pods for the spec.
// +kubebuilder:validation:Minimum=1
Count int32 `json:"count"`

// minCount is the minimum number of pods for the spec acceptable
// if the workload supports partial admission.
//
// If not provided, partial admission for the current PodSet is not
// enabled.
//
// Only one podSet within the workload can use this.
//
// This is an alpha field and requires enabling PartialAdmission feature gate.
//
// +optional
MinCount *int32 `json:"minCount,omitempty"`
}

// WorkloadStatus defines the observed state of Workload
Expand Down
5 changes: 5 additions & 0 deletions apis/kueue/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ spec:
format: int32
minimum: 1
type: integer
minCount:
description: "minCount is the minimum number of pods for the
spec acceptable if the workload supports partial admission.
\n If not provided, partial admission for the current PodSet
is not enabled. \n Only one podSet within the workload can
use this. \n This is an alpha field and requires enabling
PartialAdmission feature gate."
format: int32
type: integer
name:
description: name is the PodSet name.
type: string
Expand Down
9 changes: 9 additions & 0 deletions config/components/crd/bases/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ spec:
format: int32
minimum: 1
type: integer
minCount:
description: "minCount is the minimum number of pods for the
spec acceptable if the workload supports partial admission.
\n If not provided, partial admission for the current PodSet
is not enabled. \n Only one podSet within the workload can
use this. \n This is an alpha field and requires enabling
PartialAdmission feature gate."
format: int32
type: integer
name:
description: name is the PodSet name.
type: string
Expand Down
8 changes: 6 additions & 2 deletions keps/420-partial-admission/kep.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ approvers:


# The target maturity stage in the current dev cycle for this KEP.
stage: stable
stage: alpha

# The most recent milestone for which work toward delivery of this KEP has been
# done. This can be the current (upcoming) milestone, if it is being actively
Expand All @@ -22,5 +22,9 @@ latest-milestone: "v0.4"

# The milestone at which this feature was, or is targeted to be, at each stage.
milestone:
stable: "v0.5"
alpha: "v0.4"
beta: "v0.5"
stable: "v0.6"

feature-gates:
- name: PartialAdmission
8 changes: 8 additions & 0 deletions pkg/controller/jobframework/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,13 @@ const (
// node selectors are recorded upon a workload admission. This information
// will be used to restore them when the job is suspended.
// The content is a json marshaled slice of selectors.
//
// DEPRECATED: Use OriginalPodSetsInfoAnnotation.
OriginalNodeSelectorsAnnotation = "kueue.x-k8s.io/original-node-selectors"

// OriginalPodSetsInfoAnnotation is the annotation in which the original
// node selectors and podSet counts are recorded upon a workload admission.
// This information will be used to restore them when the job is suspended.
// The content is a json marshaled slice of PodSetInfo.
OriginalPodSetsInfoAnnotation = "kueue.x-k8s.io/original-pod-sets-info"
trasc marked this conversation as resolved.
Show resolved Hide resolved
)
8 changes: 4 additions & 4 deletions pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ type GenericJob interface {
// ResetStatus will reset the job status to the original state.
// If true, status is modified, if not, status is as it was.
ResetStatus() bool
// RunWithNodeAffinity will inject the node affinity extracting from workload to job and unsuspend the job.
RunWithNodeAffinity(nodeSelectors []PodSetNodeSelector)
// RestoreNodeAffinity will restore the original node affinity of job.
RestoreNodeAffinity(nodeSelectors []PodSetNodeSelector)
// RunWithPodSetsInfo will inject the node affinity and podSet counts extracting from workload to job and unsuspend it.
RunWithPodSetsInfo(nodeSelectors []PodSetInfo)
// RestorePodSetsInfo will restore the original node affinity and podSet counts of the job.
RestorePodSetsInfo(nodeSelectors []PodSetInfo)
// Finished means whether the job is completed/failed or not,
// condition represents the workload finished condition.
Finished() (condition metav1.Condition, finished bool)
Expand Down
85 changes: 55 additions & 30 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package jobframework
import (
"context"
"encoding/json"
"errors"
"fmt"

corev1 "k8s.io/api/core/v1"
Expand All @@ -26,17 +27,20 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
utilpriority "sigs.k8s.io/kueue/pkg/util/priority"
"sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
)

var (
errNodeSelectorsNotFound = fmt.Errorf("annotation %s not found", OriginalNodeSelectorsAnnotation)
errPodSetsInfoNotFound = fmt.Errorf("annotation %s or %s not found", OriginalNodeSelectorsAnnotation, OriginalPodSetsInfoAnnotation)
errUnknownPodSetName = errors.New("unknown podSet name")
)

// JobReconciler reconciles a GenericJob object
Expand Down Expand Up @@ -332,17 +336,17 @@ func (r *JobReconciler) equivalentToWorkload(job GenericJob, object client.Objec

// startJob will unsuspend the job, and also inject the node affinity.
func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload) error {
//get the original selectors and store them in the job object
originalSelectors := r.getNodeSelectorsFromPodSets(wl)
if err := setNodeSelectorsInAnnotation(object, originalSelectors); err != nil {
return fmt.Errorf("startJob, record original node selectors: %w", err)
//get the original podSetsInfo and store them in the job object
originalPodSetsInfo := r.getPodSetsInfoFromSpec(wl)
if err := setNodeSelectorsInAnnotation(object, originalPodSetsInfo); err != nil {
return fmt.Errorf("startJob, record original podSetsInfo: %w", err)
}

nodeSelectors, err := r.getNodeSelectorsFromAdmission(ctx, wl)
info, err := r.getPodSetsInfoFromAdmission(ctx, wl)
if err != nil {
return err
}
job.RunWithNodeAffinity(nodeSelectors)
job.RunWithPodSetsInfo(info)

if err := r.client.Update(ctx, object); err != nil {
return err
Expand Down Expand Up @@ -372,12 +376,12 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, object clie
}
}

log.V(3).Info("restore node selectors from annotation")
selectors, err := getNodeSelectorsFromObjectAnnotation(object)
log.V(3).Info("restore podSetsInfo from annotation")
info, err := getPodSetsInfoFromObjectAnnotation(object, job)
if err != nil {
log.V(3).Error(err, "Unable to get original node selectors")
log.V(3).Error(err, "Unable to get original podSetsInfo")
} else {
job.RestoreNodeAffinity(selectors)
job.RestorePodSetsInfo(info)
return r.client.Update(ctx, object)
}

Expand Down Expand Up @@ -412,24 +416,26 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o
return wl, nil
}

type PodSetNodeSelector struct {
type PodSetInfo struct {
Name string `json:"name"`
NodeSelector map[string]string `json:"nodeSelector"`
Count int32 `json:"count"`
}

// getNodeSelectorsFromAdmission will extract node selectors from admitted workloads.
func (r *JobReconciler) getNodeSelectorsFromAdmission(ctx context.Context, w *kueue.Workload) ([]PodSetNodeSelector, error) {
// getPodSetsInfoFromAdmission will extract podSetsInfo and podSets count from admitted workloads.
func (r *JobReconciler) getPodSetsInfoFromAdmission(ctx context.Context, w *kueue.Workload) ([]PodSetInfo, error) {
if len(w.Status.Admission.PodSetAssignments) == 0 {
return nil, nil
}

nodeSelectors := make([]PodSetNodeSelector, len(w.Status.Admission.PodSetAssignments))
nodeSelectors := make([]PodSetInfo, len(w.Status.Admission.PodSetAssignments))

for i, podSetFlavor := range w.Status.Admission.PodSetAssignments {
processedFlvs := sets.NewString()
nodeSelector := PodSetNodeSelector{
nodeSelector := PodSetInfo{
Name: podSetFlavor.Name,
NodeSelector: make(map[string]string),
Count: pointer.Int32Deref(podSetFlavor.Count, w.Spec.PodSets[i].Count),
}
for _, flvRef := range podSetFlavor.Flavors {
flvName := string(flvRef)
Expand All @@ -452,18 +458,19 @@ func (r *JobReconciler) getNodeSelectorsFromAdmission(ctx context.Context, w *ku
return nodeSelectors, nil
}

// getNodeSelectorsFromPodSets will extract node selectors from a workload's podSets.
func (r *JobReconciler) getNodeSelectorsFromPodSets(w *kueue.Workload) []PodSetNodeSelector {
// getPodSetsInfoFromSpec will extract podSetsInfo and podSet's counts from a workload's spec.
func (r *JobReconciler) getPodSetsInfoFromSpec(w *kueue.Workload) []PodSetInfo {
podSets := w.Spec.PodSets
if len(podSets) == 0 {
return nil
}
ret := make([]PodSetNodeSelector, len(podSets))
ret := make([]PodSetInfo, len(podSets))
for psi := range podSets {
ps := &podSets[psi]
ret[psi] = PodSetNodeSelector{
ret[psi] = PodSetInfo{
Name: ps.Name,
NodeSelector: cloneNodeSelector(ps.Template.Spec.NodeSelector),
Count: ps.Count,
}
}
return ret
Expand Down Expand Up @@ -519,34 +526,52 @@ func cloneNodeSelector(src map[string]string) map[string]string {
return ret
}

// getNodeSelectorsFromObjectAnnotation tries to retrieve a node selectors slice from the
// getPodSetsInfoFromObjectAnnotation tries to retrieve a podSetsInfo slice from the
// object's annotations fails if it's not found or is unable to unmarshal
func getNodeSelectorsFromObjectAnnotation(obj client.Object) ([]PodSetNodeSelector, error) {
str, found := obj.GetAnnotations()[OriginalNodeSelectorsAnnotation]
func getPodSetsInfoFromObjectAnnotation(obj client.Object, job GenericJob) ([]PodSetInfo, error) {
trasc marked this conversation as resolved.
Show resolved Hide resolved
hasCounts := true
str, found := obj.GetAnnotations()[OriginalPodSetsInfoAnnotation]
if !found {
return nil, errNodeSelectorsNotFound
hasCounts = false
str, found = obj.GetAnnotations()[OriginalNodeSelectorsAnnotation]
if !found {
return nil, errPodSetsInfoNotFound
}
}
// unmarshal
ret := []PodSetNodeSelector{}
ret := []PodSetInfo{}
if err := json.Unmarshal([]byte(str), &ret); err != nil {
return nil, err
}

if !hasCounts {
podSets := job.PodSets()
psMap := slices.ToRefMap(podSets, func(ps *kueue.PodSet) string { return ps.Name })
for i := range ret {
info := &ret[i]
ps, found := psMap[info.Name]
if !found {
return nil, fmt.Errorf("%w: %s", errUnknownPodSetName, info.Name)
}
info.Count = ps.Count
}
}
return ret, nil
}

// setNodeSelectorsInAnnotation - sets an annotation containing the provided node selectors into
// setNodeSelectorsInAnnotation - sets an annotation containing the provided podSetsInfo into
// a job object, even if very unlikely it could return an error related to json.marshaling
func setNodeSelectorsInAnnotation(obj client.Object, nodeSelectors []PodSetNodeSelector) error {
nodeSelectorsBytes, err := json.Marshal(nodeSelectors)
func setNodeSelectorsInAnnotation(obj client.Object, info []PodSetInfo) error {
nodeSelectorsBytes, err := json.Marshal(info)
if err != nil {
return err
}

annotations := obj.GetAnnotations()
if annotations == nil {
annotations = map[string]string{OriginalNodeSelectorsAnnotation: string(nodeSelectorsBytes)}
annotations = map[string]string{OriginalPodSetsInfoAnnotation: string(nodeSelectorsBytes)}
} else {
annotations[OriginalNodeSelectorsAnnotation] = string(nodeSelectorsBytes)
annotations[OriginalPodSetsInfoAnnotation] = string(nodeSelectorsBytes)
}
obj.SetAnnotations(annotations)
return nil
Expand Down
19 changes: 18 additions & 1 deletion pkg/controller/jobframework/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (
queueNameLabelPath = labelsPath.Key(QueueLabel)

originalNodeSelectorsWorkloadKeyPath = annotationsPath.Key(OriginalNodeSelectorsAnnotation)
originalPodSetsInfosWorkloadKeyPath = annotationsPath.Key(OriginalPodSetsInfoAnnotation)
)

func ValidateCreateForQueueName(job GenericJob) field.ErrorList {
Expand Down Expand Up @@ -83,10 +84,26 @@ func ValidateUpdateForOriginalNodeSelectors(oldJob, newJob GenericJob) field.Err
allErrs = append(allErrs, field.Forbidden(originalNodeSelectorsWorkloadKeyPath, "this annotation is immutable while the job is not changing its suspended state"))
}
} else if av, found := newJob.Object().GetAnnotations()[OriginalNodeSelectorsAnnotation]; found {
out := []PodSetNodeSelector{}
out := []PodSetInfo{}
if err := json.Unmarshal([]byte(av), &out); err != nil {
allErrs = append(allErrs, field.Invalid(originalNodeSelectorsWorkloadKeyPath, av, err.Error()))
}
}
return allErrs
}

func ValidateUpdateForOriginalPodSetsInfo(oldJob, newJob GenericJob) field.ErrorList {
var allErrs field.ErrorList
if oldJob.IsSuspended() == newJob.IsSuspended() {
if errList := apivalidation.ValidateImmutableField(oldJob.Object().GetAnnotations()[OriginalPodSetsInfoAnnotation],
newJob.Object().GetAnnotations()[OriginalPodSetsInfoAnnotation], originalPodSetsInfosWorkloadKeyPath); len(errList) > 0 {
allErrs = append(allErrs, field.Forbidden(originalPodSetsInfosWorkloadKeyPath, "this annotation is immutable while the job is not changing its suspended state"))
}
} else if av, found := newJob.Object().GetAnnotations()[OriginalPodSetsInfoAnnotation]; found {
out := []PodSetInfo{}
if err := json.Unmarshal([]byte(av), &out); err != nil {
allErrs = append(allErrs, field.Invalid(originalPodSetsInfosWorkloadKeyPath, av, err.Error()))
}
}
return allErrs
}
Loading