Skip to content

Decouple ConfigDB & Ruler #1547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions pkg/configs/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ import (

// Client is what the ruler and altermanger needs from a config store to process rules.
type Client interface {
// GetRules returns all Cortex configurations from a configs API server
// that have been updated after the given configs.ID was last updated.
GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error)

// GetAlerts fetches all the alerts that have changes since since.
GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error)
}
Expand Down Expand Up @@ -55,27 +51,6 @@ type configsClient struct {
Timeout time.Duration
}

// GetRules implements ConfigClient.
func (c configsClient) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
suffix := ""
if since != 0 {
suffix = fmt.Sprintf("?since=%d", since)
}
endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix)
response, err := doRequest(endpoint, c.Timeout, since)
if err != nil {
return nil, err
}
configs := map[string]configs.VersionedRulesConfig{}
for id, view := range response.Configs {
cfg := view.GetVersionedRulesConfig()
if cfg != nil {
configs[id] = *cfg
}
}
return configs, nil
}

// GetAlerts implements ConfigClient.
func (c configsClient) GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error) {
suffix := ""
Expand Down Expand Up @@ -117,14 +92,6 @@ type dbStore struct {
db db.DB
}

// GetRules implements ConfigClient.
func (d dbStore) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
if since == 0 {
return d.db.GetAllRulesConfigs(ctx)
}
return d.db.GetRulesConfigs(ctx, since)
}

// GetAlerts implements ConfigClient.
func (d dbStore) GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error) {
var resp map[string]configs.View
Expand Down
12 changes: 0 additions & 12 deletions pkg/configs/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ type Config struct {
// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.DBConfig.RegisterFlags(f)
f.Var(&cfg.ConfigsAPIURL, "ruler.configs.url", "DEPRECATED. URL of configs API server.")
f.DurationVar(&cfg.ClientTimeout, "ruler.client-timeout", 5*time.Second, "DEPRECATED. Timeout for requests to Weave Cloud configs service.")
flag.Var(&cfg.ConfigsAPIURL, "alertmanager.configs.url", "URL of configs API server.")
flag.DurationVar(&cfg.ClientTimeout, "alertmanager.configs.client-timeout", 5*time.Second, "Timeout for requests to Weave Cloud configs service.")
}
Expand All @@ -49,16 +47,6 @@ type instrumented struct {
next Client
}

func (i instrumented) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
var cfgs map[string]configs.VersionedRulesConfig
err := instrument.CollectedRequest(context.Background(), "Configs.GetConfigs", configsRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
var err error
cfgs, err = i.next.GetRules(ctx, since) // Warning: this will produce an incorrect result if the configID ever overflows
return err
})
return cfgs, err
}

func (i instrumented) GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error) {
var cfgs *ConfigsResponse
err := instrument.CollectedRequest(context.Background(), "Configs.GetConfigs", configsRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
Expand Down
164 changes: 164 additions & 0 deletions pkg/configs/storage/clients/configdb/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package configdb

import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"net/url"
"strings"
"time"

"github.com/cortexproject/cortex/pkg/configs"
"github.com/cortexproject/cortex/pkg/configs/storage/rules"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/go-kit/kit/log/level"
)

// Config says where we can find the ruler configs.
type Config struct {
ConfigsAPIURL flagext.URLValue
ClientTimeout time.Duration
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Var(&cfg.ConfigsAPIURL, prefix+".configs.url", "DEPRECATED. URL of configs API server.")
f.DurationVar(&cfg.ClientTimeout, prefix+".client-timeout", 5*time.Second, "DEPRECATED. Timeout for requests to Weave Cloud configs service.")
}

// ConfigsClient allows retrieving recording and alerting rules from the configs server.
type ConfigsClient struct {
URL *url.URL
Timeout time.Duration

lastPoll configs.ID
}

// New creates a new ConfigClient.
func New(cfg Config) (*ConfigsClient, error) {
return &ConfigsClient{
URL: cfg.ConfigsAPIURL.URL,
Timeout: cfg.ClientTimeout,

lastPoll: 0,
}, nil
}

// GetRules implements ConfigClient.
func (c *ConfigsClient) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
suffix := ""
if since != 0 {
suffix = fmt.Sprintf("?since=%d", since)
}
endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix)
response, err := doRequest(endpoint, c.Timeout, since)
if err != nil {
return nil, err
}
configs := map[string]configs.VersionedRulesConfig{}
for id, view := range response.Configs {
cfg := view.GetVersionedRulesConfig()
if cfg != nil {
configs[id] = *cfg
}
}
return configs, nil
}

// Stop stops rthe config client
func (c *ConfigsClient) Stop() {}

func doRequest(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) {
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return nil, err
}

client := &http.Client{Timeout: timeout}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Invalid response from configs server: %v", resp.StatusCode)
}

var config ConfigsResponse
if err := json.NewDecoder(resp.Body).Decode(&config); err != nil {
level.Error(util.Logger).Log("msg", "configs: couldn't decode JSON body", "err", err)
return nil, err
}

config.since = since
return &config, nil
}

// ConfigsResponse is a response from server for GetConfigs.
type ConfigsResponse struct {
// The version since which these configs were changed
since configs.ID

// Configs maps user ID to their latest configs.View.
Configs map[string]configs.View `json:"configs"`
}

// GetLatestConfigID returns the last config ID from a set of configs.
func (c ConfigsResponse) GetLatestConfigID() configs.ID {
latest := c.since
for _, config := range c.Configs {
if config.ID > latest {
latest = config.ID
}
}
return latest
}

