Skip to content

Update Ruler to use upstream Prom Rule Manager #1571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ jobs:
path: pkg/ring/ring.pb.go
- store_artifacts:
path: pkg/ingester/client/cortex.pb.go
- store_artifacts:
path: pkg/ruler/rules/rules.pb.go

- run:
name: Save Images
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## master / unreleased

* [CHANGE] Flags changed with transition to upstream Prometheus rules manager:
* `ruler.client-timeout` is now `ruler.configs.client-timeout` in order to match `ruler.configs.url`
* `ruler.group-timeout`has been removed
* `ruler.num-workers` has been removed
* `ruler.rule-path` has been added to specify where the prometheus rule manager will sync rule files
* `ruler.storage.type` has beem added to specify the rule store backend type, currently only the configdb.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beem >  been

* `ruler.poll-interval` has been added to specify the interval in which to poll new rule groups.
* [FEATURE] The distributor can now drop labels from samples (similar to the removal of the replica label for HA ingestion) per user via the `distributor.drop-label` flag. #1726
* [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861

Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pkg/querier/frontend/frontend.pb.go: pkg/querier/frontend/frontend.proto
pkg/querier/queryrange/queryrange.pb.go: pkg/querier/queryrange/queryrange.proto
pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto
pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto
all: $(UPTODATE_FILES)
test: protos
mod-check: protos
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ require (
github.com/hashicorp/go-cleanhttp v0.5.1
github.com/hashicorp/go-sockaddr v1.0.2
github.com/hashicorp/memberlist v0.1.4
github.com/jonboulle/clockwork v0.1.0
github.com/json-iterator/go v1.1.7
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348 // indirect
Expand All @@ -64,6 +63,7 @@ require (
github.com/satori/go.uuid v1.2.0 // indirect
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/sercand/kuberesolver v2.1.0+incompatible // indirect
github.com/spf13/afero v1.2.2
github.com/stretchr/testify v1.4.0
github.com/thanos-io/thanos v0.8.1
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
Expand Down
65 changes: 47 additions & 18 deletions pkg/configs/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,39 @@ package client
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"net/http"
"net/url"
"time"

"github.com/cortexproject/cortex/pkg/configs"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/instrument"
)

// Config says where we can find the ruler configs.
type Config struct {
ConfigsAPIURL flagext.URLValue
ClientTimeout time.Duration // HTTP timeout duration for requests made to the Weave Cloud configs service.
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Var(&cfg.ConfigsAPIURL, prefix+"configs.url", "URL of configs API server.")
f.DurationVar(&cfg.ClientTimeout, prefix+"configs.client-timeout", 5*time.Second, "Timeout for requests to Weave Cloud configs service.")
}

var configsRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "configs_request_duration_seconds",
Help: "Time spent requesting configs.",
Buckets: prometheus.DefBuckets,
}, []string{"operation", "status_code"}))

