Skip to content

Commit

Permalink
Merge pull request #7237 from Duke0404/injectorsplit
Browse files Browse the repository at this point in the history
Subdivide injector pod list processor logic
  • Loading branch information
k8s-ci-robot authored Sep 6, 2024
2 parents 33fac1f + 835b79b commit e629d0e
Showing 1 changed file with 72 additions and 37 deletions.
109 changes: 72 additions & 37 deletions cluster-autoscaler/processors/provreq/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
Expand All @@ -44,57 +45,91 @@ type ProvisioningRequestPodsInjector struct {
clock clock.PassiveClock
}

// Process pick one ProvisioningRequest, update Accepted condition and inject pods to unscheduled pods list.
func (p *ProvisioningRequestPodsInjector) Process(
_ *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod,
// IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently.
func (p *ProvisioningRequestPodsInjector) IsAvailableForProvisioning(pr *provreqwrapper.ProvisioningRequest) bool {
conditions := pr.Status.Conditions
if apimeta.IsStatusConditionTrue(conditions, v1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1.Provisioned) {
return false
}
provisioned := apimeta.FindStatusCondition(conditions, v1.Provisioned)
if provisioned != nil {
if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.clock.Now()) {
return true
}
return false
}
return true
}

// MarkAsAccepted marks the ProvisioningRequest as accepted.
func (p *ProvisioningRequestPodsInjector) MarkAsAccepted(pr *provreqwrapper.ProvisioningRequest) error {
provreqconditions.AddOrUpdateCondition(pr, v1.Accepted, metav1.ConditionTrue, provreqconditions.AcceptedReason, provreqconditions.AcceptedMsg, metav1.NewTime(p.clock.Now()))
if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil {
klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err)
return err
}
return nil
}

// MarkAsFailed marks the ProvisioningRequest as failed.
func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.ProvisioningRequest, reason string, message string) {
provreqconditions.AddOrUpdateCondition(pr, v1.Failed, metav1.ConditionTrue, reason, message, metav1.NewTime(p.clock.Now()))
if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil {
klog.Errorf("failed add Failed condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err)
}
}

// GetPodsFromNextRequest picks one ProvisioningRequest meeting the condition passed using isSupportedClass function, marks it as accepted and returns pods from it.
func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
isSupportedClass func(*provreqwrapper.ProvisioningRequest) bool,
) ([]*apiv1.Pod, error) {
provReqs, err := p.client.ProvisioningRequests()
if err != nil {
return nil, err
}

for _, pr := range provReqs {
if ok, found := provisioningrequest.SupportedProvisioningClasses[pr.Spec.ProvisioningClassName]; !ok || !found {
klog.Warningf("Provisioning Class %s is not supported", pr.Spec.ProvisioningClassName)
continue
}
conditions := pr.Status.Conditions
if apimeta.IsStatusConditionTrue(conditions, v1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1.Provisioned) {
if !isSupportedClass(pr) {
continue
}

provisioned := apimeta.FindStatusCondition(conditions, v1.Provisioned)

//TODO(yaroslava): support exponential backoff
// Inject pods if ProvReq wasn't scaled up before or it has Provisioned == False condition more than defaultRetryTime
inject := true
if provisioned != nil {
if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.clock.Now()) {
inject = true
} else {
inject = false
}
if !p.IsAvailableForProvisioning(pr) {
continue
}
if inject {
provreqpods, err := provreqpods.PodsForProvisioningRequest(pr)
if err != nil {
klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name)
provreqconditions.AddOrUpdateCondition(pr, v1.Failed, metav1.ConditionTrue, provreqconditions.FailedToCreatePodsReason, err.Error(), metav1.NewTime(p.clock.Now()))
if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil {
klog.Errorf("failed add Failed condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err)
}
continue
}
provreqconditions.AddOrUpdateCondition(pr, v1.Accepted, metav1.ConditionTrue, provreqconditions.AcceptedReason, provreqconditions.AcceptedMsg, metav1.NewTime(p.clock.Now()))
if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil {
klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err)
continue
}
unschedulablePods := append(unschedulablePods, provreqpods...)
return unschedulablePods, nil

podsFromProvReq, err := provreqpods.PodsForProvisioningRequest(pr)
if err != nil {
klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name)
p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error())
continue
}

if err := p.MarkAsAccepted(pr); err != nil {
continue
}
return podsFromProvReq, nil
}
return nil, nil
}

// Process pick one ProvisioningRequest, update Accepted condition and inject pods to unscheduled pods list.
func (p *ProvisioningRequestPodsInjector) Process(
_ *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod,
) ([]*apiv1.Pod, error) {
podsFromProvReq, err := p.GetPodsFromNextRequest(
func(pr *provreqwrapper.ProvisioningRequest) bool {
_, found := provisioningrequest.SupportedProvisioningClasses[pr.Spec.ProvisioningClassName]
return found
})

if err != nil {
return unschedulablePods, err
}
return unschedulablePods, nil

return append(unschedulablePods, podsFromProvReq...), nil
}

// CleanUp cleans up the processor's internal structures.
Expand Down

0 comments on commit e629d0e

Please sign in to comment.