Skip to content

Commit

Permalink
feat(pkger): extend metrics to collect remote sources
Browse files Browse the repository at this point in the history
closes: #18243
  • Loading branch information
jsteenb2 committed Jun 16, 2020
1 parent a1f89d9 commit 0e61e71
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

1. [18387](https://github.com/influxdata/influxdb/pull/18387): Integrate query cancellation after queries have been submitted
1. [18515](https://github.com/influxdata/influxdb/pull/18515): Extend templates with the source file|url|reader.
1. [18539](https://github.com/influxdata/influxdb/pull/18539): Collect stats on installed influxdata community template usage.

## v2.0.0-beta.12 [2020-06-12]

Expand Down
85 changes: 50 additions & 35 deletions kit/metric/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,35 @@ func New(reg prometheus.Registerer, service string, opts ...ClientOptFn) *REDCli
opt := metricOpts{
namespace: "service",
service: service,
counterMetrics: map[string]newVecOpts{
counterMetrics: map[string]VecOpts{
"call_total": {
help: "Number of calls",
labelNames: []string{"method"},
counterFn: func(vec *prometheus.CounterVec, o fnOpts) {
vec.With(prometheus.Labels{"method": o.method}).Inc()
Help: "Number of calls",
LabelNames: []string{"method"},
CounterFn: func(vec *prometheus.CounterVec, o CollectFnOpts) {
vec.With(prometheus.Labels{"method": o.Method}).Inc()
},
},
"error_total": {
help: "Number of errors encountered",
labelNames: []string{"method", "code"},
counterFn: func(vec *prometheus.CounterVec, o fnOpts) {
if o.err != nil {
Help: "Number of errors encountered",
LabelNames: []string{"method", "code"},
CounterFn: func(vec *prometheus.CounterVec, o CollectFnOpts) {
if o.Err != nil {
vec.With(prometheus.Labels{
"method": o.method,
"code": influxdb.ErrorCode(o.err),
"method": o.Method,
"code": influxdb.ErrorCode(o.Err),
}).Inc()
}
},
},
},
histogramMetrics: map[string]newVecOpts{
histogramMetrics: map[string]VecOpts{
"duration": {
help: "Duration of calls",
labelNames: []string{"method"},
histogramFn: func(vec *prometheus.HistogramVec, o fnOpts) {
Help: "Duration of calls",
LabelNames: []string{"method"},
HistogramFn: func(vec *prometheus.HistogramVec, o CollectFnOpts) {
vec.
With(prometheus.Labels{"method": o.method}).
Observe(time.Since(o.start).Seconds())
With(prometheus.Labels{"method": o.Method}).
Observe(time.Since(o.Start).Seconds())
},
},
},
Expand All @@ -57,25 +57,25 @@ func New(reg prometheus.Registerer, service string, opts ...ClientOptFn) *REDCli
client := new(REDClient)
for metricName, vecOpts := range opt.counterMetrics {
client.metrics = append(client.metrics, &counter{
fn: vecOpts.counterFn,
fn: vecOpts.CounterFn,
CounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: opt.namespace,
Subsystem: opt.serviceName(),
Name: metricName,
Help: vecOpts.help,
}, vecOpts.labelNames),
Help: vecOpts.Help,
}, vecOpts.LabelNames),
})
}

for metricName, vecOpts := range opt.histogramMetrics {
client.metrics = append(client.metrics, &histogram{
fn: vecOpts.histogramFn,
fn: vecOpts.HistogramFn,
HistogramVec: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: opt.namespace,
Subsystem: opt.serviceName(),
Name: metricName,
Help: vecOpts.help,
}, vecOpts.labelNames),
Help: vecOpts.Help,
}, vecOpts.LabelNames),
})
}

Expand All @@ -84,17 +84,32 @@ func New(reg prometheus.Registerer, service string, opts ...ClientOptFn) *REDCli
return client
}

type RecordFn func(err error, opts ...func(opts *CollectFnOpts)) error

// RecordAdditional provides an extension to the base method, err data provided
// to the metrics.
func RecordAdditional(props map[string]interface{}) func(opts *CollectFnOpts) {
return func(opts *CollectFnOpts) {
opts.AdditionalProps = props
}
}

// Record returns a record fn that is called on any given return err. If an error is encountered
// it will register the err metric. The err is never altered.
func (c *REDClient) Record(method string) func(error) error {
func (c *REDClient) Record(method string) RecordFn {
start := time.Now()
return func(err error) error {
return func(err error, opts ...func(opts *CollectFnOpts)) error {
opt := CollectFnOpts{
Method: method,
Start: start,
Err: err,
}
for _, o := range opts {
o(&opt)
}

for _, metric := range c.metrics {
metric.collect(fnOpts{
method: method,
start: start,
err: err,
})
metric.collect(opt)
}

return err
Expand All @@ -112,25 +127,25 @@ func (c *REDClient) collectors() []prometheus.Collector {
type metricCollector interface {
prometheus.Collector

collect(o fnOpts)
collect(o CollectFnOpts)
}

type counter struct {
*prometheus.CounterVec

fn counterFn
fn CounterFn
}

func (c *counter) collect(o fnOpts) {
func (c *counter) collect(o CollectFnOpts) {
c.fn(c.CounterVec, o)
}

type histogram struct {
*prometheus.HistogramVec

fn histogramFn
fn HistogramFn
}

func (h *histogram) collect(o fnOpts) {
func (h *histogram) collect(o CollectFnOpts) {
h.fn(h.HistogramVec, o)
}
43 changes: 29 additions & 14 deletions kit/metric/metrics_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,35 @@ import (
)

type (
// using for a struct here as it will be extended
fnOpts struct {
method string
start time.Time
err error
// CollectFnOpts provides arugments to the collect operation of a metric.
CollectFnOpts struct {
Method string
Start time.Time
Err error
AdditionalProps map[string]interface{}
}

counterFn func(vec *prometheus.CounterVec, o fnOpts)
CounterFn func(vec *prometheus.CounterVec, o CollectFnOpts)

histogramFn func(vec *prometheus.HistogramVec, o fnOpts)
HistogramFn func(vec *prometheus.HistogramVec, o CollectFnOpts)

newVecOpts struct {
help string
labelNames []string
// VecOpts expands on the
VecOpts struct {
Name string
Help string
LabelNames []string

counterFn counterFn
histogramFn histogramFn
CounterFn CounterFn
HistogramFn HistogramFn
}
)

type metricOpts struct {
namespace string
service string
serviceSuffix string
counterMetrics map[string]newVecOpts
histogramMetrics map[string]newVecOpts
counterMetrics map[string]VecOpts
histogramMetrics map[string]VecOpts
}

func (o metricOpts) serviceName() string {
Expand All @@ -46,6 +49,18 @@ func (o metricOpts) serviceName() string {
// ClientOptFn is an option used by a metric middleware.
type ClientOptFn func(*metricOpts)

// WithVec sets a new counter vector to be collected.
func WithVec(opts VecOpts) ClientOptFn {
return func(o *metricOpts) {
if opts.CounterFn != nil {
if o.counterMetrics == nil {
o.counterMetrics = make(map[string]VecOpts)
}
o.counterMetrics[opts.Name] = opts
}
}
}

// WithSuffix returns a metric option that applies a suffix to the service name of the metric.
func WithSuffix(suffix string) ClientOptFn {
return func(opts *metricOpts) {
Expand Down
68 changes: 65 additions & 3 deletions pkger/service_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package pkger

import (
"context"
"net/url"
"strings"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/metric"
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/prometheus/client_golang/prometheus"
)

type mwMetrics struct {
Expand All @@ -21,7 +24,7 @@ var _ SVC = (*mwMetrics)(nil)
func MWMetrics(reg *prom.Registry) SVCMiddleware {
return func(svc SVC) SVC {
return &mwMetrics{
rec: metric.New(reg, "pkger"),
rec: metric.New(reg, "pkger", metric.WithVec(templateVec())),
next: svc,
}
}
Expand Down Expand Up @@ -59,11 +62,70 @@ func (s *mwMetrics) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*
func (s *mwMetrics) DryRun(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (PkgImpactSummary, error) {
rec := s.rec.Record("dry_run")
impact, err := s.next.DryRun(ctx, orgID, userID, opts...)
return impact, rec(err)
return impact, rec(err, metric.RecordAdditional(map[string]interface{}{
"sources": impact.Sources,
}))
}

func (s *mwMetrics) Apply(ctx context.Context, orgID, userID influxdb.ID, opts ...ApplyOptFn) (PkgImpactSummary, error) {
rec := s.rec.Record("apply")
impact, err := s.next.Apply(ctx, orgID, userID, opts...)
return impact, rec(err)
return impact, rec(err, metric.RecordAdditional(map[string]interface{}{
"sources": impact.Sources,
}))
}

func templateVec() metric.VecOpts {
return metric.VecOpts{
Name: "template_count",
Help: "Number of installations per template",
LabelNames: []string{"method"},
CounterFn: func(vec *prometheus.CounterVec, o metric.CollectFnOpts) {
if o.Err != nil {
return
}

// safe to ignore the failed type assertion, a zero value
// provides a nil slice, so no worries.
sources, _ := o.AdditionalProps["sources"].([]string)
for _, source := range normalizeRemoteSources(sources) {
vec.
With(prometheus.Labels{
"method": o.Method,
"source": source.String(),
}).
Inc()
}
},
}
}

func normalizeRemoteSources(sources []string) []url.URL {
var out []url.URL
for _, source := range sources {
u, err := url.Parse(source)
if err != nil {
continue
}
if !strings.HasPrefix(u.Scheme, "http") {
continue
}
if u.Host == "raw.githubusercontent.com" {
u.Host = "github.com"
u.Path = normalizeRawGithubPath(u.Path)
}
out = append(out, *u)
}
return out
}

func normalizeRawGithubPath(rawPath string) string {
parts := strings.Split(rawPath, "/")
if len(parts) < 4 {
return rawPath
}
// keep /account/repo as base, then append the blob to it
tail := append([]string{"blob"}, parts[3:]...)
parts = append(parts[:3], tail...)
return strings.Join(parts, "/")
}
Loading

0 comments on commit 0e61e71

Please sign in to comment.