Skip to content

Commit a32919e

Browse files
jtlisipstibrany
andauthored
Ruler API and ObjectStore Backend (#2269)
* add ruler API backed by object storage Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com> * simplify and comment ruler storage api Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com> * wrape calls to ruler with auth middleware Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com> * use saner metrics for ruler integration test Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com> * remove omitempty from configs Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com> * add clarifying comments for ruler integration tests and reorder metric checks Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com> * update changelog Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com> * update docs Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com> * Update pkg/ruler/api.go Co-Authored-By: Peter Štibraný <peter.stibrany@grafana.com> Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com> * refactor per PR comments Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com> * fix API per PR comments Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com> Co-authored-by: Peter Štibraný <peter.stibrany@grafana.com>
1 parent 228090e commit a32919e

29 files changed

+920
-55
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
* [CHANGE] Utilize separate protos for rule state and storage. Experimental ruler API will not be functional until the rollout is complete. #2226
77
* [CHANGE] Frontend worker in querier now starts after all Querier module dependencies are started. This fixes issue where frontend worker started to send queries to querier before it was ready to serve them (mostly visible when using experimental blocks storage). #2246
88
* [CHANGE] Lifecycler component now enters Failed state on errors, and doesn't exit the process. (Important if you're vendoring Cortex and use Lifecycler) #2251
9+
* [FEATURE] Added experimental storage API to the ruler service that is enabled when the `-experimental.ruler.enable-api` is set to true #2269
10+
* `-ruler.storage.type` flag now allows `s3`,`gcs`, and `azure` values
11+
* `-ruler.storage.(s3|gcs|azure)` flags exist to allow the configuration of object clients set for rule storage
912
* [FEATURE] Flusher target to flush the WAL.
1013
* `-flusher.wal-dir` for the WAL directory to recover from.
1114
* `-flusher.concurrent-flushes` for number of concurrent flushes.

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ pkg/querier/queryrange/queryrange.pb.go: pkg/querier/queryrange/queryrange.proto
5656
pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto
5757
pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
5858
pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto
59+
pkg/ruler/ruler.pb.go: pkg/ruler/rules/rules.proto
5960
pkg/ring/kv/memberlist/kv.pb.go: pkg/ring/kv/memberlist/kv.proto
6061

6162
all: $(UPTODATE_FILES)

