Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] [receiver/datadog] Add support for v2 series #34180

Merged
merged 5 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions receiver/datadogreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datado
go 1.21.0

require (
github.com/DataDog/agent-payload/v5 v5.0.124
github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.9
github.com/DataDog/datadog-api-client-go/v2 v2.28.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.106.1
Expand Down
2 changes: 2 additions & 0 deletions receiver/datadogreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions receiver/datadogreceiver/internal/translator/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ var metricTypeMap = map[string]pmetric.MetricType{
}

func parseSeriesProperties(name string, metricType string, tags []string, host string, version string, stringPool *StringPool) dimensions {
resourceAttrs, scopeAttrs, dpAttrs := tagsToAttributes(tags, host, stringPool)
attrs := tagsToAttributes(tags, host, stringPool)
return dimensions{
name: name,
metricType: metricTypeMap[metricType],
buildInfo: version,
resourceAttrs: resourceAttrs,
scopeAttrs: scopeAttrs,
dpAttrs: dpAttrs,
resourceAttrs: attrs.resource,
scopeAttrs: attrs.scope,
dpAttrs: attrs.dp,
}
}

Expand Down
74 changes: 32 additions & 42 deletions receiver/datadogreceiver/internal/translator/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,25 @@ func TestMetricBatcher(t *testing.T) {
},
expect: func(t *testing.T, result pmetric.Metrics) {
// Different hosts should result in different ResourceMetrics
requireMetricAndDataPointCounts(t, result, 2, 2)
require.Equal(t, 2, result.ResourceMetrics().Len())
resource1 := result.ResourceMetrics().At(0)
resource2 := result.ResourceMetrics().At(1)
v, exists := resource1.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host1", v.AsString())
v, exists = resource2.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host2", v.AsString())

res1ExpectedAttrs := tagsToAttributes([]string{"env:tag1", "service:test1", "version:tag1"}, "Host1", newStringPool())
requireResourceAttributes(t, resource1.Resource().Attributes(), res1ExpectedAttrs.resource)

res2ExpectedAttrs := tagsToAttributes([]string{"env:tag1", "service:test1", "version:tag1"}, "Host2", newStringPool())
requireResourceAttributes(t, resource2.Resource().Attributes(), res2ExpectedAttrs.resource)

require.Equal(t, 1, resource1.ScopeMetrics().Len())
require.Equal(t, 1, resource2.ScopeMetrics().Len())

require.Equal(t, 1, resource1.ScopeMetrics().At(0).Metrics().Len())
require.Equal(t, 1, resource2.ScopeMetrics().At(0).Metrics().Len())

require.Equal(t, "TestCount1", resource1.ScopeMetrics().At(0).Metrics().At(0).Name())
require.Equal(t, "TestCount1", resource2.ScopeMetrics().At(0).Metrics().At(0).Name())
requireSum(t, resource1.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
requireSum(t, resource2.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
},
},
{
Expand Down Expand Up @@ -98,18 +99,19 @@ func TestMetricBatcher(t *testing.T) {
expect: func(t *testing.T, result pmetric.Metrics) {
// The different metrics will fall under the same ResourceMetric and ScopeMetric
// and there will be separate metrics under the ScopeMetric.Metrics()
requireMetricAndDataPointCounts(t, result, 2, 2)
require.Equal(t, 1, result.ResourceMetrics().Len())
resource := result.ResourceMetrics().At(0)

v, exists := resource.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host1", v.AsString())
expectedAttrs := tagsToAttributes([]string{"env:tag1", "service:test1", "version:tag1"}, "Host1", newStringPool())
requireResourceAttributes(t, resource.Resource().Attributes(), expectedAttrs.resource)

require.Equal(t, 1, resource.ScopeMetrics().Len())

require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().Len())
require.Equal(t, "TestCount1", resource.ScopeMetrics().At(0).Metrics().At(0).Name())
require.Equal(t, "TestCount2", resource.ScopeMetrics().At(0).Metrics().At(1).Name())
requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(1), "TestCount2", 1)
},
},
{
Expand Down Expand Up @@ -142,21 +144,16 @@ func TestMetricBatcher(t *testing.T) {
},
expect: func(t *testing.T, result pmetric.Metrics) {
// Differences in attribute values should result in different resourceMetrics
requireMetricAndDataPointCounts(t, result, 2, 2)
require.Equal(t, 2, result.ResourceMetrics().Len())
resource1 := result.ResourceMetrics().At(0)
resource2 := result.ResourceMetrics().At(1)
v, exists := resource1.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host1", v.AsString())
v, exists = resource2.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host1", v.AsString())
v, exists = resource1.Resource().Attributes().Get("deployment.environment")
require.True(t, exists)
require.Equal(t, "dev", v.AsString())
v, exists = resource2.Resource().Attributes().Get("deployment.environment")
require.True(t, exists)
require.Equal(t, "prod", v.AsString())

res1ExpectedAttrs := tagsToAttributes([]string{"env:dev", "version:tag1"}, "Host1", newStringPool())
requireResourceAttributes(t, resource1.Resource().Attributes(), res1ExpectedAttrs.resource)

res2ExpectedAttrs := tagsToAttributes([]string{"env:prod", "version:tag1"}, "Host1", newStringPool())
requireResourceAttributes(t, resource2.Resource().Attributes(), res2ExpectedAttrs.resource)

require.Equal(t, 1, resource1.ScopeMetrics().Len())
require.Equal(t, 1, resource1.ScopeMetrics().Len())
Expand All @@ -167,8 +164,8 @@ func TestMetricBatcher(t *testing.T) {
require.Equal(t, 1, resource1.ScopeMetrics().At(0).Metrics().Len())
require.Equal(t, 1, resource2.ScopeMetrics().At(0).Metrics().Len())

require.Equal(t, "TestCount1", resource1.ScopeMetrics().At(0).Metrics().At(0).Name())
require.Equal(t, "TestCount1", resource2.ScopeMetrics().At(0).Metrics().At(0).Name())
requireSum(t, resource1.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
requireSum(t, resource2.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
},
},
{
Expand Down Expand Up @@ -203,22 +200,20 @@ func TestMetricBatcher(t *testing.T) {
// The different metrics will fall under the same ResourceMetric and ScopeMetric
// and there will be separate metrics under the ScopeMetric.Metrics() due to the different
// data types
requireMetricAndDataPointCounts(t, result, 2, 2)
require.Equal(t, 1, result.ResourceMetrics().Len())
resource := result.ResourceMetrics().At(0)

v, exists := resource.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host1", v.AsString())
expectedAttrs := tagsToAttributes([]string{"env:dev", "version:tag1"}, "Host1", newStringPool())
requireResourceAttributes(t, resource.Resource().Attributes(), expectedAttrs.resource)

require.Equal(t, 1, resource.ScopeMetrics().Len())

require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().Len())

require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(0).Name())
require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(1).Name())

require.Equal(t, pmetric.MetricTypeSum, resource.ScopeMetrics().At(0).Metrics().At(0).Type())
require.Equal(t, pmetric.MetricTypeGauge, resource.ScopeMetrics().At(0).Metrics().At(1).Type())
requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(0), "TestMetric", 1)
requireGauge(t, resource.ScopeMetrics().At(0).Metrics().At(1), "TestMetric", 1)
},
},
{
Expand Down Expand Up @@ -253,21 +248,16 @@ func TestMetricBatcher(t *testing.T) {
// Same host, tags, and metric name but two different datapoints
// should result in a single resourceMetric, scopeMetric, and metric
// but two different datapoints under that metric
requireMetricAndDataPointCounts(t, result, 1, 2)
require.Equal(t, 1, result.ResourceMetrics().Len())
resource := result.ResourceMetrics().At(0)

v, exists := resource.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host1", v.AsString())
expectedAttrs := tagsToAttributes([]string{"env:dev", "version:tag1"}, "Host1", newStringPool())
requireResourceAttributes(t, resource.Resource().Attributes(), expectedAttrs.resource)

require.Equal(t, 1, resource.ScopeMetrics().Len())

require.Equal(t, 1, resource.ScopeMetrics().At(0).Metrics().Len())

require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(0).Name())

require.Equal(t, pmetric.MetricTypeSum, resource.ScopeMetrics().At(0).Metrics().At(0).Type())
require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len())
requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(0), "TestMetric", 2)
},
},
}
Expand Down
85 changes: 85 additions & 0 deletions receiver/datadogreceiver/internal/translator/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"io"
"net/http"
"strings"
"time"

