Skip to content

Commit

Permalink
Add NRT garbage collector
Browse files Browse the repository at this point in the history
Signed-off-by: PiotrProkop <pprokop@nvidia.com>
  • Loading branch information
PiotrProkop committed Jan 4, 2023
1 parent 0159ab0 commit bf96421
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 12 deletions.
38 changes: 26 additions & 12 deletions pkg/nfd-master/nfd-master.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ type NfdMaster interface {
type nfdMaster struct {
*nfdController

args Args
namespace string
nodeName string
server *grpc.Server
stop chan struct{}
ready chan bool
apihelper apihelper.APIHelpers
kubeconfig *restclient.Config
args Args
namespace string
nodeName string
server *grpc.Server
stop chan struct{}
ready chan bool
apihelper apihelper.APIHelpers
kubeconfig *restclient.Config
nrtGarbageCollector *nrtGarbageCollector
}

// NewNfdMaster creates a new NfdMaster server instance.
Expand Down Expand Up @@ -152,12 +153,23 @@ func (m *nfdMaster) Run() error {
if m.args.Prune {
return m.prune()
}
kubeconfig, err := m.getKubeconfig()
if err != nil {
return err
}
nrtGC, err := newNRTGarbageCollector(kubeconfig)
if err != nil {
return err
}

m.nrtGarbageCollector = nrtGC

klog.Info("starting nfd nrt garbage collector")
if err := m.nrtGarbageCollector.start(); err != nil {
return err
}

if m.args.CrdController {
kubeconfig, err := m.getKubeconfig()
if err != nil {
return err
}
klog.Info("starting nfd api controller")
m.nfdController, err = newNfdController(kubeconfig, !m.args.EnableNodeFeatureApi)
if err != nil {
Expand Down Expand Up @@ -300,6 +312,8 @@ func (m *nfdMaster) Stop() {
m.nfdController.stop()
}

m.nrtGarbageCollector.stop()

select {
case m.stop <- struct{}{}:
default:
Expand Down
131 changes: 131 additions & 0 deletions pkg/nfd-master/nfd-nrt-gc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nfdmaster

import (
"context"
"time"

topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"sigs.k8s.io/node-feature-discovery/pkg/apihelper"
)

type nrtGarbageCollector struct {
stopChan chan struct{}
k8sClient kubernetes.Interface
topoClient topologyclientset.Interface
}

func newNRTGarbageCollector(config *restclient.Config) (*nrtGarbageCollector, error) {
helper := apihelper.K8sHelpers{Kubeconfig: config}
cli, err := helper.GetTopologyClient()
if err != nil {
return nil, err
}

clientset := kubernetes.NewForConfigOrDie(config)
stopChan := make(chan struct{}, 1)

return &nrtGarbageCollector{
k8sClient: clientset,
topoClient: cli,
stopChan: stopChan,
}, nil
}

func (n *nrtGarbageCollector) deleteNodeHandler(object interface{}) {
// handle a case when we are starting up and need to clear stale NRT resources
obj := object
if deletedFinalStateUnknown, ok := object.(cache.DeletedFinalStateUnknown); ok {
klog.V(2).Infof("found stale NodeResourceTopology for node: %s ", object)
obj = deletedFinalStateUnknown.Obj
}

node, ok := obj.(*corev1.Node)
if !ok {
klog.Errorf("cannot convert %v to v1.Node", object)
return
}
klog.Infof("deleting NodeResourceTopology for node: %s", node.GetName())
if err := n.topoClient.TopologyV1alpha1().NodeResourceTopologies().Delete(context.TODO(), node.GetName(), metav1.DeleteOptions{}); err != nil {
if errors.IsNotFound(err) {
klog.V(2).Infof("NodeResourceTopology %s not found, omitting deletion", object)
return
} else {
klog.Warningf("failed to delete NodeResourceTopology for node %s: %s", node.GetName(), err.Error())
return
}
}
}

// populateNodeIndexer populates cache with NRTs so we know which one to delete on first synchronization
func (n *nrtGarbageCollector) populateNodeIndexer(indexer cache.Indexer) error {
nrts, err := n.topoClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return err
}

for _, nrt := range nrts.Items {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nrt.GetName(),
},
}
klog.V(2).Infof("adding node %s to indexer", node.GetName())
err := indexer.Add(node)
if err != nil {
return err
}

}

return nil
}

func (n *nrtGarbageCollector) start() error {
factory := informers.NewSharedInformerFactory(n.k8sClient, 5*time.Minute)
nodeInformer := factory.Core().V1().Nodes().Informer()

nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: n.deleteNodeHandler,
})

if err := n.populateNodeIndexer(nodeInformer.GetIndexer()); err != nil {
return err
}

factory.Start(n.stopChan)
factory.WaitForCacheSync(n.stopChan)

return nil
}

