diff --git a/.chloggen/feat_k8sattributes_set-transform.yaml b/.chloggen/feat_k8sattributes_set-transform.yaml new file mode 100755 index 000000000000..4c0d7075b4d8 --- /dev/null +++ b/.chloggen/feat_k8sattributes_set-transform.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sattributesprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Store only necessary Pod data + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23226] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 5673511864d4..0a3506393b31 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -14,6 +14,7 @@ import ( "go.uber.org/zap" apps_v1 "k8s.io/api/apps/v1" api_v1 "k8s.io/api/core/v1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" @@ -108,6 +109,17 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, } c.informer = newInformer(c.kc, c.Filters.Namespace, labelSelector, fieldSelector) + err = c.informer.SetTransform( + func(object interface{}) (interface{}, error) { + originalPod, success := object.(*api_v1.Pod) + if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing + return object, nil + } + + return removeUnnecessaryPodData(originalPod, c.Rules), nil + }, + ) + if c.extractNamespaceLabelsAnnotations() { c.namespaceInformer = newNamespaceInformer(c.kc) } else { @@ -398,6 +410,100 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { return tags } +// This function removes all data from the Pod except what is required by extraction rules and pod association +func removeUnnecessaryPodData(pod *api_v1.Pod, rules ExtractionRules) *api_v1.Pod { + + // name, namespace, uid, start time and ip are needed for identifying Pods + // there's room to optimize this further, it's kept this way for simplicity + transformedPod := api_v1.Pod{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: pod.GetName(), + Namespace: pod.GetNamespace(), + UID: pod.GetUID(), + }, + Status: api_v1.PodStatus{ + PodIP: pod.Status.PodIP, + StartTime: pod.Status.StartTime, + }, + Spec: api_v1.PodSpec{ + HostNetwork: pod.Spec.HostNetwork, + }, + } + + if rules.StartTime { + transformedPod.SetCreationTimestamp(pod.GetCreationTimestamp()) + } + + if rules.PodUID { + transformedPod.SetUID(pod.GetUID()) + } + + if rules.Node { + transformedPod.Spec.NodeName = pod.Spec.NodeName + } + + if rules.PodHostName { + transformedPod.Spec.Hostname = pod.Spec.Hostname + } + + if needContainerAttributes(rules) { + for _, containerStatus := range pod.Status.ContainerStatuses { + transformedPod.Status.ContainerStatuses = append( + transformedPod.Status.ContainerStatuses, + api_v1.ContainerStatus{ + Name: containerStatus.Name, + ContainerID: containerStatus.ContainerID, + RestartCount: containerStatus.RestartCount, + }, + ) + } + for _, containerStatus := range pod.Status.InitContainerStatuses { + transformedPod.Status.InitContainerStatuses = append( + transformedPod.Status.InitContainerStatuses, + api_v1.ContainerStatus{ + Name: containerStatus.Name, + ContainerID: containerStatus.ContainerID, + RestartCount: containerStatus.RestartCount, + }, + ) + } + + removeUnnecessaryContainerData := func(c api_v1.Container) api_v1.Container { + transformedContainer := api_v1.Container{} + transformedContainer.Name = c.Name // we always need the name, it's used for identification + if rules.ContainerImageName || rules.ContainerImageTag { + transformedContainer.Image = c.Image + } + return transformedContainer + } + + for _, container := range pod.Spec.Containers { + transformedPod.Spec.Containers = append( + transformedPod.Spec.Containers, removeUnnecessaryContainerData(container), + ) + } + for _, container := range pod.Spec.InitContainers { + transformedPod.Spec.InitContainers = append( + transformedPod.Spec.InitContainers, removeUnnecessaryContainerData(container), + ) + } + } + + if len(rules.Labels) > 0 { + transformedPod.Labels = pod.Labels + } + + if len(rules.Annotations) > 0 { + transformedPod.Annotations = pod.Annotations + } + + if rules.IncludesOwnerMetadata() { + transformedPod.SetOwnerReferences(pod.GetOwnerReferences()) + } + + return &transformedPod +} + func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContainers { containers := PodContainers{ ByID: map[string]*Container{}, diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index b803bc2a9f16..b27d68969cb8 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -846,8 +846,12 @@ func TestExtractionRules(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { c.Rules = tc.rules + + // manually call the data removal function here + // normally the informer does this, but fully emulating the informer in this test is annoying + transformedPod := removeUnnecessaryPodData(pod, c.Rules) c.handleReplicaSetAdd(replicaset) - c.handlePodAdd(pod) + c.handlePodAdd(transformedPod) p, ok := c.GetPod(newPodIdentifier("connection", "", pod.Status.PodIP)) require.True(t, ok) @@ -996,8 +1000,12 @@ func TestReplicaSetExtractionRules(t *testing.T) { t.Run(tc.name, func(t *testing.T) { c.Rules = tc.rules replicaset.OwnerReferences = tc.ownerReferences + + // manually call the data removal function here + // normally the informer does this, but fully emulating the informer in this test is annoying + transformedPod := removeUnnecessaryPodData(pod, c.Rules) c.handleReplicaSetAdd(replicaset) - c.handlePodAdd(pod) + c.handlePodAdd(transformedPod) p, ok := c.GetPod(newPodIdentifier("connection", "", pod.Status.PodIP)) require.True(t, ok) @@ -1462,7 +1470,10 @@ func Test_extractPodContainersAttributes(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := WatchClient{Rules: tt.rules} - assert.Equal(t, tt.want, c.extractPodContainersAttributes(&tt.pod)) + // manually call the data removal function here + // normally the informer does this, but fully emulating the informer in this test is annoying + transformedPod := removeUnnecessaryPodData(&tt.pod, c.Rules) + assert.Equal(t, tt.want, c.extractPodContainersAttributes(transformedPod)) }) } } diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index b7e24951b6c2..202d8ea36339 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -208,6 +208,29 @@ type ExtractionRules struct { Labels []FieldExtractionRule } +// IncludesOwnerMetadata determines whether the ExtractionRules include metadata about Pod Owners +func (rules *ExtractionRules) IncludesOwnerMetadata() bool { + rulesNeedingOwnerMetadata := []bool{ + rules.CronJobName, + rules.DeploymentName, + rules.DeploymentUID, + rules.DaemonSetUID, + rules.DaemonSetName, + rules.JobName, + rules.JobUID, + rules.ReplicaSetID, + rules.ReplicaSetName, + rules.StatefulSetUID, + rules.StatefulSetName, + } + for _, ruleEnabled := range rulesNeedingOwnerMetadata { + if ruleEnabled { + return true + } + } + return false +} + // FieldExtractionRule is used to specify which fields to extract from pod fields // and inject into spans as attributes. type FieldExtractionRule struct {