diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f2aede876..439cc42625 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,12 @@ This release introduces the following breaking changes: - feat(sumologicextension): use hostname as default collector name [#918] +### Fixed + +- fix(k8sprocessor): race condition when getting Pod data [#938] + [#918]: https://github.com/SumoLogic/sumologic-otel-collector/pull/918 +[#938]: https://github.com/SumoLogic/sumologic-otel-collector/pull/938 [Unreleased]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.70.0-sumo-0...main ## [v0.70.0-sumo-1] diff --git a/pkg/processor/k8sprocessor/client_test.go b/pkg/processor/k8sprocessor/client_test.go index 64d4758eee..519f7592f2 100644 --- a/pkg/processor/k8sprocessor/client_test.go +++ b/pkg/processor/k8sprocessor/client_test.go @@ -70,11 +70,12 @@ func newFakeClient( }, nil } -// GetPod looks up FakeClient.Pods map by the provided string, -// which might represent either IP address or Pod UID. -func (f *fakeClient) GetPod(identifier kube.PodIdentifier) (*kube.Pod, bool) { +func (f *fakeClient) GetPodAttributes(identifier kube.PodIdentifier) (map[string]string, bool) { p, ok := f.Pods[identifier] - return p, ok + if !ok { + return map[string]string{}, ok + } + return p.Attributes, ok } // Start is a noop for FakeClient. diff --git a/pkg/processor/k8sprocessor/kube/client.go b/pkg/processor/k8sprocessor/kube/client.go index 0d62ee3eef..d5b0b65953 100644 --- a/pkg/processor/k8sprocessor/kube/client.go +++ b/pkg/processor/k8sprocessor/kube/client.go @@ -247,21 +247,42 @@ func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Durati } } -// GetPod takes an IP address or Pod UID and returns the pod the identifier is associated with. -func (c *WatchClient) GetPod(identifier PodIdentifier) (*Pod, bool) { +// getPod takes an IP address or Pod UID and returns the pod the identifier is associated with. +func (c *WatchClient) getPod(identifier PodIdentifier) (*Pod, bool) { c.m.RLock() defer c.m.RUnlock() pod, ok := c.Pods[identifier] - if ok { - if pod.Ignore { - return nil, false - } + if !ok { + observability.RecordIPLookupMiss() + return nil, ok + } + if pod.Ignore { + return nil, false + } + return pod, true +} - c.updatePodOwnerMetadata(pod) - return pod, ok +// GetPodAttributes takes an IP address or Pod UID and returns the metadata attributes of the Pod the +// identifier is associated with +func (c *WatchClient) GetPodAttributes(identifier PodIdentifier) (map[string]string, bool) { + pod, ok := c.getPod(identifier) + if !ok { + return nil, false } - observability.RecordIPLookupMiss() - return nil, false + ownerAttributes := c.getPodOwnerMetadataAttributes(pod) + + // we need to take a lock here because pod.Attributes may be modified concurrently + // TODO: clean up the locking in this function and the ones it calls + c.m.RLock() + defer c.m.RUnlock() + attributes := make(map[string]string, len(pod.Attributes)) + for key, value := range pod.Attributes { + attributes[key] = value + } + for key, value := range ownerAttributes { + attributes[key] = value + } + return attributes, ok } func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { @@ -340,7 +361,10 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { return tags } -func (c *WatchClient) updatePodOwnerMetadata(pod *Pod) { +func (c *WatchClient) getPodOwnerMetadataAttributes(pod *Pod) map[string]string { + c.m.RLock() + defer c.m.RUnlock() + attributes := map[string]string{} if c.Rules.OwnerLookupEnabled { c.logger.Debug("pod owner lookup", zap.String("pod.Name", pod.Name), @@ -352,27 +376,27 @@ func (c *WatchClient) updatePodOwnerMetadata(pod *Pod) { switch owner.kind { case "DaemonSet": if c.Rules.DaemonSetName { - pod.Attributes[c.Rules.Tags.DaemonSetName] = owner.name + attributes[c.Rules.Tags.DaemonSetName] = owner.name } case "Deployment": if c.Rules.DeploymentName { - pod.Attributes[c.Rules.Tags.DeploymentName] = owner.name + attributes[c.Rules.Tags.DeploymentName] = owner.name } case "ReplicaSet": if c.Rules.ReplicaSetName { - pod.Attributes[c.Rules.Tags.ReplicaSetName] = owner.name + attributes[c.Rules.Tags.ReplicaSetName] = owner.name } case "StatefulSet": if c.Rules.StatefulSetName { - pod.Attributes[c.Rules.Tags.StatefulSetName] = owner.name + attributes[c.Rules.Tags.StatefulSetName] = owner.name } case "Job": if c.Rules.JobName { - pod.Attributes[c.Rules.Tags.JobName] = owner.name + attributes[c.Rules.Tags.JobName] = owner.name } case "CronJob": if c.Rules.CronJobName { - pod.Attributes[c.Rules.Tags.CronJobName] = owner.name + attributes[c.Rules.Tags.CronJobName] = owner.name } default: @@ -382,9 +406,10 @@ func (c *WatchClient) updatePodOwnerMetadata(pod *Pod) { if c.Rules.ServiceName { services := c.op.GetServices(pod.Name) - pod.Attributes[c.Rules.Tags.ServiceName] = strings.Join(services, c.delimiter) + attributes[c.Rules.Tags.ServiceName] = strings.Join(services, c.delimiter) } } + return attributes } // This function removes all data from the Pod except what is required by extraction rules @@ -533,18 +558,18 @@ func generatePodIDFromName(p Namer) PodIdentifier { } func (c *WatchClient) forgetPod(pod *api_v1.Pod) { - p, ok := c.GetPod(PodIdentifier(pod.Status.PodIP)) + p, ok := c.getPod(PodIdentifier(pod.Status.PodIP)) if ok && p.Name == pod.Name { c.appendDeleteQueue(PodIdentifier(pod.Status.PodIP), pod.Name) } - p, ok = c.GetPod(PodIdentifier(pod.UID)) + p, ok = c.getPod(PodIdentifier(pod.UID)) if ok && p.Name == pod.Name { c.appendDeleteQueue(PodIdentifier(pod.UID), pod.Name) } id := generatePodIDFromName(pod) - p, ok = c.GetPod(id) + p, ok = c.getPod(id) if ok && p.Name == pod.Name { c.appendDeleteQueue(id, pod.Name) } diff --git a/pkg/processor/k8sprocessor/kube/client_test.go b/pkg/processor/k8sprocessor/kube/client_test.go index 4dfeed63bd..0facf5c396 100644 --- a/pkg/processor/k8sprocessor/kube/client_test.go +++ b/pkg/processor/k8sprocessor/kube/client_test.go @@ -19,6 +19,7 @@ import ( "fmt" "reflect" "regexp" + "sync" "testing" "time" @@ -390,7 +391,7 @@ func TestGetIgnoredPod(t *testing.T) { pod.Status.PodIP = "1.1.1.1" c.handlePodAdd(pod) c.Pods[PodIdentifier(pod.Status.PodIP)].Ignore = true - got, ok := c.GetPod(PodIdentifier(pod.Status.PodIP)) + got, ok := c.getPod(PodIdentifier(pod.Status.PodIP)) assert.Nil(t, got) assert.False(t, ok) } @@ -422,19 +423,99 @@ func TestGetPod(t *testing.T) { OwnerReferences: &pod.OwnerReferences, } - got, ok := c.GetPod(PodIdentifier("1.1.1.1")) + got, ok := c.getPod(PodIdentifier("1.1.1.1")) assert.Equal(t, got, expected) assert.True(t, ok) - got, ok = c.GetPod(PodIdentifier("1234")) + got, ok = c.getPod(PodIdentifier("1234")) assert.Equal(t, got, expected) assert.True(t, ok) - got, ok = c.GetPod(PodIdentifier("pod_name.namespace_name")) + got, ok = c.getPod(PodIdentifier("pod_name.namespace_name")) assert.Equal(t, got, expected) assert.True(t, ok) } +func TestGetPodConcurrent(t *testing.T) { + c, _ := newTestClient(t) + + pod := &api_v1.Pod{} + pod.Status.PodIP = "1.1.1.1" + pod.UID = "1234" + pod.Name = "pod_name" + pod.Namespace = "namespace_name" + pod.OwnerReferences = []meta_v1.OwnerReference{ + { + Kind: "StatefulSet", + Name: "snug-sts", + UID: "f15f0585-a0bc-43a3-96e4-dd2eace75391", + }, + } + c.handlePodAdd(pod) + + expected := &Pod{ + Name: "pod_name", + Namespace: "namespace_name", + Address: "1.1.1.1", + PodUID: "1234", + Attributes: map[string]string{}, + OwnerReferences: &pod.OwnerReferences, + } + + numThreads := 2 + wg := sync.WaitGroup{} + for i := 0; i < numThreads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + got, ok := c.getPod(PodIdentifier("1.1.1.1")) + assert.Equal(t, got, expected) + assert.True(t, ok) + }() + } + wg.Wait() +} + +func TestGetPodOwnerAttributesConcurrent(t *testing.T) { + rules := ExtractionRules{ + OwnerLookupEnabled: true, + StatefulSetName: true, + Tags: NewExtractionFieldTags(), + } + c, _ := newTestClientWithRulesAndFilters(t, rules, Filters{}) + + pod := &api_v1.Pod{} + pod.Status.PodIP = "1.1.1.1" + pod.UID = "1234" + pod.Name = "pod_name" + pod.Namespace = "namespace_name" + pod.OwnerReferences = []meta_v1.OwnerReference{ + { + Kind: "StatefulSet", + Name: "snug-sts", + UID: "f15f0585-a0bc-43a3-96e4-dd2eace75391", + }, + } + c.handlePodAdd(pod) + + expected := map[string]string{ + rules.Tags.StatefulSetName: "snug-sts", + } + + numThreads := 2 + wg := sync.WaitGroup{} + for i := 0; i < numThreads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + got, ok := c.GetPodAttributes(PodIdentifier("1.1.1.1")) + assert.Equal(t, got, expected) + assert.True(t, ok) + }() + } + wg.Wait() +} + func TestGetPodWhenNamespaceInExtractedMetadata(t *testing.T) { c, _ := newTestClient(t) @@ -459,15 +540,15 @@ func TestGetPodWhenNamespaceInExtractedMetadata(t *testing.T) { OwnerReferences: &pod.OwnerReferences, } - got, ok := c.GetPod(PodIdentifier("1.1.1.1")) + got, ok := c.getPod(PodIdentifier("1.1.1.1")) assert.Equal(t, got, expected) assert.True(t, ok) - got, ok = c.GetPod(PodIdentifier("1234")) + got, ok = c.getPod(PodIdentifier("1234")) assert.Equal(t, got, expected) assert.True(t, ok) - got, ok = c.GetPod(PodIdentifier("pod_name.namespace_name")) + got, ok = c.getPod(PodIdentifier("pod_name.namespace_name")) assert.Equal(t, got, expected) assert.True(t, ok) } @@ -505,7 +586,7 @@ func TestNoHostnameExtractionRules(t *testing.T) { } c.handlePodAdd(pod) - p, _ := c.GetPod(PodIdentifier(pod.Status.PodIP)) + p, _ := c.getPod(PodIdentifier(pod.Status.PodIP)) assert.Equal(t, p.Attributes["k8s.pod.hostname"], podName) } @@ -784,12 +865,12 @@ func TestExtractionRules(t *testing.T) { // normally the informer does this, but fully emulating the informer in this test is annoying transformedPod := removeUnnecessaryPodData(pod, c.Rules) c.handlePodAdd(transformedPod) - p, ok := c.GetPod(PodIdentifier(pod.Status.PodIP)) + attributes, ok := c.GetPodAttributes(PodIdentifier(pod.Status.PodIP)) require.True(t, ok) - assert.Equal(t, len(tc.attributes), len(p.Attributes)) + assert.Equal(t, len(tc.attributes), len(attributes)) for k, v := range tc.attributes { - got, ok := p.Attributes[k] + got, ok := attributes[k] if assert.True(t, ok, "Attribute '%s' not found.", k) { assert.Equal(t, v, got, "Value of '%s' is incorrect", k) } @@ -1223,11 +1304,11 @@ func TestServiceInfoArrivesLate(t *testing.T) { client.handlePodAdd(pod) - podResult, ok := client.GetPod(PodIdentifier(podUID)) + attributes, ok := client.GetPodAttributes(PodIdentifier(podUID)) assert.True(t, ok) - logger.Debug("pod: ", zap.Any("pod", podResult)) - serviceName, ok := podResult.Attributes["ServiceName"] + logger.Debug("pod attributes: ", zap.Any("attributes", attributes)) + serviceName, ok := attributes["ServiceName"] assert.True(t, ok) // After PodAdd, there are two services: @@ -1235,11 +1316,11 @@ func TestServiceInfoArrivesLate(t *testing.T) { cache.podServices["pod"] = []string{"firstService", "secondService", "thirdService"} - podResult, ok = client.GetPod(PodIdentifier(podUID)) + attributes, ok = client.GetPodAttributes(PodIdentifier(podUID)) assert.True(t, ok) - logger.Debug("pod: ", zap.Any("pod", podResult)) - serviceName, ok = podResult.Attributes["ServiceName"] + logger.Debug("pod attributes: ", zap.Any("attributes", attributes)) + serviceName, ok = attributes["ServiceName"] assert.True(t, ok) // Desired behavior: we get all three service names in response: diff --git a/pkg/processor/k8sprocessor/kube/kube.go b/pkg/processor/k8sprocessor/kube/kube.go index 1ed1d3f347..91712cfa45 100644 --- a/pkg/processor/k8sprocessor/kube/kube.go +++ b/pkg/processor/k8sprocessor/kube/kube.go @@ -57,7 +57,7 @@ const ( // Client defines the main interface that allows querying pods by metadata. type Client interface { - GetPod(PodIdentifier) (*Pod, bool) + GetPodAttributes(PodIdentifier) (map[string]string, bool) Start() Stop() } diff --git a/pkg/processor/k8sprocessor/processor.go b/pkg/processor/k8sprocessor/processor.go index fd7c22b8b4..55b9a43b29 100644 --- a/pkg/processor/k8sprocessor/processor.go +++ b/pkg/processor/k8sprocessor/processor.go @@ -143,10 +143,10 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco } func (kp *kubernetesprocessor) getAttributesForPod(identifier kube.PodIdentifier) map[string]string { - pod, ok := kp.kc.GetPod(identifier) + attributes, ok := kp.kc.GetPodAttributes(identifier) if !ok { kp.logger.Debug("No pod with given id found", zap.Any("pod_id", identifier)) return nil } - return pod.Attributes + return attributes }