Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some informer optimizations #1360

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions pkg/kubecache/envtest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,64 @@ func TestAsynchronousStartup(t *testing.T) {
assert.LessOrEqual(t, int32(createdPods), cl3.syncSignalOnMessage.Load())
}

func TestIgnoreHeadlessServices(t *testing.T) {
svcClient := serviceClient{
Address: fmt.Sprintf("127.0.0.1:%d", freePort),
Messages: make(chan *informer.Event, 10),
}
test.Eventually(t, timeout, func(t require.TestingT) {
svcClient.Start(ctx, t)
})
// wait for the service to have sent the initial snapshot of entities
// (at the end, will send the "SYNC_FINISHED" event)
test.Eventually(t, timeout, func(t require.TestingT) {
event := ReadChannel(t, svcClient.Messages, timeout)
require.Equal(t, informer.EventType_SYNC_FINISHED, event.Type)
})

// WHEN services are created
require.NoError(t, k8sClient.Create(ctx, &corev1.Service{
ObjectMeta: v1.ObjectMeta{Name: "service1", Namespace: "default"},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{Name: "foo", Port: 8080}},
ClusterIP: "10.0.0.101", ClusterIPs: []string{"10.0.0.101"},
},
}))
require.NoError(t, k8sClient.Create(ctx, &corev1.Service{
ObjectMeta: v1.ObjectMeta{Name: "headless", Namespace: "default"},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{Name: "foo", Port: 8080}},
ClusterIP: "None",
},
}))
require.NoError(t, k8sClient.Create(ctx, &corev1.Service{
ObjectMeta: v1.ObjectMeta{Name: "service2", Namespace: "default"},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{Name: "foo", Port: 8080}},
ClusterIP: "10.0.0.102", ClusterIPs: []string{"10.0.0.102"},
},
}))

// THEN the informer cache receives the services with an IP
// AND ignores headless services (without ClusterIP)
event := ReadChannel(t, svcClient.Messages, timeout)
require.NotNil(t, event.Resource)
assert.Equal(t, "service1", event.Resource.Name)
assert.NotEmpty(t, event.Resource.Ips)

event = ReadChannel(t, svcClient.Messages, timeout)
require.NotNil(t, event.Resource)
assert.Equal(t, "service2", event.Resource.Name)
assert.NotEmpty(t, event.Resource.Ips)

select {
case event := <-svcClient.Messages:
assert.Failf(t, "did not expect more informer updates. Got %s", event.String())
default:
// ok!
}
}

