Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor: add datadogprocessor #16853

Merged
merged 8 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pkg/experimentalmetricmetadata/ @open-telemetry/collector-c

processor/attributesprocessor/ @open-telemetry/collector-contrib-approvers @boostchicken
processor/cumulativetodeltaprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth
processor/datadogprocessor/ @open-telemetry/collector-contrib-approvers @mx-psi @gbbr @dineshg13
processor/deltatorateprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9
processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @boostchicken
processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers
Expand Down
1 change: 1 addition & 0 deletions processor/datadogprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
27 changes: 27 additions & 0 deletions processor/datadogprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Datadog Processor

| Status | |
|--------------------------|---------------|
| Stability | [development] |
| Supported pipeline types | traces |
| Distributions | [contrib] |
gbbr marked this conversation as resolved.
Show resolved Hide resolved

## Description

gbbr marked this conversation as resolved.
Show resolved Hide resolved
The Datadog Processor can be used to compute Datadog APM Stats pre-sampling. For example, when using the [tailsamplingprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/tailsamplingprocessor#tail-sampling-processor) or [probabilisticsamplerprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/probabilisticsamplerprocessor) components, the `datadogprocessor` can be prepended into the pipeline to ensure that Datadog APM Stats are accurate and include the dropped traces.

## Configuration

By default, when used in conjunction with the Datadog Exporter, the processor should detect its presence (as long as it is configured within a pipeline), and use it to export the Datadog APM Stats.

If using within a gateway deployment or alongside the Datadog Agent, where the Datadog Exporter is not present, and, for example, an OTLP exporter might be, you need to specify the metrics exporter to the processor:

```yaml
processors:
datadog:
metrics_exporter: otlp
```

The default value for `metrics_exporter` is `datadog`. Any configured metrics exporter must exist as part of a metrics pipeline.

When using in conjunction with the Datadog Agent's OTLP Ingest, the minimum required Datadog Agent version that supports this processor is 7.42.0.
184 changes: 184 additions & 0 deletions processor/datadogprocessor/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datadogprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor"

import (
"context"
"net/http"
"runtime"
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/otlp/model/translator"
"github.com/DataDog/datadog-agent/pkg/trace/agent"
"github.com/DataDog/datadog-agent/pkg/trace/api"
traceconfig "github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/DataDog/datadog-agent/pkg/trace/stats"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// traceagent specifies a minimal trace agent instance that is able to process traces and output stats.
type traceagent struct {
*agent.Agent

// pchan specifies the channel that will be used to output Datadog Trace Agent API Payloads
// resulting from ingested OpenTelemetry spans.
pchan chan *api.Payload

// wg waits for all goroutines to exit.
wg sync.WaitGroup

// exit signals the agent to shut down.
exit chan struct{}
}

// newAgent creates a new unstarted traceagent using the given context. Call Start to start the traceagent.
// The out channel will receive outoing stats payloads resulting from spans ingested using the Ingest method.
func newAgent(ctx context.Context, out chan pb.StatsPayload) *traceagent {
return newAgentWithConfig(ctx, traceconfig.New(), out)
}

// newAgentWithConfig creates a new traceagent with the given config cfg. Used in tests; use newAgent instead.
func newAgentWithConfig(ctx context.Context, cfg *traceconfig.AgentConfig, out chan pb.StatsPayload) *traceagent {
// disable the HTTP receiver
cfg.ReceiverPort = 0
// set the API key to succeed startup; it is never used nor needed
cfg.Endpoints[0].APIKey = "skip_check"
// set the default hostname to the translator's placeholder; in the case where no hostname
// can be deduced from incoming traces, we don't know the default hostname (because it is set
// in the exporter). In order to avoid duplicating the hostname setting in the processor and
// exporter, we use a placeholder and fill it in later (in the Datadog Exporter or Agent OTLP
// Ingest). This gives a better user experience.
cfg.Hostname = translator.UnsetHostnamePlaceholder
pchan := make(chan *api.Payload, 1000)
a := agent.NewAgent(ctx, cfg)
// replace the Concentrator (the component which computes and flushes APM Stats from incoming
// traces) with our own, which uses the 'out' channel.
a.Concentrator = stats.NewConcentrator(cfg, out, time.Now())
// ...and the same for the ClientStatsAggregator; we don't use it here, but it is also a source
// of stats which should be available to us.
a.ClientStatsAggregator = stats.NewClientStatsAggregator(cfg, out)
// lastly, start the OTLP receiver, which will be used to introduce ResourceSpans into the traceagent,
// so that we can transform them to Datadog spans and receive stats.
a.OTLPReceiver = api.NewOTLPReceiver(pchan, cfg)
return &traceagent{
Agent: a,
exit: make(chan struct{}),
pchan: pchan,
}
}

// Start starts the traceagent, making it ready to ingest spans.
func (p *traceagent) Start() {
// we don't need to start the full agent, so we only start a set of minimal
// components needed to compute stats:
for _, starter := range []interface{ Start() }{
p.Concentrator,
p.ClientStatsAggregator,
// we don't need the samplers' nor the processor's functionalities;
// but they are used by the agent nevertheless, so they need to be
// active and functioning.
p.PrioritySampler,
p.ErrorsSampler,
p.NoPrioritySampler,
p.EventProcessor,
} {
starter.Start()
}

p.goDrain()
p.goProcess()
}

// Stop stops the traceagent, making it unable to ingest spans. Do not call Ingest after Stop.
func (p *traceagent) Stop() {
for _, stopper := range []interface{ Stop() }{
p.Concentrator,
p.ClientStatsAggregator,
p.PrioritySampler,
p.ErrorsSampler,
p.NoPrioritySampler,
p.EventProcessor,
} {
stopper.Stop()
}
close(p.exit)
p.wg.Wait()
}

// goDrain drains the TraceWriter channel, ensuring it won't block. We don't need the traces,
// nor do we have a running TraceWrite. We just want the outgoing stats.
func (p *traceagent) goDrain() {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case <-p.TraceWriter.In:
// we don't write these traces anywhere; drain the channel
case <-p.exit:
return
}
mx-psi marked this conversation as resolved.
Show resolved Hide resolved
}
}()
}

