Skip to content

Added local backend support to new ruler storage config #3932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [CHANGE] Alertmanager now removes local files after Alertmanager is no longer running for removed or resharded user. #3910
* [CHANGE] Alertmanager now stores local files in per-tenant folders. Files stored by Alertmanager previously are migrated to new hierarchy. Support for this migration will be removed in Cortex 1.10. #3910
* [FEATURE] Ruler Storage: Added `local` backend support to the ruler storage configuration under the `-ruler-storage.` flag prefix. #3932
* [ENHANCEMENT] Ruler: optimized `<prefix>/api/v1/rules` and `<prefix>/api/v1/alerts` when ruler sharding is enabled. #3916
* [ENHANCEMENT] Ruler: added the following metrics when ruler sharding is enabled: #3916
* `cortex_ruler_clients`
Expand Down
7 changes: 6 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ tenant_federation:

ruler_storage:
# Backend storage to use. Supported backends are: s3, gcs, azure, swift,
# filesystem, configdb.
# filesystem, configdb, local.
# CLI flag: -ruler-storage.backend
[backend: <string> | default = "s3"]

Expand Down Expand Up @@ -356,6 +356,11 @@ ruler_storage:
# The CLI flags prefix for this block config is: ruler-storage
[configdb: <configstore_config>]

local:
# Directory to scan for rules
# CLI flag: -ruler-storage.local.directory
[directory: <string> | default = ""]

# The configs_config configures the Cortex Configs DB and API.
[configs: <configs_config>]

Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) {
if !t.Cfg.Ruler.StoreConfig.IsDefaults() {
t.RulerStorage, err = ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, rules.FileLoader{}, util_log.Logger)
} else {
t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, util_log.Logger, prometheus.DefaultRegisterer)
}
return
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"gopkg.in/yaml.v3"

"github.com/cortexproject/cortex/pkg/cortexpb"
store "github.com/cortexproject/cortex/pkg/ruler/rulespb"
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
"github.com/cortexproject/cortex/pkg/tenant"
util_log "github.com/cortexproject/cortex/pkg/util/log"
Expand Down Expand Up @@ -405,7 +405,7 @@ func (a *API) ListRules(w http.ResponseWriter, req *http.Request) {
return
}

err = a.store.LoadRuleGroups(req.Context(), map[string]rulestore.RuleGroupList{userID: rgs})
err = a.store.LoadRuleGroups(req.Context(), map[string]rulespb.RuleGroupList{userID: rgs})
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down Expand Up @@ -435,7 +435,7 @@ func (a *API) GetRuleGroup(w http.ResponseWriter, req *http.Request) {
return
}

formatted := store.FromProto(rg)
formatted := rulespb.FromProto(rg)
marshalAndSend(formatted, w, logger)
}

Expand Down Expand Up @@ -495,7 +495,7 @@ func (a *API) CreateRuleGroup(w http.ResponseWriter, req *http.Request) {
return
}

rgProto := store.ToProto(userID, namespace, rg)
rgProto := rulespb.ToProto(userID, namespace, rg)

level.Debug(logger).Log("msg", "attempting to store rulegroup", "userID", userID, "group", rgProto.String())
err = a.store.SetRuleGroup(req.Context(), userID, namespace, rgProto)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/ruler/rulestore"
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
"github.com/cortexproject/cortex/pkg/util/services"
)

Expand Down Expand Up @@ -171,7 +171,7 @@ func TestRuler_alerts(t *testing.T) {
}

func TestRuler_Create(t *testing.T) {
cfg, cleanup := defaultRulerConfig(newMockRuleStore(make(map[string]rulestore.RuleGroupList)))
cfg, cleanup := defaultRulerConfig(newMockRuleStore(make(map[string]rulespb.RuleGroupList)))
defer cleanup()

r, rcleanup := newTestRuler(t, cfg)
Expand Down Expand Up @@ -301,7 +301,7 @@ func TestRuler_DeleteNamespace(t *testing.T) {
}

func TestRuler_Limits(t *testing.T) {
cfg, cleanup := defaultRulerConfig(newMockRuleStore(make(map[string]rulestore.RuleGroupList)))
cfg, cleanup := defaultRulerConfig(newMockRuleStore(make(map[string]rulespb.RuleGroupList)))
defer cleanup()

r, rcleanup := newTestRuler(t, cfg)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/weaveworks/common/user"
"golang.org/x/net/context/ctxhttp"

"github.com/cortexproject/cortex/pkg/ruler/rulestore"
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
)

type DefaultMultiTenantManager struct {
Expand Down Expand Up @@ -91,7 +91,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg
}, nil
}

