Skip to content

Commit

Permalink
Store latest config hash and timestamp as metrics (#1378)
Browse files Browse the repository at this point in the history
  • Loading branch information
kakkoyun authored and brancz committed Aug 6, 2019
1 parent bb040c9 commit 477a720
Showing 1 changed file with 54 additions and 11 deletions.
65 changes: 54 additions & 11 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package receive

import (
"context"
"crypto/md5"
"encoding/binary"
"encoding/json"
"io/ioutil"
"os"
Expand Down Expand Up @@ -33,6 +35,9 @@ type ConfigWatcher struct {
logger log.Logger
watcher *fsnotify.Watcher

hashGauge prometheus.Gauge
successGauge prometheus.Gauge
lastSuccessTimeGauge prometheus.Gauge
changesCounter prometheus.Counter
errorCounter prometheus.Counter
refreshCounter prometheus.Counter
Expand Down Expand Up @@ -62,6 +67,21 @@ func NewConfigWatcher(logger log.Logger, r prometheus.Registerer, path string, i
interval: time.Duration(interval),
logger: logger,
watcher: watcher,
hashGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_config_hash",
Help: "Hash of the currently loaded hashring configuration file.",
}),
successGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_config_last_reload_successful",
Help: "Whether the last hashring configuration file reload attempt was successful.",
}),
lastSuccessTimeGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_config_last_reload_success_timestamp_seconds",
Help: "Timestamp of the last successful hashring configuration file reload.",
}),
changesCounter: prometheus.NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_hashrings_file_changes_total",
Expand Down Expand Up @@ -93,6 +113,9 @@ func NewConfigWatcher(logger log.Logger, r prometheus.Registerer, path string, i

if r != nil {
r.MustRegister(
c.hashGauge,
c.successGauge,
c.lastSuccessTimeGauge,
c.changesCounter,
c.errorCounter,
c.refreshCounter,
Expand Down Expand Up @@ -148,8 +171,13 @@ func (cw *ConfigWatcher) Run(ctx context.Context) {
}
}

// readFile reads the configured file and returns a configuration.
func (cw *ConfigWatcher) readFile() ([]HashringConfig, error) {
// C returns a chan that gets hashring configuration updates.
func (cw *ConfigWatcher) C() <-chan []HashringConfig {
return cw.ch
}

// readFile reads the configured file and returns content of configuration file.
func (cw *ConfigWatcher) readFile() ([]byte, error) {
fd, err := os.Open(cw.path)
if err != nil {
return nil, err
Expand All @@ -160,33 +188,43 @@ func (cw *ConfigWatcher) readFile() ([]HashringConfig, error) {
}
}()

content, err := ioutil.ReadAll(fd)
if err != nil {
return nil, err
}
return ioutil.ReadAll(fd)
}

// loadConfig loads raw configuration content and returns a configuration.
func (cw *ConfigWatcher) loadConfig(content []byte) ([]HashringConfig, error) {
var config []HashringConfig
err = json.Unmarshal(content, &config)
err := json.Unmarshal(content, &config)
return config, err
}

// refresh reads the configured file and sends the hashring configuration on the channel.
func (cw *ConfigWatcher) refresh(ctx context.Context) {
cw.refreshCounter.Inc()
config, err := cw.readFile()
cfgContent, err := cw.readFile()
if err != nil {
cw.errorCounter.Inc()
level.Error(cw.logger).Log("msg", "failed to read configuration file", "err", err, "path", cw.path)
return
}

config, err := cw.loadConfig(cfgContent)
if err != nil {
cw.errorCounter.Inc()
level.Error(cw.logger).Log("msg", "failed to load configuration file", "err", err, "path", cw.path)
return
}

// If there was no change to the configuration, return early.
if reflect.DeepEqual(cw.last, config) {
return
}
cw.changesCounter.Inc()
// Save the last known configuration.
cw.last = config
cw.successGauge.Set(1)
cw.lastSuccessTimeGauge.Set(float64(time.Now().Unix()))
cw.hashGauge.Set(hashAsMetricValue(cfgContent))

for _, c := range config {
cw.hashringNodesGauge.WithLabelValues(c.Hashring).Set(float64(len(c.Endpoints)))
Expand Down Expand Up @@ -228,7 +266,12 @@ func (cw *ConfigWatcher) stop() {
level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped")
}

// C returns a chan that gets hashring configuration updates.
func (cw *ConfigWatcher) C() <-chan []HashringConfig {
return cw.ch
// hashAsMetricValue generates metric value from hash of data.
func hashAsMetricValue(data []byte) float64 {
sum := md5.Sum(data)
// We only want 48 bits as a float64 only has a 53 bit mantissa.
smallSum := sum[0:6]
var bytes = make([]byte, 8)
copy(bytes, smallSum)
return float64(binary.LittleEndian.Uint64(bytes))
}

0 comments on commit 477a720

Please sign in to comment.