// Ingest processes the given spans within the traceagent and outputs stats through the output channel
// provided to newAgent. Do not call Ingest on an unstarted or stopped traceagent.
func (p *traceagent) Ingest(ctx context.Context, traces ptrace.Traces) {
rspanss := traces.ResourceSpans()
for i := 0; i < rspanss.Len(); i++ {
rspans := rspanss.At(i)
p.OTLPReceiver.ReceiveResourceSpans(ctx, rspans, http.Header{}, "datadogprocessor")
// ...the call transforms the OTLP Spans into a Datadog payload and sends the result
// down the p.pchan channel
}
}

// goProcesses runs the main loop which takes incoming payloads, processes them and generates stats.
// It then picks up those stats and converts them to metrics.
func (p *traceagent) goProcess() {
for i := 0; i < runtime.NumCPU(); i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case payload := <-p.pchan:
p.Process(payload)
// ...the call processes the payload and outputs stats via the 'out' channel
// provided to newAgent
case <-p.exit:
return
}
}
}()
}
}

var _ ingester = (*traceagent)(nil)

// An ingester is able to ingest traces. Implemented by traceagent.
type ingester interface {
// Start starts the ingester.
Start()

// Ingest ingests the set of traces.
Ingest(ctx context.Context, traces ptrace.Traces)

// Stop stops the ingester.
Stop()
}
92 changes: 92 additions & 0 deletions processor/datadogprocessor/agent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datadogprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor"

import (
"context"
"testing"
"time"

"github.com/DataDog/datadog-agent/pkg/otlp/model/translator"
traceconfig "github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/DataDog/datadog-agent/pkg/trace/testutil"
"github.com/stretchr/testify/require"
)

func TestTraceAgentConfig(t *testing.T) {
cfg := traceconfig.New()
require.NotZero(t, cfg.ReceiverPort)

out := make(chan pb.StatsPayload)
agnt := newAgentWithConfig(context.Background(), cfg, out)
require.Zero(t, cfg.ReceiverPort)
require.NotEmpty(t, cfg.Endpoints[0].APIKey)
require.Equal(t, translator.UnsetHostnamePlaceholder, cfg.Hostname)
require.Equal(t, out, agnt.Concentrator.Out)
}

func TestTraceAgent(t *testing.T) {
cfg := traceconfig.New()
cfg.BucketInterval = 50 * time.Millisecond
out := make(chan pb.StatsPayload, 10)
ctx := context.Background()
a := newAgentWithConfig(ctx, cfg, out)
a.Start()
defer a.Stop()

rspanss := testutil.NewOTLPTracesRequest([]testutil.OTLPResourceSpan{
{
LibName: "libname",
LibVersion: "1.2",
Attributes: map[string]interface{}{},
Spans: []*testutil.OTLPSpan{
{Name: "1"},
{Name: "2"},
{Name: "3"},
},
},
{
LibName: "other-libname",
LibVersion: "2.1",
Attributes: map[string]interface{}{},
Spans: []*testutil.OTLPSpan{
{Name: "4", TraceID: [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}},
{Name: "5", TraceID: [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}},
},
},
}).Traces()

a.Ingest(ctx, rspanss)
var stats pb.StatsPayload
timeout := time.After(500 * time.Millisecond)
loop:
for {
select {
case stats = <-out:
if len(stats.Stats) != 0 {
break loop
}
case <-timeout:
t.Fatal("timed out")
}
}
require.Len(t, stats.Stats, 1)
require.Len(t, stats.Stats[0].Stats, 1)
// considering all spans in rspans have distinct aggregations, we should have an equal amount
// of groups
require.Len(t, stats.Stats[0].Stats[0].Stats, rspanss.SpanCount())
require.Len(t, a.TraceWriter.In, 0) // the trace writer channel should've been drained
}
35 changes: 35 additions & 0 deletions processor/datadogprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datadogprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
)

// Config defines the configuration options for datadogprocessor.
type Config struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// MetricsExporter specifies the name of the metrics exporter to be used when
// exporting stats metrics.
MetricsExporter string `mapstructure:"metrics_exporter"`
gbbr marked this conversation as resolved.
Show resolved Hide resolved
}

func createDefaultConfig() component.Config {
return &Config{
MetricsExporter: "datadog",
}
}
28 changes: 28 additions & 0 deletions processor/datadogprocessor/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package datadogprocessor contains the Datadog Processor. The Datadog Processor is used in
// conjunction with the Collector's tail samplers (such as the tailsamplingprocessor or the
// probabilisticsamplerprocessor) to extract accurate APM Stats in situations when not all
// traces are seen by the Datadog Exporter.
//
// By default, the processor looks for an exporter named "datadog" in a metrics pipeline.
// It can use any other exporter (in case the name is different, a gateway deployment is used
// or the collector runs alongside the Datadog Agent) but this needs to be specified via config,
// such as for example:
//
// processor:
// datadog:
// metrics_exporter: otlp
package datadogprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor"
Loading