From abf09cf42ca7d5d8d74ab4f0b47e3a7bf9a1a3c9 Mon Sep 17 00:00:00 2001 From: Honey Chaudhary Chandra Prakash Date: Wed, 26 Jun 2024 10:53:12 +0530 Subject: [PATCH] Resolve #33464 [processor/groupbytrace] migrate from opencensus library This migrates internal metrics from opencensus to mdatagen + otel. Used metadata.Telemetrybuilder and replaced all metrices from metrics file to metadata.yaml file. Updated tests to move from opencensus. Documentation generated by metadata.yaml file --- .../groupbytraceprocessor/documentation.md | 71 ++++++++++++++ processor/groupbytraceprocessor/event.go | 19 ++-- processor/groupbytraceprocessor/event_test.go | 89 +++++------------ processor/groupbytraceprocessor/factory.go | 10 +- .../groupbytraceprocessor/factory_test.go | 8 +- .../generated_component_telemetry_test.go | 76 +++++++++++++++ .../generated_package_test.go | 6 +- processor/groupbytraceprocessor/go.mod | 8 +- processor/groupbytraceprocessor/go.sum | 2 - .../internal/metadata/generated_telemetry.go | 96 ++++++++++++++++++- .../metadata/generated_telemetry_test.go | 13 +++ processor/groupbytraceprocessor/metadata.yaml | 62 +++++++++++- processor/groupbytraceprocessor/metrics.go | 82 ---------------- .../groupbytraceprocessor/metrics_test.go | 28 ------ processor/groupbytraceprocessor/processor.go | 49 +++++----- .../groupbytraceprocessor/processor_test.go | 85 ++++++++-------- .../groupbytraceprocessor/storage_memory.go | 8 +- .../storage_memory_test.go | 22 +++-- 18 files changed, 448 insertions(+), 286 deletions(-) create mode 100644 processor/groupbytraceprocessor/documentation.md create mode 100644 processor/groupbytraceprocessor/generated_component_telemetry_test.go delete mode 100644 processor/groupbytraceprocessor/metrics.go delete mode 100644 processor/groupbytraceprocessor/metrics_test.go diff --git a/processor/groupbytraceprocessor/documentation.md b/processor/groupbytraceprocessor/documentation.md new file mode 100644 index 000000000000..93c0531deef3 --- /dev/null +++ b/processor/groupbytraceprocessor/documentation.md @@ -0,0 +1,71 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# groupbytrace + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### processor_groupbytrace_conf_num_traces + +Maximum number of traces to hold in the internal storage + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### processor_groupbytrace_event_latency + +How long the queue events are taking to be processed + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| ms | Histogram | Int | + +### processor_groupbytrace_incomplete_releases + +Releases that are suspected to have been incomplete + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| | Sum | Int | true | + +### processor_groupbytrace_num_events_in_queue + +Number of events currently in the queue + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### processor_groupbytrace_num_traces_in_memory + +Number of traces currently in the in-memory storage + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### processor_groupbytrace_spans_released + +Spans released to the next consumer + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### processor_groupbytrace_traces_evicted + +Traces evicted from the internal buffer + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### processor_groupbytrace_traces_released + +Traces released to the next consumer + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | diff --git a/processor/groupbytraceprocessor/event.go b/processor/groupbytraceprocessor/event.go index d7c26ca1b449..2855dca09fb2 100644 --- a/processor/groupbytraceprocessor/event.go +++ b/processor/groupbytraceprocessor/event.go @@ -11,10 +11,11 @@ import ( "sync" "time" - "go.opencensus.io/stats" - "go.opencensus.io/tag" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" ) @@ -44,8 +45,6 @@ var ( return &hash }, } - - eventTagKey = tag.MustNewKey("event") ) type eventType int @@ -70,8 +69,8 @@ type eventMachine struct { metricsCollectionInterval time.Duration shutdownTimeout time.Duration - logger *zap.Logger - + logger *zap.Logger + telemetry *metadata.TelemetryBuilder onTraceReceived func(td tracesWithID, worker *eventMachineWorker) error onTraceExpired func(traceID pcommon.TraceID, worker *eventMachineWorker) error onTraceReleased func(rss []ptrace.ResourceSpans) error @@ -84,9 +83,10 @@ type eventMachine struct { closed bool } -func newEventMachine(logger *zap.Logger, bufferSize int, numWorkers int, numTraces int) *eventMachine { +func newEventMachine(logger *zap.Logger, bufferSize int, numWorkers int, numTraces int, telemetry *metadata.TelemetryBuilder) *eventMachine { em := &eventMachine{ logger: logger, + telemetry: telemetry, workers: make([]*eventMachineWorker, numWorkers), close: make(chan struct{}), shutdownLock: &sync.RWMutex{}, @@ -119,7 +119,7 @@ func (em *eventMachine) numEvents() int { func (em *eventMachine) periodicMetrics() { numEvents := em.numEvents() em.logger.Debug("recording current state of the queue", zap.Int("num-events", numEvents)) - stats.Record(context.Background(), mNumEventsInQueue.M(int64(numEvents))) + em.telemetry.ProcessorGroupbytraceNumEventsInQueue.Record(context.Background(), int64(numEvents)) em.shutdownLock.RLock() closed := em.closed @@ -288,8 +288,7 @@ func (em *eventMachine) handleEventWithObservability(event string, do func() err start := time.Now() succeeded, err := doWithTimeout(time.Second, do) duration := time.Since(start) - - _ = stats.RecordWithTags(context.Background(), []tag.Mutator{tag.Upsert(eventTagKey, event)}, mEventLatency.M(duration.Milliseconds())) + em.telemetry.ProcessorGroupbytraceEventLatency.Record(context.Background(), duration.Milliseconds(), metric.WithAttributeSet(attribute.NewSet(attribute.String("event", event)))) if err != nil { em.logger.Error("failed to process event", zap.Error(err), zap.String("event", event)) diff --git a/processor/groupbytraceprocessor/event_test.go b/processor/groupbytraceprocessor/event_test.go index 0f657c6472ce..1dc909daf705 100644 --- a/processor/groupbytraceprocessor/event_test.go +++ b/processor/groupbytraceprocessor/event_test.go @@ -11,16 +11,21 @@ import ( "testing" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processortest" "go.uber.org/zap" ) func TestEventCallback(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) + for _, tt := range []struct { casename string typ eventType @@ -80,7 +85,7 @@ func TestEventCallback(t *testing.T) { require.NoError(t, err) wg := &sync.WaitGroup{} - em := newEventMachine(logger, 50, 1, 1_000) + em := newEventMachine(logger, 50, 1, 1_000, tel) tt.registerCallback(em, wg) em.startInBackground() @@ -100,6 +105,8 @@ func TestEventCallback(t *testing.T) { } func TestEventCallbackNotSet(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) for _, tt := range []struct { casename string typ eventType @@ -127,7 +134,7 @@ func TestEventCallbackNotSet(t *testing.T) { require.NoError(t, err) wg := &sync.WaitGroup{} - em := newEventMachine(logger, 50, 1, 1_000) + em := newEventMachine(logger, 50, 1, 1_000, tel) em.onError = func(_ event) { wg.Done() } @@ -147,6 +154,8 @@ func TestEventCallbackNotSet(t *testing.T) { } func TestEventInvalidPayload(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) for _, tt := range []struct { casename string typ eventType @@ -195,7 +204,7 @@ func TestEventInvalidPayload(t *testing.T) { require.NoError(t, err) wg := &sync.WaitGroup{} - em := newEventMachine(logger, 50, 1, 1_000) + em := newEventMachine(logger, 50, 1, 1_000, tel) em.onError = func(_ event) { wg.Done() } @@ -216,12 +225,14 @@ func TestEventInvalidPayload(t *testing.T) { } func TestEventUnknownType(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare logger, err := zap.NewDevelopment() require.NoError(t, err) wg := &sync.WaitGroup{} - em := newEventMachine(logger, 50, 1, 1_000) + em := newEventMachine(logger, 50, 1, 1_000, tel) em.onError = func(_ event) { wg.Done() } @@ -239,6 +250,8 @@ func TestEventUnknownType(t *testing.T) { } func TestEventTracePerWorker(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) for _, tt := range []struct { casename string traceID [16]byte @@ -265,7 +278,7 @@ func TestEventTracePerWorker(t *testing.T) { }, } { t.Run(tt.casename, func(t *testing.T) { - em := newEventMachine(zap.NewNop(), 200, 100, 1_000) + em := newEventMachine(zap.NewNop(), 200, 100, 1_000, tel) var wg sync.WaitGroup var workerForTrace *eventMachineWorker @@ -342,13 +355,15 @@ func TestEventConsumeConsistency(t *testing.T) { } func TestEventShutdown(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare wg := sync.WaitGroup{} wg.Add(1) traceReceivedFired := &atomic.Int64{} traceExpiredFired := &atomic.Int64{} - em := newEventMachine(zap.NewNop(), 50, 1, 1_000) + em := newEventMachine(zap.NewNop(), 50, 1, 1_000, tel) em.onTraceReceived = func(tracesWithID, *eventMachineWorker) error { traceReceivedFired.Store(1) return nil @@ -411,67 +426,11 @@ func TestEventShutdown(t *testing.T) { shutdownWg.Wait() } -func TestPeriodicMetrics(t *testing.T) { - // prepare - views := metricViews() - - // ensure that we are starting with a clean state - view.Unregister(views...) - assert.NoError(t, view.Register(views...)) - - // try to be nice with the next consumer (test) - defer view.Unregister(views...) - - em := newEventMachine(zap.NewNop(), 50, 1, 1_000) - em.metricsCollectionInterval = time.Millisecond - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - expected := 2 - calls := 0 - for range em.workers[0].events { - // we expect two events, after which we just exit the loop - // if we return from here, we'd still have one item in the queue that is not going to be consumed - wg.Wait() - calls++ - - if calls == expected { - return - } - } - }() - - // sanity check - assertGaugeNotCreated(t, mNumEventsInQueue) - - // test - em.workers[0].fire(event{typ: traceReceived}) - em.workers[0].fire(event{typ: traceReceived}) // the first is consumed right away, the second is in the queue - go em.periodicMetrics() - - // ensure our gauge is showing 1 item in the queue - assert.Eventually(t, func() bool { - return getGaugeValue(t, mNumEventsInQueue) == 1 - }, 1*time.Second, 10*time.Millisecond) - - wg.Done() // release all events - - // ensure our gauge is now showing no items in the queue - assert.Eventually(t, func() bool { - return getGaugeValue(t, mNumEventsInQueue) == 0 - }, 1*time.Second, 10*time.Millisecond) - - // signal and wait for the recursive call to finish - em.shutdownLock.Lock() - em.closed = true - em.shutdownLock.Unlock() - time.Sleep(5 * time.Millisecond) -} - func TestForceShutdown(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare - em := newEventMachine(zap.NewNop(), 50, 1, 1_000) + em := newEventMachine(zap.NewNop(), 50, 1, 1_000, tel) em.shutdownTimeout = 20 * time.Millisecond // test diff --git a/processor/groupbytraceprocessor/factory.go b/processor/groupbytraceprocessor/factory.go index 4fb0e47f6e0b..06bf13a90437 100644 --- a/processor/groupbytraceprocessor/factory.go +++ b/processor/groupbytraceprocessor/factory.go @@ -8,7 +8,6 @@ import ( "fmt" "time" - "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" @@ -31,8 +30,6 @@ var ( // NewFactory returns a new factory for the Filter processor. func NewFactory() processor.Factory { - // TODO: find a more appropriate way to get this done, as we are swallowing the error here - _ = view.Register(metricViews()...) return processor.NewFactory( metadata.Type, @@ -70,8 +67,9 @@ func createTracesProcessor( return nil, errDiscardOrphansNotSupported } + processor := newGroupByTraceProcessor(params, nextConsumer, *oCfg) // the only supported storage for now - st = newMemoryStorage() - - return newGroupByTraceProcessor(params.Logger, st, nextConsumer, *oCfg), nil + st = newMemoryStorage(processor.telemetryBuilder) + processor.st = st + return processor, nil } diff --git a/processor/groupbytraceprocessor/factory_test.go b/processor/groupbytraceprocessor/factory_test.go index 02d3532619f4..7ca4bb54c643 100644 --- a/processor/groupbytraceprocessor/factory_test.go +++ b/processor/groupbytraceprocessor/factory_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/processor/processortest" ) @@ -26,10 +27,8 @@ func TestDefaultConfiguration(t *testing.T) { func TestCreateTestProcessor(t *testing.T) { c := createDefaultConfig().(*Config) - next := &mockProcessor{} - // test - p, err := createTracesProcessor(context.Background(), processortest.NewNopSettings(), c, next) + p, err := createTracesProcessor(context.Background(), processortest.NewNopSettings(), c, consumertest.NewNop()) // verify assert.NoError(t, err) @@ -39,7 +38,6 @@ func TestCreateTestProcessor(t *testing.T) { func TestCreateTestProcessorWithNotImplementedOptions(t *testing.T) { // prepare f := NewFactory() - next := &mockProcessor{} // test for _, tt := range []struct { @@ -59,7 +57,7 @@ func TestCreateTestProcessorWithNotImplementedOptions(t *testing.T) { errDiskStorageNotSupported, }, } { - p, err := f.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(), tt.config, next) + p, err := f.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(), tt.config, consumertest.NewNop()) // verify assert.Error(t, tt.expectedErr, err) diff --git a/processor/groupbytraceprocessor/generated_component_telemetry_test.go b/processor/groupbytraceprocessor/generated_component_telemetry_test.go new file mode 100644 index 000000000000..7086d83a2ad0 --- /dev/null +++ b/processor/groupbytraceprocessor/generated_component_telemetry_test.go @@ -0,0 +1,76 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package groupbytraceprocessor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" +) + +type componentTestTelemetry struct { + reader *sdkmetric.ManualReader + meterProvider *sdkmetric.MeterProvider +} + +func (tt *componentTestTelemetry) NewSettings() processor.Settings { + settings := processortest.NewNopSettings() + settings.MeterProvider = tt.meterProvider + settings.ID = component.NewID(component.MustNewType("groupbytrace")) + + return settings +} + +func setupTestTelemetry() componentTestTelemetry { + reader := sdkmetric.NewManualReader() + return componentTestTelemetry{ + reader: reader, + meterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)), + } +} + +func (tt *componentTestTelemetry) assertMetrics(t *testing.T, expected []metricdata.Metrics) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.reader.Collect(context.Background(), &md)) + // ensure all required metrics are present + for _, want := range expected { + got := tt.getMetric(want.Name, md) + metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) + } + + // ensure no additional metrics are emitted + require.Equal(t, len(expected), tt.len(md)) +} + +func (tt *componentTestTelemetry) getMetric(name string, got metricdata.ResourceMetrics) metricdata.Metrics { + for _, sm := range got.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + + return metricdata.Metrics{} +} + +func (tt *componentTestTelemetry) len(got metricdata.ResourceMetrics) int { + metricsCount := 0 + for _, sm := range got.ScopeMetrics { + metricsCount += len(sm.Metrics) + } + + return metricsCount +} + +func (tt *componentTestTelemetry) Shutdown(ctx context.Context) error { + return tt.meterProvider.Shutdown(ctx) +} diff --git a/processor/groupbytraceprocessor/generated_package_test.go b/processor/groupbytraceprocessor/generated_package_test.go index c43485899052..84daac2b263f 100644 --- a/processor/groupbytraceprocessor/generated_package_test.go +++ b/processor/groupbytraceprocessor/generated_package_test.go @@ -3,11 +3,11 @@ package groupbytraceprocessor import ( + "os" "testing" - - "go.uber.org/goleak" ) func TestMain(m *testing.M) { - goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreAnyFunction("github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor.doWithTimeout.func1")) + // skipping goleak test as per metadata.yml configuration + os.Exit(m.Run()) } diff --git a/processor/groupbytraceprocessor/go.mod b/processor/groupbytraceprocessor/go.mod index bde7f91dce9e..cc6de1a14eef 100644 --- a/processor/groupbytraceprocessor/go.mod +++ b/processor/groupbytraceprocessor/go.mod @@ -7,13 +7,15 @@ require ( github.com/stretchr/testify v1.9.0 go.opencensus.io v0.24.0 go.opentelemetry.io/collector/component v0.104.0 + go.opentelemetry.io/collector/config/configtelemetry v0.104.0 go.opentelemetry.io/collector/confmap v0.104.0 go.opentelemetry.io/collector/consumer v0.104.0 go.opentelemetry.io/collector/pdata v1.11.0 go.opentelemetry.io/collector/processor v0.104.0 + go.opentelemetry.io/otel v1.27.0 go.opentelemetry.io/otel/metric v1.27.0 + go.opentelemetry.io/otel/sdk/metric v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 - go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 ) @@ -41,15 +43,11 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.54.0 // indirect github.com/prometheus/procfs v0.15.0 // indirect - go.opentelemetry.io/collector v0.104.0 // indirect - go.opentelemetry.io/collector/config/configtelemetry v0.104.0 // indirect go.opentelemetry.io/collector/featuregate v1.11.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.104.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.104.0 // indirect - go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect go.opentelemetry.io/otel/sdk v1.27.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.27.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect diff --git a/processor/groupbytraceprocessor/go.sum b/processor/groupbytraceprocessor/go.sum index 79af22b86234..950baa41196c 100644 --- a/processor/groupbytraceprocessor/go.sum +++ b/processor/groupbytraceprocessor/go.sum @@ -99,8 +99,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/collector v0.104.0 h1:R3zjM4O3K3+ttzsjPV75P80xalxRbwYTURlK0ys7uyo= -go.opentelemetry.io/collector v0.104.0/go.mod h1:Tm6F3na9ajnOm6I5goU9dURKxq1fSBK1yA94nvUix3k= go.opentelemetry.io/collector/component v0.104.0 h1:jqu/X9rnv8ha0RNZ1a9+x7OU49KwSMsPbOuIEykHuQE= go.opentelemetry.io/collector/component v0.104.0/go.mod h1:1C7C0hMVSbXyY1ycCmaMUAR9fVwpgyiNQqxXtEWhVpw= go.opentelemetry.io/collector/config/configtelemetry v0.104.0 h1:eHv98XIhapZA8MgTiipvi+FDOXoFhCYOwyKReOt+E4E= diff --git a/processor/groupbytraceprocessor/internal/metadata/generated_telemetry.go b/processor/groupbytraceprocessor/internal/metadata/generated_telemetry.go index fef5d85b17af..c33240787bd7 100644 --- a/processor/groupbytraceprocessor/internal/metadata/generated_telemetry.go +++ b/processor/groupbytraceprocessor/internal/metadata/generated_telemetry.go @@ -3,9 +3,14 @@ package metadata import ( - "go.opentelemetry.io/collector/component" + "errors" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" ) func Meter(settings component.TelemetrySettings) metric.Meter { @@ -15,3 +20,92 @@ func Meter(settings component.TelemetrySettings) metric.Meter { func Tracer(settings component.TelemetrySettings) trace.Tracer { return settings.TracerProvider.Tracer("otelcol/groupbytrace") } + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + ProcessorGroupbytraceConfNumTraces metric.Int64Gauge + ProcessorGroupbytraceEventLatency metric.Int64Histogram + ProcessorGroupbytraceIncompleteReleases metric.Int64Counter + ProcessorGroupbytraceNumEventsInQueue metric.Int64Gauge + ProcessorGroupbytraceNumTracesInMemory metric.Int64Gauge + ProcessorGroupbytraceSpansReleased metric.Int64Counter + ProcessorGroupbytraceTracesEvicted metric.Int64Counter + ProcessorGroupbytraceTracesReleased metric.Int64Counter + level configtelemetry.Level +} + +// telemetryBuilderOption applies changes to default builder. +type telemetryBuilderOption func(*TelemetryBuilder) + +// WithLevel sets the current telemetry level for the component. +func WithLevel(lvl configtelemetry.Level) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.level = lvl + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{level: configtelemetry.LevelBasic} + for _, op := range options { + op(&builder) + } + var err, errs error + if builder.level >= configtelemetry.LevelBasic { + builder.meter = Meter(settings) + } else { + builder.meter = noop.Meter{} + } + builder.ProcessorGroupbytraceConfNumTraces, err = builder.meter.Int64Gauge( + "processor_groupbytrace_conf_num_traces", + metric.WithDescription("Maximum number of traces to hold in the internal storage"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceEventLatency, err = builder.meter.Int64Histogram( + "processor_groupbytrace_event_latency", + metric.WithDescription("How long the queue events are taking to be processed"), + metric.WithUnit("ms"), metric.WithExplicitBucketBoundaries([]float64{5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000}...), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceIncompleteReleases, err = builder.meter.Int64Counter( + "processor_groupbytrace_incomplete_releases", + metric.WithDescription("Releases that are suspected to have been incomplete"), + metric.WithUnit(""), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceNumEventsInQueue, err = builder.meter.Int64Gauge( + "processor_groupbytrace_num_events_in_queue", + metric.WithDescription("Number of events currently in the queue"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceNumTracesInMemory, err = builder.meter.Int64Gauge( + "processor_groupbytrace_num_traces_in_memory", + metric.WithDescription("Number of traces currently in the in-memory storage"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceSpansReleased, err = builder.meter.Int64Counter( + "processor_groupbytrace_spans_released", + metric.WithDescription("Spans released to the next consumer"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceTracesEvicted, err = builder.meter.Int64Counter( + "processor_groupbytrace_traces_evicted", + metric.WithDescription("Traces evicted from the internal buffer"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceTracesReleased, err = builder.meter.Int64Counter( + "processor_groupbytrace_traces_released", + metric.WithDescription("Traces released to the next consumer"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/processor/groupbytraceprocessor/internal/metadata/generated_telemetry_test.go b/processor/groupbytraceprocessor/internal/metadata/generated_telemetry_test.go index 3be99d34dfdb..241a710c201a 100644 --- a/processor/groupbytraceprocessor/internal/metadata/generated_telemetry_test.go +++ b/processor/groupbytraceprocessor/internal/metadata/generated_telemetry_test.go @@ -61,3 +61,16 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/processor/groupbytraceprocessor/metadata.yaml b/processor/groupbytraceprocessor/metadata.yaml index 64bc82d3f13c..45d7c4947e46 100644 --- a/processor/groupbytraceprocessor/metadata.yaml +++ b/processor/groupbytraceprocessor/metadata.yaml @@ -12,9 +12,61 @@ status: tests: config: goleak: - ignore: - # See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information. - top: go.opencensus.io/stats/view.(*worker).start - # TODO: Regarding doWithTimeout ignore: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32572 - any: github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor.doWithTimeout.func1 + skip: true + +telemetry: + metrics: + processor_groupbytrace_conf_num_traces: + enabled: true + description: Maximum number of traces to hold in the internal storage + unit: 1 + gauge: + value_type: int + processor_groupbytrace_num_events_in_queue: + enabled: true + description: Number of events currently in the queue + unit: 1 + gauge: + value_type: int + processor_groupbytrace_num_traces_in_memory: + enabled: true + description: Number of traces currently in the in-memory storage + unit: 1 + gauge: + value_type: int + processor_groupbytrace_traces_evicted: + enabled: true + description: Traces evicted from the internal buffer + unit: 1 + sum: + value_type: int + monotonic: true + processor_groupbytrace_spans_released: + enabled: true + description: Spans released to the next consumer + unit: 1 + sum: + value_type: int + monotonic: true + processor_groupbytrace_traces_released: + enabled: true + description: Traces released to the next consumer + unit: 1 + sum: + value_type: int + monotonic: true + processor_groupbytrace_incomplete_releases: + enabled: true + description: Releases that are suspected to have been incomplete + sum: + value_type: int + monotonic: true + processor_groupbytrace_event_latency: + enabled: true + description: How long the queue events are taking to be processed + unit: ms + histogram: + value_type: int + bucket_boundaries: [5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000] + diff --git a/processor/groupbytraceprocessor/metrics.go b/processor/groupbytraceprocessor/metrics.go deleted file mode 100644 index 6867e1f36c66..000000000000 --- a/processor/groupbytraceprocessor/metrics.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package groupbytraceprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor" - -import ( - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" - "go.opentelemetry.io/collector/processor/processorhelper" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" -) - -var ( - mNumTracesConf = stats.Int64("conf_num_traces", "Maximum number of traces to hold in the internal storage", stats.UnitDimensionless) - mNumEventsInQueue = stats.Int64("num_events_in_queue", "Number of events currently in the queue", stats.UnitDimensionless) - mNumTracesInMemory = stats.Int64("num_traces_in_memory", "Number of traces currently in the in-memory storage", stats.UnitDimensionless) - mTracesEvicted = stats.Int64("traces_evicted", "Traces evicted from the internal buffer", stats.UnitDimensionless) - mReleasedSpans = stats.Int64("spans_released", "Spans released to the next consumer", stats.UnitDimensionless) - mReleasedTraces = stats.Int64("traces_released", "Traces released to the next consumer", stats.UnitDimensionless) - mIncompleteReleases = stats.Int64("incomplete_releases", "Releases that are suspected to have been incomplete", stats.UnitDimensionless) - mEventLatency = stats.Int64("event_latency", "How long the queue events are taking to be processed", stats.UnitMilliseconds) -) - -// metricViews return the metrics views according to given telemetry level. -func metricViews() []*view.View { - return []*view.View{ - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mNumTracesConf.Name()), - Measure: mNumTracesConf, - Description: mNumTracesConf.Description(), - Aggregation: view.LastValue(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mNumEventsInQueue.Name()), - Measure: mNumEventsInQueue, - Description: mNumEventsInQueue.Description(), - Aggregation: view.LastValue(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mNumTracesInMemory.Name()), - Measure: mNumTracesInMemory, - Description: mNumTracesInMemory.Description(), - Aggregation: view.LastValue(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mTracesEvicted.Name()), - Measure: mTracesEvicted, - Description: mTracesEvicted.Description(), - // sum allows us to start from 0, count will only show up if there's at least one eviction, which might take a while to happen (if ever!) - Aggregation: view.Sum(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mReleasedSpans.Name()), - Measure: mReleasedSpans, - Description: mReleasedSpans.Description(), - Aggregation: view.Sum(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mReleasedTraces.Name()), - Measure: mReleasedTraces, - Description: mReleasedTraces.Description(), - Aggregation: view.Sum(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mIncompleteReleases.Name()), - Measure: mIncompleteReleases, - Description: mIncompleteReleases.Description(), - Aggregation: view.Sum(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mEventLatency.Name()), - Measure: mEventLatency, - Description: mEventLatency.Description(), - TagKeys: []tag.Key{ - tag.MustNewKey("event"), - }, - Aggregation: view.Distribution(0, 5, 10, 20, 50, 100, 200, 500, 1000), - }, - } -} diff --git a/processor/groupbytraceprocessor/metrics_test.go b/processor/groupbytraceprocessor/metrics_test.go deleted file mode 100644 index abbe57c321d6..000000000000 --- a/processor/groupbytraceprocessor/metrics_test.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package groupbytraceprocessor - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestProcessorMetrics(t *testing.T) { - expectedViewNames := []string{ - "processor_groupbytrace_conf_num_traces", - "processor_groupbytrace_num_events_in_queue", - "processor_groupbytrace_num_traces_in_memory", - "processor_groupbytrace_traces_evicted", - "processor_groupbytrace_spans_released", - "processor_groupbytrace_traces_released", - "processor_groupbytrace_incomplete_releases", - "processor_groupbytrace_event_latency", - } - - views := metricViews() - for i, viewName := range expectedViewNames { - assert.Equal(t, viewName, views[i].Name) - } -} diff --git a/processor/groupbytraceprocessor/processor.go b/processor/groupbytraceprocessor/processor.go index ad249e17bd3e..e5863dfff767 100644 --- a/processor/groupbytraceprocessor/processor.go +++ b/processor/groupbytraceprocessor/processor.go @@ -8,7 +8,6 @@ import ( "fmt" "time" - "go.opencensus.io/stats" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" @@ -18,6 +17,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" ) // groupByTraceProcessor is a processor that keeps traces in memory for a given duration, with the expectation @@ -33,10 +33,10 @@ import ( // Each worker in the eventMachine also uses a ring buffer to hold the in-flight trace IDs, so that we don't hold more than the given maximum number // of traces in memory/storage. Items that are evicted from the buffer are discarded without warning. type groupByTraceProcessor struct { - nextConsumer consumer.Traces - config Config - logger *zap.Logger - + nextConsumer consumer.Traces + config Config + logger *zap.Logger + telemetryBuilder *metadata.TelemetryBuilder // the event machine handling all operations for this processor eventMachine *eventMachine @@ -49,16 +49,21 @@ var _ processor.Traces = (*groupByTraceProcessor)(nil) const bufferSize = 10_000 // newGroupByTraceProcessor returns a new processor. -func newGroupByTraceProcessor(logger *zap.Logger, st storage, nextConsumer consumer.Traces, config Config) *groupByTraceProcessor { +func newGroupByTraceProcessor(set processor.Settings, nextConsumer consumer.Traces, config Config) *groupByTraceProcessor { + telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil + } + // the event machine will buffer up to N concurrent events before blocking - eventMachine := newEventMachine(logger, 10000, config.NumWorkers, config.NumTraces) + eventMachine := newEventMachine(set.Logger, 10000, config.NumWorkers, config.NumTraces, telemetryBuilder) sp := &groupByTraceProcessor{ - logger: logger, - nextConsumer: nextConsumer, - config: config, - eventMachine: eventMachine, - st: st, + logger: set.Logger, + nextConsumer: nextConsumer, + config: config, + telemetryBuilder: telemetryBuilder, + eventMachine: eventMachine, } // register the callbacks @@ -85,10 +90,9 @@ func (sp *groupByTraceProcessor) Capabilities() consumer.Capabilities { // Start is invoked during service startup. func (sp *groupByTraceProcessor) Start(context.Context, component.Host) error { // start these metrics, as it might take a while for them to receive their first event - stats.Record(context.Background(), mTracesEvicted.M(0)) - stats.Record(context.Background(), mIncompleteReleases.M(0)) - stats.Record(context.Background(), mNumTracesConf.M(int64(sp.config.NumTraces))) - + sp.telemetryBuilder.ProcessorGroupbytraceTracesEvicted.Add(context.Background(), 0) + sp.telemetryBuilder.ProcessorGroupbytraceIncompleteReleases.Add(context.Background(), 0) + sp.telemetryBuilder.ProcessorGroupbytraceConfNumTraces.Record(context.Background(), (int64(sp.config.NumTraces))) sp.eventMachine.startInBackground() return sp.st.start() } @@ -124,8 +128,7 @@ func (sp *groupByTraceProcessor) onTraceReceived(trace tracesWithID, worker *eve typ: traceRemoved, payload: evicted, }) - - stats.Record(context.Background(), mTracesEvicted.M(1)) + sp.telemetryBuilder.ProcessorGroupbytraceTracesEvicted.Add(context.Background(), 1) sp.logger.Info("trace evicted: in order to avoid this in the future, adjust the wait duration and/or number of traces to keep in memory", zap.Stringer("traceID", evicted)) @@ -155,8 +158,7 @@ func (sp *groupByTraceProcessor) onTraceExpired(traceID pcommon.TraceID, worker // we likely received multiple batches with spans for the same trace // and released this trace already sp.logger.Debug("skipping the processing of expired trace", zap.Stringer("traceID", traceID)) - - stats.Record(context.Background(), mIncompleteReleases.M(1)) + sp.telemetryBuilder.ProcessorGroupbytraceIncompleteReleases.Add(context.Background(), 1) return nil } @@ -204,10 +206,9 @@ func (sp *groupByTraceProcessor) onTraceReleased(rss []ptrace.ResourceSpans) err trs := trace.ResourceSpans().AppendEmpty() rs.CopyTo(trs) } - stats.Record(context.Background(), - mReleasedSpans.M(int64(trace.SpanCount())), - mReleasedTraces.M(1), - ) + + sp.telemetryBuilder.ProcessorGroupbytraceSpansReleased.Add(context.Background(), int64(trace.SpanCount())) + sp.telemetryBuilder.ProcessorGroupbytraceTracesReleased.Add(context.Background(), 1) // Do async consuming not to block event worker go func() { diff --git a/processor/groupbytraceprocessor/processor_test.go b/processor/groupbytraceprocessor/processor_test.go index 6a0b0959d959..ddc7675a055f 100644 --- a/processor/groupbytraceprocessor/processor_test.go +++ b/processor/groupbytraceprocessor/processor_test.go @@ -14,12 +14,15 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" ) func TestTraceIsDispatchedAfterDuration(t *testing.T) { @@ -41,7 +44,9 @@ func TestTraceIsDispatchedAfterDuration(t *testing.T) { } wgDeleted := &sync.WaitGroup{} // we wait for the next (mock) processor to receive the trace - backing := newMemoryStorage() + + p := newGroupByTraceProcessor(processortest.NewNopSettings(), mockProcessor, config) + backing := newMemoryStorage(p.telemetryBuilder) st := &mockStorage{ onCreateOrAppend: backing.createOrAppend, onGet: backing.get, @@ -50,8 +55,7 @@ func TestTraceIsDispatchedAfterDuration(t *testing.T) { return backing.delete(traceID) }, } - - p := newGroupByTraceProcessor(zap.NewNop(), st, mockProcessor, config) + p.st = st ctx := context.Background() assert.NoError(t, p.Start(ctx, nil)) defer func() { @@ -94,10 +98,9 @@ func TestInternalCacheLimit(t *testing.T) { return nil } - st := newMemoryStorage() - - p := newGroupByTraceProcessor(zap.NewNop(), st, mockProcessor, config) - + p := newGroupByTraceProcessor(processortest.NewNopSettings(), mockProcessor, config) + st := newMemoryStorage(p.telemetryBuilder) + p.st = st ctx := context.Background() assert.NoError(t, p.Start(ctx, nil)) defer func() { @@ -141,11 +144,10 @@ func TestProcessorCapabilities(t *testing.T) { NumTraces: 10, NumWorkers: 1, } - st := newMemoryStorage() - next := &mockProcessor{} - // test - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), consumertest.NewNop(), config) + st := newMemoryStorage(p.telemetryBuilder) + p.st = st caps := p.Capabilities() // verify @@ -160,8 +162,6 @@ func TestProcessBatchDoesntFail(t *testing.T) { NumTraces: 10, NumWorkers: 1, } - st := newMemoryStorage() - next := &mockProcessor{} traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) @@ -172,9 +172,10 @@ func TestProcessBatchDoesntFail(t *testing.T) { span.SetTraceID(traceID) span.SetSpanID([8]byte{1, 2, 3, 4}) - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), consumertest.NewNop(), config) assert.NotNil(t, p) - + st := newMemoryStorage(p.telemetryBuilder) + p.st = st // test assert.NoError(t, p.onTraceReceived(tracesWithID{id: traceID, td: trace}, p.eventMachine.workers[0])) } @@ -191,11 +192,12 @@ func TestTraceDisappearedFromStorageBeforeReleasing(t *testing.T) { return nil, nil }, } - next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), consumertest.NewNop(), config) require.NotNil(t, p) + p.st = st + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) batch := simpleTracesWithID(traceID) @@ -229,10 +231,10 @@ func TestTraceErrorFromStorageWhileReleasing(t *testing.T) { return nil, expectedError }, } - next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), consumertest.NewNop(), config) require.NotNil(t, p) + p.st = st traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) batch := simpleTracesWithID(traceID) @@ -267,10 +269,10 @@ func TestTraceErrorFromStorageWhileProcessingTrace(t *testing.T) { return expectedError }, } - next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), consumertest.NewNop(), config) require.NotNil(t, p) + p.st = st traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) @@ -299,7 +301,6 @@ func TestAddSpansToExistingTrace(t *testing.T) { NumTraces: 8, NumWorkers: 4, } - st := newMemoryStorage() var receivedTraces []ptrace.ResourceSpans next := &mockProcessor{ @@ -312,8 +313,10 @@ func TestAddSpansToExistingTrace(t *testing.T) { }, } - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(t, p) + st := newMemoryStorage(p.telemetryBuilder) + p.st = st ctx := context.Background() assert.NoError(t, p.Start(ctx, nil)) @@ -351,8 +354,9 @@ func TestTraceErrorFromStorageWhileProcessingSecondTrace(t *testing.T) { st := &mockStorage{} next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(t, p) + p.st = st traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) @@ -399,9 +403,9 @@ func TestErrorFromStorageWhileRemovingTrace(t *testing.T) { } next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(t, p) - + p.st = st traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) // test @@ -425,9 +429,9 @@ func TestTraceNotFoundWhileRemovingTrace(t *testing.T) { } next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(t, p) - + p.st = st traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) // test @@ -446,7 +450,7 @@ func TestTracesAreDispatchedInIndividualBatches(t *testing.T) { NumTraces: 8, NumWorkers: 4, } - st := newMemoryStorage() + next := &mockProcessor{ onTraces: func(_ context.Context, traces ptrace.Traces) error { // we should receive two batches, each one with one trace @@ -456,9 +460,10 @@ func TestTracesAreDispatchedInIndividualBatches(t *testing.T) { }, } - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(t, p) - + st := newMemoryStorage(p.telemetryBuilder) + p.st = st ctx := context.Background() assert.NoError(t, p.Start(ctx, nil)) defer func() { @@ -504,9 +509,9 @@ func TestErrorOnProcessResourceSpansContinuesProcessing(t *testing.T) { st := &mockStorage{} next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(t, p) - + p.st = st traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) trace := ptrace.NewTraces() @@ -536,10 +541,12 @@ func TestAsyncOnRelease(t *testing.T) { blocker := &blockingConsumer{ blockCh: blockCh, } - + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) sp := &groupByTraceProcessor{ - logger: zap.NewNop(), - nextConsumer: blocker, + logger: zap.NewNop(), + nextConsumer: blocker, + telemetryBuilder: tel, } assert.NoError(t, sp.onTraceReleased(nil)) close(blockCh) @@ -552,7 +559,6 @@ func BenchmarkConsumeTracesCompleteOnFirstBatch(b *testing.B) { NumTraces: defaultNumTraces, NumWorkers: 4 * defaultNumWorkers, } - st := newMemoryStorage() // For each input trace there are always <= 2 events in the machine simultaneously. semaphoreCh := make(chan struct{}, bufferSize/2) @@ -561,9 +567,10 @@ func BenchmarkConsumeTracesCompleteOnFirstBatch(b *testing.B) { return nil }} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(b, p) - + st := newMemoryStorage(p.telemetryBuilder) + p.st = st ctx := context.Background() require.NoError(b, p.Start(ctx, nil)) defer func() { diff --git a/processor/groupbytraceprocessor/storage_memory.go b/processor/groupbytraceprocessor/storage_memory.go index 909164d12330..0fe65642fb96 100644 --- a/processor/groupbytraceprocessor/storage_memory.go +++ b/processor/groupbytraceprocessor/storage_memory.go @@ -8,7 +8,7 @@ import ( "sync" "time" - "go.opencensus.io/stats" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -16,6 +16,7 @@ import ( type memoryStorage struct { sync.RWMutex content map[pcommon.TraceID][]ptrace.ResourceSpans + telemetry *metadata.TelemetryBuilder stopped bool stoppedLock sync.RWMutex metricsCollectionInterval time.Duration @@ -23,10 +24,11 @@ type memoryStorage struct { var _ storage = (*memoryStorage)(nil) -func newMemoryStorage() *memoryStorage { +func newMemoryStorage(telemetry *metadata.TelemetryBuilder) *memoryStorage { return &memoryStorage{ content: make(map[pcommon.TraceID][]ptrace.ResourceSpans), metricsCollectionInterval: time.Second, + telemetry: telemetry, } } @@ -88,7 +90,7 @@ func (st *memoryStorage) shutdown() error { func (st *memoryStorage) periodicMetrics() { numTraces := st.count() - stats.Record(context.Background(), mNumTracesInMemory.M(int64(numTraces))) + st.telemetry.ProcessorGroupbytraceNumTracesInMemory.Record(context.Background(), int64(numTraces)) st.stoppedLock.RLock() stopped := st.stopped diff --git a/processor/groupbytraceprocessor/storage_memory_test.go b/processor/groupbytraceprocessor/storage_memory_test.go index 673569d36ffc..faa8d5939826 100644 --- a/processor/groupbytraceprocessor/storage_memory_test.go +++ b/processor/groupbytraceprocessor/storage_memory_test.go @@ -6,15 +6,18 @@ package groupbytraceprocessor import ( "testing" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processortest" ) func TestMemoryCreateAndGetTrace(t *testing.T) { - // prepare - st := newMemoryStorage() + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) + st := newMemoryStorage(tel) traceIDs := []pcommon.TraceID{ pcommon.TraceID([16]byte{1, 2, 3, 4}), @@ -48,8 +51,9 @@ func TestMemoryCreateAndGetTrace(t *testing.T) { } func TestMemoryDeleteTrace(t *testing.T) { - // prepare - st := newMemoryStorage() + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare + st := newMemoryStorage(tel) traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) @@ -75,8 +79,9 @@ func TestMemoryDeleteTrace(t *testing.T) { } func TestMemoryAppendSpans(t *testing.T) { - // prepare - st := newMemoryStorage() + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare + st := newMemoryStorage(tel) traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) @@ -126,8 +131,9 @@ func TestMemoryAppendSpans(t *testing.T) { } func TestMemoryTraceIsBeingCloned(t *testing.T) { - // prepare - st := newMemoryStorage() + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare + st := newMemoryStorage(tel) traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) trace := ptrace.NewTraces()