Skip to content

Commit

Permalink
[exporter/elasticsearch] add missing scope info in span/log attributes (
Browse files Browse the repository at this point in the history
open-telemetry#27288)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

**Link to tracking Issue:**
open-telemetry#27282

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>

---------

Signed-off-by: Jared Tan <jian.tan@daocloud.io>
Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>
Signed-off-by: Raphael Silva <rapphil@gmail.com>
Signed-off-by: Alex Boten <aboten@lightstep.com>
Signed-off-by: Pavol Loffay <p.loffay@gmail.com>
Co-authored-by: Antoine Toulme <antoine@lunar-ocean.com>
Co-authored-by: Raj Nishtala <113392743+rnishtala-sumo@users.noreply.github.com>
Co-authored-by: Dominik Rosiek <58699848+sumo-drosiek@users.noreply.github.com>
Co-authored-by: Povilas Versockas <povilas.versockas@coralogix.com>
Co-authored-by: Priyanshu Raj <55045459+rpriyanshu9@users.noreply.github.com>
Co-authored-by: sakulali <sakulali@126.com>
Co-authored-by: Raphael Philipe Mendes da Silva <rapphil@gmail.com>
Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
Co-authored-by: Yotam loewenbach <48534558+yotamloe@users.noreply.github.com>
Co-authored-by: Alex Boten <aboten@lightstep.com>
Co-authored-by: bryan-aguilar <46550959+bryan-aguilar@users.noreply.github.com>
Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
Co-authored-by: Jina Jain <jjain@splunk.com>
Co-authored-by: Yang Song <songy23@users.noreply.github.com>
Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
Co-authored-by: Curtis Robert <92119472+crobert-1@users.noreply.github.com>
Co-authored-by: Abhishek Saharn <102726227+asaharn@users.noreply.github.com>
Co-authored-by: Ramachandran A G <ramacg@microsoft.com>
Co-authored-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
Co-authored-by: Ramachandran A G <106139410+ag-ramachandran@users.noreply.github.com>
Co-authored-by: Faith Chikwekwe <faithchikwekwe01@gmail.com>
Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com>
Co-authored-by: Daniel Kuiper <44123852+kuiperda@users.noreply.github.com>
Co-authored-by: Carlos Castro <ccastro@newrelic.com>
Co-authored-by: Christian <calvarez@newrelic.com>
Co-authored-by: ArchangelSDY <Archangel.SDY@gmail.com>
Co-authored-by: Pavol Loffay <p.loffay@gmail.com>
Co-authored-by: Paulo Janotti <pjanotti@splunk.com>
Co-authored-by: Nathan Burke <n.burke@natbur.com>
Co-authored-by: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com>
Co-authored-by: shalper2 <99686388+shalper2@users.noreply.github.com>
Co-authored-by: OpenTelemetry Bot <107717825+opentelemetrybot@users.noreply.github.com>
Co-authored-by: Martin Majlis <122797378+martin-majlis-s1@users.noreply.github.com>
Co-authored-by: hovavza <147598197+hovavza@users.noreply.github.com>
  • Loading branch information
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "add missing scope info in span attributes"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27282]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions cmd/otelcontribcol/exporters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestDefaultExporters(t *testing.T) {
cfg := expFactories["awscloudwatchlogs"].CreateDefaultConfig().(*awscloudwatchlogsexporter.Config)
cfg.Endpoint = "http://" + endpoint
cfg.Region = "local"

// disable queue/retry to validate passing the test data synchronously
cfg.QueueSettings.Enabled = false
cfg.RetrySettings.Enabled = false
Expand Down
7 changes: 4 additions & 3 deletions exporter/elasticsearchexporter/logs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ func (e *elasticsearchLogsExporter) pushLogsData(ctx context.Context, ld plog.Lo
resource := rl.Resource()
ills := rl.ScopeLogs()
for j := 0; j < ills.Len(); j++ {
scope := ills.At(j).Scope()
logs := ills.At(j).LogRecords()
for k := 0; k < logs.Len(); k++ {
if err := e.pushLogRecord(ctx, resource, logs.At(k)); err != nil {
if err := e.pushLogRecord(ctx, resource, logs.At(k), scope); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand All @@ -99,7 +100,7 @@ func (e *elasticsearchLogsExporter) pushLogsData(ctx context.Context, ld plog.Lo
return errors.Join(errs...)
}

func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord) error {
func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error {
fIndex := e.index
if e.dynamicIndex {
prefix := getFromBothResourceAndAttribute(indexPrefix, resource, record)
Expand All @@ -108,7 +109,7 @@ func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource
fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}

document, err := e.model.encodeLog(resource, record)
document, err := e.model.encodeLog(resource, record, scope)
if err != nil {
return fmt.Errorf("Failed to encode log event: %w", err)
}
Expand Down
9 changes: 6 additions & 3 deletions exporter/elasticsearchexporter/logs_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,12 @@ func mustSend(t *testing.T, exporter *elasticsearchLogsExporter, contents string
// send trace with span & resource attributes
func mustSendLogsWithAttributes(t *testing.T, exporter *elasticsearchLogsExporter, attrMp map[string]string, resMp map[string]string) {
logs := newLogsWithAttributeAndResourceMap(attrMp, resMp)
resSpans := logs.ResourceLogs().At(0)
logRecords := resSpans.ScopeLogs().At(0).LogRecords().At(0)
resLogs := logs.ResourceLogs().At(0)
logRecords := resLogs.ScopeLogs().At(0).LogRecords().At(0)

err := exporter.pushLogRecord(context.TODO(), resSpans.Resource(), logRecords)
scopeLogs := resLogs.ScopeLogs().AppendEmpty()
scope := scopeLogs.Scope()

err := exporter.pushLogRecord(context.TODO(), resLogs.Resource(), logRecords, scope)
require.NoError(t, err)
}
20 changes: 16 additions & 4 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
)

type mappingModel interface {
encodeLog(pcommon.Resource, plog.LogRecord) ([]byte, error)
encodeSpan(pcommon.Resource, ptrace.Span) ([]byte, error)
encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error)
encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error)
}

// encodeModel tries to keep the event as close to the original open telemetry semantics as is.
Expand All @@ -38,7 +38,7 @@ const (
attributeField = "attribute"
)

func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord) ([]byte, error) {
func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) ([]byte, error) {
var document objmodel.Document
document.AddTimestamp("@timestamp", record.Timestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used.
document.AddTraceID("TraceId", record.TraceID())
Expand All @@ -49,6 +49,7 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord
document.AddAttribute("Body", record.Body())
document.AddAttributes("Attributes", record.Attributes())
document.AddAttributes("Resource", resource.Attributes())
document.AddAttributes("Scope", scopeToAttributes(scope))

if m.dedup {
document.Dedup()
Expand All @@ -61,7 +62,7 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord
return buf.Bytes(), err
}

func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span) ([]byte, error) {
func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) {
var document objmodel.Document
document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used.
document.AddTimestamp("EndTimestamp", span.EndTimestamp())
Expand All @@ -76,6 +77,7 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span) ([
document.AddAttributes("Resource", resource.Attributes())
document.AddEvents("Events", span.Events())
document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds
document.AddAttributes("Scope", scopeToAttributes(scope))

if m.dedup {
document.Dedup()
Expand Down Expand Up @@ -107,3 +109,13 @@ func spanLinksToString(spanLinkSlice ptrace.SpanLinkSlice) string {
func durationAsMicroseconds(start, end time.Time) int64 {
return (end.UnixNano() - start.UnixNano()) / 1000
}

func scopeToAttributes(scope pcommon.InstrumentationScope) pcommon.Map {
attrs := pcommon.NewMap()
attrs.PutStr("name", scope.Name())
attrs.PutStr("version", scope.Version())
for k, v := range scope.Attributes().AsRaw() {
attrs.PutStr(k, v.(string))
}
return attrs
}
8 changes: 6 additions & 2 deletions exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
)

var expectedSpanBody = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attributes.service.instance.id":"23","Duration":1000000,"EndTimestamp":"2023-04-19T03:04:06.000000006Z","Events.fooEvent.evnetMockBar":"bar","Events.fooEvent.evnetMockFoo":"foo","Events.fooEvent.time":"2023-04-19T03:04:05.000000006Z","Kind":"SPAN_KIND_CLIENT","Link":"[{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"01020304050607080807060504030200\"}]","Name":"client span","Resource.cloud.platform":"aws_elastic_beanstalk","Resource.cloud.provider":"aws","Resource.deployment.environment":"BETA","Resource.service.instance.id":"23","Resource.service.name":"some-service","Resource.service.version":"env-version-1234","SpanId":"1920212223242526","TraceId":"01020304050607080807060504030201","TraceStatus":0}`
var expectedSpanBody = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attributes.service.instance.id":"23","Duration":1000000,"EndTimestamp":"2023-04-19T03:04:06.000000006Z","Events.fooEvent.evnetMockBar":"bar","Events.fooEvent.evnetMockFoo":"foo","Events.fooEvent.time":"2023-04-19T03:04:05.000000006Z","Kind":"SPAN_KIND_CLIENT","Link":"[{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"01020304050607080807060504030200\"}]","Name":"client span","Resource.cloud.platform":"aws_elastic_beanstalk","Resource.cloud.provider":"aws","Resource.deployment.environment":"BETA","Resource.service.instance.id":"23","Resource.service.name":"some-service","Resource.service.version":"env-version-1234","Scope.lib-foo":"lib-bar","Scope.name":"io.opentelemetry.rabbitmq-2.7","Scope.version":"1.30.0-alpha","SpanId":"1920212223242526","TraceId":"01020304050607080807060504030201","TraceStatus":0}`

func TestEncodeSpan(t *testing.T) {
model := &encodeModel{dedup: true, dedot: false}
td := mockResourceSpans()
spanByte, err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0))
spanByte, err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope())
assert.NoError(t, err)
assert.Equal(t, expectedSpanBody, string(spanByte))
}
Expand All @@ -40,6 +40,10 @@ func mockResourceSpans() ptrace.Traces {
tEnd := time.Date(2023, 4, 19, 3, 4, 6, 6, time.UTC)

scopeSpans := resourceSpans.ScopeSpans().AppendEmpty()
scopeSpans.Scope().SetName("io.opentelemetry.rabbitmq-2.7")
scopeSpans.Scope().SetVersion("1.30.0-alpha")
scopeSpans.Scope().Attributes().PutStr("lib-foo", "lib-bar")

span := scopeSpans.Spans().AppendEmpty()
span.SetName("client span")
span.SetSpanID([8]byte{0x19, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26})
Expand Down
7 changes: 4 additions & 3 deletions exporter/elasticsearchexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ func (e *elasticsearchTracesExporter) pushTraceData(
resource := il.Resource()
scopeSpans := il.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
scope := scopeSpans.At(j).Scope()
spans := scopeSpans.At(j).Spans()
for k := 0; k < spans.Len(); k++ {
if err := e.pushTraceRecord(ctx, resource, spans.At(k)); err != nil {
if err := e.pushTraceRecord(ctx, resource, spans.At(k), scope); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand All @@ -91,7 +92,7 @@ func (e *elasticsearchTracesExporter) pushTraceData(
return errors.Join(errs...)
}

func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span) error {
func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error {
fIndex := e.index
if e.dynamicIndex {
prefix := getFromBothResourceAndAttribute(indexPrefix, resource, span)
Expand All @@ -100,7 +101,7 @@ func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resou
fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}

document, err := e.model.encodeSpan(resource, span)
document, err := e.model.encodeSpan(resource, span, scope)
if err != nil {
return fmt.Errorf("Failed to encode trace record: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion exporter/elasticsearchexporter/traces_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,8 @@ func mustSendTracesWithAttributes(t *testing.T, exporter *elasticsearchTracesExp
traces := newTracesWithAttributeAndResourceMap(attrMp, resMp)
resSpans := traces.ResourceSpans().At(0)
span := resSpans.ScopeSpans().At(0).Spans().At(0)
scope := resSpans.ScopeSpans().At(0).Scope()

err := exporter.pushTraceRecord(context.TODO(), resSpans.Resource(), span)
err := exporter.pushTraceRecord(context.TODO(), resSpans.Resource(), span, scope)
require.NoError(t, err)
}

0 comments on commit 98da843

Please sign in to comment.