Skip to content

Refactor Rules Protos #2226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [CHANGE] Utilize separate protos for rule state and storage. Experimental ruler API will not be functional until the rollout is complete. #2226
* [FEATURE] Flusher target to flush the WAL.
* `-flusher.wal-dir` for the WAL directory to recover from.
* `-flusher.concurrent-flushes` for number of concurrent flushes.
Expand Down
40 changes: 20 additions & 20 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ func respondError(logger log.Logger, w http.ResponseWriter, msg string) {
func (r *Ruler) rules(w http.ResponseWriter, req *http.Request) {
logger := util.WithContext(req.Context(), util.Logger)
userID, ctx, err := user.ExtractOrgIDFromHTTPRequest(req)
if err != nil {
if err != nil || userID == "" {
level.Error(logger).Log("msg", "error extracting org id from context", "err", err)
respondError(logger, w, "no valid org id found")
return
}

w.Header().Set("Content-Type", "application/json")
rgs, err := r.GetRules(ctx, userID)
rgs, err := r.GetRules(ctx)

if err != nil {
respondError(logger, w, err.Error())
Expand All @@ -147,16 +147,16 @@ func (r *Ruler) rules(w http.ResponseWriter, req *http.Request) {

for _, g := range rgs {
grp := RuleGroup{
Name: g.Name,
File: g.Namespace,
Rules: make([]rule, len(g.Rules)),
Interval: g.Interval.Seconds(),
Name: g.Group.Name,
File: g.Group.Namespace,
Rules: make([]rule, len(g.ActiveRules)),
Interval: g.Group.Interval.Seconds(),
LastEvaluation: g.GetEvaluationTimestamp(),
EvaluationTime: g.GetEvaluationDuration().Seconds(),
}

for i, rl := range g.Rules {
if g.Rules[i].Alert != "" {
for i, rl := range g.ActiveRules {
if g.ActiveRules[i].Rule.Alert != "" {
alerts := make([]*Alert, 0, len(rl.Alerts))
for _, a := range rl.Alerts {
alerts = append(alerts, &Alert{
Expand All @@ -169,11 +169,11 @@ func (r *Ruler) rules(w http.ResponseWriter, req *http.Request) {
}
grp.Rules[i] = alertingRule{
State: rl.GetState(),
Name: rl.GetAlert(),
Query: rl.GetExpr(),
Duration: rl.For.Seconds(),
Labels: client.FromLabelAdaptersToLabels(rl.Labels),
Annotations: client.FromLabelAdaptersToLabels(rl.Annotations),
Name: rl.Rule.GetAlert(),
Query: rl.Rule.GetExpr(),
Duration: rl.Rule.For.Seconds(),
Labels: client.FromLabelAdaptersToLabels(rl.Rule.Labels),
Annotations: client.FromLabelAdaptersToLabels(rl.Rule.Annotations),
Alerts: alerts,
Health: rl.GetHealth(),
LastError: rl.GetLastError(),
Expand All @@ -183,9 +183,9 @@ func (r *Ruler) rules(w http.ResponseWriter, req *http.Request) {
}
} else {
grp.Rules[i] = recordingRule{
Name: rl.GetRecord(),
Query: rl.GetExpr(),
Labels: client.FromLabelAdaptersToLabels(rl.Labels),
Name: rl.Rule.GetRecord(),
Query: rl.Rule.GetExpr(),
Labels: client.FromLabelAdaptersToLabels(rl.Rule.Labels),
Health: rl.GetHealth(),
LastError: rl.GetLastError(),
LastEvaluation: rl.GetEvaluationTimestamp(),
Expand Down Expand Up @@ -221,14 +221,14 @@ func (r *Ruler) rules(w http.ResponseWriter, req *http.Request) {
func (r *Ruler) alerts(w http.ResponseWriter, req *http.Request) {
logger := util.WithContext(req.Context(), util.Logger)
userID, ctx, err := user.ExtractOrgIDFromHTTPRequest(req)
if err != nil {
if err != nil || userID == "" {
level.Error(logger).Log("msg", "error extracting org id from context", "err", err)
respondError(logger, w, "no valid org id found")
return
}

w.Header().Set("Content-Type", "application/json")
rgs, err := r.GetRules(ctx, userID)
rgs, err := r.GetRules(ctx)

if err != nil {
respondError(logger, w, err.Error())
Expand All @@ -238,8 +238,8 @@ func (r *Ruler) alerts(w http.ResponseWriter, req *http.Request) {
alerts := []*Alert{}

for _, g := range rgs {
for _, rl := range g.Rules {
if rl.Alert != "" {
for _, rl := range g.ActiveRules {
if rl.Rule.Alert != "" {
for _, a := range rl.Alerts {
alerts = append(alerts, &Alert{
Labels: client.FromLabelAdaptersToLabels(a.Labels),
Expand Down
63 changes: 36 additions & 27 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,32 +472,39 @@ func (r *Ruler) newManager(ctx context.Context, userID string) (*promRules.Manag

// GetRules retrieves the running rules from this ruler and all running rulers in the ring if
// sharding is enabled
func (r *Ruler) GetRules(ctx context.Context, userID string) ([]*rules.RuleGroupDesc, error) {
func (r *Ruler) GetRules(ctx context.Context) ([]*GroupStateDesc, error) {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, fmt.Errorf("no user id found in context")
}

if r.cfg.EnableSharding {
return r.getShardedRules(ctx, userID)
return r.getShardedRules(ctx)
}

return r.getLocalRules(userID)
}

func (r *Ruler) getLocalRules(userID string) ([]*rules.RuleGroupDesc, error) {
func (r *Ruler) getLocalRules(userID string) ([]*GroupStateDesc, error) {
var groups []*promRules.Group
r.userManagerMtx.Lock()
if mngr, exists := r.userManagers[userID]; exists {
groups = mngr.RuleGroups()
}
r.userManagerMtx.Unlock()

groupDescs := make([]*rules.RuleGroupDesc, 0, len(groups))
groupDescs := make([]*GroupStateDesc, 0, len(groups))
prefix := filepath.Join(r.cfg.RulePath, userID) + "/"

for _, group := range groups {
interval := group.Interval()
groupDesc := &rules.RuleGroupDesc{
Name: group.Name(),
Namespace: strings.TrimPrefix(group.File(), prefix),
Interval: interval,
User: userID,
groupDesc := &GroupStateDesc{
Group: &rules.RuleGroupDesc{
Name: group.Name(),
Namespace: strings.TrimPrefix(group.File(), prefix),
Interval: interval,
User: userID,
},
EvaluationTimestamp: group.GetEvaluationTimestamp(),
EvaluationDuration: group.GetEvaluationDuration(),
}
Expand All @@ -507,13 +514,13 @@ func (r *Ruler) getLocalRules(userID string) ([]*rules.RuleGroupDesc, error) {
lastError = r.LastError().Error()
}

var ruleDesc *rules.RuleDesc
var ruleDesc *RuleStateDesc
switch rule := r.(type) {
case *promRules.AlertingRule:
rule.ActiveAlerts()
alerts := []*rules.AlertDesc{}
alerts := []*AlertStateDesc{}
for _, a := range rule.ActiveAlerts() {
alerts = append(alerts, &rules.AlertDesc{
alerts = append(alerts, &AlertStateDesc{
State: a.State.String(),
Labels: client.FromLabelsToLabelAdapters(a.Labels),
Annotations: client.FromLabelsToLabelAdapters(a.Annotations),
Expand All @@ -525,12 +532,14 @@ func (r *Ruler) getLocalRules(userID string) ([]*rules.RuleGroupDesc, error) {
ValidUntil: a.ValidUntil,
})
}
ruleDesc = &rules.RuleDesc{
Expr: rule.Query().String(),
Alert: rule.Name(),
For: rule.Duration(),
Labels: client.FromLabelsToLabelAdapters(rule.Labels()),
Annotations: client.FromLabelsToLabelAdapters(rule.Annotations()),
ruleDesc = &RuleStateDesc{
Rule: &rules.RuleDesc{
Expr: rule.Query().String(),
Alert: rule.Name(),
For: rule.Duration(),
Labels: client.FromLabelsToLabelAdapters(rule.Labels()),
Annotations: client.FromLabelsToLabelAdapters(rule.Annotations()),
},
State: rule.State().String(),
Health: string(rule.Health()),
LastError: lastError,
Expand All @@ -539,10 +548,12 @@ func (r *Ruler) getLocalRules(userID string) ([]*rules.RuleGroupDesc, error) {
EvaluationDuration: rule.GetEvaluationDuration(),
}
case *promRules.RecordingRule:
ruleDesc = &rules.RuleDesc{
Record: rule.Name(),
Expr: rule.Query().String(),
Labels: client.FromLabelsToLabelAdapters(rule.Labels()),
ruleDesc = &RuleStateDesc{
Rule: &rules.RuleDesc{
Record: rule.Name(),
Expr: rule.Query().String(),
Labels: client.FromLabelsToLabelAdapters(rule.Labels()),
},
Health: string(rule.Health()),
LastError: lastError,
EvaluationTimestamp: rule.GetEvaluationTimestamp(),
Expand All @@ -551,14 +562,14 @@ func (r *Ruler) getLocalRules(userID string) ([]*rules.RuleGroupDesc, error) {
default:
return nil, errors.Errorf("failed to assert type of rule '%v'", rule.Name())
}
groupDesc.Rules = append(groupDesc.Rules, ruleDesc)
groupDesc.ActiveRules = append(groupDesc.ActiveRules, ruleDesc)
}
groupDescs = append(groupDescs, groupDesc)
}
return groupDescs, nil
}

func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*rules.RuleGroupDesc, error) {
func (r *Ruler) getShardedRules(ctx context.Context) ([]*GroupStateDesc, error) {
rulers, err := r.ring.GetAll()
if err != nil {
return nil, err
Expand All @@ -569,9 +580,7 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*rules.Ru
return nil, fmt.Errorf("unable to inject user ID into grpc request, %v", err)
}

// len(rgs) can't be larger than len(rulers.Ingesters)
// alloc it in advance to avoid realloc
rgs := make([]*rules.RuleGroupDesc, 0, len(rulers.Ingesters))
rgs := []*GroupStateDesc{}

for _, rlr := range rulers.Ingesters {
conn, err := grpc.Dial(rlr.Addr, grpc.WithInsecure())
Expand Down
Loading