Skip to content

Commit 6f450a7

Browse files
committed
add redis as index cache and caching bucket backend
Signed-off-by: Ben Ye <benye@amazon.com>
1 parent c9b0156 commit 6f450a7

File tree

8 files changed

+1192
-16
lines changed

8 files changed

+1192
-16
lines changed

docs/blocks-storage/querier.md

Lines changed: 323 additions & 1 deletion
Large diffs are not rendered by default.

docs/blocks-storage/store-gateway.md

Lines changed: 323 additions & 1 deletion
Large diffs are not rendered by default.

docs/configuration/config-file-reference.md

Lines changed: 323 additions & 1 deletion
Large diffs are not rendered by default.

pkg/cortexpb/compat_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func TestMetricMetadataToMetricTypeToMetricType(t *testing.T) {
9999
for _, tt := range tc {
100100
t.Run(tt.desc, func(t *testing.T) {
101101
m := MetricMetadataMetricTypeToMetricType(tt.input)
102-
testutil.Equals(t, tt.expected, m)
102+
assert.Equal(t, tt.expected, m)
103103
})
104104
}
105105
}

pkg/storage/tsdb/caching_bucket.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,26 @@ import (
2323

2424
const (
2525
CacheBackendMemcached = "memcached"
26+
CacheBackendRedis = "redis"
2627
)
2728

2829
type CacheBackend struct {
2930
Backend string `yaml:"backend"`
3031
Memcached MemcachedClientConfig `yaml:"memcached"`
32+
Redis RedisClientConfig `yaml:"redis"`
3133
}
3234

3335
// Validate the config.
3436
func (cfg *CacheBackend) Validate() error {
35-
if cfg.Backend != "" && cfg.Backend != CacheBackendMemcached {
37+
switch cfg.Backend {
38+
case CacheBackendMemcached:
39+
return cfg.Memcached.Validate()
40+
case CacheBackendRedis:
41+
return cfg.Redis.Validate()
42+
case "":
43+
default:
3644
return fmt.Errorf("unsupported cache backend: %s", cfg.Backend)
3745
}
38-
39-
if cfg.Backend == CacheBackendMemcached {
40-
if err := cfg.Memcached.Validate(); err != nil {
41-
return err
42-
}
43-
}
44-
4546
return nil
4647
}
4748

@@ -58,6 +59,7 @@ func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix st
5859
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s.", CacheBackendMemcached))
5960

6061
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
62+
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
6163

6264
f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.")
6365
f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching chunks. Zero or negative value = unlimited number of sub-requests.")
@@ -89,6 +91,7 @@ func (cfg *MetadataCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix
8991
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for metadata cache, if not empty. Supported values: %s.", CacheBackendMemcached))
9092

9193
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
94+
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
9295

9396
f.DurationVar(&cfg.TenantsListTTL, prefix+"tenants-list-ttl", 15*time.Minute, "How long to cache list of tenants in the bucket.")
9497
f.DurationVar(&cfg.TenantBlocksListTTL, prefix+"tenant-blocks-list-ttl", 5*time.Minute, "How long to cache list of blocks for each tenant.")
@@ -111,7 +114,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
111114
cfg := cache.NewCachingBucketConfig()
112115
cachingConfigured := false
113116

114-
chunksCache, err := createCache("chunks-cache", chunksConfig.Backend, chunksConfig.Memcached, logger, reg)
117+
chunksCache, err := createCache("chunks-cache", chunksConfig.Backend, chunksConfig.Memcached, chunksConfig.Redis, logger, reg)
115118
if err != nil {
116119
return nil, errors.Wrapf(err, "chunks-cache")
117120
}
@@ -121,7 +124,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
121124
cfg.CacheGetRange("chunks", chunksCache, isTSDBChunkFile, chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests)
122125
}
123126

124-
metadataCache, err := createCache("metadata-cache", metadataConfig.Backend, metadataConfig.Memcached, logger, reg)
127+
metadataCache, err := createCache("metadata-cache", metadataConfig.Backend, metadataConfig.Memcached, metadataConfig.Redis, logger, reg)
125128
if err != nil {
126129
return nil, errors.Wrapf(err, "metadata-cache")
127130
}
@@ -149,7 +152,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
149152
return storecache.NewCachingBucket(bkt, cfg, logger, reg)
150153
}
151154

152-
func createCache(cacheName string, backend string, memcached MemcachedClientConfig, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
155+
func createCache(cacheName string, backend string, memcached MemcachedClientConfig, redis RedisClientConfig, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
153156
switch backend {
154157
case "":
155158
// No caching.
@@ -163,6 +166,13 @@ func createCache(cacheName string, backend string, memcached MemcachedClientConf
163166
}
164167
return cache.NewMemcachedCache(cacheName, logger, client, reg), nil
165168

169+
case CacheBackendRedis:
170+
redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, redis.ToRedisClientConfig(), reg)
171+
if err != nil {
172+
return nil, errors.Wrapf(err, "failed to create redis client")
173+
}
174+
return cache.NewRedisCache(cacheName, logger, redisCache, reg), nil
175+
166176
default:
167177
return nil, errors.Errorf("unsupported cache type for cache %s: %s", cacheName, backend)
168178
}

pkg/storage/tsdb/index_cache.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,17 @@ const (
2323
// IndexCacheBackendMemcached is the value for the memcached index cache backend.
2424
IndexCacheBackendMemcached = "memcached"
2525

26+
// IndexCacheBackendRedis is the value for the redis index cache backend.
27+
IndexCacheBackendRedis = "redis"
28+
2629
// IndexCacheBackendDefault is the value for the default index cache backend.
2730
IndexCacheBackendDefault = IndexCacheBackendInMemory
2831

2932
defaultMaxItemSize = model.Bytes(128 * units.MiB)
3033
)
3134

3235
var (
33-
supportedIndexCacheBackends = []string{IndexCacheBackendInMemory, IndexCacheBackendMemcached}
36+
supportedIndexCacheBackends = []string{IndexCacheBackendInMemory, IndexCacheBackendMemcached, IndexCacheBackendRedis}
3437

3538
errUnsupportedIndexCacheBackend = errors.New("unsupported index cache backend")
3639
errNoIndexCacheAddresses = errors.New("no index cache backend addresses")
@@ -40,6 +43,7 @@ type IndexCacheConfig struct {
4043
Backend string `yaml:"backend"`
4144
InMemory InMemoryIndexCacheConfig `yaml:"inmemory"`
4245
Memcached MemcachedClientConfig `yaml:"memcached"`
46+
Redis RedisClientConfig `yaml:"redis"`
4347
}
4448

4549
func (cfg *IndexCacheConfig) RegisterFlags(f *flag.FlagSet) {
@@ -51,6 +55,7 @@ func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix str
5155

5256
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.")
5357
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
58+
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
5459
}
5560

5661
// Validate the config.
@@ -63,6 +68,10 @@ func (cfg *IndexCacheConfig) Validate() error {
6368
if err := cfg.Memcached.Validate(); err != nil {
6469
return err
6570
}
71+
} else if cfg.Backend == IndexCacheBackendRedis {
72+
if err := cfg.Redis.Validate(); err != nil {
73+
return err
74+
}
6675
}
6776

6877
return nil
@@ -83,6 +92,8 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
8392
return newInMemoryIndexCache(cfg.InMemory, logger, registerer)
8493
case IndexCacheBackendMemcached:
8594
return newMemcachedIndexCache(cfg.Memcached, logger, registerer)
95+
case IndexCacheBackendRedis:
96+
return newRedisIndexCache(cfg.Redis, logger, registerer)
8697
default:
8798
return nil, errUnsupportedIndexCacheBackend
8899
}
@@ -111,3 +122,12 @@ func newMemcachedIndexCache(cfg MemcachedClientConfig, logger log.Logger, regist
111122

112123
return storecache.NewMemcachedIndexCache(logger, client, registerer)
113124
}
125+
126+
func newRedisIndexCache(cfg RedisClientConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
127+
client, err := cacheutil.NewRedisClientWithConfig(logger, "index-cache", cfg.ToRedisClientConfig(), registerer)
128+
if err != nil {
129+
return nil, errors.Wrapf(err, "create index cache redis client")
130+
}
131+
132+
return storecache.NewRemoteIndexCache(logger, client, registerer)
133+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package tsdb
2+
3+
import (
4+
"flag"
5+
"time"
6+
7+
"github.com/pkg/errors"
8+
"github.com/thanos-io/thanos/pkg/cacheutil"
9+
10+
"github.com/cortexproject/cortex/pkg/util/tls"
11+
)
12+
13+
type RedisClientConfig struct {
14+
Addresses string `yaml:"addresses"`
15+
Username string `yaml:"username"`
16+
Password string `yaml:"password"`
17+
DB int `yaml:"db"`
18+
MasterName string `yaml:"master_name"`
19+
20+
PoolSize int `yaml:"pool_size"`
21+
MinIdleConns int `yaml:"min_idle_conns"`
22+
MaxGetMultiConcurrency int `yaml:"max_get_multi_concurrency"`
23+
GetMultiBatchSize int `yaml:"get_multi_batch_size"`
24+
MaxSetMultiConcurrency int `yaml:"max_set_multi_concurrency"`
25+
SetMultiBatchSize int `yaml:"set_multi_batch_size"`
26+
27+
DialTimeout time.Duration `yaml:"dial_timeout"`
28+
ReadTimeout time.Duration `yaml:"read_timeout"`
29+
WriteTimeout time.Duration `yaml:"write_timeout"`
30+
IdleTimeout time.Duration `yaml:"idle_timeout"`
31+
MaxConnAge time.Duration `yaml:"max_conn_age"`
32+
33+
TLSEnabled bool `yaml:"tls_enabled"`
34+
TLS tls.ClientConfig `yaml:",inline"`
35+
36+
// If not zero then client-side caching is enabled.
37+
// Client-side caching is when data is stored in memory
38+
// instead of fetching data each time.
39+
// See https://redis.io/docs/manual/client-side-caching/ for info.
40+
CacheSize int `yaml:"cache_size"`
41+
}
42+
43+
func (cfg *RedisClientConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
44+
f.StringVar(&cfg.Addresses, prefix+"addresses", "", "Comma separated list of redis addresses. Supported prefixes are: dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after that).")
45+
f.StringVar(&cfg.Username, prefix+"username", "", "Redis username.")
46+
f.StringVar(&cfg.Password, prefix+"password", "", "Redis password.")
47+
f.IntVar(&cfg.DB, prefix+"db", 0, "Database to be selected after connecting to the server.")
48+
f.DurationVar(&cfg.DialTimeout, prefix+"dial-timeout", time.Second*5, "Client dial timeout.")
49+
f.DurationVar(&cfg.ReadTimeout, prefix+"read-timeout", time.Second*3, "Client read timeout.")
50+
f.DurationVar(&cfg.WriteTimeout, prefix+"write-timeout", time.Second*3, "Client write timeout.")
51+
f.DurationVar(&cfg.IdleTimeout, prefix+"idle-timeout", time.Minute*5, "Amount of time after which client closes idle connections. Should be less than server's timeout. -1 disables idle timeout check.")
52+
f.DurationVar(&cfg.MaxConnAge, prefix+"max-conn-age", 0, "Connection age at which client retires (closes) the connection. Default 0 is to not close aged connections.")
53+
f.IntVar(&cfg.PoolSize, prefix+"pool-size", 100, "Maximum number of socket connections.")
54+
f.IntVar(&cfg.MinIdleConns, prefix+"min-idle-conns", 10, "Specifies the minimum number of idle connections, which is useful when it is slow to establish new connections.")
55+
f.IntVar(&cfg.MaxGetMultiConcurrency, prefix+"max-get-multi-concurrency", 100, "The maximum number of concurrent GetMulti() operations. If set to 0, concurrency is unlimited.")
56+
f.IntVar(&cfg.GetMultiBatchSize, prefix+"get-multi-batch-size", 100, "The maximum size per batch for mget.")
57+
f.IntVar(&cfg.MaxSetMultiConcurrency, prefix+"max-set-multi-concurrency", 100, "The maximum number of concurrent SetMulti() operations. If set to 0, concurrency is unlimited.")
58+
f.IntVar(&cfg.SetMultiBatchSize, prefix+"set-multi-batch-size", 100, "The maximum size per batch for pipeline set.")
59+
f.StringVar(&cfg.MasterName, prefix+"master-name", "", "Specifies the master's name. Must be not empty for Redis Sentinel.")
60+
f.IntVar(&cfg.CacheSize, prefix+"cache-size", 0, "If not zero then client-side caching is enabled. Client-side caching is when data is stored in memory instead of fetching data each time. See https://redis.io/docs/manual/client-side-caching/ for more info.")
61+
f.BoolVar(&cfg.TLSEnabled, prefix+"tls-enabled", false, "Whether to enable tls for redis connection.")
62+
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
63+
}
64+
65+
// Validate the config.
66+
func (cfg *RedisClientConfig) Validate() error {
67+
if cfg.Addresses == "" {
68+
return errNoIndexCacheAddresses
69+
}
70+
71+
if cfg.TLSEnabled {
72+
if (cfg.TLS.CertPath != "") != (cfg.TLS.KeyPath != "") {
73+
return errors.New("both client key and certificate must be provided")
74+
}
75+
}
76+
77+
return nil
78+
}
79+
80+
func (cfg *RedisClientConfig) ToRedisClientConfig() cacheutil.RedisClientConfig {
81+
return cacheutil.RedisClientConfig{
82+
Addr: cfg.Addresses,
83+
Username: cfg.Username,
84+
Password: cfg.Password,
85+
DB: cfg.DB,
86+
MasterName: cfg.MasterName,
87+
DialTimeout: cfg.DialTimeout,
88+
ReadTimeout: cfg.ReadTimeout,
89+
WriteTimeout: cfg.WriteTimeout,
90+
PoolSize: cfg.PoolSize,
91+
MinIdleConns: cfg.MinIdleConns,
92+
IdleTimeout: cfg.IdleTimeout,
93+
MaxConnAge: cfg.MaxConnAge,
94+
MaxGetMultiConcurrency: cfg.MaxGetMultiConcurrency,
95+
GetMultiBatchSize: cfg.GetMultiBatchSize,
96+
MaxSetMultiConcurrency: cfg.MaxSetMultiConcurrency,
97+
SetMultiBatchSize: cfg.SetMultiBatchSize,
98+
TLSEnabled: cfg.TLSEnabled,
99+
TLSConfig: cacheutil.TLSConfig{
100+
CAFile: cfg.TLS.CAPath,
101+
KeyFile: cfg.TLS.KeyPath,
102+
CertFile: cfg.TLS.CertPath,
103+
ServerName: cfg.TLS.ServerName,
104+
InsecureSkipVerify: cfg.TLS.InsecureSkipVerify,
105+
},
106+
}
107+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package tsdb
2+
3+
import (
4+
"testing"
5+
6+
"github.com/pkg/errors"
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/cortexproject/cortex/pkg/util/tls"
10+
)
11+
12+
func TestRedisIndexCacheConfigValidate(t *testing.T) {
13+
for _, tc := range []struct {
14+
name string
15+
conf *RedisClientConfig
16+
err error
17+
}{
18+
{
19+
name: "empty address",
20+
conf: &RedisClientConfig{},
21+
err: errNoIndexCacheAddresses,
22+
},
23+
{
24+
name: "provide TLS cert path but not key path",
25+
conf: &RedisClientConfig{
26+
Addresses: "aaa",
27+
TLSEnabled: true,
28+
TLS: tls.ClientConfig{
29+
CertPath: "foo",
30+
},
31+
},
32+
err: errors.New("both client key and certificate must be provided"),
33+
},
34+
{
35+
name: "provide TLS key path but not cert path",
36+
conf: &RedisClientConfig{
37+
Addresses: "aaa",
38+
TLSEnabled: true,
39+
TLS: tls.ClientConfig{
40+
KeyPath: "foo",
41+
},
42+
},
43+
err: errors.New("both client key and certificate must be provided"),
44+
},
45+
{
46+
name: "success when TLS enabled",
47+
conf: &RedisClientConfig{
48+
Addresses: "aaa",
49+
TLSEnabled: true,
50+
TLS: tls.ClientConfig{
51+
KeyPath: "foo",
52+
CertPath: "bar",
53+
},
54+
},
55+
},
56+
{
57+
name: "success when TLS disabled",
58+
conf: &RedisClientConfig{
59+
Addresses: "aaa",
60+
TLSEnabled: false,
61+
},
62+
},
63+
} {
64+
t.Run(tc.name, func(t *testing.T) {
65+
err := tc.conf.Validate()
66+
if tc.err == nil {
67+
require.NoError(t, err)
68+
} else {
69+
require.Equal(t, tc.err.Error(), err.Error())
70+
}
71+
})
72+
}
73+
}

0 commit comments

Comments
 (0)