"github.com/DataDog/agent-payload/v5/gogen"
datadogV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -23,6 +27,22 @@ type SeriesList struct {
Series []datadogV1.Series `json:"series"`
}

// TODO: add handling for JSON format in additional to protobuf?
func (mt *MetricsTranslator) HandleSeriesV2Payload(req *http.Request) (mp []*gogen.MetricPayload_MetricSeries, err error) {
buf := GetBuffer()
defer PutBuffer(buf)
if _, err := io.Copy(buf, req.Body); err != nil {
return mp, err
}

pl := new(gogen.MetricPayload)
if err := pl.Unmarshal(buf.Bytes()); err != nil {
return mp, err
}

return pl.GetSeries(), nil
}

func (mt *MetricsTranslator) TranslateSeriesV1(series SeriesList) pmetric.Metrics {
bt := newBatcher()

Expand Down Expand Up @@ -87,3 +107,68 @@ func (mt *MetricsTranslator) TranslateSeriesV1(series SeriesList) pmetric.Metric
}
return bt.Metrics
}

func (mt *MetricsTranslator) TranslateSeriesV2(series []*gogen.MetricPayload_MetricSeries) pmetric.Metrics {
bt := newBatcher()

for _, serie := range series {
var dps pmetric.NumberDataPointSlice

// The V2 payload stores the host name under in the Resources field
resourceMap := getV2Resources(serie.Resources)
// TODO(jesus.vazquez) (Do this with string interning)
dimensions := parseSeriesProperties(serie.Metric, strings.ToLower(serie.Type.String()), serie.Tags, resourceMap["host"], mt.buildInfo.Version, mt.stringPool)
for k, v := range resourceMap {
if k == "host" {
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
continue // Host has already been added as a resource attribute in parseSeriesProperties(), so avoid duplicating that attribute
}
dimensions.resourceAttrs.PutStr(k, v)
}
dimensions.resourceAttrs.PutStr("source", serie.SourceTypeName) //TODO: check if this is correct handling of SourceTypeName field
metric, metricID := bt.Lookup(dimensions)

switch serie.Type {
case gogen.MetricPayload_COUNT:
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
metric.Sum().SetIsMonotonic(false) // See https://docs.datadoghq.com/metrics/types/?tab=count#definition
dps = metric.Sum().DataPoints()
case gogen.MetricPayload_GAUGE:
dps = metric.Gauge().DataPoints()
case gogen.MetricPayload_RATE:
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) //TODO: verify that this is always the case
dps = metric.Sum().DataPoints()
case gogen.MetricPayload_UNSPECIFIED:
// Type is unset/unspecified
continue
}

dps.EnsureCapacity(len(serie.Points))

for _, point := range serie.Points {
dp := dps.AppendEmpty()
dp.SetTimestamp(pcommon.Timestamp(point.Timestamp * time.Second.Nanoseconds())) // OTel uses nanoseconds, while Datadog uses seconds
dimensions.dpAttrs.CopyTo(dp.Attributes()) // TODO(jesus.vazquez) Review this copy
val := point.Value
if serie.Type == gogen.MetricPayload_RATE && serie.Interval != 0 {
val *= float64(serie.Interval)
}
dp.SetDoubleValue(val)

stream := identity.OfStream(metricID, dp)
ts, ok := mt.streamHasTimestamp(stream)
if ok {
dp.SetStartTimestamp(ts)
}
mt.updateLastTsForStream(stream, dp.Timestamp())
}
}
return bt.Metrics
}

func getV2Resources(resources []*gogen.MetricPayload_Resource) map[string]string {
resourceMap := make(map[string]string)
for i := range resources {
resourceMap[resources[i].Type] = resources[i].Name
}
return resourceMap
}
Loading