Skip to content

Commit 3a74a24

Browse files
committed
Support Ruler to query Query Frontend
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 1c9c53b commit 3a74a24

14 files changed

+763
-27
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## master / unreleased
44

5+
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
56
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
67
* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869
78
* [ENHANCEMENT] Ingester/Ring: New `READONLY` status on ring to be used by Ingester. New ingester API to change mode of ingester #6163

docs/configuration/config-file-reference.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4067,6 +4067,80 @@ The `redis_config` configures the Redis backend cache.
40674067
The `ruler_config` configures the Cortex ruler.
40684068

40694069
```yaml
4070+
# [Experimental] GRPC listen address of the Query Frontend, in host:port format.
4071+
# If set, Ruler queries to Query Frontends via gRPC. If not set, ruler queries
4072+
# to Ingesters directly.
4073+
# CLI flag: -ruler.frontend-address
4074+
[frontend_address: <string> | default = ""]
4075+
4076+
frontend_client:
4077+
# gRPC client max receive message size (bytes).
4078+
# CLI flag: -ruler.frontendClient.grpc-max-recv-msg-size
4079+
[max_recv_msg_size: <int> | default = 104857600]
4080+
4081+
# gRPC client max send message size (bytes).
4082+
# CLI flag: -ruler.frontendClient.grpc-max-send-msg-size
4083+
[max_send_msg_size: <int> | default = 16777216]
4084+
4085+
# Use compression when sending messages. Supported values are: 'gzip',
4086+
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
4087+
# CLI flag: -ruler.frontendClient.grpc-compression
4088+
[grpc_compression: <string> | default = ""]
4089+
4090+
# Rate limit for gRPC client; 0 means disabled.
4091+
# CLI flag: -ruler.frontendClient.grpc-client-rate-limit
4092+
[rate_limit: <float> | default = 0]
4093+
4094+
# Rate limit burst for gRPC client.
4095+
# CLI flag: -ruler.frontendClient.grpc-client-rate-limit-burst
4096+
[rate_limit_burst: <int> | default = 0]
4097+
4098+
# Enable backoff and retry when we hit ratelimits.
4099+
# CLI flag: -ruler.frontendClient.backoff-on-ratelimits
4100+
[backoff_on_ratelimits: <boolean> | default = false]
4101+
4102+
backoff_config:
4103+
# Minimum delay when backing off.
4104+
# CLI flag: -ruler.frontendClient.backoff-min-period
4105+
[min_period: <duration> | default = 100ms]
4106+
4107+
# Maximum delay when backing off.
4108+
# CLI flag: -ruler.frontendClient.backoff-max-period
4109+
[max_period: <duration> | default = 10s]
4110+
4111+
# Number of times to backoff and retry before failing.
4112+
# CLI flag: -ruler.frontendClient.backoff-retries
4113+
[max_retries: <int> | default = 10]
4114+
4115+
# Enable TLS in the GRPC client. This flag needs to be enabled when any other
4116+
# TLS flag is set. If set to false, insecure connection to gRPC server will be
4117+
# used.
4118+
# CLI flag: -ruler.frontendClient.tls-enabled
4119+
[tls_enabled: <boolean> | default = false]
4120+
4121+
# Path to the client certificate file, which will be used for authenticating
4122+
# with the server. Also requires the key path to be configured.
4123+
# CLI flag: -ruler.frontendClient.tls-cert-path
4124+
[tls_cert_path: <string> | default = ""]
4125+
4126+
# Path to the key file for the client certificate. Also requires the client
4127+
# certificate to be configured.
4128+
# CLI flag: -ruler.frontendClient.tls-key-path
4129+
[tls_key_path: <string> | default = ""]
4130+
4131+
# Path to the CA certificates file to validate server certificate against. If
4132+
# not set, the host's root CA certificates are used.
4133+
# CLI flag: -ruler.frontendClient.tls-ca-path
4134+
[tls_ca_path: <string> | default = ""]
4135+
4136+
# Override the expected name on the server certificate.
4137+
# CLI flag: -ruler.frontendClient.tls-server-name
4138+
[tls_server_name: <string> | default = ""]
4139+
4140+
# Skip validating server certificate.
4141+
# CLI flag: -ruler.frontendClient.tls-insecure-skip-verify
4142+
[tls_insecure_skip_verify: <boolean> | default = false]
4143+
40704144
# URL of alerts return path.
40714145
# CLI flag: -ruler.external.url
40724146
[external_url: <url> | default = ]

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ Cortex is an actively developed project and we want to encourage the introductio
3535

3636
Currently experimental features are:
3737

38+
- Ruler: Evaluate rules to query frontend instead of ingesters (enabled via `-ruler.frontend-address` )
3839
- S3 Server Side Encryption (SSE) using KMS (including per-tenant KMS config overrides).
3940
- Azure blob storage.
4041
- Zone awareness based replication.

integration/ruler_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,6 +1235,72 @@ func TestRulerKeepFiring(t *testing.T) {
12351235
require.Equal(t, 0, len(alert.Alerts)) // alert should be resolved once keepFiringFor time expires
12361236
}
12371237

1238+
func TestRulerEvalWithQueryFrontend(t *testing.T) {
1239+
s, err := e2e.NewScenario(networkName)
1240+
require.NoError(t, err)
1241+
defer s.Close()
1242+
1243+
// Start dependencies.
1244+
consul := e2edb.NewConsul()
1245+
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
1246+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1247+
1248+
// Configure the ruler.
1249+
flags := mergeFlags(
1250+
BlocksStorageFlags(),
1251+
RulerFlags(),
1252+
map[string]string{
1253+
// Evaluate rules often, so that we don't need to wait for metrics to show up.
1254+
"-ruler.evaluation-interval": "2s",
1255+
// We run single ingester only, no replication.
1256+
"-distributor.replication-factor": "1",
1257+
},
1258+
)
1259+
1260+
const namespace = "test"
1261+
const user = "user"
1262+
1263+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1264+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1265+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
1266+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
1267+
require.NoError(t, s.Start(queryFrontend))
1268+
1269+
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1270+
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1271+
}), "")
1272+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1273+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1274+
}), "")
1275+
require.NoError(t, s.StartAndWaitReady(ruler, querier))
1276+
1277+
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
1278+
require.NoError(t, err)
1279+
1280+
expression := "metric"
1281+
groupName := "rule_group"
1282+
ruleName := "rule_name"
1283+
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))
1284+
1285+
rgMatcher := ruleGroupMatcher(user, namespace, groupName)
1286+
// Wait until ruler has loaded the group.
1287+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1288+
// Wait until rule group has tried to evaluate the rule.
1289+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1290+
1291+
matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
1292+
// Check that cortex_ruler_query_frontend_clients went up
1293+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
1294+
// Check that cortex_ruler_queries_total went up
1295+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1296+
// Check that cortex_ruler_queries_failed_total is zero
1297+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1298+
// Check that cortex_ruler_write_requests_total went up
1299+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1300+
// Check that cortex_ruler_write_requests_failed_total is zero
1301+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1302+
}
1303+
12381304
func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule {
12391305
responseJson, err := json.Marshal(rules)
12401306
require.NoError(t, err)

pkg/cortex/modules.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,8 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
548548
}
549549

550550
t.Cfg.Ruler.LookbackDelta = t.Cfg.Querier.LookbackDelta
551+
t.Cfg.Ruler.FrontendTimeout = t.Cfg.Querier.Timeout
552+
t.Cfg.Ruler.PrometheusHTTPPrefix = t.Cfg.API.PrometheusHTTPPrefix
551553
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
552554
metrics := ruler.NewRuleEvalMetrics(t.Cfg.Ruler, prometheus.DefaultRegisterer)
553555

pkg/ruler/compat.go

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/cortexproject/cortex/pkg/cortexpb"
2525
"github.com/cortexproject/cortex/pkg/querier"
2626
"github.com/cortexproject/cortex/pkg/querier/stats"
27+
"github.com/cortexproject/cortex/pkg/ring/client"
2728
util_log "github.com/cortexproject/cortex/pkg/util/log"
2829
promql_util "github.com/cortexproject/cortex/pkg/util/promql"
2930
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -157,7 +158,7 @@ type RulesLimits interface {
157158
// EngineQueryFunc returns a new engine query function validating max queryLength.
158159
// Modified from Prometheus rules.EngineQueryFunc
159160
// https://github.com/prometheus/prometheus/blob/v2.39.1/rules/manager.go#L189.
160-
func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
161+
func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
161162
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
162163
// Enforce the max query length.
163164
maxQueryLength := overrides.MaxQueryLength(userID)
@@ -174,25 +175,34 @@ func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides R
174175
}
175176
}
176177

177-
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
178-
if err != nil {
179-
return nil, err
180-
}
181-
res := q.Exec(ctx)
182-
if res.Err != nil {
183-
return nil, res.Err
184-
}
185-
switch v := res.Value.(type) {
186-
case promql.Vector:
178+
if frontendClient != nil {
179+
v, err := frontendClient.InstantQuery(ctx, qs, t)
180+
if err != nil {
181+
return nil, err
182+
}
183+
187184
return v, nil
188-
case promql.Scalar:
189-
return promql.Vector{promql.Sample{
190-
T: v.T,
191-
F: v.V,
192-
Metric: labels.Labels{},
193-
}}, nil
194-
default:
195-
return nil, errors.New("rule result is not a vector or scalar")
185+
} else {
186+
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
187+
if err != nil {
188+
return nil, err
189+
}
190+
res := q.Exec(ctx)
191+
if res.Err != nil {
192+
return nil, res.Err
193+
}
194+
switch v := res.Value.(type) {
195+
case promql.Vector:
196+
return v, nil
197+
case promql.Scalar:
198+
return promql.Vector{promql.Sample{
199+
T: v.T,
200+
F: v.V,
201+
Metric: labels.Labels{},
202+
}}, nil
203+
default:
204+
return nil, errors.New("rule result is not a vector or scalar")
205+
}
196206
}
197207
}
198208
}
@@ -300,22 +310,30 @@ type RulesManager interface {
300310
}
301311

302312
// ManagerFactory is a function that creates new RulesManager for given user and notifier.Manager.
303-
type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager
313+
type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, frontendPool *client.Pool, reg prometheus.Registerer) (RulesManager, error)
304314

305315
func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine promql.QueryEngine, overrides RulesLimits, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer) ManagerFactory {
306316
// Wrap errors returned by Queryable to our wrapper, so that we can distinguish between those errors
307317
// and errors returned by PromQL engine. Errors from Queryable can be either caused by user (limits) or internal errors.
308318
// Errors from PromQL are always "user" errors.
309319
q = querier.NewErrorTranslateQueryableWithFn(q, WrapQueryableErrors)
310320

311-
return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager {
321+
return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, frontendPool *client.Pool, reg prometheus.Registerer) (RulesManager, error) {
322+
var client *frontendClient
312323
failedQueries := evalMetrics.FailedQueriesVec.WithLabelValues(userID)
313324
totalQueries := evalMetrics.TotalQueriesVec.WithLabelValues(userID)
314325
totalWrites := evalMetrics.TotalWritesVec.WithLabelValues(userID)
315326
failedWrites := evalMetrics.FailedWritesVec.WithLabelValues(userID)
316327

328+
if cfg.FrontendAddress != "" {
329+
c, err := frontendPool.GetClientFor(cfg.FrontendAddress)
330+
if err != nil {
331+
return nil, err
332+
}
333+
client = c.(*frontendClient)
334+
}
317335
var queryFunc rules.QueryFunc
318-
engineQueryFunc := EngineQueryFunc(engine, q, overrides, userID, cfg.LookbackDelta)
336+
engineQueryFunc := EngineQueryFunc(engine, client, q, overrides, userID, cfg.LookbackDelta)
319337
metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries)
320338
if cfg.EnableQueryStats {
321339
queryFunc = RecordAndReportRuleQueryMetrics(metricsQueryFunc, userID, evalMetrics, logger)
@@ -340,7 +358,7 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
340358
DefaultRuleQueryOffset: func() time.Duration {
341359
return overrides.RulerQueryOffset(userID)
342360
},
343-
})
361+
}), nil
344362
}
345363
}
346364

pkg/ruler/frontend_client.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package ruler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"net/textproto"
8+
"net/url"
9+
"strconv"
10+
"time"
11+
12+
"github.com/go-kit/log/level"
13+
"github.com/prometheus/common/version"
14+
"github.com/prometheus/prometheus/promql"
15+
"github.com/weaveworks/common/httpgrpc"
16+
"github.com/weaveworks/common/user"
17+
18+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
19+
)
20+
21+
const (
22+
orgIDHeader = "X-Scope-OrgID"
23+
instantQueryPath = "/api/v1/query"
24+
mimeTypeForm = "application/x-www-form-urlencoded"
25+
contentTypeJSON = "application/json"
26+
)
27+
28+
type FrontendClient struct {
29+
client httpgrpc.HTTPClient
30+
timeout time.Duration
31+
prometheusHTTPPrefix string
32+
jsonDecoder JsonDecoder
33+
}
34+
35+
func NewFrontendClient(client httpgrpc.HTTPClient, timeout time.Duration, prometheusHTTPPrefix string) *FrontendClient {
36+
return &FrontendClient{
37+
client: client,
38+
timeout: timeout,
39+
prometheusHTTPPrefix: prometheusHTTPPrefix,
40+
jsonDecoder: JsonDecoder{},
41+
}
42+
}
43+
44+
func (p *FrontendClient) makeRequest(ctx context.Context, qs string, ts time.Time) (*httpgrpc.HTTPRequest, error) {
45+
args := make(url.Values)
46+
args.Set("query", qs)
47+
if !ts.IsZero() {
48+
args.Set("time", ts.Format(time.RFC3339Nano))
49+
}
50+
body := []byte(args.Encode())
51+
52+
//lint:ignore faillint wrapper around upstream method
53+
orgID, err := user.ExtractOrgID(ctx)
54+
if err != nil {
55+
return nil, err
56+
}
57+
58+
req := &httpgrpc.HTTPRequest{
59+
Method: http.MethodPost,
60+
Url: p.prometheusHTTPPrefix + instantQueryPath,
61+
Body: body,
62+
Headers: []*httpgrpc.Header{
63+
{Key: textproto.CanonicalMIMEHeaderKey("User-Agent"), Values: []string{fmt.Sprintf("Cortex/%s", version.Version)}},
64+
{Key: textproto.CanonicalMIMEHeaderKey("Content-Type"), Values: []string{mimeTypeForm}},
65+
{Key: textproto.CanonicalMIMEHeaderKey("Content-Length"), Values: []string{strconv.Itoa(len(body))}},
66+
{Key: textproto.CanonicalMIMEHeaderKey("Accept"), Values: []string{contentTypeJSON}},
67+
{Key: textproto.CanonicalMIMEHeaderKey(orgIDHeader), Values: []string{orgID}},
68+
},
69+
}
70+
71+
return req, nil
72+
}
73+
74+
func (p *FrontendClient) InstantQuery(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
75+
log, ctx := spanlogger.New(ctx, "FrontendClient.InstantQuery")
76+
defer log.Span.Finish()
77+
78+
req, err := p.makeRequest(ctx, qs, t)
79+
if err != nil {
80+
level.Error(log).Log("err", err, "query", qs)
81+
return nil, err
82+
}
83+
84+
ctx, cancel := context.WithTimeout(ctx, p.timeout)
85+
defer cancel()
86+
87+
resp, err := p.client.Handle(ctx, req)
88+
89+
if err != nil {
90+
level.Error(log).Log("err", err, "query", qs)
91+
return nil, err
92+
}
93+
94+
vector, warning, err := p.jsonDecoder.Decode(resp.Body)
95+
if err != nil {
96+
level.Error(log).Log("err", err, "query", qs)
97+
return nil, err
98+
}
99+
100+
if len(warning) > 0 {
101+
level.Warn(log).Log("warnings", warning, "query", qs)
102+
}
103+
104+
return vector, nil
105+
}

0 commit comments

Comments
 (0)