func ReadChannel[T any](t require.TestingT, inCh <-chan T, timeout time.Duration) T {
var item T
select {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubecache/meta/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (inf *Informers) Subscribe(observer Observer) {
}
if !inf.config.disableServices {
for _, service := range inf.services.GetStore().List() {
// ignore headless services from being added
if headlessService(service.(*indexableEntity).EncodedMeta) {
return
}
if err := observer.On(&informer.Event{
Type: informer.EventType_CREATED,
Resource: service.(*indexableEntity).EncodedMeta,
Expand Down
68 changes: 33 additions & 35 deletions pkg/kubecache/meta/informers_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,18 @@ func loadKubeconfig(kubeConfigPath string) (*rest.Config, error) {
return config, nil
}

// the transformed objects that are stored in the Informers' cache require to embed an ObjectMeta
// instances. Since the informer's cache is only used to list the stored objects, we just need
// something that is unique. We can get rid of many fields for memory saving in big clusters with
// millions of pods
func minimalIndex(om *metav1.ObjectMeta) metav1.ObjectMeta {
return metav1.ObjectMeta{
Name: om.Name,
Namespace: om.Namespace,
UID: om.UID,
}
}

func (inf *Informers) initPodInformer(informerFactory informers.SharedInformerFactory) error {
pods := informerFactory.Core().V1().Pods().Informer()

Expand Down Expand Up @@ -254,7 +266,7 @@ func (inf *Informers) initPodInformer(informerFactory informers.SharedInformerFa

startTime := pod.GetCreationTimestamp().String()
return &indexableEntity{
ObjectMeta: pod.ObjectMeta,
ObjectMeta: minimalIndex(&pod.ObjectMeta),
EncodedMeta: &informer.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
Expand All @@ -275,33 +287,7 @@ func (inf *Informers) initPodInformer(informerFactory informers.SharedInformerFa
return fmt.Errorf("can't set pods transform: %w", err)
}

_, err := pods.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
inf.Notify(&informer.Event{
Type: informer.EventType_CREATED,
Resource: obj.(*indexableEntity).EncodedMeta,
})
},
UpdateFunc: func(oldObj, newObj interface{}) {
if cmp.Equal(
oldObj.(*indexableEntity).EncodedMeta,
newObj.(*indexableEntity).EncodedMeta,
protoCmpTransform,
) {
return
}
inf.Notify(&informer.Event{
Type: informer.EventType_UPDATED,
Resource: newObj.(*indexableEntity).EncodedMeta,
})
},
DeleteFunc: func(obj interface{}) {
inf.Notify(&informer.Event{
Type: informer.EventType_DELETED,
Resource: obj.(*indexableEntity).EncodedMeta,
})
},
})
_, err := pods.AddEventHandler(inf.ipInfoEventHandler())
if err != nil {
return fmt.Errorf("can't register Pod event handler in the K8s informer: %w", err)
}
Expand Down Expand Up @@ -365,7 +351,7 @@ func (inf *Informers) initNodeIPInformer(informerFactory informers.SharedInforme
ips = cni.AddOvnIPs(ips, node)

return &indexableEntity{
ObjectMeta: node.ObjectMeta,
ObjectMeta: minimalIndex(&node.ObjectMeta),
EncodedMeta: &informer.ObjectMeta{
Name: node.Name,
Namespace: node.Namespace,
Expand Down Expand Up @@ -401,18 +387,17 @@ func (inf *Informers) initServiceIPInformer(informerFactory informers.SharedInfo
}
return nil, fmt.Errorf("was expecting a *v1.Service. Got: %T", i)
}
if svc.Spec.ClusterIP == v1.ClusterIPNone {
// this will be normal for headless services
inf.log.Debug("Service doesn't have any ClusterIP. Beyla won't decorate their flows",
"namespace", svc.Namespace, "name", svc.Name)
var ips []string
if svc.Spec.ClusterIP != v1.ClusterIPNone {
ips = svc.Spec.ClusterIPs
}
return &indexableEntity{
ObjectMeta: svc.ObjectMeta,
ObjectMeta: minimalIndex(&svc.ObjectMeta),
EncodedMeta: &informer.ObjectMeta{
Name: svc.Name,
Namespace: svc.Namespace,
Labels: svc.Labels,
Ips: svc.Spec.ClusterIPs,
Ips: ips,
Kind: typeService,
},
}, nil
Expand All @@ -429,15 +414,28 @@ func (inf *Informers) initServiceIPInformer(informerFactory informers.SharedInfo
return nil
}

func headlessService(om *informer.ObjectMeta) bool {
return len(om.Ips) == 0 && om.Kind == "Service"
}

func (inf *Informers) ipInfoEventHandler() *cache.ResourceEventHandlerFuncs {
return &cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// ignore headless services from being added
if headlessService(obj.(*indexableEntity).EncodedMeta) {
return
}
inf.Notify(&informer.Event{
Type: informer.EventType_CREATED,
Resource: obj.(*indexableEntity).EncodedMeta,
})
},
UpdateFunc: func(oldObj, newObj interface{}) {
// ignore headless services from being added
if headlessService(newObj.(*indexableEntity).EncodedMeta) &&
headlessService(oldObj.(*indexableEntity).EncodedMeta) {
return
}
if cmp.Equal(
oldObj.(*indexableEntity).EncodedMeta,
newObj.(*indexableEntity).EncodedMeta,
Expand Down
3 changes: 2 additions & 1 deletion pkg/kubecache/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
)

const defaultSendTimeout = 10 * time.Second
const barrierBufferLen = 10

// InformersCache configures and starts the gRPC service
type InformersCache struct {
Expand Down Expand Up @@ -125,7 +126,7 @@ func (ic *InformersCache) Subscribe(_ *informer.SubscribeMessage, server informe
id: connectionID,
server: server,
sendTimeout: ic.SendTimeout,
barrier: make(chan struct{}, 1),
barrier: make(chan struct{}, barrierBufferLen),
}
o.log.Info("client subscribed")

Expand Down
Loading