Skip to content

Commit

Permalink
[receiver/k8s_cluster] Do not store unused data in k8s API cache (par…
Browse files Browse the repository at this point in the history
…t 2) (open-telemetry#23432)

Do not store unused data for deployments, statefulsets and daemonsets in
k8s API cache to reduce RAM usage.

Updates
open-telemetry#23433
  • Loading branch information
dmitryax authored and fchikwekwe committed Jun 23, 2023
1 parent 2f082c5 commit c698120
Show file tree
Hide file tree
Showing 15 changed files with 378 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .chloggen/k8scluster-dont-store-unused-data-cache.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ component: receiver/k8s_cluster
note: Do not store unused data in the k8s API cache to reduce RAM usage

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23417]
issues: [23433]
9 changes: 9 additions & 0 deletions receiver/k8sclusterreceiver/informer_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/demonset"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/statefulset"
)

// transformObject transforms the k8s object by removing the data that is not utilized by the receiver.
Expand All @@ -26,6 +29,12 @@ func transformObject(object interface{}) (interface{}, error) {
return replicaset.Transform(o), nil
case *batchv1.Job:
return jobs.Transform(o), nil
case *appsv1.Deployment:
return deployment.Transform(o), nil
case *appsv1.DaemonSet:
return demonset.Transform(o), nil
case *appsv1.StatefulSet:
return statefulset.Transform(o), nil
}
return object, nil
}
35 changes: 35 additions & 0 deletions receiver/k8sclusterreceiver/informer_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"testing"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
)
Expand Down Expand Up @@ -58,6 +60,39 @@ func TestTransformObject(t *testing.T) {
want: testutils.NewJob("1"),
same: false,
},
{
name: "deployment",
object: testutils.NewDeployment("1"),
want: testutils.NewDeployment("1"),
same: false,
},
{
name: "daemonset",
object: testutils.NewDaemonset("1"),
want: testutils.NewDaemonset("1"),
same: false,
},
{
name: "statefulset",
object: &appsv1.StatefulSet{
Spec: appsv1.StatefulSetSpec{
Replicas: func() *int32 { i := int32(3); return &i }(),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "my-app",
},
},
},
},
},
want: &appsv1.StatefulSet{
Spec: appsv1.StatefulSetSpec{
Replicas: func() *int32 { i := int32(3); return &i }(),
},
},
same: false,
},
{
// This is a case where we don't transform the object.
name: "hpa",
Expand Down
14 changes: 14 additions & 0 deletions receiver/k8sclusterreceiver/internal/demonset/daemonsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ var daemonSetReadyMetric = &metricspb.MetricDescriptor{
Type: metricspb.MetricDescriptor_GAUGE_INT64,
}

// Transform transforms the pod to remove the fields that we don't use to reduce RAM utilization.
// IMPORTANT: Make sure to update this function before using new daemonset fields.
func Transform(ds *appsv1.DaemonSet) *appsv1.DaemonSet {
return &appsv1.DaemonSet{
ObjectMeta: metadata.TransformObjectMeta(ds.ObjectMeta),
Status: appsv1.DaemonSetStatus{
CurrentNumberScheduled: ds.Status.CurrentNumberScheduled,
DesiredNumberScheduled: ds.Status.DesiredNumberScheduled,
NumberMisscheduled: ds.Status.NumberMisscheduled,
NumberReady: ds.Status.NumberReady,
},
}
}

func GetMetrics(ds *appsv1.DaemonSet) []*agentmetricspb.ExportMetricsServiceRequest {
metrics := []*metricspb.Metric{
{
Expand Down
74 changes: 74 additions & 0 deletions receiver/k8sclusterreceiver/internal/demonset/daemonsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import (
"testing"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
Expand Down Expand Up @@ -42,3 +46,73 @@ func TestDaemonsetMetrics(t *testing.T) {
testutils.AssertMetricsInt(t, rm.Metrics[3], "k8s.daemonset.ready_nodes",
metricspb.MetricDescriptor_GAUGE_INT64, 2)
}

func TestTransform(t *testing.T) {
originalDS := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: "my-daemonset",
Namespace: "default",
Labels: map[string]string{
"app": "my-app",
},
},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "my-app",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "my-app",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "my-container",
Image: "nginx:latest",
ImagePullPolicy: corev1.PullAlways,
Ports: []corev1.ContainerPort{
{
Name: "http",
ContainerPort: 80,
Protocol: corev1.ProtocolTCP,
},
},
},
},
},
},
},
Status: appsv1.DaemonSetStatus{
CurrentNumberScheduled: 3,
NumberReady: 3,
DesiredNumberScheduled: 3,
NumberMisscheduled: 0,
Conditions: []appsv1.DaemonSetCondition{
{
Type: "Available",
Status: corev1.ConditionTrue,
},
},
},
}
wantDS := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: "my-daemonset",
Namespace: "default",
Labels: map[string]string{
"app": "my-app",
},
},
Status: appsv1.DaemonSetStatus{
CurrentNumberScheduled: 3,
NumberReady: 3,
DesiredNumberScheduled: 3,
NumberMisscheduled: 0,
},
}
assert.Equal(t, wantDS, Transform(originalDS))
}
14 changes: 14 additions & 0 deletions receiver/k8sclusterreceiver/internal/deployment/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
)

