Skip to content

Ruler API and ObjectStore Backend #2269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
* [CHANGE] Utilize separate protos for rule state and storage. Experimental ruler API will not be functional until the rollout is complete. #2226
* [CHANGE] Frontend worker in querier now starts after all Querier module dependencies are started. This fixes issue where frontend worker started to send queries to querier before it was ready to serve them (mostly visible when using experimental blocks storage). #2246
* [CHANGE] Lifecycler component now enters Failed state on errors, and doesn't exit the process. (Important if you're vendoring Cortex and use Lifecycler) #2251
* [FEATURE] Added experimental storage API to the ruler service that is enabled when the `-experimental.ruler.enable-api` is set to true #2269
* `-ruler.storage.type` flag now allows `s3`,`gcs`, and `azure` values
* `-ruler.storage.(s3|gcs|azure)` flags exist to allow the configuration of object clients set for rule storage
* [FEATURE] Flusher target to flush the WAL.
* `-flusher.wal-dir` for the WAL directory to recover from.
* `-flusher.concurrent-flushes` for number of concurrent flushes.
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pkg/querier/queryrange/queryrange.pb.go: pkg/querier/queryrange/queryrange.proto
pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto
pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto
pkg/ruler/ruler.pb.go: pkg/ruler/rules/rules.proto
pkg/ring/kv/memberlist/kv.pb.go: pkg/ring/kv/memberlist/kv.proto

all: $(UPTODATE_FILES)
Expand Down
75 changes: 74 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ The `ruler_config` configures the Cortex ruler.
[pollinterval: <duration> | default = 1m0s]

storeconfig:
# Method to use for backend rule storage (configdb)
# Method to use for backend rule storage (configdb, azure, gcs, s3)
# CLI flag: -ruler.storage.type
[type: <string> | default = "configdb"]

Expand All @@ -717,6 +717,79 @@ storeconfig:
# The CLI flags prefix for this block config is: ruler
[configdb: <configstore_config>]

azure:
# Name of the blob container used to store chunks. Defaults to `cortex`.
# This container must be created before running cortex.
# CLI flag: -ruler.storage.azure.container-name
[container_name: <string> | default = "cortex"]

# The Microsoft Azure account name to be used
# CLI flag: -ruler.storage.azure.account-name
[account_name: <string> | default = ""]

# The Microsoft Azure account key to use.
# CLI flag: -ruler.storage.azure.account-key
[account_key: <string> | default = ""]

# Preallocated buffer size for downloads (default is 512KB)
# CLI flag: -ruler.storage.azure.download-buffer-size
[download_buffer_size: <int> | default = 512000]

# Preallocated buffer size for up;oads (default is 256KB)
# CLI flag: -ruler.storage.azure.upload-buffer-size
[upload_buffer_size: <int> | default = 256000]

# Number of buffers used to used to upload a chunk. (defaults to 1)
# CLI flag: -ruler.storage.azure.download-buffer-count
[upload_buffer_count: <int> | default = 1]

# Timeout for requests made against azure blob storage. Defaults to 30
# seconds.
# CLI flag: -ruler.storage.azure.request-timeout
[request_timeout: <duration> | default = 30s]

# Number of retries for a request which times out.
# CLI flag: -ruler.storage.azure.max-retries
[max_retries: <int> | default = 5]

# Minimum time to wait before retrying a request.
# CLI flag: -ruler.storage.azure.min-retry-delay
[min_retry_delay: <duration> | default = 10ms]

# Maximum time to wait before retrying a request.
# CLI flag: -ruler.storage.azure.max-retry-delay
[max_retry_delay: <duration> | default = 500ms]

gcs:
# Name of GCS bucket to put chunks in.
# CLI flag: -ruler.storage.gcs.bucketname
[bucket_name: <string> | default = ""]

# The size of the buffer that GCS client for each PUT request. 0 to disable
# buffering.
# CLI flag: -ruler.storage.gcs.chunk-buffer-size
[chunk_buffer_size: <int> | default = 0]

# The duration after which the requests to GCS should be timed out.
# CLI flag: -ruler.storage.gcs.request-timeout
[request_timeout: <duration> | default = 0s]

