From 5bf038c3b44d27cd5b6764fa5e4d606998d87ceb Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Fri, 21 Jun 2024 11:25:40 -0700 Subject: [PATCH] [chore] update solace receiver to use mdatagen (#33582) This updates the code that was using OpenCensus to use mdatagen + otel Fixes #33468 --------- Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- receiver/solacereceiver/documentation.md | 111 +++ receiver/solacereceiver/factory_test.go | 36 +- .../generated_component_telemetry_test.go | 76 ++ .../solacereceiver/generated_package_test.go | 2 +- receiver/solacereceiver/go.mod | 7 +- receiver/solacereceiver/go.sum | 79 +- .../internal/metadata/generated_telemetry.go | 131 ++- .../metadata/generated_telemetry_test.go | 13 + receiver/solacereceiver/metadata.yaml | 95 ++- receiver/solacereceiver/observability.go | 206 ----- receiver/solacereceiver/observability_test.go | 115 --- receiver/solacereceiver/receiver.go | 74 +- receiver/solacereceiver/receiver_test.go | 757 ++++++++++++++++-- receiver/solacereceiver/unmarshaller.go | 18 +- .../solacereceiver/unmarshaller_egress.go | 16 +- .../unmarshaller_egress_test.go | 109 ++- .../solacereceiver/unmarshaller_receive.go | 22 +- .../unmarshaller_receive_test.go | 125 ++- receiver/solacereceiver/unmarshaller_test.go | 6 +- 19 files changed, 1410 insertions(+), 588 deletions(-) create mode 100644 receiver/solacereceiver/documentation.md create mode 100644 receiver/solacereceiver/generated_component_telemetry_test.go delete mode 100644 receiver/solacereceiver/observability.go delete mode 100644 receiver/solacereceiver/observability_test.go diff --git a/receiver/solacereceiver/documentation.md b/receiver/solacereceiver/documentation.md new file mode 100644 index 000000000000..7c7fd3c59b9c --- /dev/null +++ b/receiver/solacereceiver/documentation.md @@ -0,0 +1,111 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# solace + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### solacereceiver_dropped_egress_spans + +Number of dropped egress spans + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### solacereceiver_dropped_span_messages + +Number of dropped span messages + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### solacereceiver_failed_reconnections + +Number of failed broker reconnections + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### solacereceiver_fatal_unmarshalling_errors + +Number of fatal message unmarshalling errors + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### solacereceiver_need_upgrade + +Indicates with value 1 that receiver requires an upgrade and is not compatible with messages received from a broker + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### solacereceiver_received_span_messages + +Number of received span messages + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### solacereceiver_receiver_flow_control_recent_retries + +Most recent/current retry count when flow controlled + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### solacereceiver_receiver_flow_control_status + +Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### solacereceiver_receiver_flow_control_total + +Number of times the receiver instance became flow controlled + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### solacereceiver_receiver_flow_control_with_single_successful_retry + +Number of times the receiver instance became flow controlled and resolved situations after the first retry + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### solacereceiver_receiver_status + +Indicates the status of the receiver as an enum. 0 = starting, 1 = connecting, 2 = connected, 3 = disabled (often paired with needs_upgrade), 4 = terminating, 5 = terminated + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### solacereceiver_recoverable_unmarshalling_errors + +Number of recoverable message unmarshalling errors + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### solacereceiver_reported_spans + +Number of reported spans + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | diff --git a/receiver/solacereceiver/factory_test.go b/receiver/solacereceiver/factory_test.go index 36331ce3c246..509d8f2250c9 100644 --- a/receiver/solacereceiver/factory_test.go +++ b/receiver/solacereceiver/factory_test.go @@ -10,12 +10,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/metadata" ) @@ -68,17 +68,27 @@ func TestCreateTracesReceiverBadConfigIncompleteAuth(t *testing.T) { } func TestCreateTracesReceiverBadMetrics(t *testing.T) { - // register a metric first with the same name - statName := "solacereceiver/primary/failed_reconnections" - stat := stats.Int64(statName, "", stats.UnitDimensionless) - err := view.Register(&view.View{ - Name: buildReceiverCustomMetricName(statName), - Description: "some description", - Measure: stat, - Aggregation: view.Sum(), - }) + set := receivertest.NewNopSettings() + set.ID = component.MustNewIDWithName("solace", "factory") + // the code here sets up a custom meter provider + // to trigger the error condition required for this test + metricExp, err := stdoutmetric.New() require.NoError(t, err) - + provider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExp)), + sdkmetric.WithView(sdkmetric.NewView( + sdkmetric.Instrument{ + Name: "solacereceiver_failed_reconnections", + }, + sdkmetric.Stream{ + Aggregation: sdkmetric.AggregationLastValue{}, + }, + )), + ) + defer func() { + require.NoError(t, provider.Shutdown(context.Background())) + }() + set.TelemetrySettings.MeterProvider = provider cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) require.NoError(t, err) factory := NewFactory() @@ -87,8 +97,6 @@ func TestCreateTracesReceiverBadMetrics(t *testing.T) { sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "primary").String()) require.NoError(t, err) require.NoError(t, sub.Unmarshal(cfg)) - set := receivertest.NewNopSettings() - set.ID = component.MustNewIDWithName("solace", "factory") receiver, err := factory.CreateTracesReceiver( context.Background(), set, diff --git a/receiver/solacereceiver/generated_component_telemetry_test.go b/receiver/solacereceiver/generated_component_telemetry_test.go new file mode 100644 index 000000000000..8075850f77f3 --- /dev/null +++ b/receiver/solacereceiver/generated_component_telemetry_test.go @@ -0,0 +1,76 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package solacereceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +type componentTestTelemetry struct { + reader *sdkmetric.ManualReader + meterProvider *sdkmetric.MeterProvider +} + +func (tt *componentTestTelemetry) NewSettings() receiver.Settings { + settings := receivertest.NewNopSettings() + settings.MeterProvider = tt.meterProvider + settings.ID = component.NewID(component.MustNewType("solace")) + + return settings +} + +func setupTestTelemetry() componentTestTelemetry { + reader := sdkmetric.NewManualReader() + return componentTestTelemetry{ + reader: reader, + meterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)), + } +} + +func (tt *componentTestTelemetry) assertMetrics(t *testing.T, expected []metricdata.Metrics) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.reader.Collect(context.Background(), &md)) + // ensure all required metrics are present + for _, want := range expected { + got := tt.getMetric(want.Name, md) + metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) + } + + // ensure no additional metrics are emitted + require.Equal(t, len(expected), tt.len(md)) +} + +func (tt *componentTestTelemetry) getMetric(name string, got metricdata.ResourceMetrics) metricdata.Metrics { + for _, sm := range got.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + + return metricdata.Metrics{} +} + +func (tt *componentTestTelemetry) len(got metricdata.ResourceMetrics) int { + metricsCount := 0 + for _, sm := range got.ScopeMetrics { + metricsCount += len(sm.Metrics) + } + + return metricsCount +} + +func (tt *componentTestTelemetry) Shutdown(ctx context.Context) error { + return tt.meterProvider.Shutdown(ctx) +} diff --git a/receiver/solacereceiver/generated_package_test.go b/receiver/solacereceiver/generated_package_test.go index e0d3391161e5..0bd93028554c 100644 --- a/receiver/solacereceiver/generated_package_test.go +++ b/receiver/solacereceiver/generated_package_test.go @@ -9,5 +9,5 @@ import ( ) func TestMain(m *testing.M) { - goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + goleak.VerifyTestMain(m) } diff --git a/receiver/solacereceiver/go.mod b/receiver/solacereceiver/go.mod index 9397b3ab488d..01486b9d04f2 100644 --- a/receiver/solacereceiver/go.mod +++ b/receiver/solacereceiver/go.mod @@ -8,16 +8,18 @@ go 1.21.0 require ( github.com/Azure/go-amqp v1.0.5 github.com/stretchr/testify v1.9.0 - go.opencensus.io v0.24.0 go.opentelemetry.io/collector/component v0.103.0 go.opentelemetry.io/collector/config/configopaque v1.10.0 + go.opentelemetry.io/collector/config/configtelemetry v0.103.0 go.opentelemetry.io/collector/config/configtls v0.103.0 go.opentelemetry.io/collector/confmap v0.103.0 go.opentelemetry.io/collector/consumer v0.103.0 go.opentelemetry.io/collector/pdata v1.10.0 go.opentelemetry.io/collector/receiver v0.103.0 go.opentelemetry.io/otel v1.27.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.27.0 go.opentelemetry.io/otel/metric v1.27.0 + go.opentelemetry.io/otel/sdk/metric v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 @@ -33,7 +35,6 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -49,11 +50,9 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.54.0 // indirect github.com/prometheus/procfs v0.15.0 // indirect - go.opentelemetry.io/collector/config/configtelemetry v0.103.0 // indirect go.opentelemetry.io/collector/featuregate v1.10.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect go.opentelemetry.io/otel/sdk v1.27.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.27.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/sys v0.20.0 // indirect diff --git a/receiver/solacereceiver/go.sum b/receiver/solacereceiver/go.sum index 33edecde2ab6..4359e20466c0 100644 --- a/receiver/solacereceiver/go.sum +++ b/receiver/solacereceiver/go.sum @@ -1,21 +1,12 @@ -cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU= github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= -github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= -github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= @@ -29,30 +20,9 @@ github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsM github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= -github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= -github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= -github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= -github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= -github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= @@ -84,7 +54,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= -github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= @@ -94,18 +63,11 @@ github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4V github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= -go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/collector v0.103.0 h1:mssWo1y31p1F/SRsSBnVUX6YocgawCqM1blpE+hkWog= go.opentelemetry.io/collector/component v0.103.0 h1:j52YAsp8EmqYUotVUwhovkqFZGuxArEkk65V4TI46NE= go.opentelemetry.io/collector/component v0.103.0/go.mod h1:jKs19tGtCO8Hr5/YM0F+PoFcl8SVe/p4Ge30R6srkbc= @@ -131,6 +93,8 @@ go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= go.opentelemetry.io/otel/exporters/prometheus v0.49.0 h1:Er5I1g/YhfYv9Affk9nJLfH/+qCCVVg1f2R9AbJfqDQ= go.opentelemetry.io/otel/exporters/prometheus v0.49.0/go.mod h1:KfQ1wpjf3zsHjzP149P4LyAwWRupc6c7t1ZJ9eXpKQM= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.27.0 h1:/jlt1Y8gXWiHG9FBx6cJaIC5hYx5Fe64nC8w5Cylt/0= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.27.0/go.mod h1:bmToOGOBZ4hA9ghphIc1PAf66VA8KOtsuy3+ScStG20= go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI= @@ -148,30 +112,17 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= -golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= -golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= -golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -182,10 +133,6 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= -golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -193,36 +140,14 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= -google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5 h1:Q2RxlXqh1cgzzUgV261vBO2jI5R/3DD1J2pM0nI4NhU= google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= -google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= -google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= -google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= -google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= -google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= -google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= -google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/receiver/solacereceiver/internal/metadata/generated_telemetry.go b/receiver/solacereceiver/internal/metadata/generated_telemetry.go index 770b7192fb73..cd738205ddd1 100644 --- a/receiver/solacereceiver/internal/metadata/generated_telemetry.go +++ b/receiver/solacereceiver/internal/metadata/generated_telemetry.go @@ -3,9 +3,14 @@ package metadata import ( - "go.opentelemetry.io/collector/component" + "errors" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" ) func Meter(settings component.TelemetrySettings) metric.Meter { @@ -15,3 +20,127 @@ func Meter(settings component.TelemetrySettings) metric.Meter { func Tracer(settings component.TelemetrySettings) trace.Tracer { return settings.TracerProvider.Tracer("otelcol/solacereceiver") } + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + SolacereceiverDroppedEgressSpans metric.Int64Counter + SolacereceiverDroppedSpanMessages metric.Int64Counter + SolacereceiverFailedReconnections metric.Int64Counter + SolacereceiverFatalUnmarshallingErrors metric.Int64Counter + SolacereceiverNeedUpgrade metric.Int64Gauge + SolacereceiverReceivedSpanMessages metric.Int64Counter + SolacereceiverReceiverFlowControlRecentRetries metric.Int64Gauge + SolacereceiverReceiverFlowControlStatus metric.Int64Gauge + SolacereceiverReceiverFlowControlTotal metric.Int64Counter + SolacereceiverReceiverFlowControlWithSingleSuccessfulRetry metric.Int64Counter + SolacereceiverReceiverStatus metric.Int64Gauge + SolacereceiverRecoverableUnmarshallingErrors metric.Int64Counter + SolacereceiverReportedSpans metric.Int64Counter + level configtelemetry.Level +} + +// telemetryBuilderOption applies changes to default builder. +type telemetryBuilderOption func(*TelemetryBuilder) + +// WithLevel sets the current telemetry level for the component. +func WithLevel(lvl configtelemetry.Level) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.level = lvl + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{level: configtelemetry.LevelBasic} + for _, op := range options { + op(&builder) + } + var err, errs error + if builder.level >= configtelemetry.LevelBasic { + builder.meter = Meter(settings) + } else { + builder.meter = noop.Meter{} + } + builder.SolacereceiverDroppedEgressSpans, err = builder.meter.Int64Counter( + "solacereceiver_dropped_egress_spans", + metric.WithDescription("Number of dropped egress spans"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.SolacereceiverDroppedSpanMessages, err = builder.meter.Int64Counter( + "solacereceiver_dropped_span_messages", + metric.WithDescription("Number of dropped span messages"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.SolacereceiverFailedReconnections, err = builder.meter.Int64Counter( + "solacereceiver_failed_reconnections", + metric.WithDescription("Number of failed broker reconnections"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.SolacereceiverFatalUnmarshallingErrors, err = builder.meter.Int64Counter( + "solacereceiver_fatal_unmarshalling_errors", + metric.WithDescription("Number of fatal message unmarshalling errors"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.SolacereceiverNeedUpgrade, err = builder.meter.Int64Gauge( + "solacereceiver_need_upgrade", + metric.WithDescription("Indicates with value 1 that receiver requires an upgrade and is not compatible with messages received from a broker"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.SolacereceiverReceivedSpanMessages, err = builder.meter.Int64Counter( + "solacereceiver_received_span_messages", + metric.WithDescription("Number of received span messages"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.SolacereceiverReceiverFlowControlRecentRetries, err = builder.meter.Int64Gauge( + "solacereceiver_receiver_flow_control_recent_retries", + metric.WithDescription("Most recent/current retry count when flow controlled"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.SolacereceiverReceiverFlowControlStatus, err = builder.meter.Int64Gauge( + "solacereceiver_receiver_flow_control_status", + metric.WithDescription("Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.SolacereceiverReceiverFlowControlTotal, err = builder.meter.Int64Counter( + "solacereceiver_receiver_flow_control_total", + metric.WithDescription("Number of times the receiver instance became flow controlled"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.SolacereceiverReceiverFlowControlWithSingleSuccessfulRetry, err = builder.meter.Int64Counter( + "solacereceiver_receiver_flow_control_with_single_successful_retry", + metric.WithDescription("Number of times the receiver instance became flow controlled and resolved situations after the first retry"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.SolacereceiverReceiverStatus, err = builder.meter.Int64Gauge( + "solacereceiver_receiver_status", + metric.WithDescription("Indicates the status of the receiver as an enum. 0 = starting, 1 = connecting, 2 = connected, 3 = disabled (often paired with needs_upgrade), 4 = terminating, 5 = terminated"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.SolacereceiverRecoverableUnmarshallingErrors, err = builder.meter.Int64Counter( + "solacereceiver_recoverable_unmarshalling_errors", + metric.WithDescription("Number of recoverable message unmarshalling errors"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.SolacereceiverReportedSpans, err = builder.meter.Int64Counter( + "solacereceiver_reported_spans", + metric.WithDescription("Number of reported spans"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/receiver/solacereceiver/internal/metadata/generated_telemetry_test.go b/receiver/solacereceiver/internal/metadata/generated_telemetry_test.go index 1f570e76419f..bd6841d6ccfc 100644 --- a/receiver/solacereceiver/internal/metadata/generated_telemetry_test.go +++ b/receiver/solacereceiver/internal/metadata/generated_telemetry_test.go @@ -61,3 +61,16 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/receiver/solacereceiver/metadata.yaml b/receiver/solacereceiver/metadata.yaml index 9b04bd40787b..c4df3ce89d8b 100644 --- a/receiver/solacereceiver/metadata.yaml +++ b/receiver/solacereceiver/metadata.yaml @@ -17,8 +17,93 @@ tests: username: username password: passwd skip_lifecycle: true - goleak: - ignore: - top: - # See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information. - - "go.opencensus.io/stats/view.(*worker).start" \ No newline at end of file + +telemetry: + metrics: + solacereceiver_failed_reconnections: + enabled: true + unit: "1" + description: Number of failed broker reconnections + sum: + value_type: int + monotonic: true + solacereceiver_recoverable_unmarshalling_errors: + enabled: true + unit: "1" + description: Number of recoverable message unmarshalling errors + sum: + value_type: int + monotonic: true + solacereceiver_fatal_unmarshalling_errors: + enabled: true + unit: "1" + description: Number of fatal message unmarshalling errors + sum: + value_type: int + monotonic: true + solacereceiver_dropped_span_messages: + enabled: true + unit: "1" + description: Number of dropped span messages + sum: + value_type: int + monotonic: true + solacereceiver_received_span_messages: + enabled: true + unit: "1" + description: Number of received span messages + sum: + value_type: int + monotonic: true + solacereceiver_reported_spans: + enabled: true + unit: "1" + description: Number of reported spans + sum: + value_type: int + monotonic: true + solacereceiver_receiver_status: + enabled: true + unit: "1" + description: Indicates the status of the receiver as an enum. 0 = starting, 1 = connecting, 2 = connected, 3 = disabled (often paired with needs_upgrade), 4 = terminating, 5 = terminated + gauge: + value_type: int + solacereceiver_need_upgrade: + enabled: true + unit: "1" + description: Indicates with value 1 that receiver requires an upgrade and is not compatible with messages received from a broker + gauge: + value_type: int + solacereceiver_receiver_flow_control_status: + enabled: true + unit: "1" + description: Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled + gauge: + value_type: int + solacereceiver_receiver_flow_control_recent_retries: + enabled: true + unit: "1" + description: Most recent/current retry count when flow controlled + gauge: + value_type: int + solacereceiver_receiver_flow_control_total: + enabled: true + unit: "1" + description: Number of times the receiver instance became flow controlled + sum: + value_type: int + monotonic: true + solacereceiver_receiver_flow_control_with_single_successful_retry: + enabled: true + unit: "1" + description: Number of times the receiver instance became flow controlled and resolved situations after the first retry + sum: + value_type: int + monotonic: true + solacereceiver_dropped_egress_spans: + enabled: true + unit: "1" + description: Number of dropped egress spans + sum: + value_type: int + monotonic: true diff --git a/receiver/solacereceiver/observability.go b/receiver/solacereceiver/observability.go deleted file mode 100644 index 4d100d65d472..000000000000 --- a/receiver/solacereceiver/observability.go +++ /dev/null @@ -1,206 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package solacereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver" - -import ( - "context" - - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/metadata" -) - -const ( - // receiverKey used to identify receivers in metrics and traces. - receiverKey = "receiver" - // metricPrefix used to prefix solace specific metrics - metricPrefix = "solacereceiver" - nameSep = "/" -) - -type receiverState uint8 - -const ( - receiverStateStarting receiverState = iota - receiverStateConnecting - receiverStateConnected - receiverStateIdle - receiverStateTerminating - receiverStateTerminated -) - -type flowControlState uint8 - -const ( - flowControlStateClear flowControlState = iota - flowControlStateControlled -) - -type opencensusMetrics struct { - stats struct { - failedReconnections *stats.Int64Measure - recoverableUnmarshallingErrors *stats.Int64Measure - fatalUnmarshallingErrors *stats.Int64Measure - droppedSpanMessages *stats.Int64Measure - receivedSpanMessages *stats.Int64Measure - reportedSpans *stats.Int64Measure - receiverStatus *stats.Int64Measure - needUpgrade *stats.Int64Measure - flowControlStatus *stats.Int64Measure - flowControlRecentRetries *stats.Int64Measure - flowControlTotal *stats.Int64Measure - flowControlSingleSuccess *stats.Int64Measure - droppedEgressSpans *stats.Int64Measure - } - views struct { - failedReconnections *view.View - recoverableUnmarshallingErrors *view.View - fatalUnmarshallingErrors *view.View - droppedSpanMessages *view.View - receivedSpanMessages *view.View - reportedSpans *view.View - receiverStatus *view.View - needUpgrade *view.View - flowControlStatus *view.View - flowControlRecentRetries *view.View - flowControlTotal *view.View - flowControlSingleSuccess *view.View - droppedEgressSpans *view.View - } -} - -// receiver will register internal telemetry views -func newOpenCensusMetrics(instanceName string) (*opencensusMetrics, error) { - m := &opencensusMetrics{} - prefix := metricPrefix + nameSep - if instanceName != "" { - prefix += instanceName + nameSep - } - - m.stats.failedReconnections = stats.Int64(prefix+"failed_reconnections", "Number of failed broker reconnections", stats.UnitDimensionless) - m.stats.recoverableUnmarshallingErrors = stats.Int64(prefix+"recoverable_unmarshalling_errors", "Number of recoverable message unmarshalling errors", stats.UnitDimensionless) - m.stats.fatalUnmarshallingErrors = stats.Int64(prefix+"fatal_unmarshalling_errors", "Number of fatal message unmarshalling errors", stats.UnitDimensionless) - m.stats.droppedSpanMessages = stats.Int64(prefix+"dropped_span_messages", "Number of dropped span messages", stats.UnitDimensionless) - m.stats.receivedSpanMessages = stats.Int64(prefix+"received_span_messages", "Number of received span messages", stats.UnitDimensionless) - m.stats.reportedSpans = stats.Int64(prefix+"reported_spans", "Number of reported spans", stats.UnitDimensionless) - m.stats.receiverStatus = stats.Int64(prefix+"receiver_status", "Indicates the status of the receiver as an enum. 0 = starting, 1 = connecting, 2 = connected, 3 = disabled (often paired with needs_upgrade), 4 = terminating, 5 = terminated", stats.UnitDimensionless) - m.stats.needUpgrade = stats.Int64(prefix+"need_upgrade", "Indicates with value 1 that receiver requires an upgrade and is not compatible with messages received from a broker", stats.UnitDimensionless) - - m.stats.flowControlStatus = stats.Int64(prefix+"receiver_flow_control_status", "Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled", stats.UnitDimensionless) - m.stats.flowControlRecentRetries = stats.Int64(prefix+"receiver_flow_control_recent_retries", "Most recent/current retry count when flow controlled", stats.UnitDimensionless) - m.stats.flowControlTotal = stats.Int64(prefix+"receiver_flow_control_total", "Number of times the receiver instance became flow controlled", stats.UnitDimensionless) - m.stats.flowControlSingleSuccess = stats.Int64(prefix+"receiver_flow_control_with_single_successful_retry", "Number of times the receiver instance became flow controlled and resolved situations after the first retry", stats.UnitDimensionless) - - m.stats.droppedEgressSpans = stats.Int64(prefix+"dropped_egress_spans", "Number of dropped egress spans", stats.UnitDimensionless) - - m.views.failedReconnections = fromMeasure(m.stats.failedReconnections, view.Count()) - m.views.recoverableUnmarshallingErrors = fromMeasure(m.stats.recoverableUnmarshallingErrors, view.Count()) - m.views.fatalUnmarshallingErrors = fromMeasure(m.stats.fatalUnmarshallingErrors, view.Count()) - m.views.droppedSpanMessages = fromMeasure(m.stats.droppedSpanMessages, view.Count()) - m.views.receivedSpanMessages = fromMeasure(m.stats.receivedSpanMessages, view.Count()) - m.views.reportedSpans = fromMeasure(m.stats.reportedSpans, view.Sum()) - m.views.receiverStatus = fromMeasure(m.stats.receiverStatus, view.LastValue()) - m.views.needUpgrade = fromMeasure(m.stats.needUpgrade, view.LastValue()) - - m.views.flowControlStatus = fromMeasure(m.stats.flowControlStatus, view.LastValue()) - m.views.flowControlRecentRetries = fromMeasure(m.stats.flowControlRecentRetries, view.LastValue()) - m.views.flowControlTotal = fromMeasure(m.stats.flowControlTotal, view.Count()) - m.views.flowControlSingleSuccess = fromMeasure(m.stats.flowControlSingleSuccess, view.Count()) - - m.views.droppedEgressSpans = fromMeasure(m.stats.droppedEgressSpans, view.Count()) - - err := view.Register( - m.views.failedReconnections, - m.views.recoverableUnmarshallingErrors, - m.views.fatalUnmarshallingErrors, - m.views.droppedSpanMessages, - m.views.receivedSpanMessages, - m.views.reportedSpans, - m.views.receiverStatus, - m.views.needUpgrade, - m.views.flowControlStatus, - m.views.flowControlRecentRetries, - m.views.flowControlTotal, - m.views.flowControlSingleSuccess, - m.views.droppedEgressSpans, - ) - if err != nil { - return nil, err - } - return m, nil -} - -func fromMeasure(measure stats.Measure, agg *view.Aggregation) *view.View { - return &view.View{ - Name: buildReceiverCustomMetricName(measure.Name()), - Description: measure.Description(), - Measure: measure, - Aggregation: agg, - } -} - -func buildReceiverCustomMetricName(metric string) string { - return receiverKey + nameSep + metadata.Type.String() + nameSep + metric -} - -// recordFailedReconnection increments the metric that records failed reconnection event. -func (m *opencensusMetrics) recordFailedReconnection() { - stats.Record(context.Background(), m.stats.failedReconnections.M(1)) -} - -// recordRecoverableUnmarshallingError increments the metric that records a recoverable error by trace message unmarshalling. -func (m *opencensusMetrics) recordRecoverableUnmarshallingError() { - stats.Record(context.Background(), m.stats.recoverableUnmarshallingErrors.M(1)) -} - -// recordFatalUnmarshallingError increments the metric that records a fatal arrow by trace message unmarshalling. -func (m *opencensusMetrics) recordFatalUnmarshallingError() { - stats.Record(context.Background(), m.stats.fatalUnmarshallingErrors.M(1)) -} - -// recordDroppedSpanMessages increments the metric that records a dropped span message -func (m *opencensusMetrics) recordDroppedSpanMessages() { - stats.Record(context.Background(), m.stats.droppedSpanMessages.M(1)) -} - -// recordReceivedSpanMessages increments the metric that records a received span message -func (m *opencensusMetrics) recordReceivedSpanMessages() { - stats.Record(context.Background(), m.stats.receivedSpanMessages.M(1)) -} - -// recordReportedSpans increments the metric that records the number of spans reported to the next consumer -func (m *opencensusMetrics) recordReportedSpans(amount int64) { - stats.Record(context.Background(), m.stats.reportedSpans.M(amount)) -} - -// recordReceiverStatus sets the metric that records the current state of the receiver to the given state -func (m *opencensusMetrics) recordReceiverStatus(status receiverState) { - stats.Record(context.Background(), m.stats.receiverStatus.M(int64(status))) -} - -// RecordNeedRestart turns a need restart flag on -func (m *opencensusMetrics) recordNeedUpgrade() { - stats.Record(context.Background(), m.stats.needUpgrade.M(1)) -} - -func (m *opencensusMetrics) recordFlowControlStatus(status flowControlState) { - stats.Record(context.Background(), m.stats.flowControlStatus.M(int64(status))) -} - -func (m *opencensusMetrics) recordFlowControlRecentRetries(retries int64) { - stats.Record(context.Background(), m.stats.flowControlRecentRetries.M(retries)) -} - -func (m *opencensusMetrics) recordFlowControlTotal() { - stats.Record(context.Background(), m.stats.flowControlTotal.M(1)) -} - -func (m *opencensusMetrics) recordFlowControlSingleSuccess() { - stats.Record(context.Background(), m.stats.flowControlSingleSuccess.M(1)) -} - -func (m *opencensusMetrics) recordDroppedEgressSpan() { - stats.Record(context.Background(), m.stats.droppedEgressSpans.M(1)) -} diff --git a/receiver/solacereceiver/observability_test.go b/receiver/solacereceiver/observability_test.go deleted file mode 100644 index 95de0c6e3720..000000000000 --- a/receiver/solacereceiver/observability_test.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package solacereceiver - -import ( - "reflect" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" -) - -type metricsTestCase struct { - fn func() // function to test updating metrics - v *view.View // view to reference - m stats.Measure // expected measure of the view - calls int // number of times to call fn - expected int // expected value of reported metric at end of calls -} - -func TestRecordMetrics(t *testing.T) { - metrics := newTestMetrics(t) - testCases := []metricsTestCase{ - {metrics.recordFailedReconnection, metrics.views.failedReconnections, metrics.stats.failedReconnections, 3, 3}, - {metrics.recordRecoverableUnmarshallingError, metrics.views.recoverableUnmarshallingErrors, metrics.stats.recoverableUnmarshallingErrors, 3, 3}, - {metrics.recordFatalUnmarshallingError, metrics.views.fatalUnmarshallingErrors, metrics.stats.fatalUnmarshallingErrors, 3, 3}, - {metrics.recordDroppedSpanMessages, metrics.views.droppedSpanMessages, metrics.stats.droppedSpanMessages, 3, 3}, - {metrics.recordReceivedSpanMessages, metrics.views.receivedSpanMessages, metrics.stats.receivedSpanMessages, 3, 3}, - {func() { - metrics.recordReportedSpans(2) - }, metrics.views.reportedSpans, metrics.stats.reportedSpans, 3, 6}, - {func() { - metrics.recordReceiverStatus(receiverStateTerminated) - }, metrics.views.receiverStatus, metrics.stats.receiverStatus, 3, int(receiverStateTerminated)}, - {metrics.recordNeedUpgrade, metrics.views.needUpgrade, metrics.stats.needUpgrade, 3, 1}, - {func() { - metrics.recordFlowControlStatus(flowControlStateControlled) - }, metrics.views.flowControlStatus, metrics.stats.flowControlStatus, 3, 1}, - {func() { - metrics.recordFlowControlRecentRetries(5) - }, metrics.views.flowControlRecentRetries, metrics.stats.flowControlRecentRetries, 3, 5}, - {metrics.recordFlowControlTotal, metrics.views.flowControlTotal, metrics.stats.flowControlTotal, 3, 3}, - {metrics.recordFlowControlSingleSuccess, metrics.views.flowControlSingleSuccess, metrics.stats.flowControlSingleSuccess, 3, 3}, - {metrics.recordDroppedEgressSpan, metrics.views.droppedEgressSpans, metrics.stats.droppedEgressSpans, 3, 3}, - } - for _, tc := range testCases { - t.Run(tc.m.Name(), func(t *testing.T) { - for i := 0; i < tc.calls; i++ { - tc.fn() - } - validateMetric(t, tc.v, tc.expected) - }) - } -} - -func validateMetric(t *testing.T, v *view.View, expected any) { - // hack to reset stats to 0 - defer func() { - view.Unregister(v) - err := view.Register(v) - assert.NoError(t, err) - }() - rows, err := view.RetrieveData(v.Name) - assert.NoError(t, err) - if expected != nil { - require.Len(t, rows, 1) - value := reflect.Indirect(reflect.ValueOf(rows[0].Data)).FieldByName("Value").Interface() - assert.EqualValues(t, expected, value) - } else { - assert.Len(t, rows, 0) - } -} - -// TestRegisterViewsExpectingFailure validates that if an error is returned from view.Register, we panic and don't continue with initialization -func TestRegisterViewsExpectingFailure(t *testing.T) { - statName := "solacereceiver/" + t.Name() + "/failed_reconnections" - stat := stats.Int64(statName, "", stats.UnitDimensionless) - err := view.Register(&view.View{ - Name: buildReceiverCustomMetricName(statName), - Description: "some description", - Measure: stat, - Aggregation: view.Sum(), - }) - require.NoError(t, err) - metrics, err := newOpenCensusMetrics(t.Name()) - assert.Error(t, err) - assert.Nil(t, metrics) -} - -// newTestMetrics builds a new metrics that will cleanup when testing.T completes -func newTestMetrics(t *testing.T) *opencensusMetrics { - m, err := newOpenCensusMetrics(t.Name()) - require.NoError(t, err) - t.Cleanup(func() { - unregisterMetrics(m) - }) - return m -} - -// unregisterMetrics is used to unregister the metrics for testing purposes -func unregisterMetrics(metrics *opencensusMetrics) { - view.Unregister( - metrics.views.failedReconnections, - metrics.views.recoverableUnmarshallingErrors, - metrics.views.fatalUnmarshallingErrors, - metrics.views.droppedSpanMessages, - metrics.views.receivedSpanMessages, - metrics.views.reportedSpans, - metrics.views.receiverStatus, - metrics.views.needUpgrade, - ) -} diff --git a/receiver/solacereceiver/receiver.go b/receiver/solacereceiver/receiver.go index 73031022e3f8..d6ec13f46ddc 100644 --- a/receiver/solacereceiver/receiver.go +++ b/receiver/solacereceiver/receiver.go @@ -16,6 +16,26 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/metadata" +) + +type receiverState uint8 + +const ( + receiverStateStarting receiverState = iota + receiverStateConnecting + receiverStateConnected + receiverStateIdle + receiverStateTerminating + receiverStateTerminated +) + +type flowControlState uint8 + +const ( + flowControlStateClear flowControlState = iota + flowControlStateControlled ) // solaceTracesReceiver uses azure AMQP to consume and handle telemetry data from SOlace. Implements receiver.Traces @@ -23,10 +43,10 @@ type solaceTracesReceiver struct { // config is the receiver.Config instance used to build the receiver config *Config - nextConsumer consumer.Traces - settings receiver.Settings - metrics *opencensusMetrics - unmarshaller tracesUnmarshaller + nextConsumer consumer.Traces + settings receiver.Settings + telemetryBuilder *metadata.TelemetryBuilder + unmarshaller tracesUnmarshaller // cancel is the function that will cancel the context associated with the main worker loop cancel context.CancelFunc shutdownWaitGroup *sync.WaitGroup @@ -47,19 +67,19 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu return nil, err } - metrics, err := newOpenCensusMetrics(set.ID.Name()) + telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) if err != nil { set.Logger.Warn("Error registering metrics", zap.Any("error", err)) return nil, err } - unmarshaller := newTracesUnmarshaller(set.Logger, metrics) + unmarshaller := newTracesUnmarshaller(set.Logger, telemetryBuilder) return &solaceTracesReceiver{ config: config, nextConsumer: nextConsumer, settings: set, - metrics: metrics, + telemetryBuilder: telemetryBuilder, unmarshaller: unmarshaller, shutdownWaitGroup: &sync.WaitGroup{}, factory: factory, @@ -69,9 +89,9 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu } // Start implements component.Receiver::Start -func (s *solaceTracesReceiver) Start(_ context.Context, _ component.Host) error { - s.metrics.recordReceiverStatus(receiverStateStarting) - s.metrics.recordFlowControlStatus(flowControlStateClear) +func (s *solaceTracesReceiver) Start(ctx context.Context, _ component.Host) error { + s.telemetryBuilder.SolacereceiverReceiverStatus.Record(ctx, int64(receiverStateStarting)) + s.telemetryBuilder.SolacereceiverReceiverFlowControlStatus.Record(ctx, int64(flowControlStateClear)) var cancelableContext context.Context cancelableContext, s.cancel = context.WithCancel(context.Background()) @@ -89,12 +109,12 @@ func (s *solaceTracesReceiver) Shutdown(_ context.Context) error { return nil } s.terminating.Store(true) - s.metrics.recordReceiverStatus(receiverStateTerminating) + s.telemetryBuilder.SolacereceiverReceiverStatus.Record(context.Background(), int64(receiverStateTerminating)) s.settings.Logger.Info("Shutdown waiting for all components to complete") s.cancel() // cancels the context passed to the reconnection loop s.shutdownWaitGroup.Wait() s.settings.Logger.Info("Receiver shutdown successfully") - s.metrics.recordReceiverStatus(receiverStateTerminated) + s.telemetryBuilder.SolacereceiverReceiverStatus.Record(context.Background(), int64(receiverStateTerminated)) return nil } @@ -110,7 +130,7 @@ func (s *solaceTracesReceiver) connectAndReceive(ctx context.Context) { disable := false // indicate we are in connecting state at the start - s.metrics.recordReceiverStatus(receiverStateConnecting) + s.telemetryBuilder.SolacereceiverReceiverStatus.Record(context.Background(), int64(receiverStateConnecting)) reconnectionLoop: for !disable { @@ -136,7 +156,7 @@ reconnectionLoop: if err := service.dial(ctx); err != nil { s.settings.Logger.Debug("Encountered error while connecting messaging service", zap.Error(err)) - s.metrics.recordFailedReconnection() + s.telemetryBuilder.SolacereceiverFailedReconnections.Add(ctx, 1) return } // dial was successful, record the connected state @@ -145,7 +165,7 @@ reconnectionLoop: if err := s.receiveMessages(ctx, service); err != nil { s.settings.Logger.Debug("Encountered error while receiving messages", zap.Error(err)) if errors.Is(err, errUpgradeRequired) { - s.metrics.recordNeedUpgrade() + s.telemetryBuilder.SolacereceiverNeedUpgrade.Record(ctx, 1) disable = true return } @@ -162,7 +182,7 @@ reconnectionLoop: // this state transition were to happen, it would be short lived. func (s *solaceTracesReceiver) recordConnectionState(state receiverState) { if !s.terminating.Load() { - s.metrics.recordReceiverStatus(state) + s.telemetryBuilder.SolacereceiverReceiverStatus.Record(context.Background(), int64(state)) } } @@ -200,18 +220,18 @@ func (s *solaceTracesReceiver) receiveMessage(ctx context.Context, service messa } }() // message received successfully - s.metrics.recordReceivedSpanMessages() + s.telemetryBuilder.SolacereceiverReceivedSpanMessages.Add(ctx, 1) // unmarshal the message. unmarshalling errors are not fatal unless the version is unknown traces, unmarshalErr := s.unmarshaller.unmarshal(msg) if unmarshalErr != nil { s.settings.Logger.Error("Encountered error while unmarshalling message", zap.Error(unmarshalErr)) - s.metrics.recordFatalUnmarshallingError() + s.telemetryBuilder.SolacereceiverFatalUnmarshallingErrors.Add(ctx, 1) if errors.Is(unmarshalErr, errUpgradeRequired) { disposition = service.failed // if we don't know the version, reject the trace message since we will disable the receiver return unmarshalErr } - s.metrics.recordDroppedSpanMessages() // if the error is some other unmarshalling error, we will ack the message and drop the content - return nil // don't propagate error, but don't continue forwarding traces + s.telemetryBuilder.SolacereceiverDroppedSpanMessages.Add(ctx, 1) // if the error is some other unmarshalling error, we will ack the message and drop the content + return nil // don't propagate error, but don't continue forwarding traces } var flowControlCount int64 @@ -225,10 +245,10 @@ flowControlLoop: s.settings.Logger.Info("Encountered temporary error while forwarding traces to next receiver, will allow redelivery", zap.Error(forwardErr)) // handle flow control metrics if flowControlCount == 0 { - s.metrics.recordFlowControlStatus(flowControlStateControlled) + s.telemetryBuilder.SolacereceiverReceiverFlowControlStatus.Record(ctx, int64(flowControlStateControlled)) } flowControlCount++ - s.metrics.recordFlowControlRecentRetries(flowControlCount) + s.telemetryBuilder.SolacereceiverReceiverFlowControlRecentRetries.Record(ctx, flowControlCount) // Backpressure scenario. For now, we are only delayed retry, eventually we may need to handle this delayTimer := time.NewTimer(s.config.Flow.DelayedRetry.Delay) select { @@ -241,21 +261,21 @@ flowControlLoop: } } else { // error is permanent, we want to accept the message and increment the number of dropped messages s.settings.Logger.Warn("Encountered permanent error while forwarding traces to next receiver, will swallow trace", zap.Error(forwardErr)) - s.metrics.recordDroppedSpanMessages() + s.telemetryBuilder.SolacereceiverDroppedSpanMessages.Add(ctx, 1) break flowControlLoop } } else { // no forward error - s.metrics.recordReportedSpans(int64(traces.SpanCount())) + s.telemetryBuilder.SolacereceiverReportedSpans.Add(ctx, int64(traces.SpanCount())) break flowControlLoop } } // Make sure to clear the stats no matter what, unless we were interrupted in which case we should preserve the last state if flowControlCount != 0 { - s.metrics.recordFlowControlStatus(flowControlStateClear) - s.metrics.recordFlowControlTotal() + s.telemetryBuilder.SolacereceiverReceiverFlowControlStatus.Record(ctx, int64(flowControlStateClear)) + s.telemetryBuilder.SolacereceiverReceiverFlowControlTotal.Add(ctx, 1) if flowControlCount == 1 { - s.metrics.recordFlowControlSingleSuccess() + s.telemetryBuilder.SolacereceiverReceiverFlowControlWithSingleSuccessfulRetry.Add(ctx, 1) } } return nil diff --git a/receiver/solacereceiver/receiver_test.go b/receiver/solacereceiver/receiver_test.go index cc6f8772cc2c..baaea6c85eed 100644 --- a/receiver/solacereceiver/receiver_test.go +++ b/receiver/solacereceiver/receiver_test.go @@ -20,6 +20,9 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/metadata" ) // connectAndReceive with connect failure @@ -28,10 +31,75 @@ import ( func TestReceiveMessage(t *testing.T) { someError := errors.New("some error") - - validateMetrics := func(receivedMsgVal, droppedMsgVal, fatalUnmarshalling, reportedSpan any) func(t *testing.T, receiver *solaceTracesReceiver) { - return func(t *testing.T, receiver *solaceTracesReceiver) { - validateReceiverMetrics(t, receiver, receivedMsgVal, droppedMsgVal, fatalUnmarshalling, reportedSpan) + validateMetrics := func(receivedMsgVal, droppedMsgVal, fatalUnmarshalling, reportedSpan int64) func(t *testing.T, tt componentTestTelemetry) { + return func(t *testing.T, tt componentTestTelemetry) { + var expected []metricdata.Metrics + if reportedSpan > 0 { + expected = append(expected, + metricdata.Metrics{ + Name: "solacereceiver_reported_spans", + Description: "Number of reported spans", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: reportedSpan, + }, + }, + }, + }) + } + if receivedMsgVal > 0 { + expected = append(expected, metricdata.Metrics{ + Name: "solacereceiver_received_span_messages", + Description: "Number of received span messages", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: receivedMsgVal, + }, + }, + }, + }) + } + if droppedMsgVal > 0 { + expected = append(expected, metricdata.Metrics{ + Name: "solacereceiver_dropped_span_messages", + Description: "Number of dropped span messages", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: droppedMsgVal, + }, + }, + }, + }) + } + if fatalUnmarshalling > 0 { + expected = append(expected, metricdata.Metrics{ + Name: "solacereceiver_fatal_unmarshalling_errors", + Description: "Number of fatal message unmarshalling errors", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: fatalUnmarshalling, + }, + }, + }, + }) + } + tt.assertMetrics(t, expected) } } @@ -45,55 +113,55 @@ func TestReceiveMessage(t *testing.T) { // expected error from receiveMessage expectedErr error // validate constraints after the fact - validation func(t *testing.T, receiver *solaceTracesReceiver) + validation func(t *testing.T, tt componentTestTelemetry) // traces provided by the trace function traces ptrace.Traces }{ { // no errors, expect no error, validate metrics name: "Receive Message Success", - validation: validateMetrics(1, nil, nil, 1), + validation: validateMetrics(1, 0, 0, 1), traces: newTestTracesWithSpans(1), }, { // no errors, expect no error, validate metrics name: "Receive Message Multiple Traces Success", - validation: validateMetrics(1, nil, nil, 3), + validation: validateMetrics(1, 0, 0, 3), traces: newTestTracesWithSpans(3), }, { // fail at receiveMessage and expect the error name: "Receive Messages Error", receiveMessageErr: someError, expectedErr: someError, - validation: validateMetrics(nil, nil, nil, nil), + validation: validateMetrics(0, 0, 0, 0), }, { // unmarshal error expecting the error to be swallowed, the message to be acknowledged, stats incremented name: "Unmarshal Error", unmarshalErr: errUnknownTopic, - validation: validateMetrics(1, 1, 1, nil), + validation: validateMetrics(1, 1, 1, 0), }, { // unmarshal error with wrong version expecting error to be propagated, message to be rejected name: "Unmarshal Version Error", unmarshalErr: errUpgradeRequired, expectedErr: errUpgradeRequired, expectNack: true, - validation: validateMetrics(1, nil, 1, nil), + validation: validateMetrics(1, 0, 1, 0), }, { // expect forward to error and message to be swallowed with ack, no error returned name: "Forward Permanent Error", nextConsumer: consumertest.NewErr(consumererror.NewPermanent(errors.New("a permanent error"))), - validation: validateMetrics(1, 1, nil, nil), + validation: validateMetrics(1, 1, 0, 0), }, { // expect forward to error and message to be swallowed with ack which fails returning an error name: "Forward Permanent Error with Ack Error", nextConsumer: consumertest.NewErr(consumererror.NewPermanent(errors.New("a permanent error"))), ackErr: someError, expectedErr: someError, - validation: validateMetrics(1, 1, nil, nil), + validation: validateMetrics(1, 1, 0, 0), }, } for _, testCase := range cases { t.Run(testCase.name, func(t *testing.T) { - receiver, messagingService, unmarshaller := newReceiver(t) + receiver, messagingService, unmarshaller, tt := newReceiver(t) if testCase.nextConsumer != nil { receiver.nextConsumer = testCase.nextConsumer } @@ -148,7 +216,7 @@ func TestReceiveMessage(t *testing.T) { assert.Equal(t, !testCase.expectNack, ackCalled) } if testCase.validation != nil { - testCase.validation(t, receiver) + testCase.validation(t, tt) } }) } @@ -156,7 +224,7 @@ func TestReceiveMessage(t *testing.T) { // receiveMessages ctx done return func TestReceiveMessagesTerminateWithCtxDone(t *testing.T) { - receiver, messagingService, unmarshaller := newReceiver(t) + receiver, messagingService, unmarshaller, tt := newReceiver(t) receiveMessagesCalled := false ctx, cancel := context.WithCancel(context.Background()) msg := &inboundMessage{} @@ -184,26 +252,129 @@ func TestReceiveMessagesTerminateWithCtxDone(t *testing.T) { assert.True(t, receiveMessagesCalled) assert.True(t, unmarshalCalled) assert.True(t, ackCalled) - validateReceiverMetrics(t, receiver, 1, nil, nil, 1) + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_received_span_messages", + Description: "Number of received span messages", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_reported_spans", + Description: "Number of reported spans", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + }) } func TestReceiverLifecycle(t *testing.T) { - receiver, messagingService, _ := newReceiver(t) + receiver, messagingService, _, tt := newReceiver(t) dialCalled := make(chan struct{}) messagingService.dialFunc = func(context.Context) error { - validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateConnecting) - validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateClear) + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_receiver_status", + Description: "Indicates the status of the receiver as an enum. 0 = starting, 1 = connecting, 2 = connected, 3 = disabled (often paired with needs_upgrade), 4 = terminating, 5 = terminated", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(receiverStateConnecting), + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_status", + Description: "Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(flowControlStateClear), + }, + }, + }, + }, + }) close(dialCalled) return nil } closeCalled := make(chan struct{}) messagingService.closeFunc = func(context.Context) { - validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateTerminating) + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_receiver_status", + Description: "Indicates the status of the receiver as an enum. 0 = starting, 1 = connecting, 2 = connected, 3 = disabled (often paired with needs_upgrade), 4 = terminating, 5 = terminated", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(receiverStateTerminating), + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_status", + Description: "Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(flowControlStateClear), + }, + }, + }, + }, + }) close(closeCalled) } receiveMessagesCalled := make(chan struct{}) messagingService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) { - validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateConnected) + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_receiver_status", + Description: "Indicates the status of the receiver as an enum. 0 = starting, 1 = connecting, 2 = connected, 3 = disabled (often paired with needs_upgrade), 4 = terminating, 5 = terminated", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(receiverStateConnected), + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_status", + Description: "Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(flowControlStateClear), + }, + }, + }, + }, + }) close(receiveMessagesCalled) <-ctx.Done() return nil, errors.New("some error") @@ -216,13 +387,37 @@ func TestReceiverLifecycle(t *testing.T) { err = receiver.Shutdown(context.Background()) assert.NoError(t, err) assertChannelClosed(t, closeCalled) - validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateTerminated) - // we error on receive message, so we should not report any metrics - validateReceiverMetrics(t, receiver, nil, nil, nil, nil) + // we error on receive message, so we should not report any additional metrics + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_receiver_status", + Description: "Indicates the status of the receiver as an enum. 0 = starting, 1 = connecting, 2 = connected, 3 = disabled (often paired with needs_upgrade), 4 = terminating, 5 = terminated", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(receiverStateTerminated), + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_status", + Description: "Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(flowControlStateClear), + }, + }, + }, + }, + }) } func TestReceiverDialFailureContinue(t *testing.T) { - receiver, msgService, _ := newReceiver(t) + receiver, msgService, _, tt := newReceiver(t) dialErr := errors.New("Some dial error") const expectedAttempts = 3 // the number of attempts to perform prior to resolving dialCalled := 0 @@ -247,8 +442,47 @@ func TestReceiverDialFailureContinue(t *testing.T) { } msgService.closeFunc = func(ctx context.Context) { closeCalled++ - // asset we never left connecting state prior to closing closeDone - validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateConnecting) + // assert we never left connecting state prior to closing closeDone + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_receiver_status", + Description: "Indicates the status of the receiver as an enum. 0 = starting, 1 = connecting, 2 = connected, 3 = disabled (often paired with needs_upgrade), 4 = terminating, 5 = terminated", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(receiverStateConnecting), + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_status", + Description: "Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(flowControlStateClear), + }, + }, + }, + }, + { + Name: "solacereceiver_failed_reconnections", + Description: "Number of failed broker reconnections", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(closeCalled), + }, + }, + }, + }, + }) if closeCalled == expectedAttempts { close(closeDone) <-ctx.Done() // wait for ctx.Done @@ -265,17 +499,54 @@ func TestReceiverDialFailureContinue(t *testing.T) { // expect close to be called twice assertChannelClosed(t, closeDone) // assert failed reconnections - validateMetric(t, receiver.metrics.views.failedReconnections, expectedAttempts) err = receiver.Shutdown(context.Background()) assert.NoError(t, err) - validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateTerminated) // we error on dial, should never get to receive messages - validateReceiverMetrics(t, receiver, nil, nil, nil, nil) + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_receiver_status", + Description: "Indicates the status of the receiver as an enum. 0 = starting, 1 = connecting, 2 = connected, 3 = disabled (often paired with needs_upgrade), 4 = terminating, 5 = terminated", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(receiverStateTerminated), + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_status", + Description: "Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(flowControlStateClear), + }, + }, + }, + }, + { + Name: "solacereceiver_failed_reconnections", + Description: "Number of failed broker reconnections", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 3, + }, + }, + }, + }, + }) } func TestReceiverUnmarshalVersionFailureExpectingDisable(t *testing.T) { - receiver, msgService, unmarshaller := newReceiver(t) + receiver, msgService, unmarshaller, tt := newReceiver(t) dialDone := make(chan struct{}) nackCalled := make(chan struct{}) closeDone := make(chan struct{}) @@ -317,13 +588,75 @@ func TestReceiverUnmarshalVersionFailureExpectingDisable(t *testing.T) { // expect close to be called twice assertChannelClosed(t, closeDone) // we receive 1 message, encounter a fatal unmarshalling error and we nack the message so it is not actually dropped - validateReceiverMetrics(t, receiver, 1, nil, 1, nil) // assert idle state - validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateIdle) - + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_received_span_messages", + Description: "Number of received span messages", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_fatal_unmarshalling_errors", + Description: "Number of fatal message unmarshalling errors", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_status", + Description: "Indicates the status of the receiver as an enum. 0 = starting, 1 = connecting, 2 = connected, 3 = disabled (often paired with needs_upgrade), 4 = terminating, 5 = terminated", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(receiverStateIdle), + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_status", + Description: "Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(flowControlStateClear), + }, + }, + }, + }, + { + Name: "solacereceiver_need_upgrade", + Description: "Indicates with value 1 that receiver requires an upgrade and is not compatible with messages received from a broker", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + }) err = receiver.Shutdown(context.Background()) assert.NoError(t, err) - validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateTerminated) } func TestReceiverFlowControlDelayedRetry(t *testing.T) { @@ -331,7 +664,7 @@ func TestReceiverFlowControlDelayedRetry(t *testing.T) { testCases := []struct { name string nextConsumer consumer.Traces - validation func(*testing.T, *opencensusMetrics) + validation func(*testing.T, componentTestTelemetry) }{ { name: "Without error", @@ -340,14 +673,95 @@ func TestReceiverFlowControlDelayedRetry(t *testing.T) { { name: "With error", nextConsumer: consumertest.NewErr(someError), - validation: func(t *testing.T, metrics *opencensusMetrics) { - validateMetric(t, metrics.views.droppedSpanMessages, 1) + validation: func(t *testing.T, tt componentTestTelemetry) { + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_receiver_flow_control_recent_retries", + Description: "Most recent/current retry count when flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_received_span_messages", + Description: "Number of received span messages", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_status", + Description: "Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(flowControlStateClear), + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_total", + Description: "Number of times the receiver instance became flow controlled", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_dropped_span_messages", + Description: "Number of dropped span messages", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_with_single_successful_retry", + Description: "Number of times the receiver instance became flow controlled and resolved situations after the first retry", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + }) }, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - receiver, messagingService, unmarshaller := newReceiver(t) + receiver, messagingService, unmarshaller, tt := newReceiver(t) delay := 50 * time.Millisecond // Increase delay on windows due to tick granularity // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/17197 @@ -388,7 +802,46 @@ func TestReceiverFlowControlDelayedRetry(t *testing.T) { require.Fail(t, "Did not expect receiveMessage to return before delay interval") } // Check that we are currently flow controlled - validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateControlled) + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_receiver_flow_control_recent_retries", + Description: "Most recent/current retry count when flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_received_span_messages", + Description: "Number of received span messages", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_status", + Description: "Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(flowControlStateControlled), + }, + }, + }, + }, + }) // since we set the next consumer to a noop, this should succeed select { case <-time.After(delay): @@ -398,18 +851,97 @@ func TestReceiverFlowControlDelayedRetry(t *testing.T) { } assert.True(t, ackCalled) if tc.validation != nil { - tc.validation(t, receiver.metrics) + tc.validation(t, tt) + } else { + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_receiver_flow_control_recent_retries", + Description: "Most recent/current retry count when flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_received_span_messages", + Description: "Number of received span messages", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_status", + Description: "Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(flowControlStateClear), + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_total", + Description: "Number of times the receiver instance became flow controlled", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_reported_spans", + Description: "Number of reported spans", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 0, + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_with_single_successful_retry", + Description: "Number of times the receiver instance became flow controlled and resolved situations after the first retry", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + }) } - validateMetric(t, receiver.metrics.views.flowControlRecentRetries, 1) - validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateClear) - validateMetric(t, receiver.metrics.views.flowControlTotal, 1) - validateMetric(t, receiver.metrics.views.flowControlSingleSuccess, 1) }) } } func TestReceiverFlowControlDelayedRetryInterrupt(t *testing.T) { - receiver, messagingService, unmarshaller := newReceiver(t) + receiver, messagingService, unmarshaller, _ := newReceiver(t) // we won't wait 10 seconds since we will interrupt well before receiver.config.Flow.DelayedRetry.Delay = 10 * time.Second var err error @@ -455,7 +987,7 @@ func TestReceiverFlowControlDelayedRetryInterrupt(t *testing.T) { } func TestReceiverFlowControlDelayedRetryMultipleRetries(t *testing.T) { - receiver, messagingService, unmarshaller := newReceiver(t) + receiver, messagingService, unmarshaller, tt := newReceiver(t) // we won't wait 10 seconds since we will interrupt well before retryInterval := 50 * time.Millisecond // Increase delay on windows due to tick granularity @@ -470,7 +1002,46 @@ func TestReceiverFlowControlDelayedRetryMultipleRetries(t *testing.T) { // we want to return an error at first, then set the next consumer to a noop consumer receiver.nextConsumer, err = consumer.NewTraces(func(context.Context, ptrace.Traces) error { if currentRetries > 0 { - validateMetric(t, receiver.metrics.views.flowControlRecentRetries, currentRetries) + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_receiver_flow_control_recent_retries", + Description: "Most recent/current retry count when flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: currentRetries, + }, + }, + }, + }, + { + Name: "solacereceiver_received_span_messages", + Description: "Number of received span messages", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_status", + Description: "Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(flowControlStateControlled), + }, + }, + }, + }, + }) } currentRetries++ if currentRetries == retryCount { @@ -507,7 +1078,6 @@ func TestReceiverFlowControlDelayedRetryMultipleRetries(t *testing.T) { case <-receiveMessageComplete: require.Fail(t, "Did not expect receiveMessage to return before delay interval") } - validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateControlled) // since we set the next consumer to a noop, this should succeed select { case <-time.After(2 * retryInterval * time.Duration(retryCount)): @@ -516,19 +1086,85 @@ func TestReceiverFlowControlDelayedRetryMultipleRetries(t *testing.T) { assert.NoError(t, err) } assert.True(t, ackCalled) - validateMetric(t, receiver.metrics.views.flowControlRecentRetries, retryCount) - validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateClear) - validateMetric(t, receiver.metrics.views.flowControlTotal, 1) - validateMetric(t, receiver.metrics.views.flowControlSingleSuccess, nil) + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_receiver_flow_control_recent_retries", + Description: "Most recent/current retry count when flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: retryCount, + }, + }, + }, + }, + { + Name: "solacereceiver_received_span_messages", + Description: "Number of received span messages", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_status", + Description: "Indicates the flow control status of the receiver. 0 = not flow controlled, 1 = currently flow controlled", + Unit: "1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(flowControlStateClear), + }, + }, + }, + }, + { + Name: "solacereceiver_receiver_flow_control_total", + Description: "Number of times the receiver instance became flow controlled", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "solacereceiver_reported_spans", + Description: "Number of reported spans", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 0, + }, + }, + }, + }, + }) } -func newReceiver(t *testing.T) (*solaceTracesReceiver, *mockMessagingService, *mockUnmarshaller) { +func newReceiver(t *testing.T) (*solaceTracesReceiver, *mockMessagingService, *mockUnmarshaller, componentTestTelemetry) { unmarshaller := &mockUnmarshaller{} service := &mockMessagingService{} messagingServiceFactory := func() messagingService { return service } - metrics := newTestMetrics(t) + tel := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewSettings().TelemetrySettings) + require.NoError(t, err) receiver := &solaceTracesReceiver{ settings: receivertest.NewNopSettings(), config: &Config{ @@ -539,21 +1175,14 @@ func newReceiver(t *testing.T) (*solaceTracesReceiver, *mockMessagingService, *m }, }, nextConsumer: consumertest.NewNop(), - metrics: metrics, + telemetryBuilder: telemetryBuilder, unmarshaller: unmarshaller, factory: messagingServiceFactory, shutdownWaitGroup: &sync.WaitGroup{}, retryTimeout: 1 * time.Millisecond, terminating: &atomic.Bool{}, } - return receiver, service, unmarshaller -} - -func validateReceiverMetrics(t *testing.T, receiver *solaceTracesReceiver, receivedMsgVal, droppedMsgVal, fatalUnmarshalling, reportedSpan any) { - validateMetric(t, receiver.metrics.views.receivedSpanMessages, receivedMsgVal) - validateMetric(t, receiver.metrics.views.droppedSpanMessages, droppedMsgVal) - validateMetric(t, receiver.metrics.views.fatalUnmarshallingErrors, fatalUnmarshalling) - validateMetric(t, receiver.metrics.views.reportedSpans, reportedSpan) + return receiver, service, unmarshaller, tel } type mockMessagingService struct { diff --git a/receiver/solacereceiver/unmarshaller.go b/receiver/solacereceiver/unmarshaller.go index 1a9ec1558c3c..cd38c100a3a1 100644 --- a/receiver/solacereceiver/unmarshaller.go +++ b/receiver/solacereceiver/unmarshaller.go @@ -10,6 +10,8 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/metadata" ) // tracesUnmarshaller deserializes the message body. @@ -20,18 +22,18 @@ type tracesUnmarshaller interface { } // newUnmarshalleer returns a new unmarshaller ready for message unmarshalling -func newTracesUnmarshaller(logger *zap.Logger, metrics *opencensusMetrics) tracesUnmarshaller { +func newTracesUnmarshaller(logger *zap.Logger, telemetryBuilder *metadata.TelemetryBuilder) tracesUnmarshaller { return &solaceTracesUnmarshaller{ - logger: logger, - metrics: metrics, + logger: logger, + telemetryBuilder: telemetryBuilder, // v1 unmarshaller is implemented by solaceMessageUnmarshallerV1 receiveUnmarshallerV1: &brokerTraceReceiveUnmarshallerV1{ - logger: logger, - metrics: metrics, + logger: logger, + telemetryBuilder: telemetryBuilder, }, egressUnmarshallerV1: &brokerTraceEgressUnmarshallerV1{ - logger: logger, - metrics: metrics, + logger: logger, + telemetryBuilder: telemetryBuilder, }, } } @@ -39,7 +41,7 @@ func newTracesUnmarshaller(logger *zap.Logger, metrics *opencensusMetrics) trace // solaceTracesUnmarshaller implements tracesUnmarshaller. type solaceTracesUnmarshaller struct { logger *zap.Logger - metrics *opencensusMetrics + telemetryBuilder *metadata.TelemetryBuilder receiveUnmarshallerV1 tracesUnmarshaller egressUnmarshallerV1 tracesUnmarshaller } diff --git a/receiver/solacereceiver/unmarshaller_egress.go b/receiver/solacereceiver/unmarshaller_egress.go index fa3f9215929f..a5c7c1b7bfdc 100644 --- a/receiver/solacereceiver/unmarshaller_egress.go +++ b/receiver/solacereceiver/unmarshaller_egress.go @@ -4,6 +4,7 @@ package solacereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver" import ( + "context" "encoding/hex" "fmt" "strings" @@ -13,12 +14,13 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/proto" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/metadata" egress_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/egress/v1" ) type brokerTraceEgressUnmarshallerV1 struct { - logger *zap.Logger - metrics *opencensusMetrics + logger *zap.Logger + telemetryBuilder *metadata.TelemetryBuilder } func (u *brokerTraceEgressUnmarshallerV1) unmarshal(message *inboundMessage) (ptrace.Traces, error) { @@ -68,7 +70,7 @@ func (u *brokerTraceEgressUnmarshallerV1) mapEgressSpan(spanData *egress_v1.Span } else { // unknown span type, drop the span u.logger.Warn("Received egress span with unknown span type, is the collector out of date?") - u.metrics.recordDroppedEgressSpan() + u.telemetryBuilder.SolacereceiverDroppedEgressSpans.Add(context.Background(), 1) } } @@ -141,7 +143,7 @@ func (u *brokerTraceEgressUnmarshallerV1) mapSendSpan(sendSpan *egress_v1.SpanDa attributes.PutStr(sourceKindKey, queueKind) default: u.logger.Warn(fmt.Sprintf("Unknown source type %T", casted)) - u.metrics.recordRecoverableUnmarshallingError() + u.telemetryBuilder.SolacereceiverRecoverableUnmarshallingErrors.Add(context.Background(), 1) name = unknownSendName } span.SetName(name + sendNameSuffix) @@ -194,7 +196,7 @@ func (u *brokerTraceEgressUnmarshallerV1) mapTransactionEvent(transactionEvent * // Set the name to the unknown transaction event type to ensure forward compat. name = fmt.Sprintf("Unknown Transaction Event (%s)", transactionEvent.GetType().String()) u.logger.Warn(fmt.Sprintf("Received span with unknown transaction event %s", transactionEvent.GetType())) - u.metrics.recordRecoverableUnmarshallingError() + u.telemetryBuilder.SolacereceiverRecoverableUnmarshallingErrors.Add(context.Background(), 1) } clientEvent.SetName(name) clientEvent.SetTimestamp(pcommon.Timestamp(transactionEvent.TimeUnixNano)) @@ -210,7 +212,7 @@ func (u *brokerTraceEgressUnmarshallerV1) mapTransactionEvent(transactionEvent * default: initiator = fmt.Sprintf("Unknown Transaction Initiator (%s)", transactionEvent.GetInitiator().String()) u.logger.Warn(fmt.Sprintf("Received span with unknown transaction initiator %s", transactionEvent.GetInitiator())) - u.metrics.recordRecoverableUnmarshallingError() + u.telemetryBuilder.SolacereceiverRecoverableUnmarshallingErrors.Add(context.Background(), 1) } clientEvent.Attributes().PutStr(transactionInitiatorEventKey, initiator) // conditionally set the error description if one occurred, otherwise omit @@ -231,7 +233,7 @@ func (u *brokerTraceEgressUnmarshallerV1) mapTransactionEvent(transactionEvent * clientEvent.Attributes().PutStr(transactionXIDEventKey, xidString) default: u.logger.Warn(fmt.Sprintf("Unknown transaction ID type %T", transactionID)) - u.metrics.recordRecoverableUnmarshallingError() + u.telemetryBuilder.SolacereceiverRecoverableUnmarshallingErrors.Add(context.Background(), 1) } } diff --git a/receiver/solacereceiver/unmarshaller_egress_test.go b/receiver/solacereceiver/unmarshaller_egress_test.go index b9cd6b94a4df..5ac2232bbd46 100644 --- a/receiver/solacereceiver/unmarshaller_egress_test.go +++ b/receiver/solacereceiver/unmarshaller_egress_test.go @@ -8,10 +8,13 @@ import ( "github.com/Azure/go-amqp" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/metadata" egress_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/egress/v1" ) @@ -21,7 +24,7 @@ import ( var msgWithAdditionalSpan = []byte{10, 48, 10, 16, 1, 2, 3, 4, 5, 6, 7, 8, 9, 8, 7, 6, 5, 4, 3, 2, 18, 8, 1, 2, 3, 4, 5, 6, 7, 8, 26, 8, 1, 2, 3, 4, 5, 6, 7, 8, 74, 8, 32, 3, 10, 4, 116, 101, 115, 116, 18, 10, 118, 109, 114, 45, 49, 51, 51, 45, 53, 51, 26, 7, 100, 101, 102, 97, 117, 108, 116} func TestMsgWithUnknownOneof(t *testing.T) { - unmarshallerV1 := newTestEgressV1Unmarshaller(t) + unmarshallerV1, _ := newTestEgressV1Unmarshaller(t) spanData, err := unmarshallerV1.unmarshalToSpanData(amqp.NewMessage(msgWithAdditionalSpan)) assert.NoError(t, err) // expect one egress span @@ -36,10 +39,9 @@ func TestEgressUnmarshallerMapResourceSpan(t *testing.T) { version = "10.0.0" ) tests := []struct { - name string - spanData *egress_v1.SpanData - want map[string]any - expectedUnmarshallingErrors any + name string + spanData *egress_v1.SpanData + want map[string]any }{ { name: "Maps All Fields When Present", @@ -65,11 +67,10 @@ func TestEgressUnmarshallerMapResourceSpan(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - u := newTestEgressV1Unmarshaller(t) + u, _ := newTestEgressV1Unmarshaller(t) actual := pcommon.NewMap() u.mapResourceSpanAttributes(tt.spanData, actual) assert.Equal(t, tt.want, actual.AsRaw()) - validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.expectedUnmarshallingErrors) }) } } @@ -264,10 +265,9 @@ var validEgressSpans = []struct { func TestEgressUnmarshallerEgressSpan(t *testing.T) { type testCase struct { - name string - spanData *egress_v1.SpanData_EgressSpan - want *ptrace.Span - expectedUnmarshallingErrors any + name string + spanData *egress_v1.SpanData_EgressSpan + want *ptrace.Span } tests := []testCase{ { @@ -296,17 +296,32 @@ func TestEgressUnmarshallerEgressSpan(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - u := newTestEgressV1Unmarshaller(t) + u, tel := newTestEgressV1Unmarshaller(t) actual := ptrace.NewSpanSlice() u.mapEgressSpan(tt.spanData, actual) if tt.want != nil { assert.Equal(t, 1, actual.Len()) compareSpans(t, *tt.want, actual.At(0)) + tel.assertMetrics(t, []metricdata.Metrics{}) } else { assert.Equal(t, 0, actual.Len()) - validateMetric(t, u.metrics.views.droppedEgressSpans, 1) + tel.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_dropped_egress_spans", + Description: "Number of dropped egress spans", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + }) } - validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.expectedUnmarshallingErrors) }) } } @@ -349,7 +364,7 @@ func TestEgressUnmarshallerSendSpanAttributes(t *testing.T) { name string spanData *egress_v1.SpanData_SendSpan want ptrace.Span - expectedUnmarshallingErrors any + expectedUnmarshallingErrors int64 }{ { name: "With Queue source", @@ -408,11 +423,28 @@ func TestEgressUnmarshallerSendSpanAttributes(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - u := newTestEgressV1Unmarshaller(t) + u, tel := newTestEgressV1Unmarshaller(t) actual := ptrace.NewSpan() u.mapSendSpan(tt.spanData, actual) compareSpans(t, tt.want, actual) - validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.expectedUnmarshallingErrors) + var expectedMetrics []metricdata.Metrics + if tt.expectedUnmarshallingErrors > 0 { + expectedMetrics = append(expectedMetrics, metricdata.Metrics{ + Name: "solacereceiver_recoverable_unmarshalling_errors", + Description: "Number of recoverable message unmarshalling errors", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: tt.expectedUnmarshallingErrors, + }, + }, + }, + }) + } + tel.assertMetrics(t, expectedMetrics) }) } // test the various outcomes @@ -428,7 +460,7 @@ func TestEgressUnmarshallerSendSpanAttributes(t *testing.T) { } for outcomeKey, outcomeName := range outcomes { t.Run("With outcome "+outcomeName, func(t *testing.T) { - u := newTestEgressV1Unmarshaller(t) + u, _ := newTestEgressV1Unmarshaller(t) expected := getSpan(map[string]any{ "messaging.source.name": "someQueue", "messaging.source.kind": "queue", @@ -450,10 +482,10 @@ func TestEgressUnmarshallerSendSpanAttributes(t *testing.T) { func TestEgressUnmarshallerTransactionEvent(t *testing.T) { someErrorString := "some error" tests := []struct { - name string - spanData *egress_v1.SpanData_TransactionEvent - populateExpectedSpan func(span ptrace.Span) - unmarshallingErrors any + name string + spanData *egress_v1.SpanData_TransactionEvent + populateExpectedSpan func(span ptrace.Span) + expectedUnmarshallingErrors int64 }{ { // Local Transaction name: "Local Transaction Event", @@ -579,7 +611,7 @@ func TestEgressUnmarshallerTransactionEvent(t *testing.T) { "messaging.solace.transaction_initiator": "client", }) }, - unmarshallingErrors: 2, + expectedUnmarshallingErrors: 2, }, { // Type of ID not handled, type of initiator not handled name: "Unknown Transaction Initiator and no ID", @@ -594,24 +626,43 @@ func TestEgressUnmarshallerTransactionEvent(t *testing.T) { "messaging.solace.transaction_initiator": "Unknown Transaction Initiator (12345)", }) }, - unmarshallingErrors: 2, + expectedUnmarshallingErrors: 2, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - u := newTestEgressV1Unmarshaller(t) + u, tel := newTestEgressV1Unmarshaller(t) expected := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() tt.populateExpectedSpan(expected) actual := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() u.mapTransactionEvent(tt.spanData, actual.Events().AppendEmpty()) // order is nondeterministic for attributes, so we must sort to get a valid comparison compareSpans(t, expected, actual) - validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.unmarshallingErrors) + var expectedMetrics []metricdata.Metrics + if tt.expectedUnmarshallingErrors > 0 { + expectedMetrics = append(expectedMetrics, metricdata.Metrics{ + Name: "solacereceiver_recoverable_unmarshalling_errors", + Description: "Number of recoverable message unmarshalling errors", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: tt.expectedUnmarshallingErrors, + }, + }, + }, + }) + } + tel.assertMetrics(t, expectedMetrics) }) } } -func newTestEgressV1Unmarshaller(t *testing.T) *brokerTraceEgressUnmarshallerV1 { - m := newTestMetrics(t) - return &brokerTraceEgressUnmarshallerV1{zap.NewNop(), m} +func newTestEgressV1Unmarshaller(t *testing.T) (*brokerTraceEgressUnmarshallerV1, componentTestTelemetry) { + tt := setupTestTelemetry() + builder, err := metadata.NewTelemetryBuilder(tt.NewSettings().TelemetrySettings) + require.NoError(t, err) + return &brokerTraceEgressUnmarshallerV1{zap.NewNop(), builder}, tt } diff --git a/receiver/solacereceiver/unmarshaller_receive.go b/receiver/solacereceiver/unmarshaller_receive.go index 7dbcafca1431..0e4589d520f2 100644 --- a/receiver/solacereceiver/unmarshaller_receive.go +++ b/receiver/solacereceiver/unmarshaller_receive.go @@ -4,6 +4,7 @@ package solacereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver" import ( + "context" "encoding/hex" "fmt" "net" @@ -15,12 +16,13 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/proto" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/metadata" receive_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/receive/v1" ) type brokerTraceReceiveUnmarshallerV1 struct { - logger *zap.Logger - metrics *opencensusMetrics + logger *zap.Logger + telemetryBuilder *metadata.TelemetryBuilder } // unmarshal implements tracesUnmarshaller.unmarshal @@ -144,7 +146,7 @@ func (u *brokerTraceReceiveUnmarshallerV1) mapClientSpanAttributes(spanData *rec default: deliveryMode = fmt.Sprintf("Unknown Delivery Mode (%s)", spanData.DeliveryMode.String()) u.logger.Warn(fmt.Sprintf("Received span with unknown delivery mode %s", spanData.DeliveryMode)) - u.metrics.recordRecoverableUnmarshallingError() + u.telemetryBuilder.SolacereceiverRecoverableUnmarshallingErrors.Add(context.Background(), 1) } attrMap.PutStr(deliveryModeAttrKey, deliveryMode) @@ -187,7 +189,7 @@ func (u *brokerTraceReceiveUnmarshallerV1) mapClientSpanAttributes(spanData *rec err := u.unmarshalBaggage(attrMap, *spanData.Baggage) if err != nil { u.logger.Warn("Received malformed baggage string in span data") - u.metrics.recordRecoverableUnmarshallingError() + u.telemetryBuilder.SolacereceiverRecoverableUnmarshallingErrors.Add(context.Background(), 1) } } @@ -232,7 +234,7 @@ func (u *brokerTraceReceiveUnmarshallerV1) mapEnqueueEvent(enqueueEvent *receive destinationType = queueKind default: u.logger.Warn(fmt.Sprintf("Unknown destination type %T", casted)) - u.metrics.recordRecoverableUnmarshallingError() + u.telemetryBuilder.SolacereceiverRecoverableUnmarshallingErrors.Add(context.Background(), 1) return } clientEvent := clientSpanEvents.AppendEmpty() @@ -270,7 +272,7 @@ func (u *brokerTraceReceiveUnmarshallerV1) mapTransactionEvent(transactionEvent // Set the name to the unknown transaction event type to ensure forward compat. name = fmt.Sprintf("Unknown Transaction Event (%s)", transactionEvent.GetType().String()) u.logger.Warn(fmt.Sprintf("Received span with unknown transaction event %s", transactionEvent.GetType())) - u.metrics.recordRecoverableUnmarshallingError() + u.telemetryBuilder.SolacereceiverRecoverableUnmarshallingErrors.Add(context.Background(), 1) } clientEvent.SetName(name) clientEvent.SetTimestamp(pcommon.Timestamp(transactionEvent.TimeUnixNano)) @@ -286,7 +288,7 @@ func (u *brokerTraceReceiveUnmarshallerV1) mapTransactionEvent(transactionEvent default: initiator = fmt.Sprintf("Unknown Transaction Initiator (%s)", transactionEvent.GetInitiator().String()) u.logger.Warn(fmt.Sprintf("Received span with unknown transaction initiator %s", transactionEvent.GetInitiator())) - u.metrics.recordRecoverableUnmarshallingError() + u.telemetryBuilder.SolacereceiverRecoverableUnmarshallingErrors.Add(context.Background(), 1) } clientEvent.Attributes().PutStr(transactionInitiatorEventKey, initiator) // conditionally set the error description if one occurred, otherwise omit @@ -307,7 +309,7 @@ func (u *brokerTraceReceiveUnmarshallerV1) mapTransactionEvent(transactionEvent clientEvent.Attributes().PutStr(transactionXIDEventKey, xidString) default: u.logger.Warn(fmt.Sprintf("Unknown transaction ID type %T", transactionID)) - u.metrics.recordRecoverableUnmarshallingError() + u.telemetryBuilder.SolacereceiverRecoverableUnmarshallingErrors.Add(context.Background(), 1) } } @@ -317,7 +319,7 @@ func (u *brokerTraceReceiveUnmarshallerV1) rgmidToString(rgmid []byte) string { // may be cases where the rgmid is empty or nil, len(rgmid) will return 0 if nil if len(rgmid) > 0 { u.logger.Warn("Received invalid length or version for rgmid", zap.Int8("version", int8(rgmid[0])), zap.Int("length", len(rgmid))) - u.metrics.recordRecoverableUnmarshallingError() + u.telemetryBuilder.SolacereceiverRecoverableUnmarshallingErrors.Add(context.Background(), 1) } return hex.EncodeToString(rgmid) } @@ -401,6 +403,6 @@ func (u *brokerTraceReceiveUnmarshallerV1) insertUserProperty(toMap pcommon.Map, toMap.PutStr(k, string(rune(v.CharacterValue))) default: u.logger.Warn(fmt.Sprintf("Unknown user property type: %T", v)) - u.metrics.recordRecoverableUnmarshallingError() + u.telemetryBuilder.SolacereceiverRecoverableUnmarshallingErrors.Add(context.Background(), 1) } } diff --git a/receiver/solacereceiver/unmarshaller_receive_test.go b/receiver/solacereceiver/unmarshaller_receive_test.go index 964afa96d09e..9ce318fddcbf 100644 --- a/receiver/solacereceiver/unmarshaller_receive_test.go +++ b/receiver/solacereceiver/unmarshaller_receive_test.go @@ -11,8 +11,10 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/metadata" receive_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/receive/v1" ) @@ -26,7 +28,7 @@ func TestReceiveUnmarshallerMapResourceSpan(t *testing.T) { name string spanData *receive_v1.SpanData want map[string]any - expectedUnmarshallingErrors any + expectedUnmarshallingErrors int64 }{ { name: "Maps All Fields When Present", @@ -52,11 +54,28 @@ func TestReceiveUnmarshallerMapResourceSpan(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - u := newTestReceiveV1Unmarshaller(t) + u, tel := newTestReceiveV1Unmarshaller(t) actual := pcommon.NewMap() u.mapResourceSpanAttributes(tt.spanData, actual) assert.Equal(t, tt.want, actual.AsRaw()) - validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.expectedUnmarshallingErrors) + var expectedMetrics []metricdata.Metrics + if tt.expectedUnmarshallingErrors > 0 { + expectedMetrics = append(expectedMetrics, metricdata.Metrics{ + Name: "solacereceiver_recoverable_unmarshalling_errors", + Description: "Number of recoverable message unmarshalling errors", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: tt.expectedUnmarshallingErrors, + }, + }, + }, + }) + } + tel.assertMetrics(t, expectedMetrics) }) } } @@ -119,7 +138,7 @@ func TestReceiveUnmarshallerMapClientSpanData(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - u := newTestReceiveV1Unmarshaller(t) + u, _ := newTestReceiveV1Unmarshaller(t) actual := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() u.mapClientSpanData(tt.data, actual) expected := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() @@ -145,7 +164,7 @@ func TestReceiveUnmarshallerMapClientSpanAttributes(t *testing.T) { name string spanData *receive_v1.SpanData want map[string]any - expectedUnmarshallingErrors any + expectedUnmarshallingErrors int64 }{ { name: "With All Valid Attributes", @@ -310,11 +329,28 @@ func TestReceiveUnmarshallerMapClientSpanAttributes(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - u := newTestReceiveV1Unmarshaller(t) + u, tel := newTestReceiveV1Unmarshaller(t) actual := pcommon.NewMap() u.mapClientSpanAttributes(tt.spanData, actual) assert.Equal(t, tt.want, actual.AsRaw()) - validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.expectedUnmarshallingErrors) + var expectedMetrics []metricdata.Metrics + if tt.expectedUnmarshallingErrors > 0 { + expectedMetrics = append(expectedMetrics, metricdata.Metrics{ + Name: "solacereceiver_recoverable_unmarshalling_errors", + Description: "Number of recoverable message unmarshalling errors", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: tt.expectedUnmarshallingErrors, + }, + }, + }, + }) + } + tel.assertMetrics(t, expectedMetrics) }) } } @@ -327,7 +363,7 @@ func TestReceiveUnmarshallerEvents(t *testing.T) { name string spanData *receive_v1.SpanData populateExpectedSpan func(span ptrace.Span) - unmarshallingErrors any + unmarshallingErrors int64 }{ { // don't expect any events when none are present in the span data name: "No Events", @@ -563,14 +599,31 @@ func TestReceiveUnmarshallerEvents(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - u := newTestReceiveV1Unmarshaller(t) + u, tel := newTestReceiveV1Unmarshaller(t) expected := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() tt.populateExpectedSpan(expected) actual := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() u.mapEvents(tt.spanData, actual) // order is nondeterministic for attributes, so we must sort to get a valid comparison compareSpans(t, expected, actual) - validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.unmarshallingErrors) + var expectedMetrics []metricdata.Metrics + if tt.unmarshallingErrors > 0 { + expectedMetrics = append(expectedMetrics, metricdata.Metrics{ + Name: "solacereceiver_recoverable_unmarshalling_errors", + Description: "Number of recoverable message unmarshalling errors", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: tt.unmarshallingErrors, + }, + }, + }, + }) + } + tel.assertMetrics(t, expectedMetrics) }) } } @@ -580,7 +633,7 @@ func TestReceiveUnmarshallerRGMID(t *testing.T) { name string in []byte expected string - numErr any + numErr int64 }{ { name: "Valid RGMID", @@ -607,10 +660,27 @@ func TestReceiveUnmarshallerRGMID(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - u := newTestReceiveV1Unmarshaller(t) + u, tel := newTestReceiveV1Unmarshaller(t) actual := u.rgmidToString(tt.in) assert.Equal(t, tt.expected, actual) - validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.numErr) + var expectedMetrics []metricdata.Metrics + if tt.numErr > 0 { + expectedMetrics = append(expectedMetrics, metricdata.Metrics{ + Name: "solacereceiver_recoverable_unmarshalling_errors", + Description: "Number of recoverable message unmarshalling errors", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: tt.numErr, + }, + }, + }, + }) + } + tel.assertMetrics(t, expectedMetrics) }) } } @@ -652,7 +722,7 @@ func TestReceiveUnmarshallerReceiveBaggageString(t *testing.T) { for _, testCase := range testCases { t.Run(fmt.Sprintf("%T", testCase.name), func(t *testing.T) { actual := pcommon.NewMap() - u := newTestReceiveV1Unmarshaller(t) + u, _ := newTestReceiveV1Unmarshaller(t) err := u.unmarshalBaggage(actual, testCase.baggage) if testCase.errStr == "" { assert.NoError(t, err) @@ -823,16 +893,33 @@ func TestReceiveUnmarshallerInsertUserProperty(t *testing.T) { } func TestSolaceMessageReceiveUnmarshallerV1InsertUserPropertyUnsupportedType(t *testing.T) { - u := newTestReceiveV1Unmarshaller(t) + u, tt := newTestReceiveV1Unmarshaller(t) const key = "some-property" attributeMap := pcommon.NewMap() u.insertUserProperty(attributeMap, key, "invalid data type") _, ok := attributeMap.Get("messaging.solace.user_properties." + key) assert.False(t, ok) - validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, 1) + tt.assertMetrics(t, []metricdata.Metrics{ + { + Name: "solacereceiver_recoverable_unmarshalling_errors", + Description: "Number of recoverable message unmarshalling errors", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + }) } -func newTestReceiveV1Unmarshaller(t *testing.T) *brokerTraceReceiveUnmarshallerV1 { - m := newTestMetrics(t) - return &brokerTraceReceiveUnmarshallerV1{zap.NewNop(), m} +func newTestReceiveV1Unmarshaller(t *testing.T) (*brokerTraceReceiveUnmarshallerV1, componentTestTelemetry) { + tel := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewSettings().TelemetrySettings) + require.NoError(t, err) + return &brokerTraceReceiveUnmarshallerV1{zap.NewNop(), telemetryBuilder}, tel } diff --git a/receiver/solacereceiver/unmarshaller_test.go b/receiver/solacereceiver/unmarshaller_test.go index 82a43d4431bd..515ed4a822c5 100644 --- a/receiver/solacereceiver/unmarshaller_test.go +++ b/receiver/solacereceiver/unmarshaller_test.go @@ -15,6 +15,7 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/proto" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/metadata" egress_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/egress/v1" receive_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/receive/v1" ) @@ -318,7 +319,10 @@ func TestSolaceMessageUnmarshallerUnmarshal(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - u := newTracesUnmarshaller(zap.NewNop(), newTestMetrics(t)) + tel := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewSettings().TelemetrySettings) + require.NoError(t, err) + u := newTracesUnmarshaller(zap.NewNop(), telemetryBuilder) traces, err := u.unmarshal(tt.message) if tt.err != nil { require.Error(t, err)