Skip to content

Commit 95b76ad

Browse files
committed
add redis as index cache and caching bucket backend
Signed-off-by: Ben Ye <benye@amazon.com>
1 parent c914f86 commit 95b76ad

File tree

12 files changed

+200
-104
lines changed

12 files changed

+200
-104
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,3 +243,5 @@ replace github.com/go-openapi/spec => github.com/go-openapi/spec v0.20.6
243243
replace github.com/ionos-cloud/sdk-go/v6 => github.com/ionos-cloud/sdk-go/v6 v6.0.4
244244

245245
replace github.com/googleapis/gnostic => github.com/google/gnostic v0.6.9
246+
247+
replace github.com/thanos-io/thanos => github.com/thanos-io/thanos v0.29.1-0.20221220140041-e85bc1f7b4d8

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,8 +1492,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1
14921492
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
14931493
github.com/thanos-io/objstore v0.0.0-20221205132204-5aafc0079f06 h1:xUnLk2CwIoJyv6OB4MWC3aOH9TnneSgM5kgTsOmXIuI=
14941494
github.com/thanos-io/objstore v0.0.0-20221205132204-5aafc0079f06/go.mod h1:gdo4vwwonBnheHB/TCwAOUtKJKrLhLtbBVTQR9rN/v0=
1495-
github.com/thanos-io/thanos v0.29.1-0.20230103123855-3327c510076a h1:oN3VupYNkavPRvdXwq71p54SAFSbOGvL0qL7CeKFrJ0=
1496-
github.com/thanos-io/thanos v0.29.1-0.20230103123855-3327c510076a/go.mod h1:xZKr/xpbijYM8EqTMpurqgKItyIPFy1wqUd/DMTDu4M=
1495+
github.com/thanos-io/thanos v0.29.1-0.20221220140041-e85bc1f7b4d8 h1:TFghDANo96y3s95BASFmMRV2i9sQD1CH2uyEBgnQ/5c=
1496+
github.com/thanos-io/thanos v0.29.1-0.20221220140041-e85bc1f7b4d8/go.mod h1:ve1mHR1dhCRqQlp0C5g+AFENoNu9GA4gtq1+Og6nuzc=
14971497
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
14981498
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
14991499
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=

pkg/cortexpb/compat_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func TestMetricMetadataToMetricTypeToMetricType(t *testing.T) {
9999
for _, tt := range tc {
100100
t.Run(tt.desc, func(t *testing.T) {
101101
m := MetricMetadataMetricTypeToMetricType(tt.input)
102-
testutil.Equals(t, tt.expected, m)
102+
assert.Equal(t, tt.expected, m)
103103
})
104104
}
105105
}

pkg/storage/tsdb/caching_bucket.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,26 @@ import (
2323

2424
const (
2525
CacheBackendMemcached = "memcached"
26+
CacheBackendRedis = "redis"
2627
)
2728

2829
type CacheBackend struct {
2930
Backend string `yaml:"backend"`
3031
Memcached MemcachedClientConfig `yaml:"memcached"`
32+
Redis RedisClientConfig `yaml:"redis"`
3133
}
3234

3335
// Validate the config.
3436
func (cfg *CacheBackend) Validate() error {
35-
if cfg.Backend != "" && cfg.Backend != CacheBackendMemcached {
37+
switch cfg.Backend {
38+
case CacheBackendMemcached:
39+
return cfg.Memcached.Validate()
40+
case CacheBackendRedis:
41+
return cfg.Redis.Validate()
42+
case "":
43+
default:
3644
return fmt.Errorf("unsupported cache backend: %s", cfg.Backend)
3745
}
38-
39-
if cfg.Backend == CacheBackendMemcached {
40-
if err := cfg.Memcached.Validate(); err != nil {
41-
return err
42-
}
43-
}
44-
4546
return nil
4647
}
4748

@@ -58,6 +59,7 @@ func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix st
5859
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s.", CacheBackendMemcached))
5960

6061
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
62+
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
6163

6264
f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.")
6365
f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching chunks. Zero or negative value = unlimited number of sub-requests.")
@@ -111,7 +113,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
111113
cfg := cache.NewCachingBucketConfig()
112114
cachingConfigured := false
113115

114-
chunksCache, err := createCache("chunks-cache", chunksConfig.Backend, chunksConfig.Memcached, logger, reg)
116+
chunksCache, err := createCache("chunks-cache", chunksConfig.Backend, chunksConfig.Memcached, chunksConfig.Redis, logger, reg)
115117
if err != nil {
116118
return nil, errors.Wrapf(err, "chunks-cache")
117119
}
@@ -121,7 +123,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
121123
cfg.CacheGetRange("chunks", chunksCache, isTSDBChunkFile, chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests)
122124
}
123125

124-
metadataCache, err := createCache("metadata-cache", metadataConfig.Backend, metadataConfig.Memcached, logger, reg)
126+
metadataCache, err := createCache("metadata-cache", metadataConfig.Backend, metadataConfig.Memcached, metadataConfig.Redis, logger, reg)
125127
if err != nil {
126128
return nil, errors.Wrapf(err, "metadata-cache")
127129
}
@@ -149,7 +151,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
149151
return storecache.NewCachingBucket(bkt, cfg, logger, reg)
150152
}
151153

