Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus exporter: concurrent collect bugfix #3899

Merged
merged 37 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1a30f23
Concurrent collect bugfix
hexdigest Mar 17, 2023
4e06c3e
Used sync.Mutex and code cleanup
hexdigest Mar 18, 2023
24e2472
Revert "Concurrent collect bugfix"
hexdigest Mar 18, 2023
e382059
Merge branch 'lock' into concurrent_collect_bugfix
hexdigest Mar 18, 2023
befb8c2
Used sync.Mutex and re-grouped protected members
hexdigest Mar 21, 2023
675c0fb
Added test and updated changelog
hexdigest Mar 21, 2023
162e9fb
Updated changelog
hexdigest Mar 22, 2023
ce2045f
Take care of potential panic in otel.Handle
hexdigest Mar 22, 2023
3483cb8
Extracted critical section in a separate method and fixed nil scope info
hexdigest Mar 22, 2023
77959ad
Lock the whole scope of the func
hexdigest Mar 23, 2023
c00462c
Moved otel.Handle out of the critical section
hexdigest Mar 24, 2023
96f7fd2
Fixed calling createScopeInfoMetric twice and updated changelog
hexdigest Mar 25, 2023
1f7c3ad
Fixed markdown linter errors
hexdigest Apr 9, 2023
ecc1336
Added test for nil scopeinfo
hexdigest Apr 9, 2023
ea67c1a
Merge branch 'main' into concurrent_collect_bugfix
hexdigest Apr 30, 2023
b4b3c2a
Fix merge artifacts
hexdigest Apr 30, 2023
bb27c28
Merge branch 'main' into concurrent_collect_bugfix
pellared May 8, 2023
c274d95
Fixed linter errors
hexdigest May 13, 2023
782f393
Protect the whole validateMetrics method wity mutex
hexdigest May 13, 2023
4c03f86
Merge branch 'main' into concurrent_collect_bugfix
pellared May 15, 2023
529e868
Update CHANGELOG.md
pellared May 15, 2023
489b528
Update exporters/prometheus/exporter.go
hexdigest May 15, 2023
35809a9
Merge branch 'main' into concurrent_collect_bugfix
hexdigest May 23, 2023
7b284eb
Merge branch 'main' into concurrent_collect_bugfix
dmathieu May 24, 2023
1eed860
Update CHANGELOG.md
pellared May 25, 2023
af3d7cf
Merge branch 'main' into concurrent_collect_bugfix
pellared May 25, 2023
919987c
Merge branch 'main' into concurrent_collect_bugfix
pellared Jun 1, 2023
2a69d79
Merge branch 'main' into concurrent_collect_bugfix
dmathieu Jun 7, 2023
c85e301
Merge branch 'main' into concurrent_collect_bugfix
pellared Jun 18, 2023
ec77a14
Merge branch 'main' into concurrent_collect_bugfix
pellared Jun 20, 2023
87f5902
Merge branch 'main' into concurrent_collect_bugfix
pellared Jun 21, 2023
60ce2f0
Document that Collect is concurrent-safe
pellared Jun 22, 2023
d272f93
Merge branch 'main' into concurrent_collect_bugfix
pellared Jun 28, 2023
fcfb775
Update exporter_test.go
pellared Jun 28, 2023
e70b79f
Update exporters/prometheus/exporter_test.go
pellared Jun 29, 2023
4b5714d
Update exporters/prometheus/exporter.go
pellared Jun 29, 2023
914a931
Merge branch 'main' into concurrent_collect_bugfix
pellared Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ This release drops the compatibility guarantee of [Go 1.18].