docs/configuration/config-file-reference.md

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,7 @@ The `ruler_config` configures the Cortex ruler.
708708
[pollinterval: <duration> | default = 1m0s]
709709
710710
storeconfig:
711-
# Method to use for backend rule storage (configdb)
711+
# Method to use for backend rule storage (configdb, azure, gcs, s3)
712712
# CLI flag: -ruler.storage.type
713713
[type: <string> | default = "configdb"]
714714
@@ -717,6 +717,79 @@ storeconfig:
717717
# The CLI flags prefix for this block config is: ruler
718718
[configdb: <configstore_config>]
719719
720+
azure:
721+
# Name of the blob container used to store chunks. Defaults to `cortex`.
722+
# This container must be created before running cortex.
723+
# CLI flag: -ruler.storage.azure.container-name
724+
[container_name: <string> | default = "cortex"]
725+
726+
# The Microsoft Azure account name to be used
727+
# CLI flag: -ruler.storage.azure.account-name
728+
[account_name: <string> | default = ""]
729+
730+
# The Microsoft Azure account key to use.
731+
# CLI flag: -ruler.storage.azure.account-key
732+
[account_key: <string> | default = ""]
733+
734+
# Preallocated buffer size for downloads (default is 512KB)
735+
# CLI flag: -ruler.storage.azure.download-buffer-size
736+
[download_buffer_size: <int> | default = 512000]
737+
738+
# Preallocated buffer size for up;oads (default is 256KB)
739+
# CLI flag: -ruler.storage.azure.upload-buffer-size
740+
[upload_buffer_size: <int> | default = 256000]
741+
742+
# Number of buffers used to used to upload a chunk. (defaults to 1)
743+
# CLI flag: -ruler.storage.azure.download-buffer-count
744+
[upload_buffer_count: <int> | default = 1]
745+
746+
# Timeout for requests made against azure blob storage. Defaults to 30
747+
# seconds.
748+
# CLI flag: -ruler.storage.azure.request-timeout
749+
[request_timeout: <duration> | default = 30s]
750+
751+
# Number of retries for a request which times out.
752+
# CLI flag: -ruler.storage.azure.max-retries
753+
[max_retries: <int> | default = 5]
754+
755+
# Minimum time to wait before retrying a request.
756+
# CLI flag: -ruler.storage.azure.min-retry-delay
757+
[min_retry_delay: <duration> | default = 10ms]
758+
759+
# Maximum time to wait before retrying a request.
760+
# CLI flag: -ruler.storage.azure.max-retry-delay
761+
[max_retry_delay: <duration> | default = 500ms]
762+
763+
gcs:
764+
# Name of GCS bucket to put chunks in.
765+
# CLI flag: -ruler.storage.gcs.bucketname
766+
[bucket_name: <string> | default = ""]
767+
768+
# The size of the buffer that GCS client for each PUT request. 0 to disable
769+
# buffering.
770+
# CLI flag: -ruler.storage.gcs.chunk-buffer-size
771+
[chunk_buffer_size: <int> | default = 0]
772+
773+
# The duration after which the requests to GCS should be timed out.
774+
# CLI flag: -ruler.storage.gcs.request-timeout
775+
[request_timeout: <duration> | default = 0s]
776+
777+
s3:
778+
# S3 endpoint URL with escaped Key and Secret encoded. If only region is
779+
# specified as a host, proper endpoint will be deduced. Use
780+
# inmemory:///<bucket-name> to use a mock in-memory implementation.
781+
# CLI flag: -ruler.storage.s3.url
782+
[s3: <url> | default = ]
783+
784+
# Comma separated list of bucket names to evenly distribute chunks over.
785+
# Overrides any buckets specified in s3.url flag
786+
# CLI flag: -ruler.storage.s3.buckets
787+
[bucketnames: <string> | default = ""]
788+
789+
# Set this to `true` to force the request to use path-style addressing.
790+
# CLI flag: -ruler.storage.s3.force-path-style
791+
[s3forcepathstyle: <boolean> | default = false]
792+
720793
# file path to store temporary rule files for the prometheus rule managers
721794
# CLI flag: -ruler.rule-path
722795
[rulepath: <string> | default = "/rules"]

integration/alertmanager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestAlertmanager(t *testing.T) {
2323
require.NoError(t, s.StartAndWaitReady(alertmanager))
2424
require.NoError(t, alertmanager.WaitSumMetrics(e2e.Equals(1), "cortex_alertmanager_configs"))
2525

26-
c, err := e2ecortex.NewClient("", "", alertmanager.HTTPEndpoint(), "user-1")
26+
c, err := e2ecortex.NewClient("", "", alertmanager.HTTPEndpoint(), "", "user-1")
2727
require.NoError(t, err)
2828

2929
cfg, err := c.GetAlertmanagerConfig(context.Background())

integration/api_ruler_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// +build requires_docker
2+
3+
package main
4+
5+
import (
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
10+
"github.com/cortexproject/cortex/integration/e2e"
11+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
12+
"github.com/cortexproject/cortex/integration/e2ecortex"
13+
rulefmt "github.com/cortexproject/cortex/pkg/ruler/legacy_rulefmt"
14+
)
15+
16+
func TestRulerAPI(t *testing.T) {
17+
s, err := e2e.NewScenario(networkName)
18+
require.NoError(t, err)
19+
defer s.Close()
20+
21+
// Start dependencies.
22+
dynamo := e2edb.NewDynamoDB()
23+
minio := e2edb.NewMinio(9000, RulerConfigs["-ruler.storage.s3.buckets"])
24+
require.NoError(t, s.StartAndWaitReady(minio, dynamo))
25+
26+
// Start Cortex components.
27+
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
28+
ruler := e2ecortex.NewRuler("ruler", mergeFlags(ChunksStorageFlags, RulerConfigs), "")
29+
require.NoError(t, s.StartAndWaitReady(ruler))
30+
31+
// Create a client with the ruler address configured
32+
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), "user-1")
33+
require.NoError(t, err)
34+
35+
// Create example namespace and rule group to use for tests, using strings that
36+
// require url escaping.
37+
namespace := "test_encoded_+namespace?"
38+
rg := rulefmt.RuleGroup{
39+
Name: "test_encoded_+\"+group_name?",
40+
Interval: 100,
41+
Rules: []rulefmt.Rule{
42+
rulefmt.Rule{
43+
Record: "test_rule",
44+
Expr: "up",
45+
},
46+
},
47+
}
48+
49+
// Set the rule group into the ruler
50+
require.NoError(t, c.SetRuleGroup(rg, namespace))
51+
52+
// Wait until the user manager is created
53+
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1), "cortex_ruler_managers_total"))
54+
55+
// Check to ensure the rules running in the ruler match what was set
56+
rgs, err := c.GetRuleGroups()
57+
require.NoError(t, err)
58+
59+
retrievedNamespace, exists := rgs[namespace]
60+
require.True(t, exists)
61+
require.Len(t, retrievedNamespace, 1)
62+
require.Equal(t, retrievedNamespace[0].Name, rg.Name)
63+
64+
// Delete the set rule group
65+
require.NoError(t, c.DeleteRuleGroup(namespace, rg.Name))
66+
67+
// Wait until the users manager has been terminated
68+
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(0), "cortex_ruler_managers_total"))
69+
70+
// Check to ensure the rule groups are no longer active
71+
_, err = c.GetRuleGroups()
72+
require.Error(t, err)
73+
}