// PollRules polls the configdb server and returns the updated rule groups
func (c *ConfigsClient) PollRules(ctx context.Context) (map[string][]rules.RuleGroup, error) {
resp, err := c.GetRules(ctx, c.lastPoll)
if err != nil {
return nil, err
}

newRules := map[string][]rules.RuleGroup{}

var highestID configs.ID
for user, cfg := range resp {
if cfg.ID > highestID {
highestID = cfg.ID
}
userRules := []rules.RuleGroup{}
if cfg.IsDeleted() {
newRules[user] = []rules.RuleGroup{}
}
rMap, err := cfg.Config.Parse()
if err != nil {
return nil, err
}
for groupSlug, r := range rMap {
name, file := decomposeGroupSlug(groupSlug)
userRules = append(userRules, rules.FormattedToRuleGroup(user, file, name, r))
}
newRules[user] = userRules
}

if err != nil {
return nil, err
}

c.lastPoll = highestID

return newRules, nil
}

// decomposeGroupSlug breaks the group slug from Parse
// into it's group name and file name
func decomposeGroupSlug(slug string) (string, string) {
components := strings.Split(slug, ";")
return components[0], components[1]
}
54 changes: 54 additions & 0 deletions pkg/configs/storage/clients/configdb/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package configdb

import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/configs"
"github.com/stretchr/testify/assert"
)

var response = `{
"configs": {
"2": {
"id": 1,
"config": {
"rules_files": {
"recording.rules": "groups:\n- name: demo-service-alerts\n interval: 15s\n rules:\n - alert: SomethingIsUp\n expr: up == 1\n"
},
"rule_format_version": "2"
}
}
}
}
`

func TestDoRequest(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte(response))
require.NoError(t, err)
}))
defer server.Close()

resp, err := doRequest(server.URL, 1*time.Second, 0)
assert.Nil(t, err)

expected := ConfigsResponse{Configs: map[string]configs.View{
"2": {
ID: 1,
Config: configs.Config{
RulesConfig: configs.RulesConfig{
Files: map[string]string{
"recording.rules": "groups:\n- name: demo-service-alerts\n interval: 15s\n rules:\n - alert: SomethingIsUp\n expr: up == 1\n",
},
FormatVersion: configs.RuleFormatV2,
},
},
},
}}
assert.Equal(t, &expected, resp)
}
98 changes: 98 additions & 0 deletions pkg/configs/storage/rules/compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package rules

import (
time "time"

"github.com/cortexproject/cortex/pkg/ingester/client"

"github.com/golang/protobuf/proto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/rules"
)

// ProtoRuleUpdateDescFactory makes new RuleUpdateDesc
func ProtoRuleUpdateDescFactory() proto.Message {
return NewRuleUpdateDesc()
}

// NewRuleUpdateDesc returns an empty *distributor.RuleUpdateDesc.
func NewRuleUpdateDesc() *RuleUpdateDesc {
return &RuleUpdateDesc{}
}

// ToProto transforms a formatted prometheus rulegroup to a rule group protobuf
func ToProto(user string, namespace string, rl rulefmt.RuleGroup) RuleGroupDesc {
dur := time.Duration(rl.Interval)
rg := RuleGroupDesc{
Name: rl.Name,
Namespace: namespace,
Interval: &dur,
Rules: formattedRuleToProto(rl.Rules),
User: user,
}
return rg
}

func formattedRuleToProto(rls []rulefmt.Rule) []*RuleDesc {
rules := make([]*RuleDesc, len(rls))
for i := range rls {
f := time.Duration(rls[i].For)

rules[i] = &RuleDesc{
Expr: rls[i].Expr,
Record: rls[i].Record,
Alert: rls[i].Alert,

For: &f,
Labels: client.FromLabelsToLabelAdapaters(labels.FromMap(rls[i].Labels)),
Annotations: client.FromLabelsToLabelAdapaters(labels.FromMap(rls[i].Labels)),
}
}

return rules
}

// FromProto generates a rulefmt RuleGroup
func FromProto(rg *RuleGroupDesc) *rulefmt.RuleGroup {
formattedRuleGroup := rulefmt.RuleGroup{
Name: rg.GetName(),
Interval: model.Duration(*rg.Interval),
Rules: make([]rulefmt.Rule, len(rg.GetRules())),
}

for i, rl := range rg.GetRules() {
formattedRuleGroup.Rules[i] = rulefmt.Rule{
Record: rl.GetRecord(),
Alert: rl.GetAlert(),
Expr: rl.GetExpr(),
For: model.Duration(*rl.GetFor()),
Labels: client.FromLabelAdaptersToLabels(rl.Labels).Map(),
Annotations: client.FromLabelAdaptersToLabels(rl.Annotations).Map(),
}
}

return &formattedRuleGroup
}

// ToRuleGroup returns a functional rulegroup from a proto
func ToRuleGroup(rg *RuleGroupDesc) *Group {
return &Group{
name: rg.GetName(),
namespace: rg.GetNamespace(),
user: rg.GetUser(),
interval: *rg.Interval,
rules: rg.Rules,
}
}

// FormattedToRuleGroup transforms a formatted prometheus rulegroup to a rule group protobuf
func FormattedToRuleGroup(user string, namespace string, name string, rls []rules.Rule) *Group {
return &Group{
name: name,
namespace: namespace,
user: user,
activeRules: rls,
}
}
Loading