Skip to content

Commit

Permalink
Merge pull request karmada-io#4072 from ctripcloud/add-replicas-syncer
Browse files Browse the repository at this point in the history
add replicas syncer for resources with HPA
  • Loading branch information
karmada-bot authored Sep 21, 2023
2 parents 0135ebf + 5ae8178 commit 68f4ac7
Show file tree
Hide file tree
Showing 5 changed files with 579 additions and 1 deletion.
25 changes: 24 additions & 1 deletion cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/scale"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -44,6 +45,7 @@ import (
metricsclient "github.com/karmada-io/karmada/pkg/controllers/federatedhpa/metrics"
"github.com/karmada-io/karmada/pkg/controllers/federatedresourcequota"
"github.com/karmada-io/karmada/pkg/controllers/gracefuleviction"
"github.com/karmada-io/karmada/pkg/controllers/hpareplicassyncer"
"github.com/karmada-io/karmada/pkg/controllers/mcs"
"github.com/karmada-io/karmada/pkg/controllers/namespace"
"github.com/karmada-io/karmada/pkg/controllers/status"
Expand Down Expand Up @@ -185,7 +187,7 @@ func Run(ctx context.Context, opts *options.Options) error {
var controllers = make(controllerscontext.Initializers)

// controllersDisabledByDefault is the set of controllers which is disabled by default
var controllersDisabledByDefault = sets.New("")
var controllersDisabledByDefault = sets.New("hpaReplicasSyncer")

func init() {
controllers["cluster"] = startClusterController
Expand All @@ -205,6 +207,7 @@ func init() {
controllers["applicationFailover"] = startApplicationFailoverController
controllers["federatedHorizontalPodAutoscaler"] = startFederatedHorizontalPodAutoscalerController
controllers["cronFederatedHorizontalPodAutoscaler"] = startCronFederatedHorizontalPodAutoscalerController
controllers["hpaReplicasSyncer"] = startHPAReplicasSyncerController
}

func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) {
Expand Down Expand Up @@ -591,6 +594,26 @@ func startCronFederatedHorizontalPodAutoscalerController(ctx controllerscontext.
return true, nil
}

func startHPAReplicasSyncerController(ctx controllerscontext.Context) (enabled bool, err error) {
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(ctx.KubeClientSet.Discovery())
scaleClient, err := scale.NewForConfig(ctx.Mgr.GetConfig(), ctx.Mgr.GetRESTMapper(), dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return false, err
}

hpaReplicasSyncer := hpareplicassyncer.HPAReplicasSyncer{
Client: ctx.Mgr.GetClient(),
RESTMapper: ctx.Mgr.GetRESTMapper(),
ScaleClient: scaleClient,
}
err = hpaReplicasSyncer.SetupWithManager(ctx.Mgr)
if err != nil {
return false, err
}

return true, nil
}

// setupControllers initialize controllers and setup one by one.
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
restConfig := mgr.GetConfig()
Expand Down
148 changes: 148 additions & 0 deletions pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package hpareplicassyncer

import (
"context"

autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/scale"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

var hpaPredicate = predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldHPA, ok := e.ObjectOld.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
return false
}

newHPA, ok := e.ObjectNew.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
return false
}

return oldHPA.Status.CurrentReplicas != newHPA.Status.CurrentReplicas
},
DeleteFunc: func(e event.DeleteEvent) bool {
return false
},
}

// HPAReplicasSyncer is to sync replicas from status of HPA to resource template.
type HPAReplicasSyncer struct {
Client client.Client
RESTMapper meta.RESTMapper
ScaleClient scale.ScalesGetter
}

// SetupWithManager creates a controller and register to controller manager.
func (r *HPAReplicasSyncer) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).Named("replicas-syncer").
For(&autoscalingv2.HorizontalPodAutoscaler{}, builder.WithPredicates(hpaPredicate)).
Complete(r)
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *HPAReplicasSyncer) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling for HPA %s/%s", req.Namespace, req.Name)

hpa := &autoscalingv2.HorizontalPodAutoscaler{}
err := r.Client.Get(ctx, req.NamespacedName, hpa)
if err != nil {
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}

return controllerruntime.Result{}, err
}

workloadGR, scale, err := r.getGroupResourceAndScaleForWorkloadFromHPA(ctx, hpa)
if err != nil {
return controllerruntime.Result{}, err
}

err = r.updateScaleIfNeed(ctx, workloadGR, scale.DeepCopy(), hpa)
if err != nil {
return controllerruntime.Result{}, err
}

// TODO(@lxtywypc): Add finalizer for HPA and remove them
// when the HPA is deleting and the replicas have been synced.

return controllerruntime.Result{}, nil
}

// getGroupResourceAndScaleForWorkloadFromHPA parses GroupResource and get Scale
// of the workload declared in spec.scaleTargetRef of HPA.
func (r *HPAReplicasSyncer) getGroupResourceAndScaleForWorkloadFromHPA(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler,
) (schema.GroupResource, *autoscalingv1.Scale, error) {
gvk := schema.FromAPIVersionAndKind(hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind)
mapping, err := r.RESTMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
klog.Errorf("Failed to get group resource for resource(kind=%s, %s/%s): %v",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, err)

return schema.GroupResource{}, nil, err
}

gr := mapping.Resource.GroupResource()

scale, err := r.ScaleClient.Scales(hpa.Namespace).Get(ctx, gr, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
// If the scale of workload is not found, skip processing.
return gr, nil, nil
}

klog.Errorf("Failed to get scale for resource(kind=%s, %s/%s): %v",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, err)

return schema.GroupResource{}, nil, err
}

return gr, scale, nil
}

// updateScaleIfNeed would update the scale of workload on fed-control plane
// if the replicas declared in the workload on karmada-control-plane does not match
// the actual replicas in member clusters effected by HPA.
func (r *HPAReplicasSyncer) updateScaleIfNeed(ctx context.Context, workloadGR schema.GroupResource, scale *autoscalingv1.Scale, hpa *autoscalingv2.HorizontalPodAutoscaler) error {
// If the scale of workload is not found, skip processing.
if scale == nil {
klog.V(4).Infof("Scale of resource(kind=%s, %s/%s) not found, the resource might have been removed, skip",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)

return nil
}

if scale.Spec.Replicas != hpa.Status.CurrentReplicas {
oldReplicas := scale.Spec.Replicas

scale.Spec.Replicas = hpa.Status.CurrentReplicas
_, err := r.ScaleClient.Scales(hpa.Namespace).Update(ctx, workloadGR, scale, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to try to sync scale for resource(kind=%s, %s/%s) from %d to %d: %v",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, oldReplicas, hpa.Status.CurrentReplicas, err)
return err
}

klog.V(4).Infof("Successfully synced scale for resource(kind=%s, %s/%s) from %d to %d",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, oldReplicas, hpa.Status.CurrentReplicas)
}

return nil
}
Loading

0 comments on commit 68f4ac7

Please sign in to comment.