s3:
# S3 endpoint URL with escaped Key and Secret encoded. If only region is
# specified as a host, proper endpoint will be deduced. Use
# inmemory:///<bucket-name> to use a mock in-memory implementation.
# CLI flag: -ruler.storage.s3.url
[s3: <url> | default = ]

# Comma separated list of bucket names to evenly distribute chunks over.
# Overrides any buckets specified in s3.url flag
# CLI flag: -ruler.storage.s3.buckets
[bucketnames: <string> | default = ""]

# Set this to `true` to force the request to use path-style addressing.
# CLI flag: -ruler.storage.s3.force-path-style
[s3forcepathstyle: <boolean> | default = false]

# file path to store temporary rule files for the prometheus rule managers
# CLI flag: -ruler.rule-path
[rulepath: <string> | default = "/rules"]
Expand Down
2 changes: 1 addition & 1 deletion integration/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestAlertmanager(t *testing.T) {
require.NoError(t, s.StartAndWaitReady(alertmanager))
require.NoError(t, alertmanager.WaitSumMetrics(e2e.Equals(1), "cortex_alertmanager_configs"))

c, err := e2ecortex.NewClient("", "", alertmanager.HTTPEndpoint(), "user-1")
c, err := e2ecortex.NewClient("", "", alertmanager.HTTPEndpoint(), "", "user-1")
require.NoError(t, err)

cfg, err := c.GetAlertmanagerConfig(context.Background())
Expand Down
73 changes: 73 additions & 0 deletions integration/api_ruler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// +build requires_docker

package main

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
rulefmt "github.com/cortexproject/cortex/pkg/ruler/legacy_rulefmt"
)

func TestRulerAPI(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
dynamo := e2edb.NewDynamoDB()
minio := e2edb.NewMinio(9000, RulerConfigs["-ruler.storage.s3.buckets"])
require.NoError(t, s.StartAndWaitReady(minio, dynamo))

// Start Cortex components.
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
ruler := e2ecortex.NewRuler("ruler", mergeFlags(ChunksStorageFlags, RulerConfigs), "")
require.NoError(t, s.StartAndWaitReady(ruler))

// Create a client with the ruler address configured
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), "user-1")
require.NoError(t, err)

// Create example namespace and rule group to use for tests, using strings that
// require url escaping.
namespace := "test_encoded_+namespace?"
rg := rulefmt.RuleGroup{
Name: "test_encoded_+\"+group_name?",
Interval: 100,
Rules: []rulefmt.Rule{
rulefmt.Rule{
Record: "test_rule",
Expr: "up",
},
},
}

// Set the rule group into the ruler
require.NoError(t, c.SetRuleGroup(rg, namespace))

// Wait until the user manager is created
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1), "cortex_ruler_managers_total"))

// Check to ensure the rules running in the ruler match what was set
rgs, err := c.GetRuleGroups()
require.NoError(t, err)

retrievedNamespace, exists := rgs[namespace]
require.True(t, exists)
require.Len(t, retrievedNamespace, 1)
require.Equal(t, retrievedNamespace[0].Name, rg.Name)

// Delete the set rule group
require.NoError(t, c.DeleteRuleGroup(namespace, rg.Name))

// Wait until the users manager has been terminated
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(0), "cortex_ruler_managers_total"))

// Check to ensure the rule groups are no longer active
_, err = c.GetRuleGroups()
require.Error(t, err)
}
4 changes: 2 additions & 2 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) {
now := time.Now()
series, expectedVector := generateSeries("series_1", now)

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "user-1")
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

