From 32427f79f60277cab10c46e8adb47a5e6f6a91f5 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Thu, 23 Jul 2020 20:33:42 +0530 Subject: [PATCH] Upgrade Prometheus to master (#2918) * Upgrade Prometheus to master Signed-off-by: Ganesh Vernekar * Fix promql default interval Signed-off-by: Ganesh Vernekar * Fix tests Signed-off-by: Ganesh Vernekar * Add comment for manager.Run() Signed-off-by: Ganesh Vernekar --- CHANGELOG.md | 2 +- go.mod | 2 +- go.sum | 7 +- pkg/cortex/modules.go | 8 +- pkg/querier/querier.go | 4 +- pkg/ruler/ruler.go | 4 +- .../prometheus/prometheus/discovery/README.md | 12 ++ .../discovery/dockerswarm/dockerswarm.go | 19 +-- .../prometheus/prometheus/promql/engine.go | 122 ++++++++---------- .../prometheus/prometheus/promql/test.go | 9 +- .../prometheus/prometheus/rules/alerting.go | 2 +- .../prometheus/prometheus/rules/manager.go | 32 ++++- .../prometheus/prometheus/scrape/scrape.go | 2 - .../prometheus/prometheus/tsdb/db.go | 82 ++++++++---- .../prometheus/prometheus/tsdb/head.go | 21 ++- vendor/modules.txt | 2 +- 16 files changed, 197 insertions(+), 133 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e223cce1c8..8d26b71738 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,7 +57,7 @@ * `s3.http.idle-conn-timeout` * `s3.http.response-header-timeout` * `s3.http.insecure-skip-verify` -* [ENHANCEMENT] Prometheus upgraded. #2798 #2849 #2867 #2902 +* [ENHANCEMENT] Prometheus upgraded. #2798 #2849 #2867 #2902 #2918 * Optimized labels regex matchers for patterns containing literals (eg. `foo.*`, `.*foo`, `.*foo.*`) * [ENHANCEMENT] Add metric `cortex_ruler_config_update_failures_total` to Ruler to track failures of loading rules files. #2857 * [ENHANCEMENT] Experimental Alertmanager: Alertmanager configuration persisted to object storage using an experimental API that accepts and returns YAML-based Alertmanager configuration. #2768 diff --git a/go.mod b/go.mod index d68ad263b0..83537761cc 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/prometheus/client_golang v1.7.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.10.0 - github.com/prometheus/prometheus v1.8.2-0.20200720162558-f762a9ec77b3 + github.com/prometheus/prometheus v1.8.2-0.20200722151933-4a8531a64b32 github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1 github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e github.com/spf13/afero v1.2.2 diff --git a/go.sum b/go.sum index 8f820cd683..6b50ef821b 100644 --- a/go.sum +++ b/go.sum @@ -1002,8 +1002,8 @@ github.com/prometheus/prometheus v0.0.0-20190818123050-43acd0e2e93f/go.mod h1:rM github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef/go.mod h1:7U90zPoLkWjEIQcy/rweQla82OCTUzxVHE51G3OhJbI= github.com/prometheus/prometheus v1.8.2-0.20200213233353-b90be6f32a33/go.mod h1:fkIPPkuZnkXyopYHmXPxf9rgiPkVgZCN8w9o8+UgBlY= github.com/prometheus/prometheus v1.8.2-0.20200619100132-74207c04655e/go.mod h1:QV6T0PPQi5UFmqcLBJw3JiyIR8r1O7KEv9qlVw4VV40= -github.com/prometheus/prometheus v1.8.2-0.20200720162558-f762a9ec77b3 h1:nqmHN6hoNaeHjTAU6eNndT09OwBvoxrJHHl+rxHx0Ok= -github.com/prometheus/prometheus v1.8.2-0.20200720162558-f762a9ec77b3/go.mod h1:nnrpCyzNlHMAvHQl3Edz6cUiJZu3q4owFNV3oU3K7x0= +github.com/prometheus/prometheus v1.8.2-0.20200722151933-4a8531a64b32 h1:GcJMaFu1uu6rSueToTRZuVS3AiORbFtLEDMUfp4GA9Q= +github.com/prometheus/prometheus v1.8.2-0.20200722151933-4a8531a64b32/go.mod h1:+/y4DzJ62qmhy0o/H4PtXegRXw+80E8RVRHhLbv+bkM= github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1 h1:+kGqA4dNN5hn7WwvKdzHl0rdN5AEkbNZd0VjRltAiZg= github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -1164,6 +1164,8 @@ go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/automaxprocs v1.2.0/go.mod h1:YfO3fm683kQpzETxlTGZhGIVmXAhaw3gxeBADbpZtnU= +go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo= +go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= @@ -1421,6 +1423,7 @@ golang.org/x/tools v0.0.0-20190918214516-5a1a30219888/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191111182352-50fa39b762bc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191113191852-77e3bb0ad9e7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 37eb46fba2..534f4393aa 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -365,11 +365,6 @@ func (t *Cortex) initQueryFrontend() (serv services.Service, err error) { return } - // Ensure the default evaluation interval is set (promql uses a package-scoped mutable variable). - // This is important when `querier.parallelise-shardable-queries` is enabled because the frontend - // aggregates the sharded queries. - promql.SetDefaultEvaluationInterval(t.Cfg.Querier.DefaultEvaluationInterval) - tripperware, cache, err := queryrange.NewTripperware( t.Cfg.QueryRange, util.Logger, @@ -382,6 +377,9 @@ func (t *Cortex) initQueryFrontend() (serv services.Service, err error) { Reg: prometheus.DefaultRegisterer, MaxSamples: t.Cfg.Querier.MaxSamples, Timeout: t.Cfg.Querier.Timeout, + NoStepSubqueryIntervalFn: func(int64) int64 { + return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds() + }, }, t.Cfg.Querier.QueryIngestersWithin, prometheus.DefaultRegisterer, diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 634fbd410d..46e5b1e8dc 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -167,7 +167,6 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor level.Warn(util.Logger).Log("msg", "Using deprecated flag -promql.lookback-delta, use -querier.lookback-delta instead") } - promql.SetDefaultEvaluationInterval(cfg.DefaultEvaluationInterval) engine := promql.NewEngine(promql.EngineOpts{ Logger: util.Logger, Reg: reg, @@ -175,6 +174,9 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor MaxSamples: cfg.MaxSamples, Timeout: cfg.Timeout, LookbackDelta: lookbackDelta, + NoStepSubqueryIntervalFn: func(int64) int64 { + return cfg.DefaultEvaluationInterval.Milliseconds() + }, }) return &sampleAndChunkQueryable{lazyQueryable}, engine } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 253f2555c2..5df91741c3 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -509,7 +509,9 @@ func (r *Ruler) syncManager(ctx context.Context, user string, groups store.RuleG level.Error(r.logger).Log("msg", "unable to create rule manager", "user", user, "err", err) return } - manager.Run() + // manager.Run() starts running the manager and blocks until Stop() is called. + // Hence run it as another goroutine. + go manager.Run() r.userManagers[user] = manager } err = manager.Update(r.cfg.EvaluationInterval, files, nil) diff --git a/vendor/github.com/prometheus/prometheus/discovery/README.md b/vendor/github.com/prometheus/prometheus/discovery/README.md index 060c7d52c8..4c012c8d82 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/README.md +++ b/vendor/github.com/prometheus/prometheus/discovery/README.md @@ -216,4 +216,16 @@ If all the targets in a group go away, we need to send the target groups with em ``` down the channel. +### New Service Discovery Check List + +Here are some non-obvious parts of adding service discoveries that need to be verified: + +- Check for `nil` SDConfigs in `discovery/config/config.go`. +- Validate that discovery configs can be DeepEqualled by adding them to + `config/testdata/conf.good.yml` and to the associated tests. +- If there is a TLSConfig or HTTPClientConfig, add them to + `resolveFilepaths` in `config/config.go`. +- List the service discovery in both `` and + `` in `docs/configuration/configuration.md`. + diff --git a/vendor/github.com/prometheus/prometheus/discovery/dockerswarm/dockerswarm.go b/vendor/github.com/prometheus/prometheus/discovery/dockerswarm/dockerswarm.go index 0464f032f9..ef98890ff4 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/dockerswarm/dockerswarm.go +++ b/vendor/github.com/prometheus/prometheus/discovery/dockerswarm/dockerswarm.go @@ -44,7 +44,6 @@ type SDConfig struct { HTTPClientConfig config_util.HTTPClientConfig `yaml:",inline"` Host string `yaml:"host"` - url *url.URL Role string `yaml:"role"` Port int `yaml:"port"` @@ -62,11 +61,9 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { if c.Host == "" { return fmt.Errorf("host missing") } - url, err := url.Parse(c.Host) - if err != nil { + if _, err = url.Parse(c.Host); err != nil { return err } - c.url = url switch c.Role { case "services", "nodes", "tasks": case "": @@ -89,17 +86,15 @@ type Discovery struct { // NewDiscovery returns a new Discovery which periodically refreshes its targets. func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { var err error + d := &Discovery{ port: conf.Port, role: conf.Role, } - // This is used in tests. In normal situations, it is set when Unmarshaling. - if conf.url == nil { - conf.url, err = url.Parse(conf.Host) - if err != nil { - return nil, err - } + hostURL, err := url.Parse(conf.Host) + if err != nil { + return nil, err } opts := []client.Opt{ @@ -110,7 +105,7 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { // There are other protocols than HTTP supported by the Docker daemon, like // unix, which are not supported by the HTTP client. Passing HTTP client // options to the Docker client makes those non-HTTP requests fail. - if conf.url.Scheme == "http" || conf.url.Scheme == "https" { + if hostURL.Scheme == "http" || hostURL.Scheme == "https" { rt, err := config_util.NewRoundTripperFromConfig(conf.HTTPClientConfig, "dockerswarm_sd", false) if err != nil { return nil, err @@ -120,7 +115,7 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { Transport: rt, Timeout: time.Duration(conf.RefreshInterval), }), - client.WithScheme(conf.url.Scheme), + client.WithScheme(hostURL.Scheme), ) } diff --git a/vendor/github.com/prometheus/prometheus/promql/engine.go b/vendor/github.com/prometheus/prometheus/promql/engine.go index de4e968c48..f6bf6a61af 100644 --- a/vendor/github.com/prometheus/prometheus/promql/engine.go +++ b/vendor/github.com/prometheus/prometheus/promql/engine.go @@ -24,7 +24,6 @@ import ( "sort" "strconv" "sync" - "sync/atomic" "time" "github.com/go-kit/kit/log" @@ -56,22 +55,6 @@ const ( minInt64 = -9223372036854775808 ) -var ( - // DefaultEvaluationInterval is the default evaluation interval of - // a subquery in milliseconds. - DefaultEvaluationInterval int64 -) - -// SetDefaultEvaluationInterval sets DefaultEvaluationInterval. -func SetDefaultEvaluationInterval(ev time.Duration) { - atomic.StoreInt64(&DefaultEvaluationInterval, durationToInt64Millis(ev)) -} - -// GetDefaultEvaluationInterval returns the DefaultEvaluationInterval as time.Duration. -func GetDefaultEvaluationInterval() int64 { - return atomic.LoadInt64(&DefaultEvaluationInterval) -} - type engineMetrics struct { currentQueries prometheus.Gauge maxConcurrentQueries prometheus.Gauge @@ -221,19 +204,24 @@ type EngineOpts struct { // LookbackDelta determines the time since the last sample after which a time // series is considered stale. LookbackDelta time.Duration + + // NoStepSubqueryIntervalFn is the default evaluation interval of + // a subquery in milliseconds if no step in range vector was specified `[30m:]`. + NoStepSubqueryIntervalFn func(rangeMillis int64) int64 } // Engine handles the lifetime of queries from beginning to end. // It is connected to a querier. type Engine struct { - logger log.Logger - metrics *engineMetrics - timeout time.Duration - maxSamplesPerQuery int - activeQueryTracker *ActiveQueryTracker - queryLogger QueryLogger - queryLoggerLock sync.RWMutex - lookbackDelta time.Duration + logger log.Logger + metrics *engineMetrics + timeout time.Duration + maxSamplesPerQuery int + activeQueryTracker *ActiveQueryTracker + queryLogger QueryLogger + queryLoggerLock sync.RWMutex + lookbackDelta time.Duration + noStepSubqueryIntervalFn func(rangeMillis int64) int64 } // NewEngine returns a new engine. @@ -328,12 +316,13 @@ func NewEngine(opts EngineOpts) *Engine { } return &Engine{ - timeout: opts.Timeout, - logger: opts.Logger, - metrics: metrics, - maxSamplesPerQuery: opts.MaxSamples, - activeQueryTracker: opts.ActiveQueryTracker, - lookbackDelta: opts.LookbackDelta, + timeout: opts.Timeout, + logger: opts.Logger, + metrics: metrics, + maxSamplesPerQuery: opts.MaxSamples, + activeQueryTracker: opts.ActiveQueryTracker, + lookbackDelta: opts.LookbackDelta, + noStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn, } } @@ -525,14 +514,14 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval if s.Start == s.End && s.Interval == 0 { start := timeMilliseconds(s.Start) evaluator := &evaluator{ - startTimestamp: start, - endTimestamp: start, - interval: 1, - ctx: ctxInnerEval, - maxSamples: ng.maxSamplesPerQuery, - defaultEvalInterval: GetDefaultEvaluationInterval(), - logger: ng.logger, - lookbackDelta: ng.lookbackDelta, + startTimestamp: start, + endTimestamp: start, + interval: 1, + ctx: ctxInnerEval, + maxSamples: ng.maxSamplesPerQuery, + logger: ng.logger, + lookbackDelta: ng.lookbackDelta, + noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, } val, warnings, err := evaluator.Eval(s.Expr) @@ -575,14 +564,14 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval // Range evaluation. evaluator := &evaluator{ - startTimestamp: timeMilliseconds(s.Start), - endTimestamp: timeMilliseconds(s.End), - interval: durationMilliseconds(s.Interval), - ctx: ctxInnerEval, - maxSamples: ng.maxSamplesPerQuery, - defaultEvalInterval: GetDefaultEvaluationInterval(), - logger: ng.logger, - lookbackDelta: ng.lookbackDelta, + startTimestamp: timeMilliseconds(s.Start), + endTimestamp: timeMilliseconds(s.End), + interval: durationMilliseconds(s.Interval), + ctx: ctxInnerEval, + maxSamples: ng.maxSamplesPerQuery, + logger: ng.logger, + lookbackDelta: ng.lookbackDelta, + noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, } val, warnings, err := evaluator.Eval(s.Expr) if err != nil { @@ -657,7 +646,7 @@ func (ng *Engine) populateSeries(querier storage.Querier, s *parser.EvalStmt) { hints := &storage.SelectHints{ Start: timestamp.FromTime(s.Start), End: timestamp.FromTime(s.End), - Step: durationToInt64Millis(s.Interval), + Step: durationMilliseconds(s.Interval), } // We need to make sure we select the timerange selected by the subquery. @@ -769,11 +758,11 @@ type evaluator struct { endTimestamp int64 // End time in milliseconds. interval int64 // Interval in milliseconds. - maxSamples int - currentSamples int - defaultEvalInterval int64 - logger log.Logger - lookbackDelta time.Duration + maxSamples int + currentSamples int + logger log.Logger + lookbackDelta time.Duration + noStepSubqueryIntervalFn func(rangeMillis int64) int64 } // errorf causes a panic with the input formatted into an error. @@ -1333,21 +1322,22 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { return ev.matrixSelector(e) case *parser.SubqueryExpr: - offsetMillis := durationToInt64Millis(e.Offset) - rangeMillis := durationToInt64Millis(e.Range) + offsetMillis := durationMilliseconds(e.Offset) + rangeMillis := durationMilliseconds(e.Range) newEv := &evaluator{ - endTimestamp: ev.endTimestamp - offsetMillis, - interval: ev.defaultEvalInterval, - ctx: ev.ctx, - currentSamples: ev.currentSamples, - maxSamples: ev.maxSamples, - defaultEvalInterval: ev.defaultEvalInterval, - logger: ev.logger, - lookbackDelta: ev.lookbackDelta, + endTimestamp: ev.endTimestamp - offsetMillis, + ctx: ev.ctx, + currentSamples: ev.currentSamples, + maxSamples: ev.maxSamples, + logger: ev.logger, + lookbackDelta: ev.lookbackDelta, + noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn, } if e.Step != 0 { - newEv.interval = durationToInt64Millis(e.Step) + newEv.interval = durationMilliseconds(e.Step) + } else { + newEv.interval = ev.noStepSubqueryIntervalFn(rangeMillis) } // Start with the first timestamp after (ev.startTimestamp - offset - range) @@ -1367,10 +1357,6 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { panic(errors.Errorf("unhandled expression of type: %T", expr)) } -func durationToInt64Millis(d time.Duration) int64 { - return int64(d / time.Millisecond) -} - // vectorSelector evaluates a *parser.VectorSelector expression. func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vector, storage.Warnings) { ws, err := checkAndExpandSeriesSet(ev.ctx, node) diff --git a/vendor/github.com/prometheus/prometheus/promql/test.go b/vendor/github.com/prometheus/prometheus/promql/test.go index e3d99e83f8..ff2549144e 100644 --- a/vendor/github.com/prometheus/prometheus/promql/test.go +++ b/vendor/github.com/prometheus/prometheus/promql/test.go @@ -518,10 +518,11 @@ func (t *Test) clear() { t.storage = teststorage.New(t) opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: 100 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 100 * time.Second, + NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(1 * time.Minute) }, } t.queryEngine = NewEngine(opts) diff --git a/vendor/github.com/prometheus/prometheus/rules/alerting.go b/vendor/github.com/prometheus/prometheus/rules/alerting.go index 54ee31d740..c82bc9640a 100644 --- a/vendor/github.com/prometheus/prometheus/rules/alerting.go +++ b/vendor/github.com/prometheus/prometheus/rules/alerting.go @@ -74,7 +74,7 @@ func (s AlertState) String() string { case StateFiring: return "firing" } - panic(errors.Errorf("unknown alert state: %s", s.String())) + panic(errors.Errorf("unknown alert state: %d", s)) } // Alert is the user-level representation of a single instance of an alerting rule. diff --git a/vendor/github.com/prometheus/prometheus/rules/manager.go b/vendor/github.com/prometheus/prometheus/rules/manager.go index 76c756c977..c4344fa68e 100644 --- a/vendor/github.com/prometheus/prometheus/rules/manager.go +++ b/vendor/github.com/prometheus/prometheus/rules/manager.go @@ -852,6 +852,7 @@ type ManagerOptions struct { OutageTolerance time.Duration ForGracePeriod time.Duration ResendDelay time.Duration + GroupLoader GroupLoader Metrics *Metrics } @@ -863,6 +864,10 @@ func NewManager(o *ManagerOptions) *Manager { o.Metrics = NewGroupMetrics(o.Registerer) } + if o.GroupLoader == nil { + o.GroupLoader = FileLoader{} + } + m := &Manager{ groups: map[string]*Group{}, opts: o, @@ -875,8 +880,13 @@ func NewManager(o *ManagerOptions) *Manager { return m } -// Run starts processing of the rule manager. +// Run starts processing of the rule manager. It is blocking. func (m *Manager) Run() { + m.start() + <-m.done +} + +func (m *Manager) start() { close(m.block) } @@ -969,6 +979,22 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels return nil } +// GroupLoader is responsible for loading rule groups from arbitrary sources and parsing them. +type GroupLoader interface { + Load(identifier string) (*rulefmt.RuleGroups, []error) + Parse(query string) (parser.Expr, error) +} + +// FileLoader is the default GroupLoader implementation. It defers to rulefmt.ParseFile +// and parser.ParseExpr +type FileLoader struct{} + +func (FileLoader) Load(identifier string) (*rulefmt.RuleGroups, []error) { + return rulefmt.ParseFile(identifier) +} + +func (FileLoader) Parse(query string) (parser.Expr, error) { return parser.ParseExpr(query) } + // LoadGroups reads groups from a list of files. func (m *Manager) LoadGroups( interval time.Duration, externalLabels labels.Labels, filenames ...string, @@ -978,7 +1004,7 @@ func (m *Manager) LoadGroups( shouldRestore := !m.restored for _, fn := range filenames { - rgs, errs := rulefmt.ParseFile(fn) + rgs, errs := m.opts.GroupLoader.Load(fn) if errs != nil { return nil, errs } @@ -991,7 +1017,7 @@ func (m *Manager) LoadGroups( rules := make([]Rule, 0, len(rg.Rules)) for _, r := range rg.Rules { - expr, err := parser.ParseExpr(r.Expr.Value) + expr, err := m.opts.GroupLoader.Parse(r.Expr.Value) if err != nil { return nil, []error{errors.Wrap(err, fn)} } diff --git a/vendor/github.com/prometheus/prometheus/scrape/scrape.go b/vendor/github.com/prometheus/prometheus/scrape/scrape.go index 9883156add..ce0e333321 100644 --- a/vendor/github.com/prometheus/prometheus/scrape/scrape.go +++ b/vendor/github.com/prometheus/prometheus/scrape/scrape.go @@ -434,9 +434,7 @@ func (sp *scrapePool) sync(targets []*Target) { if _, ok := uniqueTargets[hash]; !ok { wg.Add(1) go func(l loop) { - l.stop() - wg.Done() }(sp.loops[hash]) diff --git a/vendor/github.com/prometheus/prometheus/tsdb/db.go b/vendor/github.com/prometheus/prometheus/tsdb/db.go index 2b0525ab6b..e02ad5fae0 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/db.go @@ -118,19 +118,27 @@ type Options struct { // SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. // It is always a no-op in Prometheus and mainly meant for external users who import TSDB. SeriesLifecycleCallback SeriesLifecycleCallback + + // BlocksToDelete is a function which returns the blocks which can be deleted. + // It is always the default time and size based retention in Prometheus and + // mainly meant for external users who import TSDB. + BlocksToDelete BlocksToDeleteFunc } +type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} + // DB handles reads and writes of time series falling into // a hashed partition of a seriedb. type DB struct { dir string lockf fileutil.Releaser - logger log.Logger - metrics *dbMetrics - opts *Options - chunkPool chunkenc.Pool - compactor Compactor + logger log.Logger + metrics *dbMetrics + opts *Options + chunkPool chunkenc.Pool + compactor Compactor + blocksToDelete BlocksToDeleteFunc // Mutex for that must be held when modifying the general block layout. mtx sync.RWMutex @@ -560,14 +568,18 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } db = &DB{ - dir: dir, - logger: l, - opts: opts, - compactc: make(chan struct{}, 1), - donec: make(chan struct{}), - stopc: make(chan struct{}), - autoCompact: true, - chunkPool: chunkenc.NewPool(), + dir: dir, + logger: l, + opts: opts, + compactc: make(chan struct{}, 1), + donec: make(chan struct{}), + stopc: make(chan struct{}), + autoCompact: true, + chunkPool: chunkenc.NewPool(), + blocksToDelete: opts.BlocksToDelete, + } + if db.blocksToDelete == nil { + db.blocksToDelete = DefaultBlocksToDelete(db) } if !opts.NoLockfile { @@ -871,13 +883,17 @@ func (db *DB) reload() (err error) { return err } - deletable := db.deletableBlocks(loadable) + deletableULIDs := db.blocksToDelete(loadable) + deletable := make(map[ulid.ULID]*Block, len(deletableULIDs)) // Corrupted blocks that have been superseded by a loadable block can be safely ignored. // This makes it resilient against the process crashing towards the end of a compaction. // Creation of a new block and deletion of its parents cannot happen atomically. // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. for _, block := range loadable { + if _, ok := deletableULIDs[block.meta.ULID]; ok { + deletable[block.meta.ULID] = block + } for _, b := range block.Meta().Compaction.Parents { delete(corrupted, b.ULID) deletable[b.ULID] = nil @@ -986,9 +1002,17 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po return blocks, corrupted, nil } +// DefaultBlocksToDelete returns a filter which decides time based and size based +// retention from the options of the db. +func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc { + return func(blocks []*Block) map[ulid.ULID]struct{} { + return deletableBlocks(db, blocks) + } +} + // deletableBlocks returns all blocks past retention policy. -func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { - deletable := make(map[ulid.ULID]*Block) +func deletableBlocks(db *DB, blocks []*Block) map[ulid.ULID]struct{} { + deletable := make(map[ulid.ULID]struct{}) // Sort the blocks by time - newest to oldest (largest to smallest timestamp). // This ensures that the retentions will remove the oldest blocks. @@ -998,34 +1022,36 @@ func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { for _, block := range blocks { if block.Meta().Compaction.Deletable { - deletable[block.Meta().ULID] = block + deletable[block.Meta().ULID] = struct{}{} } } - for ulid, block := range db.beyondTimeRetention(blocks) { - deletable[ulid] = block + for ulid := range BeyondTimeRetention(db, blocks) { + deletable[ulid] = struct{}{} } - for ulid, block := range db.beyondSizeRetention(blocks) { - deletable[ulid] = block + for ulid := range BeyondSizeRetention(db, blocks) { + deletable[ulid] = struct{}{} } return deletable } -func (db *DB) beyondTimeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) { +// BeyondTimeRetention returns those blocks which are beyond the time retention +// set in the db options. +func BeyondTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{}) { // Time retention is disabled or no blocks to work with. if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 { return } - deletable = make(map[ulid.ULID]*Block) + deletable = make(map[ulid.ULID]struct{}) for i, block := range blocks { // The difference between the first block and this block is larger than // the retention period so any blocks after that are added as deletable. if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > db.opts.RetentionDuration { for _, b := range blocks[i:] { - deletable[b.meta.ULID] = b + deletable[b.meta.ULID] = struct{}{} } db.metrics.timeRetentionCount.Inc() break @@ -1034,13 +1060,15 @@ func (db *DB) beyondTimeRetention(blocks []*Block) (deletable map[ulid.ULID]*Blo return deletable } -func (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) { +// BeyondSizeRetention returns those blocks which are beyond the size retention +// set in the db options. +func BeyondSizeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{}) { // Size retention is disabled or no blocks to work with. if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 { return } - deletable = make(map[ulid.ULID]*Block) + deletable = make(map[ulid.ULID]struct{}) walSize, _ := db.Head().wal.Size() headChunksSize := db.Head().chunkDiskMapper.Size() @@ -1052,7 +1080,7 @@ func (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Blo if blocksSize > int64(db.opts.MaxBytes) { // Add this and all following blocks for deletion. for _, b := range blocks[i:] { - deletable[b.meta.ULID] = b + deletable[b.meta.ULID] = struct{}{} } db.metrics.sizeRetentionCount.Inc() break diff --git a/vendor/github.com/prometheus/prometheus/tsdb/head.go b/vendor/github.com/prometheus/prometheus/tsdb/head.go index d45307dd57..066c722eaa 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/head.go @@ -44,6 +44,9 @@ var ( // ErrInvalidSample is returned if an appended sample is not valid and can't // be ingested. ErrInvalidSample = errors.New("invalid sample") + // ErrAppenderClosed is returned if an appender has already be successfully + // rolled back or commited. + ErrAppenderClosed = errors.New("appender closed") ) // Head handles reads and writes of time series data within a time window. @@ -1093,6 +1096,7 @@ type headAppender struct { sampleSeries []*memSeries appendID, cleanupAppendIDsBelow uint64 + closed bool } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -1193,7 +1197,11 @@ func (a *headAppender) log() error { return nil } -func (a *headAppender) Commit() error { +func (a *headAppender) Commit() (err error) { + if a.closed { + return ErrAppenderClosed + } + defer func() { a.closed = true }() if err := a.log(); err != nil { //nolint: errcheck a.Rollback() // Most likely the same error will happen again. @@ -1231,7 +1239,11 @@ func (a *headAppender) Commit() error { return nil } -func (a *headAppender) Rollback() error { +func (a *headAppender) Rollback() (err error) { + if a.closed { + return ErrAppenderClosed + } + defer func() { a.closed = true }() defer a.head.metrics.activeAppenders.Dec() defer a.head.iso.closeAppend(a.appendID) defer a.head.putSeriesBuffer(a.sampleSeries) @@ -2078,8 +2090,9 @@ func (s *memSeries) chunkID(pos int) int { return pos + s.firstChunkID } -// truncateChunksBefore removes all chunks from the series that have not timestamp -// at or after mint. Chunk IDs remain unchanged. +// truncateChunksBefore removes all chunks from the series that +// have no timestamp at or after mint. +// Chunk IDs remain unchanged. func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { var k int if s.headChunk != nil && s.headChunk.maxTime < mint { diff --git a/vendor/modules.txt b/vendor/modules.txt index ae5c491921..5f67004ea7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -529,7 +529,7 @@ github.com/prometheus/node_exporter/https github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.8.2-0.20200720162558-f762a9ec77b3 +# github.com/prometheus/prometheus v1.8.2-0.20200722151933-4a8531a64b32 ## explicit github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery