diff --git a/CHANGELOG.md b/CHANGELOG.md index df905f7b94f..6b458312270 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Both the `Histogram` and `HistogramDataPoint` are redefined with a generic argument of `[N int64 | float64]` in `go.opentelemetry.io/otel/sdk/metric/metricdata`. (#3849) - The metric `Export` interface from `go.opentelemetry.io/otel/sdk/metric` accepts a `*ResourceMetrics` instead of `ResourceMetrics`. (#3853) +### Fixed + +- Panic in Prometheus exporter on concurrent `Collect`. (#3899) + ### Removed - The deprecated `go.opentelemetry.io/otel/metric/global` package is removed. (#3829) diff --git a/exporters/prometheus/exporter_test.go b/exporters/prometheus/exporter_test.go index 164dcbd0956..90b49f09e4a 100644 --- a/exporters/prometheus/exporter_test.go +++ b/exporters/prometheus/exporter_test.go @@ -18,15 +18,18 @@ import ( "context" "os" "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/resource" @@ -647,3 +650,78 @@ func TestDuplicateMetrics(t *testing.T) { }) } } + +func TestConcurrentCollect(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := newConfig(WithRegisterer(registry)) + + reader := metric.NewManualReader(cfg.manualReaderOptions()...) + + collector := &collector{ + reader: reader, + disableTargetInfo: false, + withoutUnits: true, + disableScopeInfo: cfg.disableScopeInfo, + scopeInfos: make(map[instrumentation.Scope]prometheus.Metric), + metricFamilies: make(map[string]*dto.MetricFamily), + } + + err := cfg.registerer.Register(collector) + require.NoError(t, err) + + ctx := context.Background() + + // initialize resource + res, err := resource.New(ctx, + resource.WithAttributes(semconv.ServiceName("prometheus_test")), + resource.WithAttributes(semconv.TelemetrySDKVersion("latest")), + ) + require.NoError(t, err) + res, err = resource.Merge(resource.Default(), res) + require.NoError(t, err) + + exporter := &Exporter{Reader: reader} + + // initialize provider + provider := metric.NewMeterProvider( + metric.WithReader(exporter), + metric.WithResource(res), + ) + + // initialize two meter a, b + meterA := provider.Meter("ma", otelmetric.WithInstrumentationVersion("v0.1.0")) + meterB := provider.Meter("mb", otelmetric.WithInstrumentationVersion("v0.1.0")) + + fooA, err := meterA.Int64Counter("foo", + instrument.WithUnit("By"), + instrument.WithDescription("meter counter foo")) + assert.NoError(t, err) + fooA.Add(ctx, 100, attribute.String("A", "B")) + + fooB, err := meterB.Int64Counter("foo", + instrument.WithUnit("By"), + instrument.WithDescription("meter counter foo")) + assert.NoError(t, err) + fooB.Add(ctx, 100, attribute.String("A", "B")) + + concurrencyLevel := 100 + ch := make(chan prometheus.Metric, concurrencyLevel) + + for i := 0; i < concurrencyLevel; i++ { + go func() { + collector.Collect(ch) + }() + } + + for ; concurrencyLevel > 0; concurrencyLevel-- { + select { + case <-ch: + concurrencyLevel-- + if concurrencyLevel == 0 { + return + } + case <-time.After(time.Second): + t.Fatal("timeout") + } + } +}