Skip to content

Commit

Permalink
Add backoff mechanism for ProvReq retry (#7182)
Browse files Browse the repository at this point in the history
* Add backoff mechanism for ProvReq retry

* Add flags for intital and max backoff time, and cache size

* Review remarks

* Add LRU cache

* Review remark
  • Loading branch information
yaroslava-serdiuk authored Sep 23, 2024
1 parent a5e46eb commit 04b1402
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 26 deletions.
6 changes: 6 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ type AutoscalingOptions struct {
ProvisioningRequestEnabled bool
// AsyncNodeGroupsEnabled tells if CA creates/deletes node groups asynchronously.
AsyncNodeGroupsEnabled bool
// ProvisioningRequestInitialBackoffTime is the initial time for ProvisioningRequest be considered by CA after failed ScaleUp request.
ProvisioningRequestInitialBackoffTime time.Duration
// ProvisioningRequestMaxBackoffTime is the max time for ProvisioningRequest be considered by CA after failed ScaleUp request.
ProvisioningRequestMaxBackoffTime time.Duration
// ProvisioningRequestMaxCacheSize is the max size for ProvisioningRequest cache that is stored for retry backoff.
ProvisioningRequestMaxBackoffCacheSize int
}

// KubeClientOptions specify options for kube client
Expand Down
18 changes: 12 additions & 6 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,14 @@ var (
"--max-graceful-termination-sec flag should not be set when this flag is set. Not setting this flag will use unordered evictor by default."+
"Priority evictor reuses the concepts of drain logic in kubelet(https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2712-pod-priority-based-graceful-node-shutdown#migration-from-the-node-graceful-shutdown-feature)."+
"Eg. flag usage: '10000:20,1000:100,0:60'")
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed")
asyncNodeGroupsEnabled = flag.Bool("async-node-groups", false, "Whether clusterautoscaler creates and deletes node groups asynchronously. Experimental: requires cloud provider supporting async node group operations, enable at your own risk.")
proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false")
podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.")
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
provisioningRequestInitialBackoffTime = flag.Duration("provisioning-request-initial-backoff-time", 1*time.Minute, "Initial backoff time for ProvisioningRequest retry after failed ScaleUp.")
provisioningRequestMaxBackoffTime = flag.Duration("provisioning-request-max-backoff-time", 10*time.Minute, "Max backoff time for ProvisioningRequest retry after failed ScaleUp.")
provisioningRequestMaxBackoffCacheSize = flag.Int("provisioning-request-max-backoff-cache-size", 1000, "Max size for ProvisioningRequest cache size used for retry backoff mechanism.")
frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed")
asyncNodeGroupsEnabled = flag.Bool("async-node-groups", false, "Whether clusterautoscaler creates and deletes node groups asynchronously. Experimental: requires cloud provider supporting async node group operations, enable at your own risk.")
proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false")
podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -447,6 +450,9 @@ func createAutoscalingOptions() config.AutoscalingOptions {
BypassedSchedulers: scheduler_util.GetBypassedSchedulersMap(*bypassedSchedulers),
ProvisioningRequestEnabled: *provisioningRequestsEnabled,
AsyncNodeGroupsEnabled: *asyncNodeGroupsEnabled,
ProvisioningRequestInitialBackoffTime: *provisioningRequestInitialBackoffTime,
ProvisioningRequestMaxBackoffTime: *provisioningRequestMaxBackoffTime,
ProvisioningRequestMaxBackoffCacheSize: *provisioningRequestMaxBackoffCacheSize,
}
}

Expand Down Expand Up @@ -522,7 +528,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker)
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize)
if err != nil {
return nil, err
}
Expand Down
49 changes: 34 additions & 15 deletions cluster-autoscaler/processors/provreq/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
apiv1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
Expand All @@ -33,16 +33,16 @@ import (
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)

const (
defaultRetryTime = 10 * time.Minute
"k8s.io/utils/lru"
)

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
client *provreqclient.ProvisioningRequestClient
clock clock.PassiveClock
initialRetryTime time.Duration
maxBackoffTime time.Duration
backoffDuration *lru.Cache
clock clock.PassiveClock
client *provreqclient.ProvisioningRequestClient
}

// IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently.
Expand All @@ -53,7 +53,17 @@ func (p *ProvisioningRequestPodsInjector) IsAvailableForProvisioning(pr *provreq
}
provisioned := apimeta.FindStatusCondition(conditions, v1.Provisioned)
if provisioned != nil {
if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.clock.Now()) {
if provisioned.Status != metav1.ConditionFalse {
return false
}
val, found := p.backoffDuration.Get(key(pr))
retryTime, ok := val.(time.Duration)
if !found || !ok {
retryTime = p.initialRetryTime
}
if provisioned.LastTransitionTime.Add(retryTime).Before(p.clock.Now()) {
p.backoffDuration.Remove(key(pr))
p.backoffDuration.Add(key(pr), min(2*retryTime, p.maxBackoffTime))
return true
}
return false
Expand Down Expand Up @@ -87,29 +97,31 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
if err != nil {
return nil, err
}

for _, pr := range provReqs {
if !isSupportedClass(pr) {
continue
}
conditions := pr.Status.Conditions
if apimeta.IsStatusConditionTrue(conditions, v1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1.Provisioned) {
p.backoffDuration.Remove(key(pr))
continue
}

//TODO(yaroslava): support exponential backoff
// Inject pods if ProvReq wasn't scaled up before or it has Provisioned == False condition more than defaultRetryTime
if !p.IsAvailableForProvisioning(pr) {
continue
}

podsFromProvReq, err := provreqpods.PodsForProvisioningRequest(pr)
provreqpods, err := provreqpods.PodsForProvisioningRequest(pr)
if err != nil {
klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name)
p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error())
continue
}

if err := p.MarkAsAccepted(pr); err != nil {
continue
}
return podsFromProvReq, nil
return provreqpods, nil
}
return nil, nil
}
Expand All @@ -122,6 +134,9 @@ func (p *ProvisioningRequestPodsInjector) Process(
podsFromProvReq, err := p.GetPodsFromNextRequest(
func(pr *provreqwrapper.ProvisioningRequest) bool {
_, found := provisioningrequest.SupportedProvisioningClasses[pr.Spec.ProvisioningClassName]
if !found {
klog.Warningf("Provisioning Class %s is not supported for ProvReq %s/%s", pr.Spec.ProvisioningClassName, pr.Namespace, pr.Name)
}
return found
})

Expand All @@ -136,10 +151,14 @@ func (p *ProvisioningRequestPodsInjector) Process(
func (p *ProvisioningRequestPodsInjector) CleanUp() {}

// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor.
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListProcessor, error) {
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int) (pods.PodListProcessor, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil
return &ProvisioningRequestPodsInjector{initialRetryTime: initialBackoffTime, maxBackoffTime: maxBackoffTime, backoffDuration: lru.New(maxCacheSize), client: client, clock: clock.RealClock{}}, nil
}

func key(pr *provreqwrapper.ProvisioningRequest) string {
return string(pr.UID)
}
17 changes: 12 additions & 5 deletions cluster-autoscaler/processors/provreq/injector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import (

apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
clock "k8s.io/utils/clock/testing"
"k8s.io/utils/lru"
)

func TestProvisioningRequestPodsInjector(t *testing.T) {
now := time.Now()
minAgo := now.Add(-1 * time.Minute)
minAgo := now.Add(-1 * time.Minute).Add(-1 * time.Second)
hourAgo := now.Add(-1 * time.Hour)

accepted := metav1.Condition{
Expand Down Expand Up @@ -104,11 +105,15 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
},
{
name: "Provisioned=True, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{provisionedAcceptedProvReqB, failedProvReq, notProvisionedRecentlyProvReqB},
provReqs: []*provreqwrapper.ProvisioningRequest{provisionedAcceptedProvReqB, failedProvReq},
},
{
name: "Provisioned=False, ProvReq is backed off, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{notProvisionedRecentlyProvReqB},
},
{
name: "Provisioned=Unknown, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{unknownProvisionedProvReqB, failedProvReq, notProvisionedRecentlyProvReqB},
provReqs: []*provreqwrapper.ProvisioningRequest{unknownProvisionedProvReqB, failedProvReq},
},
{
name: "ProvisionedClass is unknown, no pods are injected",
Expand All @@ -124,7 +129,9 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
}
for _, tc := range testCases {
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now)}
backoffTime := lru.New(100)
backoffTime.Add(key(notProvisionedRecentlyProvReqB), 2*time.Minute)
injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client}
getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount))
if err != nil {
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)
Expand Down

0 comments on commit 04b1402

Please sign in to comment.