-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
agent.go
190 lines (168 loc) · 6.55 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package datadog // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog"
import (
"context"
"net/http"
"runtime"
"sync"
"time"
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/datadog-agent/pkg/trace/agent"
"github.com/DataDog/datadog-agent/pkg/trace/api"
traceconfig "github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/stats"
"github.com/DataDog/datadog-agent/pkg/trace/telemetry"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/ptrace"
)
// keyStatsComputed specifies the resource attribute key which indicates if stats have been
// computed for the resource spans.
const keyStatsComputed = "_dd.stats_computed"
// TraceAgent specifies a minimal trace agent instance that is able to process traces and output stats.
type TraceAgent struct {
*agent.Agent
// pchan specifies the channel that will be used to output Datadog Trace Agent API Payloads
// resulting from ingested OpenTelemetry spans.
pchan chan *api.Payload
// wg waits for all goroutines to exit.
wg sync.WaitGroup
// exit signals the agent to shut down.
exit chan struct{}
}
// ConnectorPerformanceFeatureGate uses optimized code paths for the Datadog Connector.
var ConnectorPerformanceFeatureGate = featuregate.GlobalRegistry().MustRegister(
"connector.datadogconnector.performance",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("Datadog Connector will use optimized code"),
)
// newAgent creates a new unstarted traceagent using the given context. Call Start to start the traceagent.
// The out channel will receive outoing stats payloads resulting from spans ingested using the Ingest method.
func NewAgent(ctx context.Context, out chan *pb.StatsPayload) *TraceAgent {
return NewAgentWithConfig(ctx, traceconfig.New(), out)
}
// newAgentWithConfig creates a new traceagent with the given config cfg. Used in tests; use newAgent instead.
func NewAgentWithConfig(ctx context.Context, cfg *traceconfig.AgentConfig, out chan *pb.StatsPayload) *TraceAgent {
// disable the HTTP receiver
cfg.ReceiverPort = 0
// set the API key to succeed startup; it is never used nor needed
cfg.Endpoints[0].APIKey = "skip_check"
// set the default hostname to the translator's placeholder; in the case where no hostname
// can be deduced from incoming traces, we don't know the default hostname (because it is set
// in the exporter). In order to avoid duplicating the hostname setting in the processor and
// exporter, we use a placeholder and fill it in later (in the Datadog Exporter or Agent OTLP
// Ingest). This gives a better user experience.
cfg.Hostname = metrics.UnsetHostnamePlaceholder
pchan := make(chan *api.Payload, 1000)
a := agent.NewAgent(ctx, cfg, telemetry.NewNoopCollector())
// replace the Concentrator (the component which computes and flushes APM Stats from incoming
// traces) with our own, which uses the 'out' channel.
a.Concentrator = stats.NewConcentrator(cfg, out, time.Now())
// ...and the same for the ClientStatsAggregator; we don't use it here, but it is also a source
// of stats which should be available to us.
a.ClientStatsAggregator = stats.NewClientStatsAggregator(cfg, out)
// lastly, start the OTLP receiver, which will be used to introduce ResourceSpans into the traceagent,
// so that we can transform them to Datadog spans and receive stats.
a.OTLPReceiver = api.NewOTLPReceiver(pchan, cfg)
return &TraceAgent{
Agent: a,
exit: make(chan struct{}),
pchan: pchan,
}
}
// Start starts the traceagent, making it ready to ingest spans.
func (p *TraceAgent) Start() {
// we don't need to start the full agent, so we only start a set of minimal
// components needed to compute stats:
for _, starter := range []interface{ Start() }{
p.Concentrator,
p.ClientStatsAggregator,
// we don't need the samplers' nor the processor's functionalities;
// but they are used by the agent nevertheless, so they need to be
// active and functioning.
p.PrioritySampler,
p.ErrorsSampler,
p.NoPrioritySampler,
p.EventProcessor,
} {
starter.Start()
}
p.goDrain()
p.goProcess()
}
// Stop stops the traceagent, making it unable to ingest spans. Do not call Ingest after Stop.
func (p *TraceAgent) Stop() {
for _, stopper := range []interface{ Stop() }{
p.Concentrator,
p.ClientStatsAggregator,
p.PrioritySampler,
p.ErrorsSampler,
p.NoPrioritySampler,
p.EventProcessor,
} {
stopper.Stop()
}
close(p.exit)
p.wg.Wait()
}
// goDrain drains the TraceWriter channel, ensuring it won't block. We don't need the traces,
// nor do we have a running TraceWrite. We just want the outgoing stats.
func (p *TraceAgent) goDrain() {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case <-p.TraceWriter.In:
// we don't write these traces anywhere; drain the channel
case <-p.exit:
return
}
}
}()
}
// Ingest processes the given spans within the traceagent and outputs stats through the output channel
// provided to newAgent. Do not call Ingest on an unstarted or stopped traceagent.
func (p *TraceAgent) Ingest(ctx context.Context, traces ptrace.Traces) {
rspanss := traces.ResourceSpans()
for i := 0; i < rspanss.Len(); i++ {
rspans := rspanss.At(i)
p.OTLPReceiver.ReceiveResourceSpans(ctx, rspans, http.Header{})
// ...the call transforms the OTLP Spans into a Datadog payload and sends the result
// down the p.pchan channel
// Stats will be computed for p. Mark the original resource spans to ensure that they don't
// get computed twice in case these spans pass through here again.
rspans.Resource().Attributes().PutBool(keyStatsComputed, true)
}
}
// goProcesses runs the main loop which takes incoming payloads, processes them and generates stats.
// It then picks up those stats and converts them to metrics.
func (p *TraceAgent) goProcess() {
for i := 0; i < runtime.NumCPU(); i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case payload := <-p.pchan:
p.Process(payload)
// ...the call processes the payload and outputs stats via the 'out' channel
// provided to newAgent
case <-p.exit:
return
}
}
}()
}
}
var _ Ingester = (*TraceAgent)(nil)
// An Ingester is able to ingest traces. Implemented by traceagent.
type Ingester interface {
// Start starts the ingester.
Start()
// Ingest ingests the set of traces.
Ingest(ctx context.Context, traces ptrace.Traces)
// Stop stops the ingester.
Stop()
}