Skip to content

Commit

Permalink
[receiver/k8s_cluster] Do not store unused service data in k8s API ca…
Browse files Browse the repository at this point in the history
…che (open-telemetry#23434)

To reduce RAM utilization.

Resolves
open-telemetry#23433

This is the last PR to reduce the memory footprint of k8s API informers.
Other objects are more static and usually don't have a lot of instances
in user clusters, so there is no need to reduce their size.
  • Loading branch information
dmitryax authored Jun 17, 2023
1 parent 1c84fb2 commit b2238b9
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 18 deletions.
3 changes: 3 additions & 0 deletions receiver/k8sclusterreceiver/informer_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/service"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/statefulset"
)

Expand All @@ -35,6 +36,8 @@ func transformObject(object interface{}) (interface{}, error) {
return demonset.Transform(o), nil
case *appsv1.StatefulSet:
return statefulset.Transform(o), nil
case *corev1.Service:
return service.Transform(o), nil
}
return object, nil
}
19 changes: 19 additions & 0 deletions receiver/k8sclusterreceiver/informer_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ func TestTransformObject(t *testing.T) {
},
same: false,
},
{
name: "service",
object: &corev1.Service{
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": "my-app",
},
Type: corev1.ServiceTypeClusterIP,
},
},
want: &corev1.Service{
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": "my-app",
},
},
},
same: false,
},
{
// This is a case where we don't transform the object.
name: "hpa",
Expand Down
20 changes: 2 additions & 18 deletions receiver/k8sclusterreceiver/internal/pod/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package pod // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod"

import (
"fmt"
"strings"
"time"

Expand All @@ -17,7 +16,6 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

Expand All @@ -26,6 +24,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/container"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/service"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils"
)

Expand Down Expand Up @@ -184,7 +183,7 @@ func GetMetadata(pod *corev1.Pod, mc *metadata.Store, logger *zap.Logger) map[ex
}

if mc.Services != nil {
meta = maps.MergeStringMaps(meta, getPodServiceTags(pod, mc.Services))
meta = maps.MergeStringMaps(meta, service.GetPodServiceTags(pod, mc.Services))
}

if mc.Jobs != nil {
Expand Down Expand Up @@ -268,21 +267,6 @@ func logError(err error, ref *v1.OwnerReference, podUID types.UID, logger *zap.L
)
}

// getPodServiceTags returns a set of services associated with the pod.
func getPodServiceTags(pod *corev1.Pod, services cache.Store) map[string]string {
properties := map[string]string{}

for _, ser := range services.List() {
serObj := ser.(*corev1.Service)
if serObj.Namespace == pod.Namespace &&
labels.Set(serObj.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) {
properties[fmt.Sprintf("%s%s", constants.K8sServicePrefix, serObj.Name)] = ""
}
}

return properties
}

// getWorkloadProperties returns workload metadata for provided owner reference.
func getWorkloadProperties(ref *v1.OwnerReference, labelKey string) map[string]string {
uidKey := metadata.GetOTelUIDFromKind(strings.ToLower(ref.Kind))
Expand Down
40 changes: 40 additions & 0 deletions receiver/k8sclusterreceiver/internal/service/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package service // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/service"
import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
)

// Transform transforms the pod to remove the fields that we don't use to reduce RAM utilization.
// IMPORTANT: Make sure to update this function before using new service fields.
func Transform(service *corev1.Service) *corev1.Service {
return &corev1.Service{
ObjectMeta: metadata.TransformObjectMeta(service.ObjectMeta),
Spec: corev1.ServiceSpec{
Selector: service.Spec.Selector,
},
}
}

// GetPodServiceTags returns a set of services associated with the pod.
func GetPodServiceTags(pod *corev1.Pod, services cache.Store) map[string]string {
properties := map[string]string{}

for _, ser := range services.List() {
serObj := ser.(*corev1.Service)
if serObj.Namespace == pod.Namespace &&
labels.Set(serObj.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) {
properties[fmt.Sprintf("%s%s", constants.K8sServicePrefix, serObj.Name)] = ""
}
}

return properties
}
55 changes: 55 additions & 0 deletions receiver/k8sclusterreceiver/internal/service/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package service

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestTransform(t *testing.T) {
originalService := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "my-service",
Namespace: "default",
Labels: map[string]string{
"app": "my-app",
},
Annotations: map[string]string{
"annotation1": "value1",
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": "my-app",
},
Ports: []corev1.ServicePort{
{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
},
},
Type: corev1.ServiceTypeClusterIP,
},
}
wantService := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "my-service",
Namespace: "default",
Labels: map[string]string{
"app": "my-app",
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": "my-app",
},
},
}
assert.EqualValues(t, wantService, Transform(originalService))
}

0 comments on commit b2238b9

Please sign in to comment.