Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade storage integration test to v2 Trace Reader #6388

Merged
merged 15 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor and move to translator
Signed-off-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
yurishkuro committed Dec 26, 2024
commit 2a66f86e9b0b00095927cea39e9e636e609f904f
2 changes: 1 addition & 1 deletion plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) {
spanReader, err := s.factory.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)

s.SamplingStore, err = s.factory.CreateSamplingStore(0)
require.NoError(t, err)
}
Expand Down
10 changes: 5 additions & 5 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
iterTraces := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{
ServiceName: service,
})
traces, err := v1adapter.PTracesSeq2ToModel(iterTraces)
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
if err != nil {
t.Log(err)
continue
Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) {
var actual *model.Trace
found := s.waitForCondition(t, func(_ *testing.T) bool {
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()})
traces, err := v1adapter.PTracesSeq2ToModel(iterTraces)
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
require.NotEmpty(t, traces)
actual = traces[0]
return err == nil && len(actual.Spans) >= len(expected.Spans)
Expand Down Expand Up @@ -295,7 +295,7 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) {
var actual *model.Trace
found := s.waitForCondition(t, func(t *testing.T) bool {
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()})
traces, err := v1adapter.PTracesSeq2ToModel(iterTraces)
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
if err != nil {
t.Log(err)
}
Expand All @@ -310,7 +310,7 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) {
t.Run("NotFound error", func(t *testing.T) {
fakeTraceID := model.TraceID{High: 0, Low: 1}
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: fakeTraceID.ToOTELTraceID()})
traces, err := v1adapter.PTracesSeq2ToModel(iterTraces)
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
assert.Equal(t, spanstore.ErrTraceNotFound, err)
assert.Nil(t, traces)
})
Expand Down Expand Up @@ -355,7 +355,7 @@ func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.T
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
iterTraces := s.TraceReader.FindTraces(context.Background(), *query)
traces, err = v1adapter.PTracesSeq2ToModel(iterTraces)
traces, err = v1adapter.V1TracesFromSeq2(iterTraces)
if err != nil {
t.Log(err)
return false
Expand Down
54 changes: 0 additions & 54 deletions storage_v2/v1adapter/ptrace2model.go

This file was deleted.

162 changes: 0 additions & 162 deletions storage_v2/v1adapter/ptrace2model_test.go

This file was deleted.

50 changes: 50 additions & 0 deletions storage_v2/v1adapter/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,20 @@ import (

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// V1BatchesFromTraces converts OpenTelemetry traces (ptrace.Traces)
// to Jaeger model batches ([]*model.Batch).
func V1BatchesFromTraces(traces ptrace.Traces) []*model.Batch {
return ProtoFromTraces(traces)
}

// ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces)
// to Jaeger model batches ([]*model.Batch).
//
// TODO remove this function in favor of V1BatchesFromTraces
func ProtoFromTraces(traces ptrace.Traces) []*model.Batch {
batches := jaegerTranslator.ProtoFromTraces(traces)
spanMap := createSpanMapFromBatches(batches)
Expand All @@ -32,6 +42,46 @@ func V1BatchesToTraces(batches []*model.Batch) ptrace.Traces {
return traces
}

// V1TracesFromSeq2 converts an interator of ptrace.Traces chunks into v1 traces.
// Returns spanstore.ErrTraceNotFound for empty iterators.
func V1TracesFromSeq2(otelSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) {
var (
jaegerTraces []*model.Trace
iterErr error
)
jptrace.AggregateTraces(otelSeq)(func(otelTrace ptrace.Traces, err error) bool {
if err != nil {
iterErr = err
return false
}
jaegerTraces = append(jaegerTraces, modelTraceFromOtelTrace(otelTrace))
return true
})
if iterErr != nil {
return nil, iterErr
}
if len(jaegerTraces) == 0 {
return nil, spanstore.ErrTraceNotFound
}
return jaegerTraces, nil
}

// modelTraceFromOtelTrace extracts spans from otel traces
func modelTraceFromOtelTrace(otelTrace ptrace.Traces) *model.Trace {
var spans []*model.Span
batches := V1BatchesFromTraces(otelTrace)
for _, batch := range batches {
for _, span := range batch.Spans {
if span.Process == nil {
proc := *batch.Process // shallow clone
span.Process = &proc
}
spans = append(spans, span)
}
}
return &model.Trace{Spans: spans}
}

func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span {
spanMap := make(map[model.SpanID]*model.Span)
for _, batch := range batches {
Expand Down
Loading