Skip to content

Commit

Permalink
Merge pull request #757 from jmacd/jmacd/prom_res
Browse files Browse the repository at this point in the history
Add Prometheus resource support
  • Loading branch information
MrAlias authored May 21, 2020
2 parents 51e5719 + 0d3daf0 commit d6446f0
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 45 deletions.
75 changes: 75 additions & 0 deletions api/label/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ type Iterator struct {
idx int
}

// MergeIterator supports iterating over two sets of labels while
// eliminating duplicate values from the combined set. The first
// iterator value takes precedence.
type MergeItererator struct {
one oneIterator
two oneIterator
current kv.KeyValue
}

type oneIterator struct {
iter Iterator
done bool
label kv.KeyValue
}

// Next moves the iterator to the next position. Returns false if there
// are no more labels.
func (i *Iterator) Next() bool {
Expand Down Expand Up @@ -75,3 +90,63 @@ func (i *Iterator) ToSlice() []kv.KeyValue {
}
return slice
}

// NewMergeIterator returns a MergeIterator for merging two label sets
// Duplicates are resolved by taking the value from the first set.
func NewMergeIterator(s1, s2 *Set) MergeItererator {
mi := MergeItererator{
one: makeOne(s1.Iter()),
two: makeOne(s2.Iter()),
}
return mi
}

func makeOne(iter Iterator) oneIterator {
oi := oneIterator{
iter: iter,
}
oi.advance()
return oi
}

func (oi *oneIterator) advance() {
if oi.done = !oi.iter.Next(); !oi.done {
oi.label = oi.iter.Label()
}
}

// Next returns true if there is another label available.
func (m *MergeItererator) Next() bool {
if m.one.done && m.two.done {
return false
}
if m.one.done {
m.current = m.two.label
m.two.advance()
return true
}
if m.two.done {
m.current = m.one.label
m.one.advance()
return true
}
if m.one.label.Key == m.two.label.Key {
m.current = m.one.label // first iterator label value wins
m.one.advance()
m.two.advance()
return true
}
if m.one.label.Key < m.two.label.Key {
m.current = m.one.label
m.one.advance()
return true
}
m.current = m.two.label
m.two.advance()
return true
}

// Label returns the current value after Next() returns true.
func (m *MergeItererator) Label() kv.KeyValue {
return m.current
}
94 changes: 94 additions & 0 deletions api/label/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package label_test

