Skip to content

Commit

Permalink
[processor/k8sattributes] Only store necessary Pod data (open-telemet…
Browse files Browse the repository at this point in the history
…ry#23272)

Only store Pod data we actually use for attributes in the informer store. By default, informers store all data about K8s objects, as they're primarily intended to act as a local cache for the API Server. For our use case, most of that data is unnecessary, and it eats a significant amount of memory in larger clusters. This PR uses a transform function to remove the unnecessary data from the informer store.

I've measured the gains synthetically, and we use nearly 70% less memory per stored Pod. I haven't included the benchmark function in this PR, as it's a bit complicated and hacky, and I'm not convinced there's value in adding it to the codebase permanently.
  • Loading branch information
Mikołaj Świątek authored Jun 13, 2023
1 parent ab42d69 commit 061eb3e
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 3 deletions.
20 changes: 20 additions & 0 deletions .chloggen/feat_k8sattributes_set-transform.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sattributesprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Store only necessary Pod data

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23226]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
106 changes: 106 additions & 0 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.uber.org/zap"
apps_v1 "k8s.io/api/apps/v1"
api_v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
Expand Down Expand Up @@ -108,6 +109,17 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
}

c.informer = newInformer(c.kc, c.Filters.Namespace, labelSelector, fieldSelector)
err = c.informer.SetTransform(
func(object interface{}) (interface{}, error) {
originalPod, success := object.(*api_v1.Pod)
if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing
return object, nil
}

return removeUnnecessaryPodData(originalPod, c.Rules), nil
},
)

if c.extractNamespaceLabelsAnnotations() {
c.namespaceInformer = newNamespaceInformer(c.kc)
} else {
Expand Down Expand Up @@ -398,6 +410,100 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
return tags
}

// This function removes all data from the Pod except what is required by extraction rules and pod association
func removeUnnecessaryPodData(pod *api_v1.Pod, rules ExtractionRules) *api_v1.Pod {

// name, namespace, uid, start time and ip are needed for identifying Pods
// there's room to optimize this further, it's kept this way for simplicity
transformedPod := api_v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
Name: pod.GetName(),
Namespace: pod.GetNamespace(),
UID: pod.GetUID(),
},
Status: api_v1.PodStatus{
PodIP: pod.Status.PodIP,
StartTime: pod.Status.StartTime,
},
Spec: api_v1.PodSpec{
HostNetwork: pod.Spec.HostNetwork,
},
}

if rules.StartTime {
transformedPod.SetCreationTimestamp(pod.GetCreationTimestamp())
}

if rules.PodUID {
transformedPod.SetUID(pod.GetUID())
}

if rules.Node {
transformedPod.Spec.NodeName = pod.Spec.NodeName
}

if rules.PodHostName {
transformedPod.Spec.Hostname = pod.Spec.Hostname
}

if needContainerAttributes(rules) {
for _, containerStatus := range pod.Status.ContainerStatuses {
transformedPod.Status.ContainerStatuses = append(
transformedPod.Status.ContainerStatuses,
api_v1.ContainerStatus{
Name: containerStatus.Name,
ContainerID: containerStatus.ContainerID,
RestartCount: containerStatus.RestartCount,
},
)
}
for _, containerStatus := range pod.Status.InitContainerStatuses {
transformedPod.Status.InitContainerStatuses = append(
transformedPod.Status.InitContainerStatuses,
api_v1.ContainerStatus{
Name: containerStatus.Name,
ContainerID: containerStatus.ContainerID,
RestartCount: containerStatus.RestartCount,
},
)
}

removeUnnecessaryContainerData := func(c api_v1.Container) api_v1.Container {
transformedContainer := api_v1.Container{}
transformedContainer.Name = c.Name // we always need the name, it's used for identification
if rules.ContainerImageName || rules.ContainerImageTag {
transformedContainer.Image = c.Image
}
return transformedContainer
}

for _, container := range pod.Spec.Containers {
transformedPod.Spec.Containers = append(
transformedPod.Spec.Containers, removeUnnecessaryContainerData(container),
)
}
for _, container := range pod.Spec.InitContainers {
transformedPod.Spec.InitContainers = append(
transformedPod.Spec.InitContainers, removeUnnecessaryContainerData(container),
)
}
}

if len(rules.Labels) > 0 {
transformedPod.Labels = pod.Labels
}

if len(rules.Annotations) > 0 {
transformedPod.Annotations = pod.Annotations
}

if rules.IncludesOwnerMetadata() {
transformedPod.SetOwnerReferences(pod.GetOwnerReferences())
}

return &transformedPod
}

func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContainers {
containers := PodContainers{
ByID: map[string]*Container{},
Expand Down
17 changes: 14 additions & 3 deletions processor/k8sattributesprocessor/internal/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,8 +846,12 @@ func TestExtractionRules(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c.Rules = tc.rules

// manually call the data removal function here
// normally the informer does this, but fully emulating the informer in this test is annoying
transformedPod := removeUnnecessaryPodData(pod, c.Rules)
c.handleReplicaSetAdd(replicaset)
c.handlePodAdd(pod)
c.handlePodAdd(transformedPod)
p, ok := c.GetPod(newPodIdentifier("connection", "", pod.Status.PodIP))
require.True(t, ok)

Expand Down Expand Up @@ -996,8 +1000,12 @@ func TestReplicaSetExtractionRules(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
c.Rules = tc.rules
replicaset.OwnerReferences = tc.ownerReferences

// manually call the data removal function here
// normally the informer does this, but fully emulating the informer in this test is annoying
transformedPod := removeUnnecessaryPodData(pod, c.Rules)
c.handleReplicaSetAdd(replicaset)
c.handlePodAdd(pod)
c.handlePodAdd(transformedPod)
p, ok := c.GetPod(newPodIdentifier("connection", "", pod.Status.PodIP))
require.True(t, ok)

Expand Down Expand Up @@ -1462,7 +1470,10 @@ func Test_extractPodContainersAttributes(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := WatchClient{Rules: tt.rules}
assert.Equal(t, tt.want, c.extractPodContainersAttributes(&tt.pod))
// manually call the data removal function here
// normally the informer does this, but fully emulating the informer in this test is annoying
transformedPod := removeUnnecessaryPodData(&tt.pod, c.Rules)
assert.Equal(t, tt.want, c.extractPodContainersAttributes(transformedPod))
})
}
}
Expand Down
23 changes: 23 additions & 0 deletions processor/k8sattributesprocessor/internal/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,29 @@ type ExtractionRules struct {
Labels []FieldExtractionRule
}

// IncludesOwnerMetadata determines whether the ExtractionRules include metadata about Pod Owners
func (rules *ExtractionRules) IncludesOwnerMetadata() bool {
rulesNeedingOwnerMetadata := []bool{
rules.CronJobName,
rules.DeploymentName,
rules.DeploymentUID,
rules.DaemonSetUID,
rules.DaemonSetName,
rules.JobName,
rules.JobUID,
rules.ReplicaSetID,
rules.ReplicaSetName,
rules.StatefulSetUID,
rules.StatefulSetName,
}
for _, ruleEnabled := range rulesNeedingOwnerMetadata {
if ruleEnabled {
return true
}
}
return false
}

// FieldExtractionRule is used to specify which fields to extract from pod fields
// and inject into spans as attributes.
type FieldExtractionRule struct {
Expand Down

0 comments on commit 061eb3e

Please sign in to comment.