// Transform transforms the pod to remove the fields that we don't use to reduce RAM utilization.
// IMPORTANT: Make sure to update this function before using new deployment fields.
func Transform(deployment *appsv1.Deployment) *appsv1.Deployment {
return &appsv1.Deployment{
ObjectMeta: metadata.TransformObjectMeta(deployment.ObjectMeta),
Spec: appsv1.DeploymentSpec{
Replicas: deployment.Spec.Replicas,
},
Status: appsv1.DeploymentStatus{
AvailableReplicas: deployment.Status.AvailableReplicas,
},
}
}

func GetMetrics(set receiver.CreateSettings, dep *appsv1.Deployment) pmetric.Metrics {
mb := imetadata.NewMetricsBuilder(imetadata.DefaultMetricsBuilderConfig(), set)
ts := pcommon.NewTimestampFromTime(time.Now())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/receivertest"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
Expand Down Expand Up @@ -61,3 +64,75 @@ func TestGoldenFile(t *testing.T) {
),
)
}

func TestTransform(t *testing.T) {
origDeployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "my-deployment",
UID: "my-deployment-uid",
Namespace: "default",
Labels: map[string]string{
"app": "my-app",
},
},
Spec: appsv1.DeploymentSpec{
Replicas: func() *int32 { replicas := int32(3); return &replicas }(),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "my-app",
},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "my-app",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "my-container",
Image: "nginx:latest",
ImagePullPolicy: v1.PullAlways,
Ports: []v1.ContainerPort{
{
Name: "http",
ContainerPort: 80,
Protocol: v1.ProtocolTCP,
},
},
},
},
},
},
},
Status: appsv1.DeploymentStatus{
Replicas: 3,
ReadyReplicas: 3,
AvailableReplicas: 3,
Conditions: []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: v1.ConditionTrue,
},
},
},
}
wantDeployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "my-deployment",
UID: "my-deployment-uid",
Namespace: "default",
Labels: map[string]string{
"app": "my-app",
},
},
Spec: appsv1.DeploymentSpec{
Replicas: func() *int32 { replicas := int32(3); return &replicas }(),
},
Status: appsv1.DeploymentStatus{
AvailableReplicas: 3,
},
}
assert.Equal(t, wantDeployment, Transform(origDeployment))
}
10 changes: 2 additions & 8 deletions receiver/k8sclusterreceiver/internal/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
Expand Down Expand Up @@ -53,15 +52,10 @@ var podsSuccessfulMetric = &metricspb.MetricDescriptor{
}

// Transform transforms the job to remove the fields that we don't use to reduce RAM utilization.
// IMPORTANT: Make sure to update this function when using a new job fields.
// IMPORTANT: Make sure to update this function before using new job fields.
func Transform(job *batchv1.Job) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: job.ObjectMeta.Name,
Namespace: job.ObjectMeta.Namespace,
UID: job.ObjectMeta.UID,
Labels: job.ObjectMeta.Labels,
},
ObjectMeta: metadata.TransformObjectMeta(job.ObjectMeta),
Spec: batchv1.JobSpec{
Completions: job.Spec.Completions,
Parallelism: job.Spec.Parallelism,
Expand Down
18 changes: 18 additions & 0 deletions receiver/k8sclusterreceiver/internal/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ type KubernetesMetadata struct {
Metadata map[string]string
}

func TransformObjectMeta(om v1.ObjectMeta) v1.ObjectMeta {
newOM := v1.ObjectMeta{
Name: om.Name,
Namespace: om.Namespace,
UID: om.UID,
CreationTimestamp: om.CreationTimestamp,
Labels: om.Labels,
}
for _, or := range om.OwnerReferences {
newOM.OwnerReferences = append(newOM.OwnerReferences, v1.OwnerReference{
Kind: or.Kind,
Name: or.Name,
UID: or.UID,
})
}
return newOM
}

// GetGenericMetadata is responsible for collecting metadata from K8s resources that
// live on v1.ObjectMeta.
func GetGenericMetadata(om *v1.ObjectMeta, resourceType string) *KubernetesMetadata {
Expand Down
39 changes: 39 additions & 0 deletions receiver/k8sclusterreceiver/internal/metadata/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,42 @@ func TestGetMetadataUpdate(t *testing.T) {
})
}
}

func TestTransformObjectMeta(t *testing.T) {
in := v1.ObjectMeta{
Name: "my-pod",
UID: "12345678-1234-1234-1234-123456789011",
Namespace: "default",
Labels: map[string]string{
"app": "my-app",
},
Annotations: map[string]string{
"version": "1.0",
"description": "Sample resource",
},
OwnerReferences: []v1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: "my-replicaset-1",
UID: "12345678-1234-1234-1234-123456789012",
},
},
}
want := v1.ObjectMeta{
Name: "my-pod",
UID: "12345678-1234-1234-1234-123456789011",
Namespace: "default",
Labels: map[string]string{
"app": "my-app",
},
OwnerReferences: []v1.OwnerReference{
{
Kind: "ReplicaSet",
Name: "my-replicaset-1",
UID: "12345678-1234-1234-1234-123456789012",
},
},
}
assert.Equal(t, want, TransformObjectMeta(in))
}
Loading

0 comments on commit c698120

Please sign in to comment.