Skip to content

Commit

Permalink
[exporter/kafkaexporter] add zipkin encoding for traces (#23947)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
Adding Zipkin v2 encoding support for traces in kafkaexporter 

**Link to tracking Issue:** <Issue number if applicable>

#21102

**Testing:** <Describe what testing was performed and which tests were
added.>
Test for `tracesMarshalers` has been extended with zipkin JSON test
case.
  • Loading branch information
yaroliakh authored Oct 4, 2023
1 parent 64ae47c commit 732d259
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 36 deletions.
20 changes: 20 additions & 0 deletions .chloggen/kafkaexporter_zipkin_encoding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adding Zipkin encoding option for traces to kafkaexporter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21102]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
4 changes: 3 additions & 1 deletion exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ The following settings can be optionally configured:
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- The following encodings are valid *only* for **traces**.
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`, and keyed by TraceID.
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.\
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.
- `zipkin_proto`: the payload is serialized to Zipkin v2 proto Span.
- `zipkin_json`: the payload is serialized to Zipkin v2 JSON Span.
- The following encodings are valid *only* for **logs**.
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
- `auth`
Expand Down
10 changes: 7 additions & 3 deletions exporter/kafkaexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ require (
github.com/aws/aws-sdk-go v1.45.20
github.com/cenkalti/backoff/v4 v4.2.1
github.com/gogo/protobuf v1.3.2
github.com/jaegertracing/jaeger v1.41.0
github.com/jaegertracing/jaeger v1.48.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.86.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.86.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.86.0
github.com/openzipkin/zipkin-go v0.4.2
github.com/stretchr/testify v1.8.4
github.com/xdg-go/scram v1.1.2
go.opentelemetry.io/collector/component v0.86.0
Expand All @@ -34,7 +36,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
Expand Down Expand Up @@ -71,7 +73,7 @@ require (
go.opentelemetry.io/otel/sdk v1.19.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/sys v0.12.0 // indirect
Expand All @@ -95,3 +97,5 @@ retract (
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin => ../../pkg/translator/zipkin
13 changes: 8 additions & 5 deletions exporter/kafkaexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions exporter/kafkaexporter/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2"
)

// TracesMarshaler marshals traces into Message array.
Expand Down Expand Up @@ -41,11 +43,15 @@ type LogsMarshaler interface {
func tracesMarshalers() map[string]TracesMarshaler {
otlpPb := newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding)
otlpJSON := newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json")
zipkinProto := newPdataTracesMarshaler(zipkinv2.NewProtobufTracesMarshaler(), "zipkin_proto")
zipkinJSON := newPdataTracesMarshaler(zipkinv2.NewJSONTracesMarshaler(), "zipkin_json")
jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}}
jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()}
return map[string]TracesMarshaler{
otlpPb.Encoding(): otlpPb,
otlpJSON.Encoding(): otlpJSON,
zipkinProto.Encoding(): zipkinProto,
zipkinJSON.Encoding(): zipkinJSON,
jaegerProto.Encoding(): jaegerProto,
jaegerJSON.Encoding(): jaegerJSON,
}
Expand Down
71 changes: 50 additions & 21 deletions exporter/kafkaexporter/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

zipkin "github.com/openzipkin/zipkin-go/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -20,6 +21,8 @@ func TestDefaultTracesMarshalers(t *testing.T) {
expectedEncodings := []string{
"otlp_proto",
"otlp_json",
"zipkin_proto",
"zipkin_json",
"jaeger_proto",
"jaeger_json",
}
Expand Down Expand Up @@ -84,27 +87,17 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
ils.Spans().AppendEmpty()

span := ils.Spans().At(0)
span.SetKind(ptrace.SpanKindInternal)
span.SetName(t.Name())
span.SetKind(ptrace.SpanKindServer)
span.SetName("foo")
span.SetStartTimestamp(pcommon.NewTimestampFromTime(now))
span.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(time.Second)))
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
span.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7})
span.SetParentSpanID([8]byte{8, 9, 10, 11, 12, 13, 14})

marshaler, ok := tracesMarshalers()["otlp_json"]
require.True(t, ok, "Must have otlp json marshaller")

msg, err := marshaler.Marshal(traces, t.Name())
require.NoError(t, err, "Must have marshaled the data without error")
require.Len(t, msg, 1, "Must have one entry in the message")

data, err := msg[0].Value.Encode()
require.NoError(t, err, "Must not error when encoding value")
require.NotNil(t, data, "Must have valid data to test")

// Since marshaling json is not guaranteed to be in order
// within a string, using a map to compare that the expected values are there
expectedJSON := map[string]interface{}{
otlpJSON := map[string]interface{}{
"resourceSpans": []interface{}{
map[string]interface{}{
"resource": map[string]interface{}{},
Expand All @@ -113,11 +106,11 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
"scope": map[string]interface{}{},
"spans": []interface{}{
map[string]interface{}{
"traceId": "",
"traceId": "0102030405060708090a0b0c0d0e0f10",
"spanId": "0001020304050607",
"parentSpanId": "08090a0b0c0d0e00",
"name": t.Name(),
"kind": float64(ptrace.SpanKindInternal),
"name": "foo",
"kind": float64(ptrace.SpanKindServer),
"startTimeUnixNano": fmt.Sprint(now.UnixNano()),
"endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()),
"status": map[string]interface{}{},
Expand All @@ -131,9 +124,45 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
},
}

var final map[string]interface{}
err = json.Unmarshal(data, &final)
require.NoError(t, err, "Must not error marshaling expected data")
zipkinJSON := []interface{}{
map[string]interface{}{
"traceId": "0102030405060708090a0b0c0d0e0f10",
"id": "0001020304050607",
"parentId": "08090a0b0c0d0e00",
"name": "foo",
"timestamp": float64(time.Second.Microseconds()),
"duration": float64(time.Second.Microseconds()),
"kind": string(zipkin.Server),
"localEndpoint": map[string]interface{}{"serviceName": "otlpresourcenoservicename"},
},
}

tests := []struct {
encoding string
expectedJSON interface{}
unmarshaled interface{}
}{
{encoding: "otlp_json", expectedJSON: otlpJSON, unmarshaled: map[string]interface{}{}},
{encoding: "zipkin_json", expectedJSON: zipkinJSON, unmarshaled: []map[string]interface{}{}},
}

for _, test := range tests {

marshaler, ok := tracesMarshalers()[test.encoding]
require.True(t, ok, fmt.Sprintf("Must have %s marshaller", test.encoding))

msg, err := marshaler.Marshal(traces, t.Name())
require.NoError(t, err, "Must have marshaled the data without error")
require.Len(t, msg, 1, "Must have one entry in the message")

data, err := msg[0].Value.Encode()
require.NoError(t, err, "Must not error when encoding value")
require.NotNil(t, data, "Must have valid data to test")

assert.Equal(t, expectedJSON, final, "Must match the expected value")
err = json.Unmarshal(data, &test.unmarshaled)
require.NoError(t, err, "Must not error marshaling expected data")

assert.Equal(t, test.expectedJSON, test.unmarshaled, "Must match the expected value")

}
}
8 changes: 6 additions & 2 deletions receiver/kafkametricsreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jaegertracing/jaeger v1.41.0 // indirect
github.com/jaegertracing/jaeger v1.48.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
Expand All @@ -74,10 +74,12 @@ require (
github.com/morikuni/aec v1.0.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.86.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.86.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.86.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc4 // indirect
github.com/opencontainers/runc v1.1.5 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/openzipkin/zipkin-go v0.4.2 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down Expand Up @@ -108,7 +110,7 @@ require (
go.opentelemetry.io/otel/sdk v1.19.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/exp v0.0.0-20230711023510-fffb14384f22 // indirect
golang.org/x/mod v0.12.0 // indirect
Expand Down Expand Up @@ -141,5 +143,7 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin => ../../pkg/translator/zipkin

// see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/24240
replace github.com/docker/docker v24.0.4+incompatible => github.com/docker/docker v24.0.5-0.20230719162248-f022632503d1+incompatible
10 changes: 6 additions & 4 deletions receiver/kafkametricsreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 732d259

Please sign in to comment.