forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 1
/
exporter.go
164 lines (140 loc) · 4.69 KB
/
exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package sapmexporter exports trace data using Splunk's SAPM protocol.
package sapmexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sapmexporter"
import (
"context"
"errors"
"github.com/jaegertracing/jaeger/model"
sapmclient "github.com/signalfx/sapm-proto/client"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
)
// TODO: Find a place for this to be shared.
type baseTracesExporter struct {
component.Component
consumer.Traces
}
// sapmExporter is a wrapper struct of SAPM exporter
type sapmExporter struct {
client *sapmclient.Client
logger *zap.Logger
config *Config
}
func (se *sapmExporter) Shutdown(context.Context) error {
se.client.Stop()
return nil
}
func newSAPMExporter(cfg *Config, params exporter.Settings) (sapmExporter, error) {
client, err := sapmclient.New(cfg.clientOptions()...)
if err != nil {
return sapmExporter{}, err
}
return sapmExporter{
client: client,
logger: params.Logger,
config: cfg,
}, err
}
func newSAPMTracesExporter(cfg *Config, set exporter.Settings) (exporter.Traces, error) {
se, err := newSAPMExporter(cfg, set)
if err != nil {
return nil, err
}
te, err := exporterhelper.NewTraces(
context.TODO(),
set,
cfg,
se.pushTraceData,
exporterhelper.WithShutdown(se.Shutdown),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithRetry(cfg.BackOffConfig),
exporterhelper.WithTimeout(cfg.TimeoutSettings),
)
if err != nil {
return nil, err
}
// If AccessTokenPassthrough enabled, split the incoming Traces data by splunk.SFxAccessTokenLabel,
// this ensures that we get batches of data for the same token when pushing to the backend.
if cfg.AccessTokenPassthrough {
te = &baseTracesExporter{
Component: te,
Traces: batchperresourceattr.NewBatchPerResourceTraces(splunk.SFxAccessTokenLabel, te),
}
}
return te, nil
}
// pushTraceData exports traces in SAPM proto by associated SFx access token and returns number of dropped spans
// and the last experienced error if any translation or export failed
func (se *sapmExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error {
rss := td.ResourceSpans()
if rss.Len() == 0 {
return nil
}
accessToken := se.retrieveAccessToken(ctx, rss.At(0))
batches := jaeger.ProtoFromTraces(td)
// Cannot remove the access token from the pdata, because exporters required to not modify incoming pdata,
// so need to remove that after conversion.
filterToken(batches)
ingestResponse, err := se.client.ExportWithAccessTokenAndGetResponse(ctx, batches, accessToken)
if se.config.LogDetailedResponse && ingestResponse != nil {
if ingestResponse.Err != nil {
se.logger.Error("Failed to get response from trace ingest", zap.Error(ingestResponse.Err))
} else {
se.logger.Debug("Detailed response from ingest", zap.ByteString("response", ingestResponse.Body))
}
}
if err != nil {
sendErr := &sapmclient.ErrSend{}
if errors.As(err, &sendErr) && sendErr.Permanent {
return consumererror.NewPermanent(sendErr)
}
return err
}
return nil
}
func (se *sapmExporter) retrieveAccessToken(ctx context.Context, md ptrace.ResourceSpans) string {
if !se.config.AccessTokenPassthrough {
// Nothing to do if token is pass through not configured or resource is nil.
return ""
}
cl := client.FromContext(ctx)
ss := cl.Metadata.Get(splunk.SFxAccessTokenHeader)
if len(ss) > 0 {
return ss[0]
}
attrs := md.Resource().Attributes()
if accessToken, ok := attrs.Get(splunk.SFxAccessTokenLabel); ok {
return accessToken.Str()
}
return ""
}
// filterToken filters the access token from the batch processor to avoid leaking credentials to the backend.
func filterToken(batches []*model.Batch) {
for _, batch := range batches {
filterTokenFromProcess(batch.Process)
}
}
func filterTokenFromProcess(proc *model.Process) {
if proc == nil {
return
}
for i := 0; i < len(proc.Tags); {
if proc.Tags[i].Key == splunk.SFxAccessTokenLabel {
proc.Tags[i] = proc.Tags[len(proc.Tags)-1]
// We do not need to put proc.Tags[i] at the end, as it will be discarded anyway
proc.Tags = proc.Tags[:len(proc.Tags)-1]
continue
}
i++
}
}