// Client is what the ruler and altermanger needs from a config store to process rules.
type Client interface {
// GetRules returns all Cortex configurations from a configs API server
Expand All @@ -25,32 +47,32 @@ type Client interface {
}

// New creates a new ConfigClient.
func New(cfg Config) (Client, error) {
if cfg.ConfigsAPIURL.URL == nil {
return nil, errors.New("configdb url not provided")
}
return instrumented{
next: configsClient{
URL: cfg.ConfigsAPIURL.URL,
Timeout: cfg.ClientTimeout,
},
func New(cfg Config) (*ConfigDBClient, error) {
return &ConfigDBClient{
URL: cfg.ConfigsAPIURL.URL,
Timeout: cfg.ClientTimeout,
}, nil
}

// configsClient allows retrieving recording and alerting rules from the configs server.
type configsClient struct {
// ConfigDBClient allows retrieving recording and alerting rules from the configs server.
type ConfigDBClient struct {
URL *url.URL
Timeout time.Duration
}

// GetRules implements ConfigClient.
func (c configsClient) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
// GetRules implements Client
func (c ConfigDBClient) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
suffix := ""
if since != 0 {
suffix = fmt.Sprintf("?since=%d", since)
}
endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix)
response, err := doRequest(endpoint, c.Timeout, since)
var response *ConfigsResponse
err := instrument.CollectedRequest(ctx, "GetRules", configsRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var err error
response, err = doRequest(endpoint, c.Timeout, since)
return err
})
if err != nil {
return nil, err
}
Expand All @@ -64,14 +86,20 @@ func (c configsClient) GetRules(ctx context.Context, since configs.ID) (map[stri
return configs, nil
}

// GetAlerts implements ConfigClient.
func (c configsClient) GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error) {
// GetAlerts implements Client.
func (c ConfigDBClient) GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error) {
suffix := ""
if since != 0 {
suffix = fmt.Sprintf("?since=%d", since)
}
endpoint := fmt.Sprintf("%s/private/api/prom/configs/alertmanager%s", c.URL.String(), suffix)
return doRequest(endpoint, c.Timeout, since)
var response *ConfigsResponse
err := instrument.CollectedRequest(ctx, "GetAlerts", configsRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var err error
response, err = doRequest(endpoint, c.Timeout, since)
return err
})
return response, err
}

func doRequest(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) {
Expand All @@ -81,6 +109,7 @@ func doRequest(endpoint string, timeout time.Duration, since configs.ID) (*Confi
}

client := &http.Client{Timeout: timeout}

resp, err := client.Do(req)
if err != nil {
return nil, err
Expand Down
66 changes: 0 additions & 66 deletions pkg/configs/client/config.go

This file was deleted.

89 changes: 89 additions & 0 deletions pkg/configs/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/promql"
Expand Down Expand Up @@ -182,6 +183,94 @@ func (c RulesConfig) Parse() (map[string][]rules.Rule, error) {
}
}

// ParseFormatted returns the rulefmt map of a users rules configs. It allows
// for rules to be mapped to disk and read by the prometheus rules manager.
func (c RulesConfig) ParseFormatted() (map[string]rulefmt.RuleGroups, error) {
switch c.FormatVersion {
case RuleFormatV1:
return c.parseV1Formatted()
case RuleFormatV2:
return c.parseV2Formatted()
default:
return nil, fmt.Errorf("unknown rule format version %v", c.FormatVersion)
}
}

// parseV2 parses and validates the content of the rule files in a RulesConfig
// according to the Prometheus 2.x rule format.
func (c RulesConfig) parseV2Formatted() (map[string]rulefmt.RuleGroups, error) {
ruleMap := map[string]rulefmt.RuleGroups{}

for fn, content := range c.Files {
rgs, errs := rulefmt.Parse([]byte(content))
if errs != nil {
for _, err := range errs {
return nil, err
}
}
ruleMap[fn] = *rgs

}
return ruleMap, nil
}

// parseV1 parses and validates the content of the rule files in a RulesConfig
// according to the Prometheus 1.x rule format.
func (c RulesConfig) parseV1Formatted() (map[string]rulefmt.RuleGroups, error) {
result := map[string]rulefmt.RuleGroups{}
for fn, content := range c.Files {
stmts, err := legacy_promql.ParseStmts(content)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %s", fn, err)
}

ra := []rulefmt.Rule{}
for _, stmt := range stmts {
var rule rulefmt.Rule
switch r := stmt.(type) {
case *legacy_promql.AlertStmt:
_, err := promql.ParseExpr(r.Expr.String())
if err != nil {
return nil, err
}

rule = rulefmt.Rule{
Alert: r.Name,
Expr: r.Expr.String(),
For: model.Duration(r.Duration),
Labels: r.Labels.Map(),
Annotations: r.Annotations.Map(),
}

case *legacy_promql.RecordStmt:
_, err := promql.ParseExpr(r.Expr.String())
if err != nil {
return nil, err
}

rule = rulefmt.Rule{
Record: r.Name,
Expr: r.Expr.String(),
Labels: r.Labels.Map(),
}

default:
return nil, fmt.Errorf("ruler.GetRules: unknown statement type")
}
ra = append(ra, rule)
}
result[fn] = rulefmt.RuleGroups{
Groups: []rulefmt.RuleGroup{
{
Name: "rg:" + fn,
Rules: ra,
},
},
}
}
return result, nil
}

// parseV2 parses and validates the content of the rule files in a RulesConfig
// according to the Prometheus 2.x rule format.
//
Expand Down
Loading