152-
func createCache(cacheName string, backend string, memcached MemcachedClientConfig, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
154+
func createCache(cacheName string, backend string, memcached MemcachedClientConfig, redis RedisClientConfig, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
153155
switch backend {
154156
case "":
155157
// No caching.
@@ -163,6 +165,13 @@ func createCache(cacheName string, backend string, memcached MemcachedClientConf
163165
}
164166
return cache.NewMemcachedCache(cacheName, logger, client, reg), nil
165167

168+
case CacheBackendRedis:
169+
redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, redis.ToRedisClientConfig(), reg)
170+
if err != nil {
171+
return nil, errors.Wrapf(err, "failed to create redis client")
172+
}
173+
return cache.NewRedisCache("caching-bucket", logger, redisCache, reg), nil
174+
166175
default:
167176
return nil, errors.Errorf("unsupported cache type for cache %s: %s", cacheName, backend)
168177
}

pkg/storage/tsdb/index_cache.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,17 @@ const (
2323
// IndexCacheBackendMemcached is the value for the memcached index cache backend.
2424
IndexCacheBackendMemcached = "memcached"
2525

26+
// IndexCacheBackendRedis is the value for the redis index cache backend.
27+
IndexCacheBackendRedis = "redis"
28+
2629
// IndexCacheBackendDefault is the value for the default index cache backend.
2730
IndexCacheBackendDefault = IndexCacheBackendInMemory
2831

2932
defaultMaxItemSize = model.Bytes(128 * units.MiB)
3033
)
3134

3235
var (
33-
supportedIndexCacheBackends = []string{IndexCacheBackendInMemory, IndexCacheBackendMemcached}
36+
supportedIndexCacheBackends = []string{IndexCacheBackendInMemory, IndexCacheBackendMemcached, IndexCacheBackendRedis}
3437

3538
errUnsupportedIndexCacheBackend = errors.New("unsupported index cache backend")
3639
errNoIndexCacheAddresses = errors.New("no index cache backend addresses")
@@ -40,6 +43,7 @@ type IndexCacheConfig struct {
4043
Backend string `yaml:"backend"`
4144
InMemory InMemoryIndexCacheConfig `yaml:"inmemory"`
4245
Memcached MemcachedClientConfig `yaml:"memcached"`
46+
Redis RedisClientConfig `yaml:"redis"`
4347
}
4448

4549
func (cfg *IndexCacheConfig) RegisterFlags(f *flag.FlagSet) {
@@ -51,6 +55,7 @@ func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix str
5155

5256
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.")
5357
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
58+
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
5459
}
5560

5661
// Validate the config.
@@ -63,6 +68,10 @@ func (cfg *IndexCacheConfig) Validate() error {
6368
if err := cfg.Memcached.Validate(); err != nil {
6469
return err
6570
}
71+
} else if cfg.Backend == IndexCacheBackendRedis {
72+
if err := cfg.Redis.Validate(); err != nil {
73+
return err
74+
}
6675
}
6776

6877
return nil
@@ -83,6 +92,8 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
8392
return newInMemoryIndexCache(cfg.InMemory, logger, registerer)
8493
case IndexCacheBackendMemcached:
8594
return newMemcachedIndexCache(cfg.Memcached, logger, registerer)
95+
case IndexCacheBackendRedis:
96+
return newRedisIndexCache(cfg.Redis, logger, registerer)
8697
default:
8798
return nil, errUnsupportedIndexCacheBackend
8899
}
@@ -111,3 +122,12 @@ func newMemcachedIndexCache(cfg MemcachedClientConfig, logger log.Logger, regist
111122

112123
return storecache.NewMemcachedIndexCache(logger, client, registerer)
113124
}
125+
126+
func newRedisIndexCache(cfg RedisClientConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
127+
client, err := cacheutil.NewRedisClientWithConfig(logger, "index-cache", cfg.ToRedisClientConfig(), registerer)
128+
if err != nil {
129+
return nil, errors.Wrapf(err, "create index cache redis client")
130+
}
131+
132+
return storecache.NewRemoteIndexCache(logger, client, registerer)
133+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package tsdb
2+
3+
import (
4+
"flag"
5+
"time"
6+
7+
"github.com/cortexproject/cortex/pkg/util/tls"
8+
"github.com/pkg/errors"
9+
"github.com/thanos-io/thanos/pkg/cacheutil"
10+
)
11+
12+
type RedisClientConfig struct {
13+
// Addr specifies the addresses of redis server.
14+
Addr string `yaml:"addr"`
15+
16+
// Use the specified Username to authenticate the current connection
17+
// with one of the connections defined in the ACL list when connecting
18+
// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
19+
Username string `yaml:"username"`
20+
// Optional password. Must match the password specified in the
21+
// requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower),
22+
// or the User Password when connecting to a Redis 6.0 instance, or greater,
23+
// that is using the Redis ACL system.
24+
Password string `yaml:"password"`
25+
26+
// DB Database to be selected after connecting to the server.
27+
DB int `yaml:"db"`
28+
29+
// DialTimeout specifies the client dial timeout.
30+
DialTimeout time.Duration `yaml:"dial_timeout"`
31+
32+
// ReadTimeout specifies the client read timeout.
33+
ReadTimeout time.Duration `yaml:"read_timeout"`
34+
35+
// WriteTimeout specifies the client write timeout.
36+
WriteTimeout time.Duration `yaml:"write_timeout"`
37+
38+
// Maximum number of socket connections.
39+
PoolSize int `yaml:"pool_size"`
40+
41+
// MinIdleConns specifies the minimum number of idle connections which is useful when establishing
42+
// new connection is slow.
43+
MinIdleConns int `yaml:"min_idle_conns"`
44+
45+
// Amount of time after which client closes idle connections.
46+
// Should be less than server's timeout.
47+
// -1 disables idle timeout check.
48+
IdleTimeout time.Duration `yaml:"idle_timeout"`
49+
50+
// Connection age at which client retires (closes) the connection.
51+
// Default 0 is to not close aged connections.
52+
MaxConnAge time.Duration `yaml:"max_conn_age"`
53+
54+
// MaxGetMultiConcurrency specifies the maximum number of concurrent GetMulti() operations.
55+
// If set to 0, concurrency is unlimited.
56+
MaxGetMultiConcurrency int `yaml:"max_get_multi_concurrency"`
57+
58+
// GetMultiBatchSize specifies the maximum size per batch for mget.
59+
GetMultiBatchSize int `yaml:"get_multi_batch_size"`
60+
61+
// MaxSetMultiConcurrency specifies the maximum number of concurrent SetMulti() operations.
62+
// If set to 0, concurrency is unlimited.
63+
MaxSetMultiConcurrency int `yaml:"max_set_multi_concurrency"`
64+
65+
// SetMultiBatchSize specifies the maximum size per batch for pipeline set.
66+
SetMultiBatchSize int `yaml:"set_multi_batch_size"`
67+
68+
// If not zero then client-side caching is enabled.
69+
// Client-side caching is when data is stored in memory
70+
// instead of fetching data each time.
71+
// See https://redis.io/docs/manual/client-side-caching/ for info.
72+
CacheSize int `yaml:"cache_size"`
73+
74+
TLSEnabled bool `yaml:"tls_enabled"`
75+
TLS tls.ClientConfig `yaml:",inline"`
76+
}
77+
78+
func (cfg *RedisClientConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
79+
f.StringVar(&cfg.Addr, prefix+"addr", "", "Comma separated list of redis addresses. Supported prefixes are: dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after that).")
80+
f.StringVar(&cfg.Username, prefix+"username", "", "The socket read/write timeout.")
81+
f.StringVar(&cfg.Password, prefix+"password", "", "The socket read/write timeout.")
82+
f.IntVar(&cfg.DB, prefix+"db", 0, "Database to be selected after connecting to the server.")
83+
f.DurationVar(&cfg.DialTimeout, prefix+"dial-timeout", time.Second*5, "Client dial timeout.")
84+
f.DurationVar(&cfg.ReadTimeout, prefix+"read-timeout", time.Second*3, "Client read timeout.")
85+
f.DurationVar(&cfg.WriteTimeout, prefix+"write-timeout", time.Second*3, "Client write timeout.")
86+
f.DurationVar(&cfg.IdleTimeout, prefix+"idle-timeout", time.Minute*5, "The socket read/write timeout.")
87+
f.DurationVar(&cfg.MaxConnAge, prefix+"max-conn-age", 0, "The socket read/write timeout.")
88+
f.IntVar(&cfg.PoolSize, prefix+"pool-size", 100, "Maximum number of socket connections.")
89+
f.IntVar(&cfg.MinIdleConns, prefix+"min-idle-conns", 10, "Specifies the minimum number of idle connections, which is useful when it is slow to establish new connections.")
90+
f.IntVar(&cfg.MaxGetMultiConcurrency, prefix+"max-get-multi-concurrency", 100, "The maximum number of concurrent GetMulti() operations. If set to 0, concurrency is unlimited.")
91+
f.IntVar(&cfg.GetMultiBatchSize, prefix+"get-multi-batch-size", 100, "The maximum size per batch for mget.")
92+
f.IntVar(&cfg.MaxSetMultiConcurrency, prefix+"max-set-multi-concurrency", 100, "The maximum number of concurrent SetMulti() operations. If set to 0, concurrency is unlimited.")
93+
f.IntVar(&cfg.SetMultiBatchSize, prefix+"set-multi-batch-size", 100, "The maximum size per batch for pipeline set.")
94+
f.IntVar(&cfg.CacheSize, prefix+"cache-size", 0, "If not zero then client-side caching is enabled. Client-side caching is when data is stored in memory instead of fetching data each time. See https://redis.io/docs/manual/client-side-caching/ for more info.")
95+
f.BoolVar(&cfg.TLSEnabled, prefix+"tls-enabled", false, "Whether to enable tls for redis connection.")
96+
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
97+
}
98+
99+
// Validate the config.
100+
func (cfg *RedisClientConfig) Validate() error {
101+
if cfg.Addr == "" {
102+
return errNoIndexCacheAddresses
103+
}
104+
105+
if cfg.TLSEnabled {
106+
if (cfg.TLS.CertPath != "") != (cfg.TLS.KeyPath != "") {
107+
return errors.New("both client key and certificate must be provided")
108+
}
109+
}
110+
111+
return nil
112+
}
113+
114+
func (cfg *RedisClientConfig) ToRedisClientConfig() cacheutil.RedisClientConfig {
115+
return cacheutil.RedisClientConfig{
116+
Addr: cfg.Addr,
117+
Username: cfg.Username,
118+
Password: cfg.Password,
119+
DB: cfg.DB,
120+
DialTimeout: cfg.DialTimeout,
121+
ReadTimeout: cfg.ReadTimeout,
122+
WriteTimeout: cfg.WriteTimeout,
123+
PoolSize: cfg.PoolSize,
124+
MinIdleConns: cfg.MinIdleConns,
125+
IdleTimeout: cfg.IdleTimeout,
126+
MaxConnAge: cfg.MaxConnAge,
127+
MaxGetMultiConcurrency: cfg.MaxGetMultiConcurrency,
128+
GetMultiBatchSize: cfg.GetMultiBatchSize,
129+
MaxSetMultiConcurrency: cfg.MaxSetMultiConcurrency,
130+
SetMultiBatchSize: cfg.SetMultiBatchSize,
131+
TLSEnabled: cfg.TLSEnabled,
132+
TLSConfig: cacheutil.TLSConfig{
133+
CAFile: cfg.TLS.CAPath,
134+
KeyFile: cfg.TLS.KeyPath,
135+
CertFile: cfg.TLS.CertPath,
136+
ServerName: cfg.TLS.ServerName,
137+
InsecureSkipVerify: cfg.TLS.InsecureSkipVerify,
138+
},
139+
}
140+
}

vendor/github.com/thanos-io/thanos/pkg/block/block.go

Lines changed: 0 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/thanos-io/thanos/pkg/cacheutil/redis_client.go

Lines changed: 2 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)