Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datadog Connector Component #25065

Merged
merged 13 commits into from
Aug 14, 2023
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ connector/exceptionsconnector/ @open-telemetry/collect
connector/routingconnector/ @open-telemetry/collector-contrib-approvers @jpkrohling @mwear
connector/servicegraphconnector/ @open-telemetry/collector-contrib-approvers @jpkrohling @mapno
connector/spanmetricsconnector/ @open-telemetry/collector-contrib-approvers @albertteoh
connector/datadogconnector/ @open-telemetry/collector-contrib-approvers @mx-psi @gbbr @dineshg13


examples/demo/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers

Expand Down
1 change: 1 addition & 0 deletions connector/datadogconnector/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
85 changes: 85 additions & 0 deletions connector/datadogconnector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Datadog Connector

## Description

The Datadog Connector is a connector component that computes Datadog APM Stats pre-sampling in the event that your traces pipeline is sampled using components such as the tailsamplingprocessor or probabilisticsamplerprocessor.

The connector is most applicable when using the sampling components such as the [tailsamplingprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/tailsamplingprocessor#tail-sampling-processor), or the [probabilisticsamplerprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/probabilisticsamplerprocessor) in one of your pipelines. The sampled pipeline should be duplicated and the `datadog` connector should be added to the the pipeline that is not being sampled to ensure that Datadog APM Stats are accurate in the backend.

## Usage

To use the Datadog Connector, add the connector to one set of the duplicated pipelines while sampling the other. The Datadog Connector will compute APM Stats on all spans that it sees. Here is an example on how to add it to a pipeline using the [probabilisticsampler]:

<table>
<tr>
<td> Before </td> <td> After </td>
</tr>
<tr>
<td valign="top">

```yaml
# ...
processors:
# ...
probabilistic_sampler:
sampling_percentage: 20
# add the "datadog" processor definition
datadog:

exporters:
datadog:
api:
key: ${env:DD_API_KEY}

service:
pipelines:
traces:
receivers: [otlp]
# prepend it to the sampler in your pipeline:
processors: [batch, datadog, probabilistic_sampler]
exporters: [datadog]

metrics:
receivers: [otlp]
processors: [batch]
exporters: [datadog]
```

</td><td valign="top">

```yaml
# ...
processors:
probabilistic_sampler:
sampling_percentage: 20

connectors:
# add the "datadog" connector definition and further configurations
datadog/connector:

exporters:
datadog:
api:
key: ${env:DD_API_KEY}

service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [datadog/connector]

traces/2: # this pipeline uses sampling
receivers: [otlp]
processors: [batch, probabilistic_sampler]
exporters: [datadog]

metrics:
receivers: [datadog/connector]
processors: [batch]
exporters: [datadog]
```
</tr></table>

Here we have two traces pipelines that ingest the same data but one is being sampled. The one that is sampled has its data sent to the datadog backend for you to see the sampled subset of the total traces sent across. The other non-sampled pipeline of traces sends its data to the metrics pipeline to be used in the APM stats. This unsampled pipeline gives the full picture of how much data the application emits in traces.

115 changes: 115 additions & 0 deletions connector/datadogconnector/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector"

import (
"context"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog"
)

// connectorImp is the schema for connector
type connectorImp struct {
metricsConsumer consumer.Metrics // the next component in the pipeline to ingest data after connector
logger *zap.Logger
started bool
gord02 marked this conversation as resolved.
Show resolved Hide resolved

// agent specifies the agent used to ingest traces and output APM Stats.
// It is implemented by the traceagent structure; replaced in tests.
agent datadog.Ingester

// translator specifies the translator used to transform APM Stats Payloads
// from the agent to OTLP Metrics.
translator *metrics.Translator

// in specifies the channel through which the agent will output Stats Payloads
// resulting from ingested traces.
in chan *pb.StatsPayload

// exit specifies the exit channel, which will be closed upon shutdown.
exit chan struct{}
}

// function to create a new connector
func newConnector(logger *zap.Logger, _ component.Config, nextConsumer consumer.Metrics) (*connectorImp, error) {
logger.Info("Building datadog connector")

in := make(chan *pb.StatsPayload, 100)
trans, err := metrics.NewTranslator(logger)

ctx := context.Background()
if err != nil {
return nil, err
}
return &connectorImp{
logger: logger,
agent: datadog.NewAgent(ctx, in),
translator: trans,
in: in,
metricsConsumer: nextConsumer,
exit: make(chan struct{}),
}, nil
}

// Start implements the component.Component interface.
func (c *connectorImp) Start(_ context.Context, _ component.Host) error {
c.logger.Info("Starting datadogconnector")
c.started = true
c.agent.Start()
go c.run()
return nil
}

// Shutdown implements the component.Component interface.
func (c *connectorImp) Shutdown(context.Context) error {
c.logger.Info("Shutting down datadog connector")
c.started = false
c.agent.Stop()
c.exit <- struct{}{} // signal exit
<-c.exit // wait for close
return nil
}

// Capabilities implements the consumer interface.
// tells use whether the component(connector) will mutate the data passed into it. if set to true the processor does modify the data
func (c *connectorImp) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (c *connectorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
c.agent.Ingest(ctx, traces)
return nil
}

// run awaits incoming stats resulting from the agent's ingestion, converts them
// to metrics and flushes them using the configured metrics exporter.
func (c *connectorImp) run() {
defer close(c.exit)
for {
select {
case stats := <-c.in:
if len(stats.Stats) == 0 {
continue
}
// APM stats as metrics
mx := c.translator.StatsPayloadToMetrics(stats)
ctx := context.TODO()

// send metrics to the consumer or next component in pipeline
if err := c.metricsConsumer.ConsumeMetrics(ctx, mx); err != nil {
c.logger.Error("Failed ConsumeMetrics", zap.Error(err))
return
}
case <-c.exit:
return
}
}
}
32 changes: 32 additions & 0 deletions connector/datadogconnector/connector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogconnector

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/consumer/consumertest"
)

