diff --git a/cluster-autoscaler/processors/provreq/injector.go b/cluster-autoscaler/processors/provreq/injector.go index c628fedf1252..538563d24cbe 100644 --- a/cluster-autoscaler/processors/provreq/injector.go +++ b/cluster-autoscaler/processors/provreq/injector.go @@ -29,6 +29,7 @@ import ( provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" "k8s.io/client-go/rest" "k8s.io/klog/v2" "k8s.io/utils/clock" @@ -44,57 +45,91 @@ type ProvisioningRequestPodsInjector struct { clock clock.PassiveClock } -// Process pick one ProvisioningRequest, update Accepted condition and inject pods to unscheduled pods list. -func (p *ProvisioningRequestPodsInjector) Process( - _ *context.AutoscalingContext, - unschedulablePods []*apiv1.Pod, +// IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently. +func (p *ProvisioningRequestPodsInjector) IsAvailableForProvisioning(pr *provreqwrapper.ProvisioningRequest) bool { + conditions := pr.Status.Conditions + if apimeta.IsStatusConditionTrue(conditions, v1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1.Provisioned) { + return false + } + provisioned := apimeta.FindStatusCondition(conditions, v1.Provisioned) + if provisioned != nil { + if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.clock.Now()) { + return true + } + return false + } + return true +} + +// MarkAsAccepted marks the ProvisioningRequest as accepted. +func (p *ProvisioningRequestPodsInjector) MarkAsAccepted(pr *provreqwrapper.ProvisioningRequest) error { + provreqconditions.AddOrUpdateCondition(pr, v1.Accepted, metav1.ConditionTrue, provreqconditions.AcceptedReason, provreqconditions.AcceptedMsg, metav1.NewTime(p.clock.Now())) + if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil { + klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err) + return err + } + return nil +} + +// MarkAsFailed marks the ProvisioningRequest as failed. +func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.ProvisioningRequest, reason string, message string) { + provreqconditions.AddOrUpdateCondition(pr, v1.Failed, metav1.ConditionTrue, reason, message, metav1.NewTime(p.clock.Now())) + if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil { + klog.Errorf("failed add Failed condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err) + } +} + +// GetPodsFromNextRequest picks one ProvisioningRequest meeting the condition passed using isSupportedClass function, marks it as accepted and returns pods from it. +func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest( + isSupportedClass func(*provreqwrapper.ProvisioningRequest) bool, ) ([]*apiv1.Pod, error) { provReqs, err := p.client.ProvisioningRequests() if err != nil { return nil, err } + for _, pr := range provReqs { - if ok, found := provisioningrequest.SupportedProvisioningClasses[pr.Spec.ProvisioningClassName]; !ok || !found { - klog.Warningf("Provisioning Class %s is not supported", pr.Spec.ProvisioningClassName) - continue - } - conditions := pr.Status.Conditions - if apimeta.IsStatusConditionTrue(conditions, v1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1.Provisioned) { + if !isSupportedClass(pr) { continue } - provisioned := apimeta.FindStatusCondition(conditions, v1.Provisioned) - //TODO(yaroslava): support exponential backoff // Inject pods if ProvReq wasn't scaled up before or it has Provisioned == False condition more than defaultRetryTime - inject := true - if provisioned != nil { - if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.clock.Now()) { - inject = true - } else { - inject = false - } + if !p.IsAvailableForProvisioning(pr) { + continue } - if inject { - provreqpods, err := provreqpods.PodsForProvisioningRequest(pr) - if err != nil { - klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name) - provreqconditions.AddOrUpdateCondition(pr, v1.Failed, metav1.ConditionTrue, provreqconditions.FailedToCreatePodsReason, err.Error(), metav1.NewTime(p.clock.Now())) - if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil { - klog.Errorf("failed add Failed condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err) - } - continue - } - provreqconditions.AddOrUpdateCondition(pr, v1.Accepted, metav1.ConditionTrue, provreqconditions.AcceptedReason, provreqconditions.AcceptedMsg, metav1.NewTime(p.clock.Now())) - if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil { - klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err) - continue - } - unschedulablePods := append(unschedulablePods, provreqpods...) - return unschedulablePods, nil + + podsFromProvReq, err := provreqpods.PodsForProvisioningRequest(pr) + if err != nil { + klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name) + p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error()) + continue } + + if err := p.MarkAsAccepted(pr); err != nil { + continue + } + return podsFromProvReq, nil + } + return nil, nil +} + +// Process pick one ProvisioningRequest, update Accepted condition and inject pods to unscheduled pods list. +func (p *ProvisioningRequestPodsInjector) Process( + _ *context.AutoscalingContext, + unschedulablePods []*apiv1.Pod, +) ([]*apiv1.Pod, error) { + podsFromProvReq, err := p.GetPodsFromNextRequest( + func(pr *provreqwrapper.ProvisioningRequest) bool { + _, found := provisioningrequest.SupportedProvisioningClasses[pr.Spec.ProvisioningClassName] + return found + }) + + if err != nil { + return unschedulablePods, err } - return unschedulablePods, nil + + return append(unschedulablePods, podsFromProvReq...), nil } // CleanUp cleans up the processor's internal structures.