diff --git a/processor/processorhelper/logs_test.go b/processor/processorhelper/logs_test.go index e4a35657455..1f0359b264a 100644 --- a/processor/processorhelper/logs_test.go +++ b/processor/processorhelper/logs_test.go @@ -6,20 +6,15 @@ package processorhelper import ( "context" "errors" - "strings" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/plog" @@ -91,60 +86,44 @@ func TestLogsProcessor_RecordInOut(t *testing.T) { incomingLogRecords.AppendEmpty() incomingLogRecords.AppendEmpty() - metricReader := sdkmetric.NewManualReader() - set := processortest.NewNopSettings() - set.TelemetrySettings.MetricsLevel = configtelemetry.LevelBasic - set.TelemetrySettings.LeveledMeterProvider = func(level configtelemetry.Level) metric.MeterProvider { - if level >= configtelemetry.LevelBasic { - return sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader)) - } - return nil - } - - lp, err := NewLogsProcessor(context.Background(), set, &testLogsCfg, consumertest.NewNop(), mockAggregate) + testTelemetry := setupTestTelemetry() + lp, err := NewLogsProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs)) assert.NoError(t, lp.Shutdown(context.Background())) - ownMetrics := new(metricdata.ResourceMetrics) - require.NoError(t, metricReader.Collect(context.Background(), ownMetrics)) - - require.Len(t, ownMetrics.ScopeMetrics, 1) - require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2) - - inMetric := ownMetrics.ScopeMetrics[0].Metrics[0] - outMetric := ownMetrics.ScopeMetrics[0].Metrics[1] - if strings.Contains(inMetric.Name, "outgoing") { - inMetric, outMetric = outMetric, inMetric - } - - metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(attribute.KeyValue{ - Key: attribute.Key("processor"), - Value: attribute.StringValue(set.ID.String()), - }), - Value: 3, + testTelemetry.assertMetrics(t, []metricdata.Metrics{ + { + Name: "otelcol_processor_incoming_log_records", + Description: "Number of log records passed to the processor.", + Unit: "{records}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 3, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")), + }, + }, }, }, - }, inMetric.Data, metricdatatest.IgnoreTimestamp()) - - metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(attribute.KeyValue{ - Key: attribute.Key("processor"), - Value: attribute.StringValue(set.ID.String()), - }), - Value: 1, + { + Name: "otelcol_processor_outgoing_log_records", + Description: "Number of log records emitted from the processor.", + Unit: "{records}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")), + }, + }, }, }, - }, outMetric.Data, metricdatatest.IgnoreTimestamp()) + }) } diff --git a/processor/processorhelper/metrics_test.go b/processor/processorhelper/metrics_test.go index 7a31d70c831..77a33b31bf1 100644 --- a/processor/processorhelper/metrics_test.go +++ b/processor/processorhelper/metrics_test.go @@ -6,20 +6,15 @@ package processorhelper import ( "context" "errors" - "strings" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pmetric" @@ -76,76 +71,60 @@ func newTestMProcessor(retError error) ProcessMetricsFunc { } func TestMetricsProcessor_RecordInOut(t *testing.T) { - // Regardless of how many data points are ingested, emit just one + // Regardless of how many data points are ingested, emit 3 mockAggregate := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) { md := pmetric.NewMetrics() md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty() + md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty() + md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty() return md, nil } incomingMetrics := pmetric.NewMetrics() dps := incomingMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints() - // Add 3 data points to the incoming - dps.AppendEmpty() + // Add 2 data points to the incoming dps.AppendEmpty() dps.AppendEmpty() - metricReader := sdkmetric.NewManualReader() - set := processortest.NewNopSettings() - set.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal - set.TelemetrySettings.MetricsLevel = configtelemetry.LevelBasic - set.TelemetrySettings.LeveledMeterProvider = func(level configtelemetry.Level) metric.MeterProvider { - if level >= configtelemetry.LevelBasic { - return sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader)) - } - return nil - } - - mp, err := NewMetricsProcessor(context.Background(), set, &testMetricsCfg, consumertest.NewNop(), mockAggregate) + testTelemetry := setupTestTelemetry() + mp, err := NewMetricsProcessor(context.Background(), testTelemetry.NewSettings(), &testMetricsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, mp.ConsumeMetrics(context.Background(), incomingMetrics)) assert.NoError(t, mp.Shutdown(context.Background())) - ownMetrics := new(metricdata.ResourceMetrics) - require.NoError(t, metricReader.Collect(context.Background(), ownMetrics)) - - require.Len(t, ownMetrics.ScopeMetrics, 1) - require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2) - - inMetric := ownMetrics.ScopeMetrics[0].Metrics[0] - outMetric := ownMetrics.ScopeMetrics[0].Metrics[1] - if strings.Contains(inMetric.Name, "outgoing") { - inMetric, outMetric = outMetric, inMetric - } - - metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(attribute.KeyValue{ - Key: attribute.Key("processor"), - Value: attribute.StringValue(set.ID.String()), - }), - Value: 3, + testTelemetry.assertMetrics(t, []metricdata.Metrics{ + { + Name: "otelcol_processor_incoming_metric_points", + Description: "Number of metric points passed to the processor.", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 2, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")), + }, + }, }, }, - }, inMetric.Data, metricdatatest.IgnoreTimestamp()) - - metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(attribute.KeyValue{ - Key: attribute.Key("processor"), - Value: attribute.StringValue(set.ID.String()), - }), - Value: 1, + { + Name: "otelcol_processor_outgoing_metric_points", + Description: "Number of metric points emitted from the processor.", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 3, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")), + }, + }, }, }, - }, outMetric.Data, metricdatatest.IgnoreTimestamp()) + }) } diff --git a/processor/processorhelper/traces_test.go b/processor/processorhelper/traces_test.go index 1a844dabd70..d709c5a9731 100644 --- a/processor/processorhelper/traces_test.go +++ b/processor/processorhelper/traces_test.go @@ -6,20 +6,15 @@ package processorhelper import ( "context" "errors" - "strings" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/ptrace" @@ -86,66 +81,50 @@ func TestTracesProcessor_RecordInOut(t *testing.T) { incomingTraces := ptrace.NewTraces() incomingSpans := incomingTraces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans() - // Add 3 records to the incoming + // Add 4 records to the incoming + incomingSpans.AppendEmpty() incomingSpans.AppendEmpty() incomingSpans.AppendEmpty() incomingSpans.AppendEmpty() - metricReader := sdkmetric.NewManualReader() - set := processortest.NewNopSettings() - set.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal - set.TelemetrySettings.MetricsLevel = configtelemetry.LevelBasic - set.TelemetrySettings.LeveledMeterProvider = func(level configtelemetry.Level) metric.MeterProvider { - if level >= configtelemetry.LevelBasic { - return sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader)) - } - return nil - } - - tp, err := NewTracesProcessor(context.Background(), set, &testLogsCfg, consumertest.NewNop(), mockAggregate) + testTelemetry := setupTestTelemetry() + tp, err := NewTracesProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, tp.ConsumeTraces(context.Background(), incomingTraces)) assert.NoError(t, tp.Shutdown(context.Background())) - ownMetrics := new(metricdata.ResourceMetrics) - require.NoError(t, metricReader.Collect(context.Background(), ownMetrics)) - - require.Len(t, ownMetrics.ScopeMetrics, 1) - require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2) - - inMetric := ownMetrics.ScopeMetrics[0].Metrics[0] - outMetric := ownMetrics.ScopeMetrics[0].Metrics[1] - if strings.Contains(inMetric.Name, "outgoing") { - inMetric, outMetric = outMetric, inMetric - } - - metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(attribute.KeyValue{ - Key: attribute.Key("processor"), - Value: attribute.StringValue(set.ID.String()), - }), - Value: 3, + testTelemetry.assertMetrics(t, []metricdata.Metrics{ + { + Name: "otelcol_processor_incoming_spans", + Description: "Number of spans passed to the processor.", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 4, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")), + }, + }, }, }, - }, inMetric.Data, metricdatatest.IgnoreTimestamp()) - - metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(attribute.KeyValue{ - Key: attribute.Key("processor"), - Value: attribute.StringValue(set.ID.String()), - }), - Value: 1, + { + Name: "otelcol_processor_outgoing_spans", + Description: "Number of spans emitted from the processor.", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")), + }, + }, }, }, - }, outMetric.Data, metricdatatest.IgnoreTimestamp()) + }) }