var _ component.Component = (*connectorImp)(nil) // testing that the connectorImp properly implements the type Component interface
gord02 marked this conversation as resolved.
Show resolved Hide resolved

// create test to create a connector, check that basic code compiles
func TestNewConnector(t *testing.T) {

factory := NewFactory()

creationParams := connectortest.NewNopCreateSettings()
cfg := factory.CreateDefaultConfig().(*Config)

// Test
traceConnector, err := factory.CreateTracesToMetrics(context.Background(), creationParams, cfg, consumertest.NewNop())
smc := traceConnector.(*connectorImp)
gord02 marked this conversation as resolved.
Show resolved Hide resolved

assert.Nil(t, err)
gord02 marked this conversation as resolved.
Show resolved Hide resolved
assert.NotNil(t, smc) // checks if the created connector implements the connectorImp struct
}
48 changes: 48 additions & 0 deletions connector/datadogconnector/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml

package datadogconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
)

const (
// this is the name used to refer to the connector in the config.yaml
typeStr = "datadog"
)

// NewFactory creates a factory for tailtracer connector.
func NewFactory() connector.Factory {
// OTel connector factory to make a factory for connectors
return connector.NewFactory(
// metadata.Type,
typeStr,
createDefaultConfig,
// connector.WithTracesToMetrics(createTracesToMetricsConnector, metadata.TracesStability))
connector.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelAlpha))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing this, you should create a metadata.yaml file for the component. You can take a look at this https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/connector/spanmetricsconnector/metadata.yaml as an example.

Then add, this go generate directive at the top of this file:

}

var _ component.Config = (*Config)(nil)

type Config struct{}

func createDefaultConfig() component.Config {
return &Config{}
}

// defines the consumer type of the connector
// we want to consume traces and export metrics therefore define nextConsumer as metrics, consumer is the next component in the pipeline
func createTracesToMetricsConnector(_ context.Context, params connector.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) {
c, err := newConnector(params.Logger, cfg, nextConsumer)
if err != nil {
return nil, err
}
return c, nil
}
96 changes: 96 additions & 0 deletions connector/datadogconnector/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector

go 1.19

require (
github.com/DataDog/datadog-agent/pkg/proto v0.48.0-beta.1
songy23 marked this conversation as resolved.
Show resolved Hide resolved
github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics v0.7.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog v0.82.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.82.0
go.opentelemetry.io/collector/connector v0.82.0
go.opentelemetry.io/collector/consumer v0.82.0
go.opentelemetry.io/collector/pdata v1.0.0-rcv0014
go.uber.org/zap v1.25.0
)

require (
github.com/DataDog/datadog-agent/pkg/obfuscate v0.48.0-beta.1 // indirect
github.com/DataDog/datadog-agent/pkg/remoteconfig/state v0.48.0-beta.1 // indirect
github.com/DataDog/datadog-agent/pkg/trace v0.48.0-beta.1 // indirect
github.com/DataDog/datadog-agent/pkg/util/cgroups v0.48.0-beta.1 // indirect
github.com/DataDog/datadog-agent/pkg/util/log v0.48.0-beta.1 // indirect
github.com/DataDog/datadog-agent/pkg/util/pointer v0.48.0-beta.1 // indirect
github.com/DataDog/datadog-agent/pkg/util/scrubber v0.48.0-beta.1 // indirect
github.com/DataDog/datadog-go/v5 v5.1.1 // indirect
github.com/DataDog/go-tuf v1.0.1-0.5.2 // indirect
github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.7.0 // indirect
github.com/DataDog/opentelemetry-mapping-go/pkg/quantile v0.7.0 // indirect
github.com/DataDog/sketches-go v1.4.2 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/godbus/dbus/v5 v5.0.6 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/karrick/godirwalk v1.17.0 // indirect
github.com/knadh/koanf v1.5.0 // indirect
github.com/knadh/koanf/v2 v2.0.1 // indirect
github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/opencontainers/runtime-spec v1.1.0-rc.3 // indirect
github.com/outcaste-io/ristretto v0.2.1 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/philhofer/fwd v1.1.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect
github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect
github.com/shirou/gopsutil/v3 v3.23.7 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tinylib/msgp v1.1.8 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/collector v0.82.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.82.0 // indirect
go.opentelemetry.io/collector/confmap v0.82.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 // indirect
go.opentelemetry.io/collector/semconv v0.82.0 // indirect
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.12.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

gord02 marked this conversation as resolved.
Show resolved Hide resolved
replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog => ../../internal/datadog

replace github.com/DataDog/datadog-agent/pkg/proto => github.com/DataDog/datadog-agent/pkg/proto v0.48.0-beta.1

replace github.com/DataDog/datadog-agent/pkg/trace => github.com/DataDog/datadog-agent/pkg/trace v0.48.0-beta.1
Loading
Loading