Skip to content

Commit

Permalink
Add a translator for api/v0.2/traces protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
tomert-alma committed Oct 26, 2023
1 parent acae6fe commit 1cc26b4
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 27 deletions.
27 changes: 27 additions & 0 deletions .chloggen/datadogreceiver-api-v02-traces.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: datadogereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "The datadogreceiver supports the new datadog protocol that is sent by the datadog agent API/v0.2/traces."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27045]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
24 changes: 14 additions & 10 deletions receiver/datadogreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (ddr *datadogReceiver) Start(_ context.Context, host component.Host) error
ddmux.HandleFunc("/v0.4/traces", ddr.handleTraces)
ddmux.HandleFunc("/v0.5/traces", ddr.handleTraces)
ddmux.HandleFunc("/v0.7/traces", ddr.handleTraces)
ddmux.HandleFunc("/api/v0.2/traces", ddr.handleTraces)

var err error
ddr.server, err = ddr.config.HTTPServerSettings.ToServer(
Expand Down Expand Up @@ -88,22 +89,25 @@ func (ddr *datadogReceiver) handleTraces(w http.ResponseWriter, req *http.Reques
defer func(spanCount *int) {
ddr.tReceiver.EndTracesOp(obsCtx, "datadog", *spanCount, err)
}(&spanCount)
var ddTraces *pb.TracerPayload

var ddTraces []*pb.TracerPayload
ddTraces, err = handlePayload(req)
if err != nil {
http.Error(w, "Unable to unmarshal reqs", http.StatusBadRequest)
ddr.params.Logger.Error("Unable to unmarshal reqs")
return
}

otelTraces := toTraces(ddTraces, req)
spanCount = otelTraces.SpanCount()
err = ddr.nextConsumer.ConsumeTraces(obsCtx, otelTraces)
if err != nil {
http.Error(w, "Trace consumer errored out", http.StatusInternalServerError)
ddr.params.Logger.Error("Trace consumer errored out")
} else {
_, _ = w.Write([]byte("OK"))
for _, ddTrace := range ddTraces {
otelTraces := toTraces(ddTrace, req)
spanCount = otelTraces.SpanCount()
err = ddr.nextConsumer.ConsumeTraces(obsCtx, otelTraces)
if err != nil {
http.Error(w, "Trace consumer errored out", http.StatusInternalServerError)
ddr.params.Logger.Error("Trace consumer errored out")
return
}
}

_, _ = w.Write([]byte("OK"))

}
49 changes: 39 additions & 10 deletions receiver/datadogreceiver/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.16.0"
"go.uber.org/multierr"
"google.golang.org/protobuf/proto"
)

const (
Expand Down Expand Up @@ -192,7 +193,9 @@ func putBuffer(buffer *bytes.Buffer) {
bufferPool.Put(buffer)
}

func handlePayload(req *http.Request) (tp *pb.TracerPayload, err error) {
func handlePayload(req *http.Request) (tp []*pb.TracerPayload, err error) {
var tracerPayloads []*pb.TracerPayload

defer func() {
_, errs := io.Copy(io.Discard, req.Body)
err = multierr.Combine(err, errs, req.Body.Close())
Expand All @@ -206,46 +209,72 @@ func handlePayload(req *http.Request) (tp *pb.TracerPayload, err error) {
return nil, err
}
var tracerPayload pb.TracerPayload
_, err = tracerPayload.UnmarshalMsg(buf.Bytes())
return &tracerPayload, err
if _, err = tracerPayload.UnmarshalMsg(buf.Bytes()); err != nil {
return nil, err
}

tracerPayloads = append(tracerPayloads, &tracerPayload)
case strings.HasPrefix(req.URL.Path, "/v0.5"):
buf := getBuffer()
defer putBuffer(buf)
if _, err = io.Copy(buf, req.Body); err != nil {
return nil, err
}
var traces pb.Traces
err = traces.UnmarshalMsgDictionary(buf.Bytes())
return &pb.TracerPayload{

if err = traces.UnmarshalMsgDictionary(buf.Bytes()); err != nil {
return nil, err
}

tracerPayload := &pb.TracerPayload{
LanguageName: req.Header.Get("Datadog-Meta-Lang"),
LanguageVersion: req.Header.Get("Datadog-Meta-Lang-Version"),
TracerVersion: req.Header.Get("Datadog-Meta-Tracer-Version"),
Chunks: traceChunksFromTraces(traces),
}, err
}
tracerPayloads = append(tracerPayloads, tracerPayload)

case strings.HasPrefix(req.URL.Path, "/v0.1"):
var spans []pb.Span
if err = json.NewDecoder(req.Body).Decode(&spans); err != nil {
return nil, err
}
return &pb.TracerPayload{
tracerPayload := &pb.TracerPayload{
LanguageName: req.Header.Get("Datadog-Meta-Lang"),
LanguageVersion: req.Header.Get("Datadog-Meta-Lang-Version"),
TracerVersion: req.Header.Get("Datadog-Meta-Tracer-Version"),
Chunks: traceChunksFromSpans(spans),
}, nil
}
tracerPayloads = append(tracerPayloads, tracerPayload)
case strings.HasPrefix(req.URL.Path, "/api/v0.2"):
buf := getBuffer()
defer putBuffer(buf)
if _, err = io.Copy(buf, req.Body); err != nil {
return nil, err
}

var agentPayload pb.AgentPayload
if err = proto.Unmarshal(buf.Bytes(), &agentPayload); err != nil {
return nil, err
}

return agentPayload.TracerPayloads, err

default:
var traces pb.Traces
if err = decodeRequest(req, &traces); err != nil {
return nil, err
}
return &pb.TracerPayload{
tracerPayload := &pb.TracerPayload{
LanguageName: req.Header.Get("Datadog-Meta-Lang"),
LanguageVersion: req.Header.Get("Datadog-Meta-Lang-Version"),
TracerVersion: req.Header.Get("Datadog-Meta-Tracer-Version"),
Chunks: traceChunksFromTraces(traces),
}, err
}
tracerPayloads = append(tracerPayloads, tracerPayload)
}

return tracerPayloads, nil
}

func decodeRequest(req *http.Request, dest *pb.Traces) (err error) {
Expand Down
66 changes: 59 additions & 7 deletions receiver/datadogreceiver/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package datadogreceiver // import "github.com/open-telemetry/opentelemetry-colle

import (
"bytes"
"fmt"
"io"
"net/http"
"testing"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
vmsgp "github.com/vmihailenco/msgpack/v4"
"google.golang.org/protobuf/proto"
)

var data = [2]interface{}{
Expand Down Expand Up @@ -57,6 +59,16 @@ var data = [2]interface{}{
},
}

func getTraces(t *testing.T) (traces pb.Traces) {
payload, err := vmsgp.Marshal(&data)
assert.NoError(t, err)
if err2 := traces.UnmarshalMsgDictionary(payload); err2 != nil {
t.Fatal(err)
}
return traces

}

func TestTracePayloadV05Unmarshalling(t *testing.T) {
var traces pb.Traces

Expand All @@ -65,6 +77,7 @@ func TestTracePayloadV05Unmarshalling(t *testing.T) {

require.NoError(t, traces.UnmarshalMsgDictionary(payload), "Must not error when marshaling content")
req, _ := http.NewRequest(http.MethodPost, "/v0.5/traces", io.NopCloser(bytes.NewReader(payload)))

translated := toTraces(&pb.TracerPayload{
LanguageName: req.Header.Get("Datadog-Meta-Lang"),
LanguageVersion: req.Header.Get("Datadog-Meta-Lang-Version"),
Expand All @@ -84,12 +97,7 @@ func TestTracePayloadV05Unmarshalling(t *testing.T) {
}

func TestTracePayloadV07Unmarshalling(t *testing.T) {
var traces pb.Traces
payload, err := vmsgp.Marshal(&data)
assert.NoError(t, err)
if err2 := traces.UnmarshalMsgDictionary(payload); err2 != nil {
t.Fatal(err2)
}
traces := getTraces(t)
apiPayload := pb.TracerPayload{
LanguageName: "1",
LanguageVersion: "1",
Expand All @@ -100,7 +108,9 @@ func TestTracePayloadV07Unmarshalling(t *testing.T) {
bytez, _ := apiPayload.MarshalMsg(reqBytes)
req, _ := http.NewRequest(http.MethodPost, "/v0.7/traces", io.NopCloser(bytes.NewReader(bytez)))

translated, _ := handlePayload(req)
translatedPayloads, _ := handlePayload(req)
assert.Equal(t, len(translatedPayloads), 1, "Expected one translated payload")
translated := translatedPayloads[0]
span := translated.GetChunks()[0].GetSpans()[0]
assert.NotNil(t, span)
assert.Equal(t, 4, len(span.GetMeta()), "missing attributes")
Expand All @@ -126,3 +136,45 @@ func BenchmarkTranslatorv07(b *testing.B) {
}
b.StopTimer()
}

func TestTracePayloadApiV02Unmarshalling(t *testing.T) {
traces := getTraces(t)
agentPayload := agentPayloadFromTraces(&traces)

bytez, _ := proto.Marshal(&agentPayload)
req, _ := http.NewRequest(http.MethodPost, "/api/v0.2/traces", io.NopCloser(bytes.NewReader(bytez)))

translatedPayloads, _ := handlePayload(req)
assert.Equal(t, len(translatedPayloads), 2, "Expected two translated payload")
for _, translated := range translatedPayloads {
assert.NotNil(t, translated)
assert.Equal(t, 1, len(translated.Chunks))
assert.Equal(t, 1, len(translated.Chunks[0].Spans))
span := translated.Chunks[0].Spans[0]

assert.NotNil(t, span)
assert.Equal(t, 4, len(span.Meta), "missing attributes")
assert.Equal(t, "my-service", span.Meta["service.name"])
assert.Equal(t, "my-name", span.Name)
assert.Equal(t, "my-resource", span.Resource)
}
}

func agentPayloadFromTraces(traces *pb.Traces) (agentPayload pb.AgentPayload) {
numberOfTraces := 2
var tracerPayloads []*pb.TracerPayload
for i := 0; i < numberOfTraces; i++ {
payload := &pb.TracerPayload{
LanguageName: fmt.Sprintf("%d", i),
LanguageVersion: fmt.Sprintf("%d", i),
Chunks: traceChunksFromTraces(*traces),
TracerVersion: fmt.Sprintf("%d", i),
}
tracerPayloads = append(tracerPayloads, payload)
}

return pb.AgentPayload{
TracerPayloads: tracerPayloads,
}

}

0 comments on commit 1cc26b4

Please sign in to comment.