Skip to content

Commit

Permalink
[exporter/kafkaexporter] add zipkin encoding for traces
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroliakh committed Jul 4, 2023
1 parent 81320c4 commit dba22a5
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 24 deletions.
20 changes: 20 additions & 0 deletions .chloggen/kafkaexporter_zipkin_encoding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adding Zipkin encoding option for traces to kafkaexporter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21102]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
4 changes: 3 additions & 1 deletion exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ The following settings can be optionally configured:
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- The following encodings are valid *only* for **traces**.
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`, and keyed by TraceID.
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.\
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.
- `zipkin_proto`: the payload is serialized to Zipkin v2 proto Span.
- `zipkin_json`: the payload is serialized to Zipkin v2 JSON Span.
- The following encodings are valid *only* for **logs**.
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
- `auth`
Expand Down
4 changes: 3 additions & 1 deletion exporter/kafkaexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ require (
github.com/jaegertracing/jaeger v1.41.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.80.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.80.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.80.0
github.com/openzipkin/zipkin-go v0.4.1
github.com/stretchr/testify v1.8.4
github.com/xdg-go/scram v1.1.2
go.opentelemetry.io/collector/component v0.80.1-0.20230629144634-c3f70bd1f8ea
Expand All @@ -32,7 +34,7 @@ require (
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
Expand Down
7 changes: 6 additions & 1 deletion exporter/kafkaexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions exporter/kafkaexporter/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2"
)

// TracesMarshaler marshals traces into Message array.
Expand Down Expand Up @@ -41,11 +43,15 @@ type LogsMarshaler interface {
func tracesMarshalers() map[string]TracesMarshaler {
otlpPb := newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding)
otlpJSON := newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json")
zipkinProto := newPdataTracesMarshaler(zipkinv2.NewProtobufTracesMarshaler(), "zipkin_proto")
zipkinJSON := newPdataTracesMarshaler(zipkinv2.NewJSONTracesMarshaler(), "zipkin_json")
jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}}
jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()}
return map[string]TracesMarshaler{
otlpPb.Encoding(): otlpPb,
otlpJSON.Encoding(): otlpJSON,
zipkinProto.Encoding(): zipkinProto,
zipkinJSON.Encoding(): zipkinJSON,
jaegerProto.Encoding(): jaegerProto,
jaegerJSON.Encoding(): jaegerJSON,
}
Expand Down
71 changes: 50 additions & 21 deletions exporter/kafkaexporter/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

zipkin "github.com/openzipkin/zipkin-go/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -20,6 +21,8 @@ func TestDefaultTracesMarshalers(t *testing.T) {
expectedEncodings := []string{
"otlp_proto",
"otlp_json",
"zipkin_proto",
"zipkin_json",
"jaeger_proto",
"jaeger_json",
}
Expand Down Expand Up @@ -84,27 +87,17 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
ils.Spans().AppendEmpty()

span := ils.Spans().At(0)
span.SetKind(ptrace.SpanKindInternal)
span.SetName(t.Name())
span.SetKind(ptrace.SpanKindServer)
span.SetName("foo")
span.SetStartTimestamp(pcommon.NewTimestampFromTime(now))
span.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(time.Second)))
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
span.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7})
span.SetParentSpanID([8]byte{8, 9, 10, 11, 12, 13, 14})

marshaler, ok := tracesMarshalers()["otlp_json"]
require.True(t, ok, "Must have otlp json marshaller")

msg, err := marshaler.Marshal(traces, t.Name())
require.NoError(t, err, "Must have marshaled the data without error")
require.Len(t, msg, 1, "Must have one entry in the message")

data, err := msg[0].Value.Encode()
require.NoError(t, err, "Must not error when encoding value")
require.NotNil(t, data, "Must have valid data to test")

// Since marshaling json is not guaranteed to be in order
// within a string, using a map to compare that the expected values are there
expectedJSON := map[string]interface{}{
otlpJSON := map[string]interface{}{
"resourceSpans": []interface{}{
map[string]interface{}{
"resource": map[string]interface{}{},
Expand All @@ -113,11 +106,11 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
"scope": map[string]interface{}{},
"spans": []interface{}{
map[string]interface{}{
"traceId": "",
"traceId": "0102030405060708090a0b0c0d0e0f10",
"spanId": "0001020304050607",
"parentSpanId": "08090a0b0c0d0e00",
"name": t.Name(),
"kind": float64(ptrace.SpanKindInternal),
"name": "foo",
"kind": float64(ptrace.SpanKindServer),
"startTimeUnixNano": fmt.Sprint(now.UnixNano()),
"endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()),
"status": map[string]interface{}{},
Expand All @@ -131,9 +124,45 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
},
}

var final map[string]interface{}
err = json.Unmarshal(data, &final)
require.NoError(t, err, "Must not error marshaling expected data")
zipkinJSON := []interface{}{
map[string]interface{}{
"traceId": "0102030405060708090a0b0c0d0e0f10",
"id": "0001020304050607",
"parentId": "08090a0b0c0d0e00",
"name": "foo",
"timestamp": float64(time.Second.Microseconds()),
"duration": float64(time.Second.Microseconds()),
"kind": string(zipkin.Server),
"localEndpoint": map[string]interface{}{"serviceName": "otlpresourcenoservicename"},
},
}

tests := []struct {
encoding string
expectedJSON interface{}
unmarshaled interface{}
}{
{encoding: "otlp_json", expectedJSON: otlpJSON, unmarshaled: map[string]interface{}{}},
{encoding: "zipkin_json", expectedJSON: zipkinJSON, unmarshaled: []map[string]interface{}{}},
}

for _, test := range tests {

marshaler, ok := tracesMarshalers()[test.encoding]
require.True(t, ok, fmt.Sprintf("Must have %s marshaller", test.encoding))

msg, err := marshaler.Marshal(traces, t.Name())
require.NoError(t, err, "Must have marshaled the data without error")
require.Len(t, msg, 1, "Must have one entry in the message")

data, err := msg[0].Value.Encode()
require.NoError(t, err, "Must not error when encoding value")
require.NotNil(t, data, "Must have valid data to test")

assert.Equal(t, expectedJSON, final, "Must match the expected value")
err = json.Unmarshal(data, &test.unmarshaled)
require.NoError(t, err, "Must not error marshaling expected data")

assert.Equal(t, test.expectedJSON, test.unmarshaled, "Must match the expected value")

}
}

0 comments on commit dba22a5

Please sign in to comment.