From c438a9042f4ec7168a319a60ec98ebf54afdab8f Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Fri, 16 Jun 2023 23:08:30 -0700 Subject: [PATCH] [receiver/k8s_cluster] Do not store unused service data in k8s API cache (#23434) To reduce RAM utilization. Resolves https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/23433 This is the last PR to reduce the memory footprint of k8s API informers. Other objects are more static and usually don't have a lot of instances in user clusters, so there is no need to reduce their size. --- .../k8sclusterreceiver/informer_transform.go | 3 + .../informer_transform_test.go | 19 +++++++ .../k8sclusterreceiver/internal/pod/pods.go | 20 +------ .../internal/service/service.go | 40 ++++++++++++++ .../internal/service/service_test.go | 55 +++++++++++++++++++ 5 files changed, 119 insertions(+), 18 deletions(-) create mode 100644 receiver/k8sclusterreceiver/internal/service/service.go create mode 100644 receiver/k8sclusterreceiver/internal/service/service_test.go diff --git a/receiver/k8sclusterreceiver/informer_transform.go b/receiver/k8sclusterreceiver/informer_transform.go index d89b79314147..4790c6a43370 100644 --- a/receiver/k8sclusterreceiver/informer_transform.go +++ b/receiver/k8sclusterreceiver/informer_transform.go @@ -14,6 +14,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/service" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/statefulset" ) @@ -35,6 +36,8 @@ func transformObject(object interface{}) (interface{}, error) { return demonset.Transform(o), nil case *appsv1.StatefulSet: return statefulset.Transform(o), nil + case *corev1.Service: + return service.Transform(o), nil } return object, nil } diff --git a/receiver/k8sclusterreceiver/informer_transform_test.go b/receiver/k8sclusterreceiver/informer_transform_test.go index fa87b4281a17..6dc9f3218f2a 100644 --- a/receiver/k8sclusterreceiver/informer_transform_test.go +++ b/receiver/k8sclusterreceiver/informer_transform_test.go @@ -93,6 +93,25 @@ func TestTransformObject(t *testing.T) { }, same: false, }, + { + name: "service", + object: &corev1.Service{ + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "app": "my-app", + }, + Type: corev1.ServiceTypeClusterIP, + }, + }, + want: &corev1.Service{ + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "app": "my-app", + }, + }, + }, + same: false, + }, { // This is a case where we don't transform the object. name: "hpa", diff --git a/receiver/k8sclusterreceiver/internal/pod/pods.go b/receiver/k8sclusterreceiver/internal/pod/pods.go index fff0b278047a..5801a0f29bb3 100644 --- a/receiver/k8sclusterreceiver/internal/pod/pods.go +++ b/receiver/k8sclusterreceiver/internal/pod/pods.go @@ -4,7 +4,6 @@ package pod // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod" import ( - "fmt" "strings" "time" @@ -17,7 +16,6 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" @@ -26,6 +24,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/container" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/service" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" ) @@ -184,7 +183,7 @@ func GetMetadata(pod *corev1.Pod, mc *metadata.Store, logger *zap.Logger) map[ex } if mc.Services != nil { - meta = maps.MergeStringMaps(meta, getPodServiceTags(pod, mc.Services)) + meta = maps.MergeStringMaps(meta, service.GetPodServiceTags(pod, mc.Services)) } if mc.Jobs != nil { @@ -268,21 +267,6 @@ func logError(err error, ref *v1.OwnerReference, podUID types.UID, logger *zap.L ) } -// getPodServiceTags returns a set of services associated with the pod. -func getPodServiceTags(pod *corev1.Pod, services cache.Store) map[string]string { - properties := map[string]string{} - - for _, ser := range services.List() { - serObj := ser.(*corev1.Service) - if serObj.Namespace == pod.Namespace && - labels.Set(serObj.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) { - properties[fmt.Sprintf("%s%s", constants.K8sServicePrefix, serObj.Name)] = "" - } - } - - return properties -} - // getWorkloadProperties returns workload metadata for provided owner reference. func getWorkloadProperties(ref *v1.OwnerReference, labelKey string) map[string]string { uidKey := metadata.GetOTelUIDFromKind(strings.ToLower(ref.Kind)) diff --git a/receiver/k8sclusterreceiver/internal/service/service.go b/receiver/k8sclusterreceiver/internal/service/service.go new file mode 100644 index 000000000000..58792c71af5d --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/service/service.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package service // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/service" +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" +) + +// Transform transforms the pod to remove the fields that we don't use to reduce RAM utilization. +// IMPORTANT: Make sure to update this function before using new service fields. +func Transform(service *corev1.Service) *corev1.Service { + return &corev1.Service{ + ObjectMeta: metadata.TransformObjectMeta(service.ObjectMeta), + Spec: corev1.ServiceSpec{ + Selector: service.Spec.Selector, + }, + } +} + +// GetPodServiceTags returns a set of services associated with the pod. +func GetPodServiceTags(pod *corev1.Pod, services cache.Store) map[string]string { + properties := map[string]string{} + + for _, ser := range services.List() { + serObj := ser.(*corev1.Service) + if serObj.Namespace == pod.Namespace && + labels.Set(serObj.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) { + properties[fmt.Sprintf("%s%s", constants.K8sServicePrefix, serObj.Name)] = "" + } + } + + return properties +} diff --git a/receiver/k8sclusterreceiver/internal/service/service_test.go b/receiver/k8sclusterreceiver/internal/service/service_test.go new file mode 100644 index 000000000000..0089602b6889 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/service/service_test.go @@ -0,0 +1,55 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package service + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestTransform(t *testing.T) { + originalService := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-service", + Namespace: "default", + Labels: map[string]string{ + "app": "my-app", + }, + Annotations: map[string]string{ + "annotation1": "value1", + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "app": "my-app", + }, + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }, + }, + Type: corev1.ServiceTypeClusterIP, + }, + } + wantService := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-service", + Namespace: "default", + Labels: map[string]string{ + "app": "my-app", + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "app": "my-app", + }, + }, + } + assert.EqualValues(t, wantService, Transform(originalService)) +}