- Handle empty environment variable as it they were not set. (#3764)
- Clarify the `httpconv` and `netconv` packages in `go.opentelemetry.io/otel/semconv/*` provide tracing semantic conventions. (#3823)
- Fix race conditions in `go.opentelemetry.io/otel/exporters/metric/prometheus` that could cause a panic. (#3899)
- Fix sending nil `scopeInfo` to metrics channel in `go.opentelemetry.io/otel/exporters/metric/prometheus` that could cause a panic in `github.com/prometheus/client_golang/prometheus`. (#3899)

### Deprecated

Expand Down
182 changes: 115 additions & 67 deletions exporters/prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,15 @@ var _ metric.Reader = &Exporter{}
type collector struct {
reader metric.Reader

disableTargetInfo bool
withoutUnits bool
targetInfo prometheus.Metric
disableScopeInfo bool
createTargetInfoOnce sync.Once
scopeInfos map[instrumentation.Scope]prometheus.Metric
metricFamilies map[string]*dto.MetricFamily
namespace string
withoutUnits bool
disableScopeInfo bool
namespace string

mu sync.Mutex // mu protects all members below from the concurrent access.
disableTargetInfo bool
targetInfo prometheus.Metric
scopeInfos map[instrumentation.Scope]prometheus.Metric
metricFamilies map[string]*dto.MetricFamily
}

// prometheus counters MUST have a _total suffix:
Expand Down Expand Up @@ -113,6 +114,8 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
}

// Collect implements prometheus.Collector.
//
// This method is safe to call concurrently.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
// TODO (#3047): Use a sync.Pool instead of allocating metrics every Collect.
metrics := metricdata.ResourceMetrics{}
Expand All @@ -124,16 +127,31 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
}
}

c.createTargetInfoOnce.Do(func() {
// Resource should be immutable, we don't need to compute again
targetInfo, err := c.createInfoMetric(targetInfoMetricName, targetInfoDescription, metrics.Resource)
if err != nil {
// If the target info metric is invalid, disable sending it.
otel.Handle(err)
c.disableTargetInfo = true
// Initialize (once) targetInfo and disableTargetInfo.
func() {
hexdigest marked this conversation as resolved.
Show resolved Hide resolved
var err error
var targetInfo prometheus.Metric

c.mu.Lock()
defer func() {
c.mu.Unlock()
if err != nil {
otel.Handle(err)
dmathieu marked this conversation as resolved.
Show resolved Hide resolved
}
}()

if c.targetInfo == nil && !c.disableTargetInfo {
targetInfo, err = createInfoMetric(targetInfoMetricName, targetInfoDescription, metrics.Resource)
if err != nil {
// If the target info metric is invalid, disable sending it.
c.disableTargetInfo = true
return
}

c.targetInfo = targetInfo
pellared marked this conversation as resolved.
Show resolved Hide resolved
}
c.targetInfo = targetInfo
})
}()

if !c.disableTargetInfo {
ch <- c.targetInfo
}
Expand All @@ -142,48 +160,53 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
var keys, values [2]string

if !c.disableScopeInfo {
scopeInfo, ok := c.scopeInfos[scopeMetrics.Scope]
if !ok {
scopeInfo, err = createScopeInfoMetric(scopeMetrics.Scope)
if err != nil {
otel.Handle(err)
}
c.scopeInfos[scopeMetrics.Scope] = scopeInfo
scopeInfo, err := c.scopeInfo(scopeMetrics.Scope)
if err != nil {
otel.Handle(err)
continue
}

ch <- scopeInfo

keys = scopeInfoKeys
values = [2]string{scopeMetrics.Scope.Name, scopeMetrics.Scope.Version}
}

for _, m := range scopeMetrics.Metrics {
typ, name := c.metricTypeAndName(m)
if typ == nil {
continue
}

drop, help := c.validateMetrics(name, m.Description, typ)
if drop {
continue
}

if help != "" {
m.Description = help
}

switch v := m.Data.(type) {
case metricdata.Histogram[int64]:
addHistogramMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addHistogramMetric(ch, v, m, keys, values, name)
case metricdata.Histogram[float64]:
addHistogramMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addHistogramMetric(ch, v, m, keys, values, name)
case metricdata.Sum[int64]:
addSumMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addSumMetric(ch, v, m, keys, values, name)
case metricdata.Sum[float64]:
addSumMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addSumMetric(ch, v, m, keys, values, name)
case metricdata.Gauge[int64]:
addGaugeMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addGaugeMetric(ch, v, m, keys, values, name)
case metricdata.Gauge[float64]:
addGaugeMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addGaugeMetric(ch, v, m, keys, values, name)
}
}
}
}

func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogram metricdata.Histogram[N], m metricdata.Metrics, ks, vs [2]string, name string, mfs map[string]*dto.MetricFamily) {
func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogram metricdata.Histogram[N], m metricdata.Metrics, ks, vs [2]string, name string) {
// TODO(https://github.com/open-telemetry/opentelemetry-go/issues/3163): support exemplars
drop, help := validateMetrics(name, m.Description, dto.MetricType_HISTOGRAM.Enum(), mfs)
if drop {
return
}
if help != "" {
m.Description = help
}

for _, dp := range histogram.DataPoints {
keys, values := getAttrs(dp.Attributes, ks, vs)

Expand All @@ -204,24 +227,10 @@ func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogra
}
}

