Skip to content

Commit 66e1d27

Browse files
authored
Merge pull request #104300 from wojtek-t/converting_informer
Create TransformingInformer
2 parents 3b2b23c + 56ffb4e commit 66e1d27

File tree

2 files changed

+194
-9
lines changed

2 files changed

+194
-9
lines changed

staging/src/k8s.io/client-go/tools/cache/controller.go

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ func NewInformer(
322322
// This will hold the client state, as we know it.
323323
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
324324

325-
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
325+
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
326326
}
327327

328328
// NewIndexerInformer returns an Indexer and a Controller for populating the index
@@ -351,7 +351,59 @@ func NewIndexerInformer(
351351
// This will hold the client state, as we know it.
352352
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
353353

354-
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
354+
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
355+
}
356+
357+
// TransformFunc allows for transforming an object before it will be processed
358+
// and put into the controller cache and before the corresponding handlers will
359+
// be called on it.
360+
// TransformFunc (similarly to ResourceEventHandler functions) should be able
361+
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown
362+
//
363+
// The most common usage pattern is to clean-up some parts of the object to
364+
// reduce component memory usage if a given component doesn't care about them.
365+
// given controller doesn't care for them
366+
type TransformFunc func(interface{}) (interface{}, error)
367+
368+
// NewTransformingInformer returns a Store and a controller for populating
369+
// the store while also providing event notifications. You should only used
370+
// the returned Store for Get/List operations; Add/Modify/Deletes will cause
371+
// the event notifications to be faulty.
372+
// The given transform function will be called on all objects before they will
373+
// put put into the Store and corresponding Add/Modify/Delete handlers will
374+
// be invokved for them.
375+
func NewTransformingInformer(
376+
lw ListerWatcher,
377+
objType runtime.Object,
378+
resyncPeriod time.Duration,
379+
h ResourceEventHandler,
380+
transformer TransformFunc,
381+
) (Store, Controller) {
382+
// This will hold the client state, as we know it.
383+
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
384+
385+
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer)
386+
}
387+
388+
// NewTransformingIndexerInformer returns an Indexer and a controller for
389+
// populating the index while also providing event notifications. You should
390+
// only used the returned Index for Get/List operations; Add/Modify/Deletes
391+
// will cause the event notifications to be faulty.
392+
// The given transform function will be called on all objects before they will
393+
// be put into the Index and corresponding Add/Modify/Delete handlers will
394+
// be invoked for them.
395+
func NewTransformingIndexerInformer(
396+
lw ListerWatcher,
397+
objType runtime.Object,
398+
resyncPeriod time.Duration,
399+
h ResourceEventHandler,
400+
indexers Indexers,
401+
transformer TransformFunc,
402+
) (Indexer, Controller) {
403+
// This will hold the client state, as we know it.
404+
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
405+
406+
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer)
355407
}
356408

