Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade storage integration test to v2 Trace Reader #6388

Merged
merged 15 commits into from
Dec 26, 2024
Merged
Prev Previous commit
Next Next commit
Consume otel trace sequence to produce model traces
Signed-off-by: Emmanuel Emonueje Ebenezer <eebenezer949@gmail.com>
  • Loading branch information
ekefan committed Dec 25, 2024
commit 51648ab37d5fe36509c20263bfb2820ef5347598
64 changes: 64 additions & 0 deletions storage_v2/v1adapter/ptrace_model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package v1adapter

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
otel2model "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
)

// PTracesSeq2ToModel consumes an otel trace iterator and returns a jaeger model trace.
//
// When necessary, it groups chunks of traces into one single trace
func PTracesSeq2ToModel(seqTrace iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
var (
jaegerTraces []*model.Trace
err error
tracesByID map[pcommon.TraceID]*model.Trace
)

seqTrace(func(otelTraces []ptrace.Traces, e error) bool {
if e != nil {
err = e
return false
}

for _, otelTrace := range otelTraces {
spans := modelSpansFromOtelTrace(otelTrace)
for _, span := range spans {
traceId := span.TraceID.ToOTELTraceID()
if _, exists := tracesByID[traceId]; !exists {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
tracesByID[traceId] = &model.Trace{}
}
trace := tracesByID[traceId]
trace.Spans = append(trace.Spans, span)
tracesByID[traceId] = trace
}
}
return true
})

if err != nil {
return nil, err
}

for _, trace := range tracesByID {
jaegerTraces = append(jaegerTraces, trace)
}
return jaegerTraces, nil
}

// modelSpansFromOtelTrace extracts spans from otel traces
func modelSpansFromOtelTrace(otelTrace ptrace.Traces) []*model.Span {
spans := []*model.Span{}
batches := otel2model.ProtoFromTraces(otelTrace)
for _, batch := range batches {
spans = append(spans, batch.Spans...)
}
return spans
}