Skip to content

Commit

Permalink
[node-agent] Use linear mapping to distribute OperatingSystemConfig
Browse files Browse the repository at this point in the history
… reconciliation delays evenly (gardener#8885)

* Implement linear mapping approach for OSC reconciliation delay

* Do not restrict `corev1.Node` cache on manager

Now that the `OperatingSystemConfig` controller needs to list all nodes to compute the reconciliation delay, we cannot restrict the node cache anymore. However, given that all controllers working with nodes use metadata-only requests, the impact should be negligible.

* Address PR review feedback

* Address PR review feedback
  • Loading branch information
rfranzke authored Dec 5, 2023
1 parent 70419bb commit c8c174d
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 48 deletions.
6 changes: 1 addition & 5 deletions cmd/gardener-node-agent/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/rest"
"k8s.io/component-base/version/verflag"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -190,9 +189,6 @@ func run(ctx context.Context, cancel context.CancelFunc, log logr.Logger, cfg *c
Namespaces: map[string]cache.Config{metav1.NamespaceSystem: {}},
Field: fields.SelectorFromSet(fields.Set{metav1.ObjectNameField: cfg.Controllers.OperatingSystemConfig.SecretName}),
},
&corev1.Node{}: {
Label: labels.SelectorFromSet(labels.Set{corev1.LabelHostname: hostName}),
},
&coordinationv1.Lease{}: leaseCacheOptions,
}},
LeaderElection: false,
Expand Down Expand Up @@ -229,7 +225,7 @@ func run(ctx context.Context, cancel context.CancelFunc, log logr.Logger, cfg *c
&bootstrappers.KubeletBootstrapKubeconfig{Log: log.WithName("kubelet-bootstrap-kubeconfig-creator"), FS: fs, APIServerConfig: cfg.APIServer},
},
ActualRunnables: []manager.Runnable{
manager.RunnableFunc(func(_ context.Context) error { return controller.AddToManager(cancel, mgr, cfg, hostName) }),
manager.RunnableFunc(func(ctx context.Context) error { return controller.AddToManager(ctx, cancel, mgr, cfg, hostName) }),
&bootstrappers.CloudConfigDownloaderCleaner{Log: log.WithName("legacy-cloud-config-downloader-cleaner"), FS: fs, DBus: dbus},
},
}); err != nil {
Expand Down
16 changes: 12 additions & 4 deletions pkg/nodeagent/controller/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/gardener/gardener/pkg/nodeagent/apis/config"
"github.com/gardener/gardener/pkg/nodeagent/controller/lease"
Expand All @@ -28,16 +31,21 @@ import (
)

// AddToManager adds all controllers to the given manager.
func AddToManager(cancel context.CancelFunc, mgr manager.Manager, cfg *config.NodeAgentConfiguration, hostName string) error {
if err := (&node.Reconciler{}).AddToManager(mgr); err != nil {
func AddToManager(ctx context.Context, cancel context.CancelFunc, mgr manager.Manager, cfg *config.NodeAgentConfiguration, hostName string) error {
nodePredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{MatchLabels: map[string]string{corev1.LabelHostname: hostName}})
if err != nil {
return fmt.Errorf("failed computing label selector predicate for node: %w", err)
}

if err := (&node.Reconciler{}).AddToManager(mgr, nodePredicate); err != nil {
return fmt.Errorf("failed adding node controller: %w", err)
}

if err := (&operatingsystemconfig.Reconciler{
Config: cfg.Controllers.OperatingSystemConfig,
HostName: hostName,
CancelContext: cancel,
}).AddToManager(mgr); err != nil {
}).AddToManager(ctx, mgr); err != nil {
return fmt.Errorf("failed adding operating system config controller: %w", err)
}

Expand All @@ -47,7 +55,7 @@ func AddToManager(cancel context.CancelFunc, mgr manager.Manager, cfg *config.No
return fmt.Errorf("failed adding token controller: %w", err)
}

if err := (&lease.Reconciler{}).AddToManager(mgr); err != nil {
if err := (&lease.Reconciler{}).AddToManager(mgr, nodePredicate); err != nil {
return fmt.Errorf("failed adding lease controller: %w", err)
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/nodeagent/controller/lease/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"

predicateutils "github.com/gardener/gardener/pkg/controllerutils/predicate"
)
Expand All @@ -29,7 +30,7 @@ import (
const ControllerName = "lease"

// AddToManager adds the lease controller with the default Options to the manager.
func (r *Reconciler) AddToManager(mgr manager.Manager) error {
func (r *Reconciler) AddToManager(mgr manager.Manager, nodePredicate predicate.Predicate) error {
if r.Client == nil {
r.Client = mgr.GetClient()
}
Expand All @@ -49,7 +50,7 @@ func (r *Reconciler) AddToManager(mgr manager.Manager) error {
return builder.
ControllerManagedBy(mgr).
Named(ControllerName).
For(node, builder.WithPredicates(predicateutils.ForEventTypes(predicateutils.Create))).
For(node, builder.WithPredicates(nodePredicate, predicateutils.ForEventTypes(predicateutils.Create))).
WithOptions(controller.Options{MaxConcurrentReconciles: 1}).
Complete(r)
}
4 changes: 2 additions & 2 deletions pkg/nodeagent/controller/node/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
const ControllerName = "node"

// AddToManager adds Reconciler to the given manager.
func (r *Reconciler) AddToManager(mgr manager.Manager) error {
func (r *Reconciler) AddToManager(mgr manager.Manager, nodePredicate predicate.Predicate) error {
if r.Client == nil {
r.Client = mgr.GetClient()
}
Expand All @@ -47,7 +47,7 @@ func (r *Reconciler) AddToManager(mgr manager.Manager) error {
return builder.
ControllerManagedBy(mgr).
Named(ControllerName).
For(node, builder.WithPredicates(r.NodePredicate())).
For(node, builder.WithPredicates(r.NodePredicate(), nodePredicate)).
WithOptions(controller.Options{MaxConcurrentReconciles: 1}).
Complete(r)
}
Expand Down
60 changes: 52 additions & 8 deletions pkg/nodeagent/controller/operatingsystemconfig/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ package operatingsystemconfig
import (
"bytes"
"context"
"slices"
"time"

"github.com/go-logr/logr"
"github.com/spf13/afero"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -37,14 +40,14 @@ import (
nodeagentv1alpha1 "github.com/gardener/gardener/pkg/nodeagent/apis/config/v1alpha1"
"github.com/gardener/gardener/pkg/nodeagent/dbus"
"github.com/gardener/gardener/pkg/nodeagent/registry"
"github.com/gardener/gardener/pkg/utils"
kubernetesutils "github.com/gardener/gardener/pkg/utils/kubernetes"
)

// ControllerName is the name of this controller.
const ControllerName = "operatingsystemconfig"

// AddToManager adds Reconciler to the given manager.
func (r *Reconciler) AddToManager(mgr manager.Manager) error {
func (r *Reconciler) AddToManager(ctx context.Context, mgr manager.Manager) error {
if r.Client == nil {
r.Client = mgr.GetClient()
}
Expand All @@ -66,7 +69,7 @@ func (r *Reconciler) AddToManager(mgr manager.Manager) error {
Named(ControllerName).
WatchesRawSource(
source.Kind(mgr.GetCache(), &corev1.Secret{}),
r.EnqueueWithJitterDelay(mgr.GetLogger().WithValues("controller", ControllerName).WithName("jitterEventHandler")),
r.EnqueueWithJitterDelay(ctx, mgr.GetLogger().WithValues("controller", ControllerName).WithName("reconciliation-delayer")),
builder.WithPredicates(
r.SecretPredicate(),
predicateutils.ForEventTypes(predicateutils.Create, predicateutils.Update),
Expand Down Expand Up @@ -105,12 +108,16 @@ func reconcileRequest(obj client.Object) reconcile.Request {
}}
}

// RandomDurationWithMetaDuration is an alias for `utils.RandomDurationWithMetaDuration`. Exposed for unit tests.
var RandomDurationWithMetaDuration = utils.RandomDurationWithMetaDuration

// EnqueueWithJitterDelay returns handler.Funcs which enqueues the object with a random jitter duration for 'update'
// events. 'Create' events are enqueued immediately.
func (r *Reconciler) EnqueueWithJitterDelay(log logr.Logger) handler.EventHandler {
func (r *Reconciler) EnqueueWithJitterDelay(ctx context.Context, log logr.Logger) handler.EventHandler {
delay := delayer{
log: log,
client: r.Client,
minDelaySeconds: 0,
maxDelaySeconds: int(r.Config.SyncJitterPeriod.Duration.Seconds()),
}

return &handler.Funcs{
CreateFunc: func(_ context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
Expand All @@ -130,10 +137,47 @@ func (r *Reconciler) EnqueueWithJitterDelay(log logr.Logger) handler.EventHandle
}

if !bytes.Equal(oldSecret.Data[nodeagentv1alpha1.DataKeyOperatingSystemConfig], newSecret.Data[nodeagentv1alpha1.DataKeyOperatingSystemConfig]) {
duration := RandomDurationWithMetaDuration(r.Config.SyncJitterPeriod)
duration := delay.compute(ctx, r.NodeName)
log.Info("Enqueued secret with operating system config with a jitter period", "duration", duration)
q.AddAfter(reconcileRequest(evt.ObjectNew), duration)
}
},
}
}

type delayer struct {
log logr.Logger
client client.Client

minDelaySeconds int
maxDelaySeconds int

nodeList *metav1.PartialObjectMetadataList
}

// compute computes a time.Duration that can be used to delay reconciliations by using a simple linear mapping approach
// based on the index of the node this instance of gardener-node-agent is responsible for in the list of all nodes in
// the cluster. This way, the delays of all instances of gardener-node-agent are distributed evenly.
func (d *delayer) compute(ctx context.Context, nodeName string) time.Duration {
if nodeName == "" {
return 0
}

nodeList := &metav1.PartialObjectMetadataList{}
nodeList.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("NodeList"))
if err := d.client.List(ctx, nodeList); err != nil {
d.log.Error(err, "Failed to list nodes when computing reconciliation delay", "nodeName", nodeName)
// fall back to previously computed list of nodes
} else {
kubernetesutils.ByName().Sort(nodeList)
d.nodeList = nodeList
}

index := slices.IndexFunc(d.nodeList.Items, func(node metav1.PartialObjectMetadata) bool {
return node.GetName() == nodeName
})

rangeSize := float64(d.maxDelaySeconds-d.minDelaySeconds) / float64(len(d.nodeList.Items))
delaySeconds := float64(d.minDelaySeconds) + float64(index)*rangeSize
return time.Duration(delaySeconds * float64(time.Second))
}
Loading

0 comments on commit c8c174d

Please sign in to comment.