res, err := c.Push(series)
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) {
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Query the series
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1")
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

result, err := c.Query("series_1", now)
Expand Down
2 changes: 1 addition & 1 deletion integration/chunks_storage_backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestChunksStorageAllIndexBackends(t *testing.T) {
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1")
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Push and Query some series from Cortex for each day starting from oldest start time from configs until now so that we test all the Index Stores
Expand Down
10 changes: 10 additions & 0 deletions integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ var (
"-alertmanager.web.external-url": "http://localhost/api/prom",
}

RulerConfigs = map[string]string{
"-ruler.enable-sharding": "false",
"-ruler.poll-interval": "2s",
"-experimental.ruler.enable-api": "true",
"-ruler.storage.type": "s3",
"-ruler.storage.s3.buckets": "cortex-rules",
"-ruler.storage.s3.force-path-style": "true",
"-ruler.storage.s3.url": fmt.Sprintf("s3://%s:%s@%s-minio-9000.:9000", e2edb.MinioAccessKey, e2edb.MinioSecretKey, networkName),
}

BlocksStorageFlags = map[string]string{
"-store.engine": "tsdb",
"-experimental.tsdb.backend": "s3",
Expand Down
102 changes: 101 additions & 1 deletion integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"

"github.com/gogo/protobuf/proto"
Expand All @@ -16,11 +18,14 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
yaml "gopkg.in/yaml.v2"

rulefmt "github.com/cortexproject/cortex/pkg/ruler/legacy_rulefmt"
)

// Client is a client used to interact with Cortex in integration tests
type Client struct {
alertmanagerClient promapi.Client
rulerAddress string
distributorAddress string
timeout time.Duration
httpClient *http.Client
Expand All @@ -29,7 +34,13 @@ type Client struct {
}

// NewClient makes a new Cortex client
func NewClient(distributorAddress string, querierAddress string, alertmanagerAddress string, orgID string) (*Client, error) {
func NewClient(
distributorAddress string,
querierAddress string,
alertmanagerAddress string,
rulerAddress string,
orgID string,
) (*Client, error) {
// Create querier API client
querierAPIClient, err := promapi.NewClient(promapi.Config{
Address: "http://" + querierAddress + "/api/prom",
Expand All @@ -41,6 +52,7 @@ func NewClient(distributorAddress string, querierAddress string, alertmanagerAdd

c := &Client{
distributorAddress: distributorAddress,
rulerAddress: rulerAddress,
timeout: 5 * time.Second,
httpClient: &http.Client{},
querierClient: promv1.NewAPI(querierAPIClient),
Expand Down Expand Up @@ -144,3 +156,91 @@ func (c *Client) GetAlertmanagerConfig(ctx context.Context) (*alertConfig.Config

return cfg, err
}

// GetRuleGroups gets the status of an alertmanager instance
func (c *Client) GetRuleGroups() (map[string][]rulefmt.RuleGroup, error) {
// Create HTTP request
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/rules", c.rulerAddress), nil)
if err != nil {
return nil, err
}
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer res.Body.Close()
rgs := map[string][]rulefmt.RuleGroup{}

data, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}

err = yaml.Unmarshal(data, rgs)
if err != nil {
return nil, err
}

return rgs, nil
}

// SetRuleGroup gets the status of an alertmanager instance
func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) error {
// Create write request
data, err := yaml.Marshal(rulegroup)
if err != nil {
return err
}

// Create HTTP request
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/rules/%s", c.rulerAddress, url.PathEscape(namespace)), bytes.NewReader(data))
if err != nil {
return err
}

req.Header.Set("Content-Type", "application/yaml")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return err
}

defer res.Body.Close()
return nil
}

// DeleteRuleGroup gets the status of an alertmanager instance
func (c *Client) DeleteRuleGroup(namespace string, groupName string) error {
// Create HTTP request
req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s/api/prom/rules/%s/%s", c.rulerAddress, url.PathEscape(namespace), url.PathEscape(groupName)), nil)
if err != nil {
return err
}

req.Header.Set("Content-Type", "application/yaml")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return err
}

defer res.Body.Close()
return nil
}
19 changes: 19 additions & 0 deletions integration/e2ecortex/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,22 @@ func NewAlertmanager(name string, flags map[string]string, image string) *Cortex
grpcPort,
)
}

func NewRuler(name string, flags map[string]string, image string) *CortexService {
if image == "" {
image = GetDefaultImage()
}

return NewCortexService(
name,
image,
e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{
"-target": "ruler",
"-log.level": "warn",
}, flags))...),
// The alertmanager doesn't expose a readiness probe, so we just check if the / returns 200
e2e.NewReadinessProbe(httpPort, "/", 200),
httpPort,
grpcPort,
)
}
Loading