func (n *nrtGarbageCollector) stop() {
select {
case n.stopChan <- struct{}{}:
default:
}
}
154 changes: 154 additions & 0 deletions pkg/nfd-master/nfd-nrt-gc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nfdmaster

import (
"context"
"testing"
"time"

nrtapi "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
faketopologyv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned/fake"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fakek8sclientset "k8s.io/client-go/kubernetes/fake"

. "github.com/smartystreets/goconvey/convey"
)

func TestInformer(t *testing.T) {
Convey("When theres is old NRT ", t, func() {
k8sClient := fakek8sclientset.NewSimpleClientset()

fakeClient := faketopologyv1alpha1.NewSimpleClientset(&nrtapi.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
})

stopChan := make(chan struct{}, 1)

nrtGC := &nrtGarbageCollector{
k8sClient: k8sClient,
topoClient: fakeClient,
stopChan: stopChan,
}

err := nrtGC.start()
So(err, ShouldBeNil)

nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{})
So(err, ShouldBeNil)
So(nrts.Items, ShouldHaveLength, 0)
})
Convey("When theres is one old NRT and one up to date", t, func() {
k8sClient := fakek8sclientset.NewSimpleClientset(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
})

fakeClient := faketopologyv1alpha1.NewSimpleClientset(&nrtapi.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
},
&nrtapi.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
},
)

stopChan := make(chan struct{}, 1)

nrtGC := &nrtGarbageCollector{
k8sClient: k8sClient,
topoClient: fakeClient,
stopChan: stopChan,
}

err := nrtGC.start()
So(err, ShouldBeNil)

nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{})
So(err, ShouldBeNil)
So(nrts.Items, ShouldHaveLength, 1)
So(nrts.Items[0].GetName(), ShouldEqual, "node1")

})
Convey("Should react to delete event", t, func() {
k8sClient := fakek8sclientset.NewSimpleClientset(
&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
},
&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
},
)

fakeClient := faketopologyv1alpha1.NewSimpleClientset(
&nrtapi.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
},
&nrtapi.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
},
)

stopChan := make(chan struct{}, 1)

nrtGC := &nrtGarbageCollector{
k8sClient: k8sClient,
topoClient: fakeClient,
stopChan: stopChan,
}

err := nrtGC.start()
So(err, ShouldBeNil)

nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{})
So(err, ShouldBeNil)

So(nrts.Items, ShouldHaveLength, 2)

err = k8sClient.CoreV1().Nodes().Delete(context.TODO(), "node1", metav1.DeleteOptions{})
So(err, ShouldBeNil)
// simple sleep with retry loop to make sure indexer will pick up event and trigger deleteNode Function
deleted := false
for i := 0; i < 5; i++ {
nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{})
So(err, ShouldBeNil)

if len(nrts.Items) == 1 {
deleted = true
break
}
time.Sleep(time.Second)
}
So(deleted, ShouldBeTrue)
})

}

0 comments on commit bf96421

Please sign in to comment.