Skip to content

Commit

Permalink
Infer types in remote_write
Browse files Browse the repository at this point in the history
Signed-off-by: chrismark <chrismarkou92@gmail.com>
  • Loading branch information
ChrsMark committed Jul 15, 2020
1 parent 93e43e7 commit c0c6a0f
Show file tree
Hide file tree
Showing 15 changed files with 548 additions and 41 deletions.
5 changes: 2 additions & 3 deletions metricbeat/module/prometheus/collector/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
package collector

import (
"math"
"strconv"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/helper/labelhash"
"github.com/elastic/beats/v7/metricbeat/mb"
"math"
"strconv"

dto "github.com/prometheus/client_model/go"
)
Expand Down
13 changes: 12 additions & 1 deletion metricbeat/module/prometheus/remote_write/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,18 @@ import (
"github.com/elastic/beats/v7/metricbeat/mb"
)

func samplesToEvents(metrics model.Samples) map[string]mb.Event {

// DefaultRemoteWriteEventsGeneratorFactory returns the default prometheus events generator
func DefaultRemoteWriteEventsGeneratorFactory(ms mb.BaseMetricSet) (RemoteWriteEventsGenerator, error) {
return &remoteWriteEventGenerator{}, nil
}

type remoteWriteEventGenerator struct{}

func (p *remoteWriteEventGenerator) Start() {}
func (p *remoteWriteEventGenerator) Stop() {}

func (p *remoteWriteEventGenerator) GenerateEvents(metrics model.Samples) map[string]mb.Event {
eventList := map[string]mb.Event{}

for _, metric := range metrics {
Expand Down
53 changes: 51 additions & 2 deletions metricbeat/module/prometheus/remote_write/remote_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,33 @@ import (
)

func init() {
mb.Registry.MustAddMetricSet("prometheus", "remote_write", New,
mb.Registry.MustAddMetricSet("prometheus", "remote_write",
MetricSetBuilder("prometheus", DefaultRemoteWriteEventsGeneratorFactory),
mb.WithHostParser(parse.EmptyHostParser),
)
}

// RemoteWriteEventsGenerator converts a Prometheus metric Samples mb.Event map
type RemoteWriteEventsGenerator interface {
// Start must be called before using the generator
Start()

// converts a Prometheus metric family into a list of PromEvents
GenerateEvents(metrics model.Samples) map[string]mb.Event

// Stop must be called when the generator won't be used anymore
Stop()
}

// RemoteWriteEventsGeneratorFactory creates a PromEventsGenerator when instanciating a metricset
type RemoteWriteEventsGeneratorFactory func(ms mb.BaseMetricSet) (RemoteWriteEventsGenerator, error)

type MetricSet struct {
mb.BaseMetricSet
server serverhelper.Server
events chan mb.Event
promEventsGen RemoteWriteEventsGenerator
eventGenStarted bool
}

func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
Expand All @@ -62,6 +80,37 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return m, nil
}

// MetricSetBuilder returns a builder function for a new Prometheus metricset using
// the given namespace and event generator
func MetricSetBuilder(namespace string, genFactory RemoteWriteEventsGeneratorFactory) func(base mb.BaseMetricSet) (mb.MetricSet, error) {
return func(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := defaultConfig()
err := base.Module().UnpackConfig(&config)
if err != nil {
return nil, err
}

promEventsGen, err := genFactory(base)
if err != nil {
return nil, err
}

m := &MetricSet{
BaseMetricSet: base,
promEventsGen: promEventsGen,
eventGenStarted: false,
}
svc, err := httpserver.NewHttpServerWithHandler(base, m.handleFunc)
if err != nil {
return nil, err
}
m.server = svc

return m, nil
}
}


func (m *MetricSet) Run(reporter mb.PushReporterV2) {
// Start event watcher
m.server.Start()
Expand Down Expand Up @@ -100,7 +149,7 @@ func (m *MetricSet) handleFunc(writer http.ResponseWriter, req *http.Request) {
}

samples := protoToSamples(&protoReq)
events := samplesToEvents(samples)
events := m.promEventsGen.GenerateEvents(samples)

for _, e := range events {
select {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
# Docs: https://www.elastic.co/guide/en/beats/metricbeat/master/metricbeat-module-prometheus.html

# Metrics collected from a Prometheus endpoint
- module: prometheus
period: 10s
metricsets: ["collector"]
hosts: ["localhost:9090"]
metrics_path: /metrics
#- module: prometheus
# period: 10s
# metricsets: ["collector"]
# hosts: ["localhost:9090"]
# metrics_path: /metrics
#metrics_filters:
# include: []
# exclude: []
Expand All @@ -20,10 +20,10 @@


# Metrics sent by a Prometheus server using remote_write option
#- module: prometheus
# metricsets: ["remote_write"]
# host: "localhost"
# port: "9201"
- module: prometheus
metricsets: ["remote_write"]
host: "localhost"
port: "9201"

# Secure settings for the server using TLS/SSL:
#ssl.certificate: "/etc/pki/server/cert.pem"
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/prometheus/collector/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ func Test_CounterCache(t *testing.T) {
for i, val := range tt.valuesUint64 {
want := tt.expectedUin64[i]
if got, _ := tt.counterCache.RateUint64(tt.counterName, val); got != want {
t.Errorf("counterCache.RateUint64() = %v, want %v", got, want)
t.Errorf("CounterCache.RateUint64() = %v, want %v", got, want)
}
}
for i, val := range tt.valuesFloat64 {
want := tt.expectedFloat64[i]
if got, _ := tt.counterCache.RateFloat64(tt.counterName, val); got != want {
t.Errorf("counterCache.RateFloat64() = %v, want %v", got, want)
t.Errorf("CounterCache.RateFloat64() = %v, want %v", got, want)
}
}
})
Expand Down
40 changes: 20 additions & 20 deletions x-pack/metricbeat/module/prometheus/collector/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ func promEventsGeneratorFactory(base mb.BaseMetricSet) (collector.PromEventsGene
// to make sure that all counters are available between fetches
counters := NewCounterCache(base.Module().Config().Period * 5)

g := typedGenerator{
counterCache: counters,
rateCounters: config.RateCounters,
g := TypedGenerator{
CounterCache: counters,
RateCounters: config.RateCounters,
}

return &g, nil
Expand All @@ -39,29 +39,29 @@ func promEventsGeneratorFactory(base mb.BaseMetricSet) (collector.PromEventsGene
return collector.DefaultPromEventsGeneratorFactory(base)
}

type typedGenerator struct {
counterCache CounterCache
rateCounters bool
type TypedGenerator struct {
CounterCache CounterCache
RateCounters bool
}

func (g *typedGenerator) Start() {
func (g *TypedGenerator) Start() {
cfgwarn.Beta("Prometheus 'use_types' setting is beta")

if g.rateCounters {
if g.RateCounters {
cfgwarn.Experimental("Prometheus 'rate_counters' setting is experimental")
}

g.counterCache.Start()
g.CounterCache.Start()
}

func (g *typedGenerator) Stop() {
logp.Debug("prometheus.collector.cache", "stopping counterCache")
g.counterCache.Stop()
func (g *TypedGenerator) Stop() {
logp.Debug("prometheus.collector.cache", "stopping CounterCache")
g.CounterCache.Stop()
}

// GeneratePromEvents stores all Prometheus metrics using
// specific Elasticsearch data types.
func (g *typedGenerator) GeneratePromEvents(mf *dto.MetricFamily) []collector.PromEvent {
func (g *TypedGenerator) GeneratePromEvents(mf *dto.MetricFamily) []collector.PromEvent {
var events []collector.PromEvent

name := *mf.Name
Expand Down Expand Up @@ -138,7 +138,7 @@ func (g *typedGenerator) GeneratePromEvents(mf *dto.MetricFamily) []collector.Pr
events = append(events, collector.PromEvent{
Data: common.MapStr{
name: common.MapStr{
"histogram": promHistogramToES(g.counterCache, name, labels, histogram),
"histogram": PromHistogramToES(g.CounterCache, name, labels, histogram),
},
},
Labels: labels,
Expand Down Expand Up @@ -167,26 +167,26 @@ func (g *typedGenerator) GeneratePromEvents(mf *dto.MetricFamily) []collector.Pr
}

// rateCounterUint64 fills a counter value and optionally adds the rate if rate_counters is enabled
func (g *typedGenerator) rateCounterUint64(name string, labels common.MapStr, value uint64) common.MapStr {
func (g *TypedGenerator) rateCounterUint64(name string, labels common.MapStr, value uint64) common.MapStr {
d := common.MapStr{
"counter": value,
}

if g.rateCounters {
d["rate"], _ = g.counterCache.RateUint64(name+labels.String(), value)
if g.RateCounters {
d["rate"], _ = g.CounterCache.RateUint64(name+labels.String(), value)
}

return d
}

// rateCounterFloat64 fills a counter value and optionally adds the rate if rate_counters is enabled
func (g *typedGenerator) rateCounterFloat64(name string, labels common.MapStr, value float64) common.MapStr {
func (g *TypedGenerator) rateCounterFloat64(name string, labels common.MapStr, value float64) common.MapStr {
d := common.MapStr{
"counter": value,
}

if g.rateCounters {
d["rate"], _ = g.counterCache.RateFloat64(name+labels.String(), value)
if g.RateCounters {
d["rate"], _ = g.CounterCache.RateFloat64(name+labels.String(), value)
}

return d
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/prometheus/collector/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
dto "github.com/prometheus/client_model/go"
)

// promHistogramToES takes a Prometheus histogram and converts it to an ES histogram:
// PromHistogramToES takes a Prometheus histogram and converts it to an ES histogram:
//
// ES histograms look like this:
//
Expand All @@ -27,7 +27,7 @@ import (
// - undoing counters accumulation for each bucket (counts)
//
// https://www.elastic.co/guide/en/elasticsearch/reference/master/histogram.html
func promHistogramToES(cc CounterCache, name string, labels common.MapStr, histogram *dto.Histogram) common.MapStr {
func PromHistogramToES(cc CounterCache, name string, labels common.MapStr, histogram *dto.Histogram) common.MapStr {
var values []float64
var counts []uint64

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
)

// TestPromHistogramToES tests that calling promHistogramToES multiple
// TestPromHistogramToES tests that calling PromHistogramToES multiple
// times with the same cache produces each time the expected results.
func TestPromHistogramToES(t *testing.T) {
type sample struct {
Expand Down Expand Up @@ -398,7 +398,7 @@ func TestPromHistogramToES(t *testing.T) {

for i, s := range c.samples {
t.Logf("#%d: %+v", i, s.histogram)
result := promHistogramToES(cache, metricName, labels, &s.histogram)
result := PromHistogramToES(cache, metricName, labels, &s.histogram)
assert.EqualValues(t, s.expected, result)
}
})
Expand Down
51 changes: 51 additions & 0 deletions x-pack/metricbeat/module/prometheus/remote_write/_meta/data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"@timestamp": "2020-02-28T13:55:37.221Z",
"@metadata": {
"beat": "metricbeat",
"type": "_doc",
"version": "8.0.0"
},
"service": {
"type": "prometheus"
},
"agent": {
"version": "8.0.0",
"type": "metricbeat",
"ephemeral_id": "ead09243-0aa0-4fd2-8732-1e09a6d36338",
"hostname": "host1",
"id": "bd12ee45-881f-48e4-af20-13b139548607"
},
"ecs": {
"version": "1.4.0"
},
"host": {},
"event": {
"dataset": "prometheus.remote_write",
"module": "prometheus"
},
"metricset": {
"name": "remote_write"
},
"prometheus": {
"metrics": {
"container_tasks_state": 0
},
"labels": {
"name": "nodeexporter",
"id": "/docker/1d6ec1931c9b527d4fe8e28d9c798f6ec612f48af51949f3219b5ca77e120b10",
"container_label_com_docker_compose_oneoff": "False",
"instance": "cadvisor:8080",
"container_label_com_docker_compose_service": "nodeexporter",
"state": "iowaiting",
"monitor": "docker-host-alpha",
"container_label_com_docker_compose_project": "dockprom",
"job": "cadvisor",
"image": "prom/node-exporter:v0.18.1",
"container_label_maintainer": "The Prometheus Authors <prometheus-developers@googlegroups.com>",
"container_label_com_docker_compose_config_hash": "2cc2fedf6da5ff0996a209d9801fb74962a8f4c21e44be03ed82659817d9e7f9",
"container_label_com_docker_compose_version": "1.24.1",
"container_label_com_docker_compose_container_number": "1",
"container_label_org_label_schema_group": "monitoring"
}
}
}
Loading

0 comments on commit c0c6a0f

Please sign in to comment.