func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata.Sum[N], m metricdata.Metrics, ks, vs [2]string, name string, mfs map[string]*dto.MetricFamily) {
func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata.Sum[N], m metricdata.Metrics, ks, vs [2]string, name string) {
valueType := prometheus.CounterValue
metricType := dto.MetricType_COUNTER
if !sum.IsMonotonic {
valueType = prometheus.GaugeValue
metricType = dto.MetricType_GAUGE
}
if sum.IsMonotonic {
// Add _total suffix for counters
name += counterSuffix
}

drop, help := validateMetrics(name, m.Description, metricType.Enum(), mfs)
if drop {
return
}
if help != "" {
m.Description = help
}

for _, dp := range sum.DataPoints {
Expand All @@ -237,15 +246,7 @@ func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata
}
}

func addGaugeMetric[N int64 | float64](ch chan<- prometheus.Metric, gauge metricdata.Gauge[N], m metricdata.Metrics, ks, vs [2]string, name string, mfs map[string]*dto.MetricFamily) {
drop, help := validateMetrics(name, m.Description, dto.MetricType_GAUGE.Enum(), mfs)
if drop {
return
}
if help != "" {
m.Description = help
}

func addGaugeMetric[N int64 | float64](ch chan<- prometheus.Metric, gauge metricdata.Gauge[N], m metricdata.Metrics, ks, vs [2]string, name string) {
for _, dp := range gauge.DataPoints {
keys, values := getAttrs(dp.Attributes, ks, vs)

Expand Down Expand Up @@ -293,7 +294,7 @@ func getAttrs(attrs attribute.Set, ks, vs [2]string) ([]string, []string) {
return keys, values
}

func (c *collector) createInfoMetric(name, description string, res *resource.Resource) (prometheus.Metric, error) {
func createInfoMetric(name, description string, res *resource.Resource) (prometheus.Metric, error) {
keys, values := getAttrs(*res.Set(), [2]string{}, [2]string{})
desc := prometheus.NewDesc(name, description, keys, nil)
return prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(1), values...)
Expand Down Expand Up @@ -386,16 +387,63 @@ func sanitizeName(n string) string {
return b.String()
}

func validateMetrics(name, description string, metricType *dto.MetricType, mfs map[string]*dto.MetricFamily) (drop bool, help string) {
emf, exist := mfs[name]
func (c *collector) metricTypeAndName(m metricdata.Metrics) (*dto.MetricType, string) {
name := c.getName(m)

switch v := m.Data.(type) {
case metricdata.Histogram[int64], metricdata.Histogram[float64]:
return dto.MetricType_HISTOGRAM.Enum(), name
case metricdata.Sum[float64]:
if v.IsMonotonic {
return dto.MetricType_COUNTER.Enum(), name + counterSuffix
}
return dto.MetricType_GAUGE.Enum(), name
case metricdata.Sum[int64]:
pellared marked this conversation as resolved.
Show resolved Hide resolved
if v.IsMonotonic {
return dto.MetricType_COUNTER.Enum(), name + counterSuffix
}
return dto.MetricType_GAUGE.Enum(), name
case metricdata.Gauge[int64], metricdata.Gauge[float64]:
return dto.MetricType_GAUGE.Enum(), name
}

return nil, ""
}

func (c *collector) scopeInfo(scope instrumentation.Scope) (prometheus.Metric, error) {
c.mu.Lock()
defer c.mu.Unlock()

scopeInfo, ok := c.scopeInfos[scope]
if ok {
return scopeInfo, nil
}

scopeInfo, err := createScopeInfoMetric(scope)
if err != nil {
return nil, fmt.Errorf("cannot create scope info metric: %w", err)
}

c.scopeInfos[scope] = scopeInfo
pellared marked this conversation as resolved.
Show resolved Hide resolved

return scopeInfo, nil
}

func (c *collector) validateMetrics(name, description string, metricType *dto.MetricType) (drop bool, help string) {
c.mu.Lock()
defer c.mu.Unlock()

emf, exist := c.metricFamilies[name]

if !exist {
mfs[name] = &dto.MetricFamily{
c.metricFamilies[name] = &dto.MetricFamily{
pellared marked this conversation as resolved.
Show resolved Hide resolved
Name: proto.String(name),
Help: proto.String(description),
Type: metricType,
}
return false, ""
}

if emf.GetType() != *metricType {
global.Error(
errors.New("instrument type conflict"),
Expand Down
Loading