func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulestore.RuleGroupList) {
func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
// A lock is taken to ensure if this function is called concurrently, then each call
// returns after the call map files and check for updates
r.userManagerMtx.Lock()
Expand Down Expand Up @@ -121,7 +121,7 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou

// syncRulesToManager maps the rule files to disk, detects any changes and will create/update the
// the users Prometheus Rules Manager.
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulestore.RuleGroupList) {
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
// Map the files to disk and return the file names to be passed to the users manager if they
// have been updated
update, files, err := r.mapper.MapRules(user, groups.Formatted())
Expand Down
3 changes: 1 addition & 2 deletions pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/ruler/rulespb"
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
"github.com/cortexproject/cortex/pkg/util/test"
)

Expand All @@ -32,7 +31,7 @@ func TestSyncRuleGroups(t *testing.T) {

const user = "testUser"

userRules := map[string]rulestore.RuleGroupList{
userRules := map[string]rulespb.RuleGroupList{
user: {
&rulespb.RuleGroupDesc{
Name: "group1",
Expand Down
14 changes: 7 additions & 7 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
type MultiTenantManager interface {
// SyncRuleGroups is used to sync the Manager with rules from the RuleStore.
// If existing user is missing in the ruleGroups map, its ruler manager will be stopped.
SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulestore.RuleGroupList)
SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList)
// GetRules fetches rules for a particular tenant (userID).
GetRules(userID string) []*promRules.Group
// Stop stops all Manager components.
Expand Down Expand Up @@ -470,7 +470,7 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) {
r.manager.SyncRuleGroups(ctx, configs)
}

func (r *Ruler) listRules(ctx context.Context) (map[string]rulestore.RuleGroupList, error) {
func (r *Ruler) listRules(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
switch {
case !r.cfg.EnableSharding:
return r.listRulesNoSharding(ctx)
Expand All @@ -486,17 +486,17 @@ func (r *Ruler) listRules(ctx context.Context) (map[string]rulestore.RuleGroupLi
}
}

func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulestore.RuleGroupList, error) {
func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
return r.store.ListAllRuleGroups(ctx)
}

func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulestore.RuleGroupList, error) {
func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
configs, err := r.store.ListAllRuleGroups(ctx)
if err != nil {
return nil, err
}

filteredConfigs := make(map[string]rulestore.RuleGroupList)
filteredConfigs := make(map[string]rulespb.RuleGroupList)
for userID, groups := range configs {
filtered := filterRuleGroups(userID, groups, r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
if len(filtered) > 0 {
Expand All @@ -506,7 +506,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulest
return filteredConfigs, nil
}

func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulestore.RuleGroupList, error) {
func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
users, err := r.store.ListAllUsers(ctx)
if err != nil {
return nil, errors.Wrap(err, "unable to list users of ruler")
Expand Down Expand Up @@ -540,7 +540,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulest
close(userCh)

mu := sync.Mutex{}
result := map[string]rulestore.RuleGroupList{}
result := map[string]rulespb.RuleGroupList{}

concurrency := loadRulesConcurrency
if len(userRings) < concurrency {
Expand Down
28 changes: 14 additions & 14 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,15 @@ func TestSharding(t *testing.T) {
user2Group1Token := tokenForGroup(user2Group1)
user3Group1Token := tokenForGroup(user3Group1)

noRules := map[string]rulestore.RuleGroupList{}
allRules := map[string]rulestore.RuleGroupList{
noRules := map[string]rulespb.RuleGroupList{}
allRules := map[string]rulespb.RuleGroupList{
user1: {user1Group1, user1Group2},
user2: {user2Group1},
user3: {user3Group1},
}

// ruler ID -> (user ID -> list of groups).
type expectedRulesMap map[string]map[string]rulestore.RuleGroupList
type expectedRulesMap map[string]map[string]rulespb.RuleGroupList

type testCase struct {
sharding bool
Expand Down Expand Up @@ -321,12 +321,12 @@ func TestSharding(t *testing.T) {
},

expectedRules: expectedRulesMap{
ruler1: map[string]rulestore.RuleGroupList{
ruler1: map[string]rulespb.RuleGroupList{
user1: {user1Group1},
user2: {user2Group1},
},

ruler2: map[string]rulestore.RuleGroupList{
ruler2: map[string]rulespb.RuleGroupList{
user1: {user1Group2},
user3: {user3Group1},
},
Expand All @@ -349,7 +349,7 @@ func TestSharding(t *testing.T) {

expectedRules: expectedRulesMap{
// This ruler doesn't get rules from unhealthy ruler (RF=1).
ruler1: map[string]rulestore.RuleGroupList{
ruler1: map[string]rulespb.RuleGroupList{
user1: {user1Group1},
user2: {user2Group1},
},
Expand Down Expand Up @@ -447,10 +447,10 @@ func TestSharding(t *testing.T) {
},

expectedRules: expectedRulesMap{
ruler1: map[string]rulestore.RuleGroupList{
ruler1: map[string]rulespb.RuleGroupList{
user1: {user1Group1, user1Group2},
},
ruler2: map[string]rulestore.RuleGroupList{
ruler2: map[string]rulespb.RuleGroupList{
user2: {user2Group1},
user3: {user3Group1},
},
Expand All @@ -468,13 +468,13 @@ func TestSharding(t *testing.T) {
},

expectedRules: expectedRulesMap{
ruler1: map[string]rulestore.RuleGroupList{
ruler1: map[string]rulespb.RuleGroupList{
user1: {user1Group1},
},
ruler2: map[string]rulestore.RuleGroupList{
ruler2: map[string]rulespb.RuleGroupList{
user1: {user1Group2},
},
ruler3: map[string]rulestore.RuleGroupList{
ruler3: map[string]rulespb.RuleGroupList{
user2: {user2Group1},
user3: {user3Group1},
},
Expand All @@ -492,11 +492,11 @@ func TestSharding(t *testing.T) {
},

expectedRules: expectedRulesMap{
ruler1: map[string]rulestore.RuleGroupList{
ruler1: map[string]rulespb.RuleGroupList{
user1: {user1Group1, user1Group2},
},
ruler2: noRules, // Ruler2 owns token for user2group1, but user-2 will only be handled by ruler-1 and 3.
ruler3: map[string]rulestore.RuleGroupList{
ruler3: map[string]rulespb.RuleGroupList{
user2: {user2Group1},
user3: {user3Group1},
},
Expand Down Expand Up @@ -583,7 +583,7 @@ func TestSharding(t *testing.T) {
require.NoError(t, err)
// Normalize nil map to empty one.
if loaded == nil {
loaded = map[string]rulestore.RuleGroupList{}
loaded = map[string]rulespb.RuleGroupList{}
}
expected[id] = loaded
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/ruler/rulespb/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package rulespb

import "github.com/prometheus/prometheus/pkg/rulefmt"

// RuleGroupList contains a set of rule groups
type RuleGroupList []*RuleGroupDesc

// Formatted returns the rule group list as a set of formatted rule groups mapped
// by namespace
func (l RuleGroupList) Formatted() map[string][]rulefmt.RuleGroup {
ruleMap := map[string][]rulefmt.RuleGroup{}
for _, g := range l {
if _, exists := ruleMap[g.Namespace]; !exists {
ruleMap[g.Namespace] = []rulefmt.RuleGroup{FromProto(g)}
continue
}
ruleMap[g.Namespace] = append(ruleMap[g.Namespace], FromProto(g))

}
return ruleMap
}
10 changes: 5 additions & 5 deletions pkg/ruler/rulestore/bucketclient/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func (b *BucketRuleStore) ListAllUsers(ctx context.Context) ([]string, error) {
}

// ListAllRuleGroups implements rules.RuleStore.
func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rulestore.RuleGroupList, error) {
out := map[string]rulestore.RuleGroupList{}
func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
out := map[string]rulespb.RuleGroupList{}

// List rule groups for all tenants.
err := b.bucket.Iter(ctx, "", func(key string) error {
Expand Down Expand Up @@ -126,10 +126,10 @@ func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rul
}

// ListRuleGroupsForUserAndNamespace implements rules.RuleStore.
func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulestore.RuleGroupList, error) {
func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) {
userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider)

groupList := rulestore.RuleGroupList{}
groupList := rulespb.RuleGroupList{}

// The prefix to list objects depends on whether the namespace has been
// specified in the request.
Expand Down Expand Up @@ -162,7 +162,7 @@ func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context,
}

// LoadRuleGroups implements rules.RuleStore.
func (b *BucketRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulestore.RuleGroupList) error {
func (b *BucketRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) error {
ch := make(chan *rulespb.RuleGroupDesc)

// Given we store one file per rule group. With this, we create a pool of workers that will
Expand Down
15 changes: 7 additions & 8 deletions pkg/ruler/rulestore/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,24 @@ import (
"flag"

"github.com/cortexproject/cortex/pkg/configs/client"
"github.com/cortexproject/cortex/pkg/ruler/rulestore/configdb"
"github.com/cortexproject/cortex/pkg/ruler/rulestore/local"
"github.com/cortexproject/cortex/pkg/storage/bucket"
)

const (
ConfigDB = "configdb"

Name = "ruler-storage"
prefix = "ruler-storage."
)

// Config configures a rule store.
type Config struct {
bucket.Config `yaml:",inline"`
ConfigDB client.Config `yaml:"configdb"`
Local local.Config `yaml:"local"`
}

// RegisterFlags registers the backend storage config.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.ExtraBackends = []string{ConfigDB}
prefix := "ruler-storage."

cfg.ExtraBackends = []string{configdb.Name, local.Name}
cfg.ConfigDB.RegisterFlagsWithPrefix(prefix, f)
cfg.Local.RegisterFlagsWithPrefix(prefix, f)
cfg.RegisterFlagsWithPrefix(prefix, f)
}
Loading