import (
"fmt"
"testing"

"go.opentelemetry.io/otel/api/kv"
Expand Down Expand Up @@ -55,3 +56,96 @@ func TestEmptyIterator(t *testing.T) {
require.Equal(t, 0, iter.Len())
require.False(t, iter.Next())
}

func TestMergedIterator(t *testing.T) {

type inputs struct {
name string
keys1 []string
keys2 []string
expect []string
}

makeLabels := func(keys []string, num int) (result []kv.KeyValue) {
for _, k := range keys {
result = append(result, kv.Int(k, num))
}
return
}

for _, input := range []inputs{
{
name: "one overlap",
keys1: []string{"A", "B"},
keys2: []string{"B", "C"},
expect: []string{"A/1", "B/1", "C/2"},
},
{
name: "reversed one overlap",
keys1: []string{"B", "A"},
keys2: []string{"C", "B"},
expect: []string{"A/1", "B/1", "C/2"},
},
{
name: "one empty",
keys1: nil,
keys2: []string{"C", "B"},
expect: []string{"B/2", "C/2"},
},
{
name: "two empty",
keys1: []string{"C", "B"},
keys2: nil,
expect: []string{"B/1", "C/1"},
},
{
name: "no overlap both",
keys1: []string{"C"},
keys2: []string{"B"},
expect: []string{"B/2", "C/1"},
},
{
name: "one empty single two",
keys1: nil,
keys2: []string{"B"},
expect: []string{"B/2"},
},
{
name: "two empty single one",
keys1: []string{"A"},
keys2: nil,
expect: []string{"A/1"},
},
{
name: "all empty",
keys1: nil,
keys2: nil,
expect: nil,
},
{
name: "full overlap",
keys1: []string{"A", "B", "C", "D"},
keys2: []string{"A", "B", "C", "D"},
expect: []string{"A/1", "B/1", "C/1", "D/1"},
},
} {
t.Run(input.name, func(t *testing.T) {
labels1 := makeLabels(input.keys1, 1)
labels2 := makeLabels(input.keys2, 2)

set1 := label.NewSet(labels1...)
set2 := label.NewSet(labels2...)

merge := label.NewMergeIterator(&set1, &set2)

var result []string

for merge.Next() {
label := merge.Label()
result = append(result, fmt.Sprint(label.Key, "/", label.Value.Emit()))
}

require.Equal(t, input.expect, result)
})
}
}
17 changes: 11 additions & 6 deletions exporters/metric/prometheus/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,23 @@ import (
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
"go.opentelemetry.io/otel/sdk/metric/controller/pull"
"go.opentelemetry.io/otel/sdk/resource"
)

// This test demonstrates that it is relatively difficult to setup a
// Prometheus export pipeline:
//
// 1. The default boundaries are difficult to pass, should be []float instead of []metric.Number
//
// TODO: Address this issue; add Resources to the test.
// TODO: Address this issue.

func ExampleNewExportPipeline() {
// Create a meter
exporter, err := prometheus.NewExportPipeline(prometheus.Config{})
exporter, err := prometheus.NewExportPipeline(
prometheus.Config{},
pull.WithResource(resource.New(kv.String("R", "V"))),
)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -73,10 +78,10 @@ func ExampleNewExportPipeline() {
// Output:
// # HELP a_counter Counts things
// # TYPE a_counter counter
// a_counter{key="value"} 100
// a_counter{R="V",key="value"} 100
// # HELP a_valuerecorder Records values
// # TYPE a_valuerecorder histogram
// a_valuerecorder_bucket{key="value",le="+Inf"} 1
// a_valuerecorder_sum{key="value"} 100
// a_valuerecorder_count{key="value"} 1
// a_valuerecorder_bucket{R="V",key="value",le="+Inf"} 1
// a_valuerecorder_sum{R="V",key="value"} 100
// a_valuerecorder_count{R="V",key="value"} 1
}
59 changes: 33 additions & 26 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ import (
"net/http"
"sync"

"go.opentelemetry.io/otel/api/metric"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/controller/pull"
Expand Down Expand Up @@ -203,7 +202,9 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
defer c.exp.lock.RUnlock()

_ = c.exp.Controller().ForEach(func(record export.Record) error {
ch <- c.toDesc(&record)
var labelKeys []string
mergeLabels(record, &labelKeys, nil)
ch <- c.toDesc(record, labelKeys)
return nil
})
}
Expand All @@ -222,9 +223,11 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
err := ctrl.ForEach(func(record export.Record) error {
agg := record.Aggregator()
numberKind := record.Descriptor().NumberKind()
// TODO: Use the resource value in this record.
labels := labelValues(record.Labels())
desc := c.toDesc(&record)

var labelKeys, labels []string
mergeLabels(record, &labelKeys, &labels)

desc := c.toDesc(record, labelKeys)

if hist, ok := agg.(aggregator.Histogram); ok {
if err := c.exportHistogram(ch, hist, numberKind, desc, labels); err != nil {
Expand Down Expand Up @@ -346,30 +349,34 @@ func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregator
return nil
}

func (c *collector) toDesc(record *export.Record) *prometheus.Desc {
func (c *collector) toDesc(record export.Record, labelKeys []string) *prometheus.Desc {
desc := record.Descriptor()
labels := labelsKeys(record.Labels())
return prometheus.NewDesc(sanitize(desc.Name()), desc.Description(), labels, nil)
return prometheus.NewDesc(sanitize(desc.Name()), desc.Description(), labelKeys, nil)
}

func labelsKeys(labels *label.Set) []string {
iter := labels.Iter()
keys := make([]string, 0, iter.Len())
for iter.Next() {
kv := iter.Label()
keys = append(keys, sanitize(string(kv.Key)))
// mergeLabels merges the export.Record's labels and resources into a
// single set, giving precedence to the record's labels in case of
// duplicate keys. This outputs one or both of the keys and the
// values as a slice, and either argument may be nil to avoid
// allocating an unnecessary slice.
func mergeLabels(record export.Record, keys, values *[]string) {
if keys != nil {
*keys = make([]string, 0, record.Labels().Len()+record.Resource().Len())
}
if values != nil {
*values = make([]string, 0, record.Labels().Len()+record.Resource().Len())
}
return keys
}

func labelValues(labels *label.Set) []string {
// TODO(paivagustavo): parse the labels.Encoded() instead of calling `Emit()` directly
// this would avoid unnecessary allocations.
iter := labels.Iter()
values := make([]string, 0, iter.Len())
for iter.Next() {
label := iter.Label()
values = append(values, label.Value.Emit())
// Duplicate keys are resolved by taking the record label value over
// the resource value.
mi := label.NewMergeIterator(record.Labels(), record.Resource().LabelSet())
for mi.Next() {
label := mi.Label()
if keys != nil {
*keys = append(*keys, sanitize(string(label.Key)))
}
if values != nil {
*values = append(*values, label.Value.Emit())
}
}
return values
}
15 changes: 8 additions & 7 deletions exporters/metric/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import (
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
"go.opentelemetry.io/otel/sdk/metric/controller/pull"
"go.opentelemetry.io/otel/sdk/resource"
)

func TestPrometheusExporter(t *testing.T) {
exporter, err := prometheus.NewExportPipeline(prometheus.Config{
DefaultHistogramBoundaries: []float64{-0.5, 1},
})
}, pull.WithResource(resource.New(kv.String("R", "V"))))
require.NoError(t, err)

meter := exporter.Provider().Meter("test")
Expand All @@ -54,18 +55,18 @@ func TestPrometheusExporter(t *testing.T) {
counter.Add(ctx, 10, labels...)
counter.Add(ctx, 5.3, labels...)

expected = append(expected, `counter{A="B",C="D"} 15.3`)
expected = append(expected, `counter{A="B",C="D",R="V"} 15.3`)

valuerecorder.Record(ctx, -0.6, labels...)
valuerecorder.Record(ctx, -0.4, labels...)
valuerecorder.Record(ctx, 0.6, labels...)
valuerecorder.Record(ctx, 20, labels...)

expected = append(expected, `valuerecorder_bucket{A="B",C="D",le="+Inf"} 4`)
expected = append(expected, `valuerecorder_bucket{A="B",C="D",le="-0.5"} 1`)
expected = append(expected, `valuerecorder_bucket{A="B",C="D",le="1"} 3`)
expected = append(expected, `valuerecorder_count{A="B",C="D"} 4`)
expected = append(expected, `valuerecorder_sum{A="B",C="D"} 19.6`)
expected = append(expected, `valuerecorder_bucket{A="B",C="D",R="V",le="+Inf"} 4`)
expected = append(expected, `valuerecorder_bucket{A="B",C="D",R="V",le="-0.5"} 1`)
expected = append(expected, `valuerecorder_bucket{A="B",C="D",R="V",le="1"} 3`)
expected = append(expected, `valuerecorder_count{A="B",C="D",R="V"} 4`)
expected = append(expected, `valuerecorder_sum{A="B",C="D",R="V"} 19.6`)

compareExport(t, exporter, expected)
}
Expand Down
Loading

0 comments on commit d6446f0

Please sign in to comment.