integration/backward_compatibility_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage s
7373
now := time.Now()
7474
series, expectedVector := generateSeries("series_1", now)
7575

76-
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "user-1")
76+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
7777
require.NoError(t, err)
7878

7979
res, err := c.Push(series)
@@ -103,7 +103,7 @@ func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage s
103103
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
104104

105105
// Query the series
106-
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1")
106+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
107107
require.NoError(t, err)
108108

109109
result, err := c.Query("series_1", now)

integration/chunks_storage_backends_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func TestChunksStorageAllIndexBackends(t *testing.T) {
8383
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
8484
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
8585

86-
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1")
86+
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
8787
require.NoError(t, err)
8888

8989
// Push and Query some series from Cortex for each day starting from oldest start time from configs until now so that we test all the Index Stores

integration/configs.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@ var (
5050
"-alertmanager.web.external-url": "http://localhost/api/prom",
5151
}
5252

53+
RulerConfigs = map[string]string{
54+
"-ruler.enable-sharding": "false",
55+
"-ruler.poll-interval": "2s",
56+
"-experimental.ruler.enable-api": "true",
57+
"-ruler.storage.type": "s3",
58+
"-ruler.storage.s3.buckets": "cortex-rules",
59+
"-ruler.storage.s3.force-path-style": "true",
60+
"-ruler.storage.s3.url": fmt.Sprintf("s3://%s:%s@%s-minio-9000.:9000", e2edb.MinioAccessKey, e2edb.MinioSecretKey, networkName),
61+
}
62+
5363
BlocksStorageFlags = map[string]string{
5464
"-store.engine": "tsdb",
5565
"-experimental.tsdb.backend": "s3",

integration/e2ecortex/client.go

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"context"
66
"encoding/json"
77
"fmt"
8+
"io/ioutil"
89
"net/http"
10+
"net/url"
911
"time"
1012

1113
"github.com/gogo/protobuf/proto"
@@ -16,11 +18,14 @@ import (
1618
"github.com/prometheus/common/model"
1719
"github.com/prometheus/prometheus/prompb"
1820
yaml "gopkg.in/yaml.v2"
21+
22+
rulefmt "github.com/cortexproject/cortex/pkg/ruler/legacy_rulefmt"
1923
)
2024

2125
// Client is a client used to interact with Cortex in integration tests
2226
type Client struct {
2327
alertmanagerClient promapi.Client
28+
rulerAddress string
2429
distributorAddress string
2530
timeout time.Duration
2631
httpClient *http.Client
@@ -29,7 +34,13 @@ type Client struct {
2934
}
3035

3136
// NewClient makes a new Cortex client
32-
func NewClient(distributorAddress string, querierAddress string, alertmanagerAddress string, orgID string) (*Client, error) {
37+
func NewClient(
38+
distributorAddress string,
39+
querierAddress string,
40+
alertmanagerAddress string,
41+
rulerAddress string,
42+
orgID string,
43+
) (*Client, error) {
3344
// Create querier API client
3445
querierAPIClient, err := promapi.NewClient(promapi.Config{
3546
Address: "http://" + querierAddress + "/api/prom",
@@ -41,6 +52,7 @@ func NewClient(distributorAddress string, querierAddress string, alertmanagerAdd
4152

4253
c := &Client{
4354
distributorAddress: distributorAddress,
55+
rulerAddress: rulerAddress,
4456
timeout: 5 * time.Second,
4557
httpClient: &http.Client{},
4658
querierClient: promv1.NewAPI(querierAPIClient),
@@ -144,3 +156,91 @@ func (c *Client) GetAlertmanagerConfig(ctx context.Context) (*alertConfig.Config
144156

145157
return cfg, err
146158
}
159+
160+
// GetRuleGroups gets the status of an alertmanager instance
161+
func (c *Client) GetRuleGroups() (map[string][]rulefmt.RuleGroup, error) {
162+
// Create HTTP request
163+
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/rules", c.rulerAddress), nil)
164+
if err != nil {
165+
return nil, err
166+
}
167+
req.Header.Set("X-Scope-OrgID", c.orgID)
168+
169+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
170+
defer cancel()
171+
172+
// Execute HTTP request
173+
res, err := c.httpClient.Do(req.WithContext(ctx))
174+
if err != nil {
175+
return nil, err
176+
}
177+
178+
defer res.Body.Close()
179+
rgs := map[string][]rulefmt.RuleGroup{}
180+
181+
data, err := ioutil.ReadAll(res.Body)
182+
if err != nil {
183+
return nil, err
184+
}
185+
186+
err = yaml.Unmarshal(data, rgs)
187+
if err != nil {
188+
return nil, err
189+
}
190+
191+
return rgs, nil
192+
}
193+
194+
// SetRuleGroup gets the status of an alertmanager instance
195+
func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) error {
196+
// Create write request
197+
data, err := yaml.Marshal(rulegroup)
198+
if err != nil {
199+
return err
200+
}
201+
202+
// Create HTTP request
203+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/rules/%s", c.rulerAddress, url.PathEscape(namespace)), bytes.NewReader(data))
204+
if err != nil {
205+
return err
206+
}
207+
208+
req.Header.Set("Content-Type", "application/yaml")
209+
req.Header.Set("X-Scope-OrgID", c.orgID)
210+
211+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
212+
defer cancel()
213+
214+
// Execute HTTP request
215+
res, err := c.httpClient.Do(req.WithContext(ctx))
216+
if err != nil {
217+
return err
218+
}
219+
220+
defer res.Body.Close()
221+
return nil
222+
}
223+
224+
// DeleteRuleGroup gets the status of an alertmanager instance
225+
func (c *Client) DeleteRuleGroup(namespace string, groupName string) error {
226+
// Create HTTP request
227+
req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s/api/prom/rules/%s/%s", c.rulerAddress, url.PathEscape(namespace), url.PathEscape(groupName)), nil)
228+
if err != nil {
229+
return err
230+
}
231+
232+
req.Header.Set("Content-Type", "application/yaml")
233+
req.Header.Set("X-Scope-OrgID", c.orgID)
234+
235+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
236+
defer cancel()
237+
238+
// Execute HTTP request
239+
res, err := c.httpClient.Do(req.WithContext(ctx))
240+
if err != nil {
241+
return err
242+
}
243+
244+
defer res.Body.Close()
245+
return nil
246+
}

integration/e2ecortex/services.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,3 +210,22 @@ func NewAlertmanager(name string, flags map[string]string, image string) *Cortex
210210
grpcPort,
211211
)
212212
}
213+
214+
func NewRuler(name string, flags map[string]string, image string) *CortexService {
215+
if image == "" {
216+
image = GetDefaultImage()
217+
}
218+
219+
return NewCortexService(
220+
name,
221+
image,
222+
e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{
223+
"-target": "ruler",
224+
"-log.level": "warn",
225+
}, flags))...),
226+
// The alertmanager doesn't expose a readiness probe, so we just check if the / returns 200
227+
e2e.NewReadinessProbe(httpPort, "/", 200),
228+
httpPort,
229+
grpcPort,
230+
)
231+
}

0 commit comments

Comments
 (0)