Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reloader: Add support for watching and decompressing Prometheus configuration directories #7199

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7105](https://github.com/thanos-io/thanos/pull/7105) Rule: add flag `--query.enable-x-functions` to allow usage of extended promql functions (xrate, xincrease, xdelta) in loaded rules
- [#6867](https://github.com/thanos-io/thanos/pull/6867) Query UI: Tenant input box added to the Query UI, in order to be able to specify which tenant the query should use.
- [#7175](https://github.com/thanos-io/thanos/pull/7175): Query: Add `--query.mode=distributed` which enables the new distributed mode of the Thanos query engine.
- [#7199](https://github.com/thanos-io/thanos/pull/7199): Reloader: Add support for watching and decompressing Prometheus configuration directories

### Changed

Expand Down
183 changes: 148 additions & 35 deletions pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"fmt"
"hash"
"io"
"maps"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -89,6 +90,7 @@ type Reloader struct {
logger log.Logger
cfgFile string
cfgOutputFile string
cfgDirs []CfgDirOption
watchInterval time.Duration
retryInterval time.Duration
watchedDirs []string
Expand All @@ -97,7 +99,9 @@ type Reloader struct {
tr TriggerReloader

lastCfgHash []byte
lastCfgDirsHash [][]byte
lastWatchedDirsHash []byte
lastCfgDirFiles []map[string]struct{}
forceReload bool

reloads prometheus.Counter
Expand All @@ -114,6 +118,22 @@ type TriggerReloader interface {
TriggerReload(ctx context.Context) error
}

// CfgDirOption contains options for watching directories containing configurations. For example, a
// directory could contain additional scrape config files or rule files listed in the main
// Prometheus configuration. Sub-directories are ignored.
type CfgDirOption struct {
// Dir is the path containing the Prometheus configurations to watch.
Dir string
TheSpiritXIII marked this conversation as resolved.
Show resolved Hide resolved

// OutputDir is a directory path to output configurations. If OutputDir is not empty,
// then all config files in the Dir directory are decompressed if needed, environment
// variables will be substituted and the output written into the given path. Prometheus
// should then use OutputDir as its config path.
OutputDir string

// TODO: https://github.com/thanos-io/thanos/issues/7201
}

// Options bundles options for the Reloader.
type Options struct {
// ReloadURL is the Prometheus URL to trigger reloads.
Expand All @@ -133,13 +153,15 @@ type Options struct {
// successful.
RuntimeInfoURL *url.URL

// CfgFile is a path to the prometheus config file to watch.
// CfgFile is a path to the Prometheus config file to watch.
CfgFile string
// CfgOutputFile is a path for the output config file.
// If cfgOutputFile is not empty the config file will be decompressed if needed, environment variables
// will be substituted and the output written into the given path. Prometheus should then use
// cfgOutputFile as its config file path.
CfgOutputFile string
// CfgDirs is an array of paths to directories containing Prometheus configs to watch.
CfgDirs []CfgDirOption
// WatchedDirs is a collection of paths for the reloader to watch over.
WatchedDirs []string
// DelayInterval controls how long the reloader will wait without receiving
Expand All @@ -161,13 +183,15 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader {
logger = log.NewNopLogger()
}
r := &Reloader{
logger: logger,
cfgFile: o.CfgFile,
cfgOutputFile: o.CfgOutputFile,
watcher: newWatcher(logger, reg, o.DelayInterval),
watchedDirs: o.WatchedDirs,
watchInterval: o.WatchInterval,
retryInterval: o.RetryInterval,
logger: logger,
cfgFile: o.CfgFile,
cfgOutputFile: o.CfgOutputFile,
cfgDirs: o.CfgDirs,
lastCfgDirFiles: make([]map[string]struct{}, len(o.CfgDirs)),
watcher: newWatcher(logger, reg, o.DelayInterval),
watchedDirs: o.WatchedDirs,
watchInterval: o.WatchInterval,
retryInterval: o.RetryInterval,

reloads: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Expand Down Expand Up @@ -234,7 +258,7 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader {
// Because some edge cases might be missing, the reloader also relies on the
// watch interval.
func (r *Reloader) Watch(ctx context.Context) error {
if r.cfgFile == "" && len(r.watchedDirs) == 0 {
if r.cfgFile == "" && len(r.cfgDirs) == 0 && len(r.watchedDirs) == 0 {
level.Info(r.logger).Log("msg", "nothing to be watched")
<-ctx.Done()
return nil
Expand All @@ -260,6 +284,13 @@ func (r *Reloader) Watch(ctx context.Context) error {
}
}

for _, cfgDir := range r.cfgDirs {
dir := cfgDir.Dir
if err := r.watcher.addDirectory(dir); err != nil {
return errors.Wrapf(err, "add directory %s to watcher", dir)
}
}

if r.watchInterval == 0 {
// Skip watching the file-system.
return nil
Expand All @@ -279,9 +310,15 @@ func (r *Reloader) Watch(ctx context.Context) error {
wg.Done()
}()

cfgDirsNames := make([]string, 0, len(r.cfgDirs))
for _, cfgDir := range r.cfgDirs {
cfgDirsNames = append(cfgDirsNames, cfgDir.Dir)
}

level.Info(r.logger).Log(
"msg", "started watching config file and directories for changes",
"cfg", r.cfgFile,
"cfgDirs", strings.Join(cfgDirsNames, ","),
"out", r.cfgOutputFile,
"dirs", strings.Join(r.watchedDirs, ","))

Expand Down Expand Up @@ -311,6 +348,44 @@ func (r *Reloader) Watch(ctx context.Context) error {
}
}

func normalize(logger log.Logger, inputFile, outputFile string) error {
b, err := os.ReadFile(inputFile)
if err != nil {
return errors.Wrap(err, "read file")
}

// Detect and extract gzipped file.
if bytes.Equal(b[0:3], firstGzipBytes) {
zr, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return errors.Wrap(err, "create gzip reader")
}
defer runutil.CloseWithLogOnErr(logger, zr, "gzip reader close")

b, err = io.ReadAll(zr)
if err != nil {
return errors.Wrap(err, "read compressed config file")
}
}

b, err = expandEnv(b)
if err != nil {
return errors.Wrap(err, "expand environment variables")
}

tmpFile := outputFile + ".tmp"
defer func() {
_ = os.Remove(tmpFile)
}()
if err := os.WriteFile(tmpFile, b, 0644); err != nil {
return errors.Wrap(err, "write file")
}
if err := os.Rename(tmpFile, outputFile); err != nil {
return errors.Wrap(err, "rename file")
}
return nil
}

// apply triggers Prometheus reload if rules or config changed. If cfgOutputFile is set, we also
// expand env vars into config file before reloading.
// Reload is retried in retryInterval until watchInterval.
Expand All @@ -327,41 +402,72 @@ func (r *Reloader) apply(ctx context.Context) error {
}
cfgHash = h.Sum(nil)
if r.cfgOutputFile != "" {
b, err := os.ReadFile(r.cfgFile)
if err != nil {
return errors.Wrap(err, "read file")
if err := normalize(r.logger, r.cfgFile, r.cfgOutputFile); err != nil {
return err
}
}
}

// Detect and extract gzipped file.
if bytes.Equal(b[0:3], firstGzipBytes) {
zr, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return errors.Wrap(err, "create gzip reader")
}
defer runutil.CloseWithLogOnErr(r.logger, zr, "gzip reader close")
cfgDirsHash := make([][]byte, len(r.cfgDirs))
cfgDirsChanged := len(r.lastCfgDirsHash) == 0 && len(r.cfgDirs) > 0
for i, cfgDir := range r.cfgDirs {
h := sha256.New()

b, err = io.ReadAll(zr)
if err != nil {
return errors.Wrap(err, "read compressed config file")
}
}
walkDir, err := filepath.EvalSymlinks(cfgDir.Dir)
if err != nil {
return errors.Wrap(err, "dir symlink eval")
}
outDir, err := filepath.EvalSymlinks(cfgDir.OutputDir)
if err != nil {
return errors.Wrap(err, "dir symlink eval")
}

cfgDirFiles := map[string]struct{}{}
entries, err := os.ReadDir(walkDir)
if err != nil {
return errors.Wrapf(err, "read dir: %s", walkDir)
}
for _, entry := range entries {
path := filepath.Join(walkDir, entry.Name())

b, err = expandEnv(b)
// Make sure to follow a symlink before checking if it is a directory.
targetFile, err := os.Stat(path)
if err != nil {
return errors.Wrap(err, "expand environment variables")
return errors.Wrapf(err, "stat file: %s", path)
}

if targetFile.IsDir() {
continue
}

tmpFile := r.cfgOutputFile + ".tmp"
defer func() {
_ = os.Remove(tmpFile)
}()
if err := os.WriteFile(tmpFile, b, 0644); err != nil {
return errors.Wrap(err, "write file")
if err := hashFile(h, path); err != nil {
return errors.Wrapf(err, "build hash for file: %s", path)
}
if err := os.Rename(tmpFile, r.cfgOutputFile); err != nil {
return errors.Wrap(err, "rename file")

outFile := filepath.Join(outDir, targetFile.Name())
cfgDirFiles[outFile] = struct{}{}
if err := normalize(r.logger, path, outFile); err != nil {
return errors.Wrapf(err, "move file: %s", path)
}
}
if r.lastCfgDirFiles[i] != nil {
if !maps.Equal(r.lastCfgDirFiles[i], cfgDirFiles) {
for outFile := range r.lastCfgDirFiles[i] {
if _, ok := cfgDirFiles[outFile]; !ok {
if err := os.Remove(outFile); err != nil {
return err
}
}
}
}
}
r.lastCfgDirFiles[i] = cfgDirFiles

cfgDirsHash[i] = h.Sum(nil)
// Skip comparing bytes if we already set the flag.
if !cfgDirsChanged && !bytes.Equal(r.lastCfgDirsHash[i], cfgDirsHash[i]) {
cfgDirsChanged = true
}
}

h := sha256.New()
Expand Down Expand Up @@ -401,7 +507,7 @@ func (r *Reloader) apply(ctx context.Context) error {
watchedDirsHash = h.Sum(nil)
}

if !r.forceReload && bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) {
if !r.forceReload && !cfgDirsChanged && bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) {
// Nothing to do.
return nil
}
Expand All @@ -420,11 +526,18 @@ func (r *Reloader) apply(ctx context.Context) error {

r.forceReload = false
r.lastCfgHash = cfgHash
r.lastCfgDirsHash = cfgDirsHash
r.lastWatchedDirsHash = watchedDirsHash

cfgDirsNames := make([]string, 0, len(r.cfgDirs))
for _, cfgDir := range r.cfgDirs {
cfgDirsNames = append(cfgDirsNames, cfgDir.Dir)
}
level.Info(r.logger).Log(
"msg", "Reload triggered",
"cfg_in", r.cfgFile,
"cfg_out", r.cfgOutputFile,
"cfg_dirs", strings.Join(cfgDirsNames, ", "),
"watched_dirs", strings.Join(r.watchedDirs, ", "))
r.lastReloadSuccess.Set(1)
r.lastReloadSuccessTimestamp.SetToCurrentTime()
Expand Down
Loading
Loading