Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ksm] make time based metrics comparable. #11394

Merged
merged 3 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/collector/corechecks/cluster/ksm/kubernetes_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,11 @@ func (k *KSMCheck) Run() error {
}
}

currentTime := time.Now()
for _, stores := range k.allStores {
for _, store := range stores {
metrics := store.(*ksmstore.MetricsStore).Push(ksmstore.GetAllFamilies, ksmstore.GetAllMetrics)
k.processMetrics(sender, metrics, labelJoiner)
k.processMetrics(sender, metrics, labelJoiner, currentTime)
k.processTelemetry(metrics)
}
}
Expand All @@ -343,7 +344,7 @@ func (k *KSMCheck) Cancel() {
}

// processMetrics attaches tags and forwards metrics to the aggregator
func (k *KSMCheck) processMetrics(sender aggregator.Sender, metrics map[string][]ksmstore.DDMetricsFam, labelJoiner *labelJoiner) {
func (k *KSMCheck) processMetrics(sender aggregator.Sender, metrics map[string][]ksmstore.DDMetricsFam, labelJoiner *labelJoiner, now time.Time) {
for _, metricsList := range metrics {
for _, metricFamily := range metricsList {
// First check for aggregator, because the check use _labels metrics to aggregate values.
Expand All @@ -358,7 +359,7 @@ func (k *KSMCheck) processMetrics(sender aggregator.Sender, metrics map[string][
lMapperOverride := labelsMapperOverride(metricFamily.Name)
for _, m := range metricFamily.ListMetrics {
hostname, tags := k.hostnameAndTags(m.Labels, labelJoiner, lMapperOverride)
transform(sender, metricFamily.Name, m, hostname, tags)
transform(sender, metricFamily.Name, m, hostname, tags, now)
}
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package ksm
import (
"reflect"
"testing"
"time"

"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/aggregator/mocksender"
Expand Down Expand Up @@ -252,7 +253,7 @@ func TestProcessMetrics(t *testing.T) {
},
metricsToGet: []ksmstore.DDMetricsFam{},
metricTransformers: map[string]metricTransformerFunc{
"kube_pod_status_phase": func(s aggregator.Sender, n string, m ksmstore.DDMetric, h string, t []string) {
"kube_pod_status_phase": func(s aggregator.Sender, n string, m ksmstore.DDMetric, h string, t []string, c time.Time) {
s.Gauge("kube_pod_status_phase_transformed", 1, "", []string{"transformed:tag"})
},
},
Expand Down Expand Up @@ -457,7 +458,7 @@ func TestProcessMetrics(t *testing.T) {
for _, metricFam := range test.metricsToGet {
labelJoiner.insertFamily(metricFam)
}
kubeStateMetricsSCheck.processMetrics(mocked, test.metricsToProcess, labelJoiner)
kubeStateMetricsSCheck.processMetrics(mocked, test.metricsToProcess, labelJoiner, time.Now())
t.Run(test.name, func(t *testing.T) {
for _, expectMetric := range test.expected {
mocked.AssertMetric(t, "Gauge", expectMetric.name, expectMetric.val, expectMetric.hostname, expectMetric.tags)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

// metricTransformerFunc is used to tweak or generate new metrics from a given KSM metric
// For name translation only please use metricNamesMapper instead
type metricTransformerFunc = func(aggregator.Sender, string, ksmstore.DDMetric, string, []string)
type metricTransformerFunc = func(aggregator.Sender, string, ksmstore.DDMetric, string, []string, time.Time)

var (
// metricTransformers contains KSM metric names and their corresponding transformer functions
Expand Down Expand Up @@ -54,12 +54,9 @@ var (
}
)

// now allows testing
var now = time.Now

// nodeConditionTransformer generates service checks based on the metric kube_node_status_condition
// It also submit the metric kubernetes_state.node.by_condition
func nodeConditionTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func nodeConditionTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
if metric.Val != 1.0 {
// Only consider active metrics
return
Expand Down Expand Up @@ -139,7 +136,7 @@ func statusForCondition(status string, positiveEvent bool) metrics.ServiceCheckS

// nodeUnschedulableTransformer reports whether a node can schedule new pods
// It adds a tag 'status' that can be either 'schedulable' or 'unschedulable' and always report the metric value '1'
func nodeUnschedulableTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func nodeUnschedulableTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
status := ""
switch metric.Val {
case 0.0:
Expand All @@ -156,27 +153,27 @@ func nodeUnschedulableTransformer(s aggregator.Sender, name string, metric ksmst

// submitAge generates a resource age or uptime metric based on a given timestamp
// The metric value must correspond to a timestamp
func submitAge(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
s.Gauge(name, float64(now().Unix())-metric.Val, hostname, tags)
func submitAge(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, currentTime time.Time) {
s.Gauge(name, float64(currentTime.Unix())-metric.Val, hostname, tags)
}

// nodeCreationTransformer generates the node age metric based on the creation timestamp
func nodeCreationTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
submitAge(s, ksmMetricPrefix+"node.age", metric, hostname, tags)
func nodeCreationTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, currentTime time.Time) {
submitAge(s, ksmMetricPrefix+"node.age", metric, hostname, tags, currentTime)
}

// podCreationTransformer generates the pod age metric based on the creation timestamp
func podCreationTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
submitAge(s, ksmMetricPrefix+"pod.age", metric, hostname, tags)
func podCreationTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, currentTime time.Time) {
submitAge(s, ksmMetricPrefix+"pod.age", metric, hostname, tags, currentTime)
}

// podStartTimeTransformer generates the pod uptime metric based on the start time timestamp
func podStartTimeTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
submitAge(s, ksmMetricPrefix+"pod.uptime", metric, hostname, tags)
func podStartTimeTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, currentTime time.Time) {
submitAge(s, ksmMetricPrefix+"pod.uptime", metric, hostname, tags, currentTime)
}

// podPhaseTransformer sends status phase metrics for pods, the tag phase has the pod status
func podPhaseTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func podPhaseTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
submitActiveMetric(s, ksmMetricPrefix+"pod.status_phase", metric, hostname, tags)
}

Expand All @@ -188,7 +185,7 @@ var allowedWaitingReasons = map[string]struct{}{
}

// containerWaitingReasonTransformer validates the container waiting reasons for metric kube_pod_container_status_waiting_reason
func containerWaitingReasonTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func containerWaitingReasonTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
if reason, found := metric.Labels["reason"]; found {
// Filtering according to the reason here is paramount to limit cardinality
if _, allowed := allowedWaitingReasons[strings.ToLower(reason)]; allowed {
Expand All @@ -204,7 +201,7 @@ var allowedTerminatedReasons = map[string]struct{}{
}

// containerTerminatedReasonTransformer validates the container waiting reasons for metric kube_pod_container_status_terminated_reason
func containerTerminatedReasonTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func containerTerminatedReasonTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
if reason, found := metric.Labels["reason"]; found {
// Filtering according to the reason here is paramount to limit cardinality
if _, allowed := allowedTerminatedReasons[strings.ToLower(reason)]; allowed {
Expand All @@ -214,12 +211,12 @@ func containerTerminatedReasonTransformer(s aggregator.Sender, name string, metr
}

// containerResourceRequestsTransformer transforms the generic ksm resource request metrics into resource-specific metrics
func containerResourceRequestsTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func containerResourceRequestsTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
submitContainerResourceMetric(s, name, metric, hostname, tags, "requested")
}

// containerResourceLimitsTransformer transforms the generic ksm resource limit metrics into resource-specific metrics
func containerResourceLimitsTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func containerResourceLimitsTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
submitContainerResourceMetric(s, name, metric, hostname, tags, "limit")
}

Expand All @@ -245,12 +242,12 @@ func submitContainerResourceMetric(s aggregator.Sender, name string, metric ksms
}

// nodeAllocatableTransformer transforms the generic ksm node allocatable metrics into resource-specific metrics
func nodeAllocatableTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func nodeAllocatableTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
submitNodeResourceMetric(s, name, metric, hostname, tags, "allocatable")
}

// nodeCapacityTransformer transforms the generic ksm node capacity metrics into resource-specific metrics
func nodeCapacityTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func nodeCapacityTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
submitNodeResourceMetric(s, name, metric, hostname, tags, "capacity")
}

Expand Down Expand Up @@ -282,10 +279,10 @@ func submitNodeResourceMetric(s aggregator.Sender, name string, metric ksmstore.
}

// cronJobNextScheduleTransformer sends a service check to alert if the cronjob's next schedule is in the past
func cronJobNextScheduleTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func cronJobNextScheduleTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, currentTime time.Time) {
message := ""
var status metrics.ServiceCheckStatus
timeDiff := int64(metric.Val) - now().Unix()
timeDiff := int64(metric.Val) - currentTime.Unix()
if timeDiff >= 0 {
status = metrics.ServiceCheckOK
} else {
Expand All @@ -296,12 +293,12 @@ func cronJobNextScheduleTransformer(s aggregator.Sender, name string, metric ksm
}

// cronJobLastScheduleTransformer sends the duration since the last time the cronjob was scheduled
func cronJobLastScheduleTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
s.Gauge(ksmMetricPrefix+"cronjob.duration_since_last_schedule", float64(now().Unix())-metric.Val, hostname, tags)
func cronJobLastScheduleTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, currentTime time.Time) {
s.Gauge(ksmMetricPrefix+"cronjob.duration_since_last_schedule", float64(currentTime.Unix())-metric.Val, hostname, tags)
}

// jobCompleteTransformer sends a metric and a service check based on kube_job_complete
func jobCompleteTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func jobCompleteTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
for i, tag := range tags {
if tag == "condition:true" {
jobMetric(s, metric, ksmMetricPrefix+"job.completion.succeeded", hostname, append(tags[:i], tags[i+1:]...))
Expand All @@ -312,7 +309,7 @@ func jobCompleteTransformer(s aggregator.Sender, name string, metric ksmstore.DD
}

// jobFailedTransformer sends a metric and a service check based on kube_job_failed
func jobFailedTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func jobFailedTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
for i, tag := range tags {
if tag == "condition:true" {
jobMetric(s, metric, ksmMetricPrefix+"job.completion.failed", hostname, append(tags[:i], tags[i+1:]...))
Expand Down Expand Up @@ -363,7 +360,7 @@ func jobServiceCheck(s aggregator.Sender, metric ksmstore.DDMetric, status metri
}

// jobStatusSucceededTransformer sends a metric based on kube_job_status_succeeded
func jobStatusSucceededTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func jobStatusSucceededTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
jobMetric(s, metric, ksmMetricPrefix+"job.succeeded", hostname, tags)
}

Expand All @@ -377,7 +374,7 @@ func jobStatusSucceededTransformer(s aggregator.Sender, name string, metric ksms
//
// In order to reduce the cardinality, we are here removing the `reason` tag.
// The resulting datadog metric is 0 if there are no failed pods and 1 otherwise.
func jobStatusFailedTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func jobStatusFailedTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
// Remove the `reason` tag to reduce the cardinality
reasonTagIndex := -1
for idx, tag := range tags {
Expand Down Expand Up @@ -405,7 +402,7 @@ func jobMetric(s aggregator.Sender, metric ksmstore.DDMetric, metricName string,
}

// resourcequotaTransformer generates dedicated metrics per resource per type from the kube_resourcequota metric
func resourcequotaTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func resourcequotaTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
resource, found := metric.Labels["resource"]
if !found {
log.Debugf("Couldn't find 'resource' label, ignoring metric '%s'", name)
Expand Down Expand Up @@ -433,7 +430,7 @@ var constraintsMapper = map[string]string{
}

// limitrangeTransformer generates dedicated metrics per resource per type from the kube_limitrange metric
func limitrangeTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func limitrangeTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
constraintLabel, found := metric.Labels["constraint"]
if !found {
log.Debugf("Couldn't find 'constraint' label, ignoring metric '%s'", name)
Expand Down Expand Up @@ -463,11 +460,11 @@ func submitActiveMetric(s aggregator.Sender, metricName string, metric ksmstore.
}

// pvPhaseTransformer generates metrics per persistentvolume and per phase from the kube_persistentvolume_status_phase metric
func pvPhaseTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func pvPhaseTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
submitActiveMetric(s, ksmMetricPrefix+"persistentvolume.by_phase", metric, hostname, tags)
}

// serviceTypeTransformer generates metrics per service, namespace, and type from the kube_service_spec_type metric
func serviceTypeTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string) {
func serviceTypeTransformer(s aggregator.Sender, name string, metric ksmstore.DDMetric, hostname string, tags []string, _ time.Time) {
submitActiveMetric(s, ksmMetricPrefix+"service.type", metric, hostname, tags)
}
Loading