357409
// newInformer returns a controller for populating the store while also
@@ -374,6 +426,7 @@ func newInformer(
374426
resyncPeriod time.Duration,
375427
h ResourceEventHandler,
376428
clientState Store,
429+
transformer TransformFunc,
377430
) Controller {
378431
// This will hold incoming changes. Note how we pass clientState in as a
379432
// KeyLister, that way resync operations will result in the correct set
@@ -393,24 +446,33 @@ func newInformer(
393446
Process: func(obj interface{}) error {
394447
// from oldest to newest
395448
for _, d := range obj.(Deltas) {
449+
obj := d.Object
450+
if transformer != nil {
451+
var err error
452+
obj, err = transformer(obj)
453+
if err != nil {
454+
return err
455+
}
456+
}
457+
396458
switch d.Type {
397459
case Sync, Replaced, Added, Updated:
398-
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
399-
if err := clientState.Update(d.Object); err != nil {
460+
if old, exists, err := clientState.Get(obj); err == nil && exists {
461+
if err := clientState.Update(obj); err != nil {
400462
return err
401463
}
402-
h.OnUpdate(old, d.Object)
464+
h.OnUpdate(old, obj)
403465
} else {
404-
if err := clientState.Add(d.Object); err != nil {
466+
if err := clientState.Add(obj); err != nil {
405467
return err
406468
}
407-
h.OnAdd(d.Object)
469+
h.OnAdd(obj)
408470
}
409471
case Deleted:
410-
if err := clientState.Delete(d.Object); err != nil {
472+
if err := clientState.Delete(obj); err != nil {
411473
return err
412474
}
413-
h.OnDelete(d.Object)
475+
h.OnDelete(obj)
414476
}
415477
}
416478
return nil

staging/src/k8s.io/client-go/tools/cache/controller_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"time"
2525

2626
"k8s.io/api/core/v1"
27+
apiequality "k8s.io/apimachinery/pkg/api/equality"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
"k8s.io/apimachinery/pkg/runtime"
2930
"k8s.io/apimachinery/pkg/util/sets"
@@ -451,3 +452,125 @@ func TestPanicPropagated(t *testing.T) {
451452
t.Errorf("timeout: the panic failed to propagate from the controller run method!")
452453
}
453454
}
455+
456+
func TestTransformingInformer(t *testing.T) {
457+
// source simulates an apiserver object endpoint.
458+
source := fcache.NewFakeControllerSource()
459+
460+
makePod := func(name, generation string) *v1.Pod {
461+
return &v1.Pod{
462+
ObjectMeta: metav1.ObjectMeta{
463+
Name: name,
464+
Namespace: "namespace",
465+
Labels: map[string]string{"generation": generation},
466+
},
467+
Spec: v1.PodSpec{
468+
Hostname: "hostname",
469+
Subdomain: "subdomain",
470+
},
471+
}
472+
}
473+
expectedPod := func(name, generation string) *v1.Pod {
474+
pod := makePod(name, generation)
475+
pod.Spec.Hostname = "new-hostname"
476+
pod.Spec.Subdomain = ""
477+
pod.Spec.NodeName = "nodename"
478+
return pod
479+
}
480+
481+
source.Add(makePod("pod1", "1"))
482+
source.Modify(makePod("pod1", "2"))
483+
484+
type event struct {
485+
eventType watch.EventType
486+
previous interface{}
487+
current interface{}
488+
}
489+
events := make(chan event, 10)
490+
recordEvent := func(eventType watch.EventType, previous, current interface{}) {
491+
events <- event{eventType: eventType, previous: previous, current: current}
492+
}
493+
verifyEvent := func(eventType watch.EventType, previous, current interface{}) {
494+
select {
495+
case event := <-events:
496+
if event.eventType != eventType {
497+
t.Errorf("expected type %v, got %v", eventType, event.eventType)
498+
}
499+
if !apiequality.Semantic.DeepEqual(event.previous, previous) {
500+
t.Errorf("expected previous object %#v, got %#v", previous, event.previous)
501+
}
502+
if !apiequality.Semantic.DeepEqual(event.current, current) {
503+
t.Errorf("expected object %#v, got %#v", current, event.current)
504+
}
505+
case <-time.After(wait.ForeverTestTimeout):
506+
t.Errorf("failed to get event")
507+
}
508+
}
509+
510+
podTransformer := func(obj interface{}) (interface{}, error) {
511+
pod, ok := obj.(*v1.Pod)
512+
if !ok {
513+
return nil, fmt.Errorf("unexpected object type: %T", obj)
514+
}
515+
pod.Spec.Hostname = "new-hostname"
516+
pod.Spec.Subdomain = ""
517+
pod.Spec.NodeName = "nodename"
518+
519+
// Clear out ResourceVersion to simplify comparisons.
520+
pod.ResourceVersion = ""
521+
522+
return pod, nil
523+
}
524+
525+
store, controller := NewTransformingInformer(
526+
source,
527+
&v1.Pod{},
528+
0,
529+
ResourceEventHandlerFuncs{
530+
AddFunc: func(obj interface{}) { recordEvent(watch.Added, nil, obj) },
531+
UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) },
532+
DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) },
533+
},
534+
podTransformer,
535+
)
536+
537+
verifyStore := func(expectedItems []interface{}) {
538+
items := store.List()
539+
if len(items) != len(expectedItems) {
540+
t.Errorf("unexpected items %v, expected %v", items, expectedItems)
541+
}
542+
for _, expectedItem := range expectedItems {
543+
found := false
544+
for _, item := range items {
545+
if apiequality.Semantic.DeepEqual(item, expectedItem) {
546+
found = true
547+
}
548+
}
549+
if !found {
550+
t.Errorf("expected item %v not found in %v", expectedItem, items)
551+
}
552+
}
553+
}
554+
555+
stopCh := make(chan struct{})
556+
go controller.Run(stopCh)
557+
558+
verifyEvent(watch.Added, nil, expectedPod("pod1", "2"))
559+
verifyStore([]interface{}{expectedPod("pod1", "2")})
560+
561+
source.Add(makePod("pod2", "1"))
562+
verifyEvent(watch.Added, nil, expectedPod("pod2", "1"))
563+
verifyStore([]interface{}{expectedPod("pod1", "2"), expectedPod("pod2", "1")})
564+
565+
source.Add(makePod("pod3", "1"))
566+
verifyEvent(watch.Added, nil, expectedPod("pod3", "1"))
567+
568+
source.Modify(makePod("pod2", "2"))
569+
verifyEvent(watch.Modified, expectedPod("pod2", "1"), expectedPod("pod2", "2"))
570+
571+
source.Delete(makePod("pod1", "2"))
572+
verifyEvent(watch.Deleted, expectedPod("pod1", "2"), nil)
573+
verifyStore([]interface{}{expectedPod("pod2", "2"), expectedPod("pod3", "1")})
574+
575+
close(stopCh)
576+
}

0 commit comments

Comments
 (0)