Skip to content

Commit

Permalink
[connector/datadogconnector] Feature Gate to resolve Memory issues (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#30085)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Adds feature gate `connector.datadogconnector.performance` that can be
used optionally to reduce memory footprint of datadog connector.

**Link to tracking Issue:**
open-telemetry#29755

**Testing:** <Describe what testing was performed and which tests were
added.>
- Tested internally using client data. 

**Documentation:** <Describe the documentation added.>
  • Loading branch information
dineshg13 authored Jan 4, 2024
1 parent 6ab41fa commit 05f9ff4
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 63 deletions.
18 changes: 18 additions & 0 deletions .chloggen/datadog-connector-memory-issue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "bug_fix"
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: "datadogconnector"
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add feature flag to address memory issue with Datadog Connector
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29755]
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
7 changes: 6 additions & 1 deletion connector/datadogconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,9 @@ service:
```
</tr></table>
Here we have two traces pipelines that ingest the same data but one is being sampled. The one that is sampled has its data sent to the datadog backend for you to see the sampled subset of the total traces sent across. The other non-sampled pipeline of traces sends its data to the metrics pipeline to be used in the APM stats. This unsampled pipeline gives the full picture of how much data the application emits in traces.
Here we have two traces pipelines that ingest the same data but one is being sampled. The one that is sampled has its data sent to the datadog backend for you to see the sampled subset of the total traces sent across. The other non-sampled pipeline of traces sends its data to the metrics pipeline to be used in the APM stats. This unsampled pipeline gives the full picture of how much data the application emits in traces.
## Feature Gate for Performance
In case you are experiencing high memory usage with Datadog Connector, similar to [issue](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29755), use the feature gate `connector.datadogconnector.performance`. With the feature gate enabled, Datadog Connector takes OTLP traces and produces OTLP metric with the name `dd.internal.stats.payload`. This Metric has an attribute `dd.internal.stats.payload` that contains the bytes for StatsPayload. With the feature gate, we can use Datadog Connector only in conjunction with Datadog Exporter. Please enable the feature only if needed for performance reasons and higher throughput. Enable the feature gate on all collectors (especially in gateway deployment) in the pipeline that sends data to Datadog. We plan to refactor this component in the future so that the signals produced are usable in any metrics pipeline.

14 changes: 13 additions & 1 deletion connector/datadogconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"
Expand Down Expand Up @@ -113,8 +114,19 @@ func (c *connectorImp) run() {
if len(stats.Stats) == 0 {
continue
}
var mx pmetric.Metrics
var err error
if datadog.ConnectorPerformanceFeatureGate.IsEnabled() {
c.logger.Debug("Received stats payload", zap.Any("stats", stats))
mx, err = c.translator.StatsToMetrics(stats)
if err != nil {
c.logger.Error("Failed to convert stats to metrics", zap.Error(err))
continue
}
} else {
mx = c.translator.StatsPayloadToMetrics(stats)
}
// APM stats as metrics
mx := c.translator.StatsPayloadToMetrics(stats)
ctx := context.TODO()

// send metrics to the consumer or next component in pipeline
Expand Down
43 changes: 40 additions & 3 deletions exporter/datadogexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ package datadogexporter // import "github.com/open-telemetry/opentelemetry-colle
import (
"context"
"fmt"
"runtime"
"sync"
"time"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/datadog-agent/pkg/trace/agent"
"github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
Expand All @@ -25,9 +27,11 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/hostmetadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
)

Expand Down Expand Up @@ -232,14 +236,39 @@ func checkAndCastConfig(c component.Config, logger *zap.Logger) *Config {
return cfg
}

func (f *factory) consumeStatsPayload(ctx context.Context, out chan []byte, traceagent *agent.Agent, tracerVersion string, logger *zap.Logger) {
for i := 0; i < runtime.NumCPU(); i++ {
f.wg.Add(1)
go func() {
defer f.wg.Done()
for {
select {
case <-ctx.Done():
return
case msg := <-out:
sp := &pb.StatsPayload{}

err := proto.Unmarshal(msg, sp)
if err != nil {
logger.Error("failed to unmarshal stats payload", zap.Error(err))
continue
}
for _, sc := range sp.Stats {
traceagent.ProcessStats(sc, "", tracerVersion)
}
}
}
}()
}
}

// createMetricsExporter creates a metrics exporter based on this config.
func (f *factory) createMetricsExporter(
ctx context.Context,
set exporter.CreateSettings,
c component.Config,
) (exporter.Metrics, error) {
cfg := checkAndCastConfig(c, set.TelemetrySettings.Logger)

hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname)
if err != nil {
return nil, fmt.Errorf("failed to build hostname provider: %w", err)
Expand All @@ -253,7 +282,12 @@ func (f *factory) createMetricsExporter(
cancel()
return nil, fmt.Errorf("failed to start trace-agent: %w", err)
}

var statsOut chan []byte
if datadog.ConnectorPerformanceFeatureGate.IsEnabled() {
statsOut = make(chan []byte, 1000)
statsv := set.BuildInfo.Command + set.BuildInfo.Version
f.consumeStatsPayload(ctx, statsOut, traceagent, statsv, set.Logger)
}
pcfg := newMetadataConfigfromConfig(cfg)
metadataReporter, err := f.Reporter(set, pcfg)
if err != nil {
Expand Down Expand Up @@ -286,7 +320,7 @@ func (f *factory) createMetricsExporter(
return nil
}
} else {
exp, metricsErr := newMetricsExporter(ctx, set, cfg, &f.onceMetadata, attrsTranslator, hostProvider, traceagent, metadataReporter)
exp, metricsErr := newMetricsExporter(ctx, set, cfg, &f.onceMetadata, attrsTranslator, hostProvider, traceagent, metadataReporter, statsOut)
if metricsErr != nil {
cancel() // first cancel context
f.wg.Wait() // then wait for shutdown
Expand All @@ -310,6 +344,9 @@ func (f *factory) createMetricsExporter(
exporterhelper.WithShutdown(func(context.Context) error {
cancel()
f.StopReporter()
if statsOut != nil {
close(statsOut)
}
return nil
}),
)
Expand Down
2 changes: 1 addition & 1 deletion exporter/datadogexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector v0.91.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.91.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.91.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog v0.91.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.91.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.91.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.91.0
Expand Down Expand Up @@ -185,7 +186,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.91.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog v0.91.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.91.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.91.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.91.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions exporter/datadogexporter/integrationtest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/DataDog/datadog-agent/pkg/proto v0.50.1
github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector v0.91.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter v0.91.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog v0.91.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.91.0
github.com/stretchr/testify v1.8.4
github.com/tinylib/msgp v1.1.9
Expand All @@ -14,6 +15,7 @@ require (
go.opentelemetry.io/collector/connector v0.91.0
go.opentelemetry.io/collector/exporter v0.91.0
go.opentelemetry.io/collector/exporter/debugexporter v0.91.0
go.opentelemetry.io/collector/featuregate v1.0.0
go.opentelemetry.io/collector/otelcol v0.91.0
go.opentelemetry.io/collector/processor v0.91.0
go.opentelemetry.io/collector/processor/batchprocessor v0.91.0
Expand Down Expand Up @@ -113,7 +115,6 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.91.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.91.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.91.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog v0.91.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.91.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.91.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.91.0 // indirect
Expand Down Expand Up @@ -157,7 +158,6 @@ require (
go.opentelemetry.io/collector/consumer v0.91.0 // indirect
go.opentelemetry.io/collector/extension v0.91.0 // indirect
go.opentelemetry.io/collector/extension/auth v0.91.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0 // indirect
go.opentelemetry.io/collector/pdata v1.0.0 // indirect
go.opentelemetry.io/collector/semconv v0.91.0 // indirect
go.opentelemetry.io/collector/service v0.91.0 // indirect
Expand Down
133 changes: 81 additions & 52 deletions exporter/datadogexporter/integrationtest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/debugexporter"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/otelcol/otelcoltest"
"go.opentelemetry.io/collector/processor"
Expand All @@ -39,72 +40,97 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"
)

func TestIntegration(t *testing.T) {
// 1. Set up mock Datadog server
// See also https://github.com/DataDog/datadog-agent/blob/49c16e0d4deab396626238fa1d572b684475a53f/cmd/trace-agent/test/backend.go
apmstatsRec := &testutil.HTTPRequestRecorderWithChan{Pattern: testutil.APMStatsEndpoint, ReqChan: make(chan []byte)}
tracesRec := &testutil.HTTPRequestRecorderWithChan{Pattern: testutil.TraceEndpoint, ReqChan: make(chan []byte)}
server := testutil.DatadogServerMock(apmstatsRec.HandlerFunc, tracesRec.HandlerFunc)
defer server.Close()
tests := []struct {
name string
featureGateEnabled bool
}{
{
name: "with feature gate enabled",
featureGateEnabled: true,
},
{
name: "with feature gate disabled",
featureGateEnabled: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 1. Set up mock Datadog server
// See also https://github.com/DataDog/datadog-agent/blob/49c16e0d4deab396626238fa1d572b684475a53f/cmd/trace-agent/test/backend.go
apmstatsRec := &testutil.HTTPRequestRecorderWithChan{Pattern: testutil.APMStatsEndpoint, ReqChan: make(chan []byte)}
tracesRec := &testutil.HTTPRequestRecorderWithChan{Pattern: testutil.TraceEndpoint, ReqChan: make(chan []byte)}
server := testutil.DatadogServerMock(apmstatsRec.HandlerFunc, tracesRec.HandlerFunc)
defer server.Close()

// 2. Start in-process collector
factories := getIntegrationTestComponents(t)
app, confFilePath := getIntegrationTestCollector(t, server.URL, factories)
go func() {
assert.NoError(t, app.Run(context.Background()))
}()
defer app.Shutdown()
defer os.Remove(confFilePath)
waitForReadiness(app)
// 2. Start in-process collector
factories := getIntegrationTestComponents(t)
app, confFilePath := getIntegrationTestCollector(t, server.URL, factories)
if tt.featureGateEnabled {
err := featuregate.GlobalRegistry().Set(datadog.ConnectorPerformanceFeatureGate.ID(), true)
assert.NoError(t, err)
defer func() {
_ = featuregate.GlobalRegistry().Set(datadog.ConnectorPerformanceFeatureGate.ID(), false)
}()
}
go func() {
assert.NoError(t, app.Run(context.Background()))
}()
defer app.Shutdown()
defer os.Remove(confFilePath)
waitForReadiness(app)

// 3. Generate and send traces
sendTraces(t)
// 3. Generate and send traces
sendTraces(t)

// 4. Validate traces and APM stats from the mock server
var spans []*pb.Span
var stats []*pb.ClientGroupedStats
// 4. Validate traces and APM stats from the mock server
var spans []*pb.Span
var stats []*pb.ClientGroupedStats

// 5 sampled spans + APM stats on 10 spans are sent to datadog exporter
for len(spans) < 5 || len(stats) < 10 {
select {
case tracesBytes := <-tracesRec.ReqChan:
gz := getGzipReader(t, tracesBytes)
slurp, err := io.ReadAll(gz)
require.NoError(t, err)
var traces pb.AgentPayload
require.NoError(t, proto.Unmarshal(slurp, &traces))
for _, tps := range traces.TracerPayloads {
for _, chunks := range tps.Chunks {
spans = append(spans, chunks.Spans...)
for _, span := range chunks.Spans {
assert.Equal(t, span.Meta["_dd.stats_computed"], "true")
// 5 sampled spans + APM stats on 10 spans are sent to datadog exporter
for len(spans) < 5 || len(stats) < 10 {
select {
case tracesBytes := <-tracesRec.ReqChan:
gz := getGzipReader(t, tracesBytes)
slurp, err := io.ReadAll(gz)
require.NoError(t, err)
var traces pb.AgentPayload
require.NoError(t, proto.Unmarshal(slurp, &traces))
for _, tps := range traces.TracerPayloads {
for _, chunks := range tps.Chunks {
spans = append(spans, chunks.Spans...)
for _, span := range chunks.Spans {
assert.Equal(t, span.Meta["_dd.stats_computed"], "true")
}
}
}
}
}

case apmstatsBytes := <-apmstatsRec.ReqChan:
gz := getGzipReader(t, apmstatsBytes)
var spl pb.StatsPayload
require.NoError(t, msgp.Decode(gz, &spl))
for _, csps := range spl.Stats {
for _, csbs := range csps.Stats {
stats = append(stats, csbs.Stats...)
for _, stat := range csbs.Stats {
assert.True(t, strings.HasPrefix(stat.Resource, "TestSpan"))
assert.Equal(t, stat.Hits, uint64(1))
assert.Equal(t, stat.TopLevelHits, uint64(1))
case apmstatsBytes := <-apmstatsRec.ReqChan:
gz := getGzipReader(t, apmstatsBytes)
var spl pb.StatsPayload
require.NoError(t, msgp.Decode(gz, &spl))
for _, csps := range spl.Stats {
for _, csbs := range csps.Stats {
stats = append(stats, csbs.Stats...)
for _, stat := range csbs.Stats {
assert.True(t, strings.HasPrefix(stat.Resource, "TestSpan"))
assert.Equal(t, stat.Hits, uint64(1))
assert.Equal(t, stat.TopLevelHits, uint64(1))
}
}
}
}
}
}
}

// Verify we don't receive more than the expected numbers
assert.Len(t, spans, 5)
assert.Len(t, stats, 10)
// Verify we don't receive more than the expected numbers
assert.Len(t, spans, 5)
assert.Len(t, stats, 10)
})
}
}

func getIntegrationTestComponents(t *testing.T) otelcol.Factories {
Expand Down Expand Up @@ -185,6 +211,9 @@ exporters:
endpoint: %q
service:
telemetry:
metrics:
level: none
pipelines:
traces:
receivers: [otlp]
Expand Down
Loading

0 comments on commit 05f9ff4

Please sign in to comment.