From 59d50c1c4a84057a2a9c8fef08ee23bb7c9a187c Mon Sep 17 00:00:00 2001 From: PiotrProkop Date: Tue, 3 Jan 2023 13:23:46 +0100 Subject: [PATCH] Add NRT garbage collector Signed-off-by: PiotrProkop --- cmd/nfd-topology-updater/main.go | 4 + .../kustomization.yaml | 9 + .../topologyupdater-gc-clusterrole.yaml | 25 ++ ...topologyupdater-gc-clusterrolebinding.yaml | 12 + .../topologyupdater-gc-serviceaccount.yaml | 4 + .../topologyupdater-gc/kustomization.yaml | 7 + .../topologyupdater-gc.yaml | 25 ++ .../kustomization.yaml | 2 + pkg/nfd-topology-updater/nfd-nrt-gc.go | 175 ++++++++++++++ pkg/nfd-topology-updater/nfd-nrt-gc_test.go | 224 ++++++++++++++++++ .../nfd-topology-updater.go | 24 ++ 11 files changed, 511 insertions(+) create mode 100644 deployment/base/rbac-topologyupdater-gc/kustomization.yaml create mode 100644 deployment/base/rbac-topologyupdater-gc/topologyupdater-gc-clusterrole.yaml create mode 100644 deployment/base/rbac-topologyupdater-gc/topologyupdater-gc-clusterrolebinding.yaml create mode 100644 deployment/base/rbac-topologyupdater-gc/topologyupdater-gc-serviceaccount.yaml create mode 100644 deployment/base/topologyupdater-gc/kustomization.yaml create mode 100644 deployment/base/topologyupdater-gc/topologyupdater-gc.yaml create mode 100644 pkg/nfd-topology-updater/nfd-nrt-gc.go create mode 100644 pkg/nfd-topology-updater/nfd-nrt-gc_test.go diff --git a/cmd/nfd-topology-updater/main.go b/cmd/nfd-topology-updater/main.go index 514c985530..30f8a4f8cb 100644 --- a/cmd/nfd-topology-updater/main.go +++ b/cmd/nfd-topology-updater/main.go @@ -125,6 +125,10 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) { args := &topology.Args{} resourcemonitorArgs := &resourcemonitor.Args{} + flagset.BoolVar(&args.GCEnabled, "gc-enabled", false, + "Run NodeResourceTopology Garbage Collector") + flagset.DurationVar(&args.GCPeriod, "gc-interval", time.Duration(10)*time.Minute, + "Interval between which Garbage Collector will try to cleanup any missed but already obsolete NodeResourceTopology. [Default: 10m]") flagset.BoolVar(&args.Oneshot, "oneshot", false, "Update once and exit") flagset.BoolVar(&args.NoPublish, "no-publish", false, diff --git a/deployment/base/rbac-topologyupdater-gc/kustomization.yaml b/deployment/base/rbac-topologyupdater-gc/kustomization.yaml new file mode 100644 index 0000000000..a06374ca77 --- /dev/null +++ b/deployment/base/rbac-topologyupdater-gc/kustomization.yaml @@ -0,0 +1,9 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: node-feature-discovery + +resources: +- topologyupdater-gc-clusterrole.yaml +- topologyupdater-gc-clusterrolebinding.yaml +- topologyupdater-gc-serviceaccount.yaml diff --git a/deployment/base/rbac-topologyupdater-gc/topologyupdater-gc-clusterrole.yaml b/deployment/base/rbac-topologyupdater-gc/topologyupdater-gc-clusterrole.yaml new file mode 100644 index 0000000000..c0f4314447 --- /dev/null +++ b/deployment/base/rbac-topologyupdater-gc/topologyupdater-gc-clusterrole.yaml @@ -0,0 +1,25 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: nfd-topology-gc +rules: +- apiGroups: + - "" + resources: + - nodes + verbs: + - list + - watch +- apiGroups: + - "" + resources: + - nodes/proxy + verbs: + - get +- apiGroups: + - topology.node.k8s.io + resources: + - noderesourcetopologies + verbs: + - delete + - list diff --git a/deployment/base/rbac-topologyupdater-gc/topologyupdater-gc-clusterrolebinding.yaml b/deployment/base/rbac-topologyupdater-gc/topologyupdater-gc-clusterrolebinding.yaml new file mode 100644 index 0000000000..b8615d63c0 --- /dev/null +++ b/deployment/base/rbac-topologyupdater-gc/topologyupdater-gc-clusterrolebinding.yaml @@ -0,0 +1,12 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: nfd-topology-gc +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: nfd-topology-gc +subjects: +- kind: ServiceAccount + name: nfd-topology-gc + namespace: default diff --git a/deployment/base/rbac-topologyupdater-gc/topologyupdater-gc-serviceaccount.yaml b/deployment/base/rbac-topologyupdater-gc/topologyupdater-gc-serviceaccount.yaml new file mode 100644 index 0000000000..e56f7bbefd --- /dev/null +++ b/deployment/base/rbac-topologyupdater-gc/topologyupdater-gc-serviceaccount.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: nfd-topology-gc diff --git a/deployment/base/topologyupdater-gc/kustomization.yaml b/deployment/base/topologyupdater-gc/kustomization.yaml new file mode 100644 index 0000000000..bc57c4b7d8 --- /dev/null +++ b/deployment/base/topologyupdater-gc/kustomization.yaml @@ -0,0 +1,7 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: node-feature-discovery + +resources: +- topologyupdater-gc.yaml diff --git a/deployment/base/topologyupdater-gc/topologyupdater-gc.yaml b/deployment/base/topologyupdater-gc/topologyupdater-gc.yaml new file mode 100644 index 0000000000..76b95fb74e --- /dev/null +++ b/deployment/base/topologyupdater-gc/topologyupdater-gc.yaml @@ -0,0 +1,25 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: nfd + name: nfd-topology-gc +spec: + selector: + matchLabels: + app: nfd-topology-gc + template: + metadata: + labels: + app: nfd-topology-gc + spec: + dnsPolicy: ClusterFirstWithHostNet + serviceAccount: nfd-topology-gc + containers: + - name: nfd-topology-gc + image: gcr.io/k8s-staging-nfd/node-feature-discovery:master + imagePullPolicy: Always + command: + - "nfd-topology-updater" + - "-gc-enabled" + - "-gc-interval=10s" diff --git a/deployment/overlays/master-worker-topologyupdater/kustomization.yaml b/deployment/overlays/master-worker-topologyupdater/kustomization.yaml index ffcbab6581..32ff0d5475 100644 --- a/deployment/overlays/master-worker-topologyupdater/kustomization.yaml +++ b/deployment/overlays/master-worker-topologyupdater/kustomization.yaml @@ -6,11 +6,13 @@ namespace: node-feature-discovery bases: - ../../base/rbac - ../../base/rbac-topologyupdater +- ../../base/rbac-topologyupdater-gc - ../../base/nfd-crds - ../../base/master - ../../base/worker-daemonset - ../../base/noderesourcetopologies-crd - ../../base/topologyupdater-daemonset +- ../../base/topologyupdater-gc resources: - namespace.yaml diff --git a/pkg/nfd-topology-updater/nfd-nrt-gc.go b/pkg/nfd-topology-updater/nfd-nrt-gc.go new file mode 100644 index 0000000000..8c2e11508c --- /dev/null +++ b/pkg/nfd-topology-updater/nfd-nrt-gc.go @@ -0,0 +1,175 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nfdtopologyupdater + +import ( + "context" + "time" + + topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "sigs.k8s.io/node-feature-discovery/pkg/apihelper" +) + +type nrtGarbageCollector struct { + stopChan chan struct{} + k8sClient kubernetes.Interface + topoClient topologyclientset.Interface + gcPeriod time.Duration +} + +func newNRTGarbageCollector(config *restclient.Config, stop chan struct{}, gcPeriod time.Duration) (*nrtGarbageCollector, error) { + helper := apihelper.K8sHelpers{Kubeconfig: config} + cli, err := helper.GetTopologyClient() + if err != nil { + return nil, err + } + + clientset := kubernetes.NewForConfigOrDie(config) + + return &nrtGarbageCollector{ + k8sClient: clientset, + topoClient: cli, + stopChan: stop, + gcPeriod: gcPeriod, + }, nil +} + +func (n *nrtGarbageCollector) deleteNodeHandler(object interface{}) { + // handle a case when we are starting up and need to clear stale NRT resources + obj := object + if deletedFinalStateUnknown, ok := object.(cache.DeletedFinalStateUnknown); ok { + klog.V(2).Infof("found stale NodeResourceTopology for node: %s ", object) + obj = deletedFinalStateUnknown.Obj + } + + node, ok := obj.(*corev1.Node) + if !ok { + klog.Errorf("cannot convert %v to v1.Node", object) + return + } + if err := n.topoClient.TopologyV1alpha1().NodeResourceTopologies().Delete(context.TODO(), node.GetName(), metav1.DeleteOptions{}); err != nil { + if errors.IsNotFound(err) { + klog.V(2).Infof("NodeResourceTopology for node %s not found, omitting deletion", node.GetName()) + return + } else { + klog.Warningf("failed to delete NodeResourceTopology for node %s: %s", node.GetName(), err.Error()) + return + } + } + klog.Infof("NodeResourceTopology for node %s has been deleted", node.GetName()) +} + +// populateNodeIndexer populates cache with NRTs so we know which one to delete on first synchronization +func (n *nrtGarbageCollector) populateNodeIndexer(indexer cache.Indexer) error { + nrts, err := n.topoClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return err + } + + for _, nrt := range nrts.Items { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nrt.GetName(), + }, + } + klog.V(2).Infof("adding node %s to indexer", node.GetName()) + err := indexer.Add(node) + if err != nil { + return err + } + + } + + return nil +} + +// periodicGC runs garbage collector at every gcPeriod to make sure we haven't missed any node +func (n *nrtGarbageCollector) periodicGC(gcPeriod time.Duration, indexer cache.Indexer) { + gcTrigger := time.NewTicker(gcPeriod) + for { + select { + case <-gcTrigger.C: + klog.Infof("Running periodic GC") + objects := indexer.List() + nodes := map[string]struct{}{} + for _, object := range objects { + key, err := cache.MetaNamespaceKeyFunc(object) + if err != nil { + klog.Warningf("cannot create key for %v: %s", object, err.Error()) + continue + } + nodes[key] = struct{}{} + } + nrts, err := n.topoClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + klog.Warningf("cannot list NRTs %s", err.Error()) + break + } + for _, nrt := range nrts.Items { + key, err := cache.MetaNamespaceKeyFunc(&nrt) + if err != nil { + klog.Warningf("cannot create key for %v: %s", nrt, err.Error()) + continue + } + if _, ok := nodes[key]; !ok { + if err := n.topoClient.TopologyV1alpha1().NodeResourceTopologies().Delete(context.TODO(), nrt.GetName(), metav1.DeleteOptions{}); err != nil { + if errors.IsNotFound(err) { + klog.V(2).Infof("NodeResourceTopology %s not found, omitting deletion", nrt.GetName()) + continue + } else { + klog.Warningf("failed to delete NodeResourceTopology for node %s: %s", nrt.GetName(), err.Error()) + continue + } + } + } + } + case <-n.stopChan: + klog.Infof("shutting down periodic Garbage Collector") + return + } + } +} + +func (n *nrtGarbageCollector) start() error { + factory := informers.NewSharedInformerFactory(n.k8sClient, 5*time.Minute) + nodeInformer := factory.Core().V1().Nodes().Informer() + + nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: n.deleteNodeHandler, + }) + + if err := n.populateNodeIndexer(nodeInformer.GetIndexer()); err != nil { + return err + } + + factory.Start(n.stopChan) + // do initial cleanup + factory.WaitForCacheSync(n.stopChan) + + go n.periodicGC(n.gcPeriod, nodeInformer.GetIndexer()) + + return nil +} diff --git a/pkg/nfd-topology-updater/nfd-nrt-gc_test.go b/pkg/nfd-topology-updater/nfd-nrt-gc_test.go new file mode 100644 index 0000000000..f1ed340030 --- /dev/null +++ b/pkg/nfd-topology-updater/nfd-nrt-gc_test.go @@ -0,0 +1,224 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nfdtopologyupdater + +import ( + "context" + "testing" + "time" + + nrtapi "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1" + v1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1" + faketopologyv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned/fake" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + fakek8sclientset "k8s.io/client-go/kubernetes/fake" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestNRTGC(t *testing.T) { + Convey("When theres is old NRT ", t, func() { + k8sClient := fakek8sclientset.NewSimpleClientset() + + fakeClient := faketopologyv1alpha1.NewSimpleClientset(&nrtapi.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }) + + stopChan := make(chan struct{}, 1) + + nrtGC := &nrtGarbageCollector{ + k8sClient: k8sClient, + topoClient: fakeClient, + stopChan: stopChan, + gcPeriod: 10 * time.Minute, + } + + err := nrtGC.start() + So(err, ShouldBeNil) + + nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{}) + So(err, ShouldBeNil) + So(nrts.Items, ShouldHaveLength, 0) + }) + Convey("When theres is one old NRT and one up to date", t, func() { + k8sClient := fakek8sclientset.NewSimpleClientset(&corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }) + + fakeClient := faketopologyv1alpha1.NewSimpleClientset(&nrtapi.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + &nrtapi.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + }, + ) + + stopChan := make(chan struct{}, 1) + + nrtGC := &nrtGarbageCollector{ + k8sClient: k8sClient, + topoClient: fakeClient, + stopChan: stopChan, + gcPeriod: 10 * time.Minute, + } + + err := nrtGC.start() + So(err, ShouldBeNil) + + nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{}) + So(err, ShouldBeNil) + So(nrts.Items, ShouldHaveLength, 1) + So(nrts.Items[0].GetName(), ShouldEqual, "node1") + + }) + Convey("Should react to delete event", t, func() { + k8sClient := fakek8sclientset.NewSimpleClientset( + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + }, + ) + + fakeClient := faketopologyv1alpha1.NewSimpleClientset( + &nrtapi.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + &nrtapi.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + }, + ) + + stopChan := make(chan struct{}, 1) + + nrtGC := &nrtGarbageCollector{ + k8sClient: k8sClient, + topoClient: fakeClient, + stopChan: stopChan, + gcPeriod: 10 * time.Minute, + } + + err := nrtGC.start() + So(err, ShouldBeNil) + + nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{}) + So(err, ShouldBeNil) + + So(nrts.Items, ShouldHaveLength, 2) + + err = k8sClient.CoreV1().Nodes().Delete(context.TODO(), "node1", metav1.DeleteOptions{}) + So(err, ShouldBeNil) + // simple sleep with retry loop to make sure indexer will pick up event and trigger deleteNode Function + deleted := false + for i := 0; i < 5; i++ { + nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{}) + So(err, ShouldBeNil) + + if len(nrts.Items) == 1 { + deleted = true + break + } + time.Sleep(time.Second) + } + So(deleted, ShouldBeTrue) + }) + Convey("periodic GC should remove obsolete NRT", t, func() { + k8sClient := fakek8sclientset.NewSimpleClientset( + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + }, + ) + + fakeClient := faketopologyv1alpha1.NewSimpleClientset( + &nrtapi.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + &nrtapi.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + }, + ) + + stopChan := make(chan struct{}, 1) + + nrtGC := &nrtGarbageCollector{ + k8sClient: k8sClient, + topoClient: fakeClient, + stopChan: stopChan, + gcPeriod: time.Second, + } + + err := nrtGC.start() + So(err, ShouldBeNil) + + nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{}) + So(err, ShouldBeNil) + + So(nrts.Items, ShouldHaveLength, 2) + + nrt := v1alpha1.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "not-existing", + }, + } + + _, err = fakeClient.TopologyV1alpha1().NodeResourceTopologies().Create(context.TODO(), &nrt, metav1.CreateOptions{}) + So(err, ShouldBeNil) + // simple sleep with retry loop to make sure GC was triggered + deleted := false + for i := 0; i < 5; i++ { + nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{}) + So(err, ShouldBeNil) + + if len(nrts.Items) == 2 { + deleted = true + break + } + time.Sleep(2 * time.Second) + } + So(deleted, ShouldBeTrue) + }) + +} diff --git a/pkg/nfd-topology-updater/nfd-topology-updater.go b/pkg/nfd-topology-updater/nfd-topology-updater.go index a5b3e26393..046bed352d 100644 --- a/pkg/nfd-topology-updater/nfd-topology-updater.go +++ b/pkg/nfd-topology-updater/nfd-topology-updater.go @@ -43,6 +43,8 @@ type Args struct { Oneshot bool KubeConfigFile string ConfigFile string + GCEnabled bool + GCPeriod time.Duration Klog map[string]*utils.KlogFlagVal } @@ -94,6 +96,28 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, pol // one request if OneShot is set to 'true' in the updater args. func (w *nfdTopologyUpdater) Run() error { klog.Infof("Node Feature Discovery Topology Updater %s", version.Get()) + if w.args.GCEnabled { + klog.Info("Running NodeResourceTopology Garbage Collector") + kubeconfig, err := apihelper.GetKubeconfig(w.args.KubeConfigFile) + if err != nil { + return err + } + + nrtGC, err := newNRTGarbageCollector(kubeconfig, w.stop, w.args.GCPeriod) + if err != nil { + return fmt.Errorf("failed to create NRT garbage collector: %w", err) + } + + if err := nrtGC.start(); err != nil { + return fmt.Errorf("failed to start NRT garbage collector: %w", err) + } + + for range w.stop { + klog.Info("shutting down") + } + return nil + } + klog.Infof("NodeName: '%s'", w.nodeInfo.nodeName) podResClient, err := podres.GetPodResClient(w.resourcemonitorArgs.PodResourceSocketPath)