Skip to content

Commit

Permalink
fix(k8sprocessor): race condition when getting Pod data
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikołaj Świątek committed Feb 7, 2023
1 parent 3086c9f commit 7909c61
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 45 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ This release introduces the following breaking changes:

- feat(sumologicextension): use hostname as default collector name [#918]

### Fixed

- fix(k8sprocessor): race condition when getting Pod data [#938]

[#918]: https://github.com/SumoLogic/sumologic-otel-collector/pull/918
[#938]: https://github.com/SumoLogic/sumologic-otel-collector/pull/938
[Unreleased]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.70.0-sumo-0...main

## [v0.70.0-sumo-1]
Expand Down
9 changes: 5 additions & 4 deletions pkg/processor/k8sprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ func newFakeClient(
}, nil
}

// GetPod looks up FakeClient.Pods map by the provided string,
// which might represent either IP address or Pod UID.
func (f *fakeClient) GetPod(identifier kube.PodIdentifier) (*kube.Pod, bool) {
func (f *fakeClient) GetPodAttributes(identifier kube.PodIdentifier) (map[string]string, bool) {
p, ok := f.Pods[identifier]
return p, ok
if !ok {
return map[string]string{}, ok
}
return p.Attributes, ok
}

// Start is a noop for FakeClient.
Expand Down
67 changes: 46 additions & 21 deletions pkg/processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,21 +247,42 @@ func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Durati
}
}

// GetPod takes an IP address or Pod UID and returns the pod the identifier is associated with.
func (c *WatchClient) GetPod(identifier PodIdentifier) (*Pod, bool) {
// getPod takes an IP address or Pod UID and returns the pod the identifier is associated with.
func (c *WatchClient) getPod(identifier PodIdentifier) (*Pod, bool) {
c.m.RLock()
defer c.m.RUnlock()
pod, ok := c.Pods[identifier]
if ok {
if pod.Ignore {
return nil, false
}
if !ok {
observability.RecordIPLookupMiss()
return nil, ok
}
if pod.Ignore {
return nil, false
}
return pod, true
}

c.updatePodOwnerMetadata(pod)
return pod, ok
// GetPodAttributes takes an IP address or Pod UID and returns the metadata attributes of the Pod the
// identifier is associated with
func (c *WatchClient) GetPodAttributes(identifier PodIdentifier) (map[string]string, bool) {
pod, ok := c.getPod(identifier)
if !ok {
return nil, false
}
observability.RecordIPLookupMiss()
return nil, false
ownerAttributes := c.getPodOwnerMetadataAttributes(pod)

// we need to take a lock here because pod.Attributes may be modified concurrently
// TODO: clean up the locking in this function and the ones it calls
c.m.RLock()
defer c.m.RUnlock()
attributes := make(map[string]string, len(pod.Attributes))
for key, value := range pod.Attributes {
attributes[key] = value
}
for key, value := range ownerAttributes {
attributes[key] = value
}
return attributes, ok
}

func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
Expand Down Expand Up @@ -340,7 +361,10 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
return tags
}

func (c *WatchClient) updatePodOwnerMetadata(pod *Pod) {
func (c *WatchClient) getPodOwnerMetadataAttributes(pod *Pod) map[string]string {
c.m.RLock()
defer c.m.RUnlock()
attributes := map[string]string{}
if c.Rules.OwnerLookupEnabled {
c.logger.Debug("pod owner lookup",
zap.String("pod.Name", pod.Name),
Expand All @@ -352,27 +376,27 @@ func (c *WatchClient) updatePodOwnerMetadata(pod *Pod) {
switch owner.kind {
case "DaemonSet":
if c.Rules.DaemonSetName {
pod.Attributes[c.Rules.Tags.DaemonSetName] = owner.name
attributes[c.Rules.Tags.DaemonSetName] = owner.name
}
case "Deployment":
if c.Rules.DeploymentName {
pod.Attributes[c.Rules.Tags.DeploymentName] = owner.name
attributes[c.Rules.Tags.DeploymentName] = owner.name
}
case "ReplicaSet":
if c.Rules.ReplicaSetName {
pod.Attributes[c.Rules.Tags.ReplicaSetName] = owner.name
attributes[c.Rules.Tags.ReplicaSetName] = owner.name
}
case "StatefulSet":
if c.Rules.StatefulSetName {
pod.Attributes[c.Rules.Tags.StatefulSetName] = owner.name
attributes[c.Rules.Tags.StatefulSetName] = owner.name
}
case "Job":
if c.Rules.JobName {
pod.Attributes[c.Rules.Tags.JobName] = owner.name
attributes[c.Rules.Tags.JobName] = owner.name
}
case "CronJob":
if c.Rules.CronJobName {
pod.Attributes[c.Rules.Tags.CronJobName] = owner.name
attributes[c.Rules.Tags.CronJobName] = owner.name
}

default:
Expand All @@ -382,9 +406,10 @@ func (c *WatchClient) updatePodOwnerMetadata(pod *Pod) {

if c.Rules.ServiceName {
services := c.op.GetServices(pod.Name)
pod.Attributes[c.Rules.Tags.ServiceName] = strings.Join(services, c.delimiter)
attributes[c.Rules.Tags.ServiceName] = strings.Join(services, c.delimiter)
}
}
return attributes
}

// This function removes all data from the Pod except what is required by extraction rules
Expand Down Expand Up @@ -533,18 +558,18 @@ func generatePodIDFromName(p Namer) PodIdentifier {
}

func (c *WatchClient) forgetPod(pod *api_v1.Pod) {
p, ok := c.GetPod(PodIdentifier(pod.Status.PodIP))
p, ok := c.getPod(PodIdentifier(pod.Status.PodIP))
if ok && p.Name == pod.Name {
c.appendDeleteQueue(PodIdentifier(pod.Status.PodIP), pod.Name)
}

p, ok = c.GetPod(PodIdentifier(pod.UID))
p, ok = c.getPod(PodIdentifier(pod.UID))
if ok && p.Name == pod.Name {
c.appendDeleteQueue(PodIdentifier(pod.UID), pod.Name)
}

id := generatePodIDFromName(pod)
p, ok = c.GetPod(id)
p, ok = c.getPod(id)
if ok && p.Name == pod.Name {
c.appendDeleteQueue(id, pod.Name)
}
Expand Down
115 changes: 98 additions & 17 deletions pkg/processor/k8sprocessor/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"regexp"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -390,7 +391,7 @@ func TestGetIgnoredPod(t *testing.T) {
pod.Status.PodIP = "1.1.1.1"
c.handlePodAdd(pod)
c.Pods[PodIdentifier(pod.Status.PodIP)].Ignore = true
got, ok := c.GetPod(PodIdentifier(pod.Status.PodIP))
got, ok := c.getPod(PodIdentifier(pod.Status.PodIP))
assert.Nil(t, got)
assert.False(t, ok)
}
Expand Down Expand Up @@ -422,19 +423,99 @@ func TestGetPod(t *testing.T) {
OwnerReferences: &pod.OwnerReferences,
}

got, ok := c.GetPod(PodIdentifier("1.1.1.1"))
got, ok := c.getPod(PodIdentifier("1.1.1.1"))
assert.Equal(t, got, expected)
assert.True(t, ok)

got, ok = c.GetPod(PodIdentifier("1234"))
got, ok = c.getPod(PodIdentifier("1234"))
assert.Equal(t, got, expected)
assert.True(t, ok)

got, ok = c.GetPod(PodIdentifier("pod_name.namespace_name"))
got, ok = c.getPod(PodIdentifier("pod_name.namespace_name"))
assert.Equal(t, got, expected)
assert.True(t, ok)
}

func TestGetPodConcurrent(t *testing.T) {
c, _ := newTestClient(t)

pod := &api_v1.Pod{}
pod.Status.PodIP = "1.1.1.1"
pod.UID = "1234"
pod.Name = "pod_name"
pod.Namespace = "namespace_name"
pod.OwnerReferences = []meta_v1.OwnerReference{
{
Kind: "StatefulSet",
Name: "snug-sts",
UID: "f15f0585-a0bc-43a3-96e4-dd2eace75391",
},
}
c.handlePodAdd(pod)

expected := &Pod{
Name: "pod_name",
Namespace: "namespace_name",
Address: "1.1.1.1",
PodUID: "1234",
Attributes: map[string]string{},
OwnerReferences: &pod.OwnerReferences,
}

numThreads := 2
wg := sync.WaitGroup{}
for i := 0; i < numThreads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
got, ok := c.getPod(PodIdentifier("1.1.1.1"))
assert.Equal(t, got, expected)
assert.True(t, ok)
}()
}
wg.Wait()
}

func TestGetPodOwnerAttributesConcurrent(t *testing.T) {
rules := ExtractionRules{
OwnerLookupEnabled: true,
StatefulSetName: true,
Tags: NewExtractionFieldTags(),
}
c, _ := newTestClientWithRulesAndFilters(t, rules, Filters{})

pod := &api_v1.Pod{}
pod.Status.PodIP = "1.1.1.1"
pod.UID = "1234"
pod.Name = "pod_name"
pod.Namespace = "namespace_name"
pod.OwnerReferences = []meta_v1.OwnerReference{
{
Kind: "StatefulSet",
Name: "snug-sts",
UID: "f15f0585-a0bc-43a3-96e4-dd2eace75391",
},
}
c.handlePodAdd(pod)

expected := map[string]string{
rules.Tags.StatefulSetName: "snug-sts",
}

numThreads := 2
wg := sync.WaitGroup{}
for i := 0; i < numThreads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
got, ok := c.GetPodAttributes(PodIdentifier("1.1.1.1"))
assert.Equal(t, got, expected)
assert.True(t, ok)
}()
}
wg.Wait()
}

func TestGetPodWhenNamespaceInExtractedMetadata(t *testing.T) {
c, _ := newTestClient(t)

Expand All @@ -459,15 +540,15 @@ func TestGetPodWhenNamespaceInExtractedMetadata(t *testing.T) {
OwnerReferences: &pod.OwnerReferences,
}

got, ok := c.GetPod(PodIdentifier("1.1.1.1"))
got, ok := c.getPod(PodIdentifier("1.1.1.1"))
assert.Equal(t, got, expected)
assert.True(t, ok)

got, ok = c.GetPod(PodIdentifier("1234"))
got, ok = c.getPod(PodIdentifier("1234"))
assert.Equal(t, got, expected)
assert.True(t, ok)

got, ok = c.GetPod(PodIdentifier("pod_name.namespace_name"))
got, ok = c.getPod(PodIdentifier("pod_name.namespace_name"))
assert.Equal(t, got, expected)
assert.True(t, ok)
}
Expand Down Expand Up @@ -505,7 +586,7 @@ func TestNoHostnameExtractionRules(t *testing.T) {
}

c.handlePodAdd(pod)
p, _ := c.GetPod(PodIdentifier(pod.Status.PodIP))
p, _ := c.getPod(PodIdentifier(pod.Status.PodIP))
assert.Equal(t, p.Attributes["k8s.pod.hostname"], podName)
}

Expand Down Expand Up @@ -784,12 +865,12 @@ func TestExtractionRules(t *testing.T) {
// normally the informer does this, but fully emulating the informer in this test is annoying
transformedPod := removeUnnecessaryPodData(pod, c.Rules)
c.handlePodAdd(transformedPod)
p, ok := c.GetPod(PodIdentifier(pod.Status.PodIP))
attributes, ok := c.GetPodAttributes(PodIdentifier(pod.Status.PodIP))
require.True(t, ok)

assert.Equal(t, len(tc.attributes), len(p.Attributes))
assert.Equal(t, len(tc.attributes), len(attributes))
for k, v := range tc.attributes {
got, ok := p.Attributes[k]
got, ok := attributes[k]
if assert.True(t, ok, "Attribute '%s' not found.", k) {
assert.Equal(t, v, got, "Value of '%s' is incorrect", k)
}
Expand Down Expand Up @@ -1223,23 +1304,23 @@ func TestServiceInfoArrivesLate(t *testing.T) {

client.handlePodAdd(pod)

podResult, ok := client.GetPod(PodIdentifier(podUID))
attributes, ok := client.GetPodAttributes(PodIdentifier(podUID))
assert.True(t, ok)

logger.Debug("pod: ", zap.Any("pod", podResult))
serviceName, ok := podResult.Attributes["ServiceName"]
logger.Debug("pod attributes: ", zap.Any("attributes", attributes))
serviceName, ok := attributes["ServiceName"]
assert.True(t, ok)

// After PodAdd, there are two services:
assert.Equal(t, "firstService, secondService", serviceName)

cache.podServices["pod"] = []string{"firstService", "secondService", "thirdService"}

podResult, ok = client.GetPod(PodIdentifier(podUID))
attributes, ok = client.GetPodAttributes(PodIdentifier(podUID))
assert.True(t, ok)

logger.Debug("pod: ", zap.Any("pod", podResult))
serviceName, ok = podResult.Attributes["ServiceName"]
logger.Debug("pod attributes: ", zap.Any("attributes", attributes))
serviceName, ok = attributes["ServiceName"]
assert.True(t, ok)

// Desired behavior: we get all three service names in response:
Expand Down
2 changes: 1 addition & 1 deletion pkg/processor/k8sprocessor/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (

// Client defines the main interface that allows querying pods by metadata.
type Client interface {
GetPod(PodIdentifier) (*Pod, bool)
GetPodAttributes(PodIdentifier) (map[string]string, bool)
Start()
Stop()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/processor/k8sprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco
}

func (kp *kubernetesprocessor) getAttributesForPod(identifier kube.PodIdentifier) map[string]string {
pod, ok := kp.kc.GetPod(identifier)
attributes, ok := kp.kc.GetPodAttributes(identifier)
if !ok {
kp.logger.Debug("No pod with given id found", zap.Any("pod_id", identifier))
return nil
}
return pod.Attributes
return attributes
}

0 comments on commit 7909c61

Please sign in to comment.