Skip to content

Add Redis support #1612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Oct 3, 2019
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e505d30
adding support for Redis cache
Aug 24, 2019
54ffdb8
fix lint errors
Aug 24, 2019
97b2824
minor bugfix
Aug 26, 2019
3b57fc2
proper use of pool connections
Aug 29, 2019
bbaf6cc
fix lint errors
Aug 29, 2019
2dc96ec
addressed comments
Sep 5, 2019
c7d3d89
Merge branch 'upstream-master' into ds-redis-cache
Sep 5, 2019
b0b90de
update unit tests
Sep 5, 2019
a459387
reuse context in request timeout
Sep 6, 2019
e94a4e0
use correct function names in logs
Sep 6, 2019
ddac452
use common config for cache storage
Sep 11, 2019
2c8fa9e
master rebase
Sep 11, 2019
03e30de
added missing module dependency
Sep 11, 2019
6ec6c4d
delete extra newline
Sep 11, 2019
b9cc19b
optimize redis SET
Sep 13, 2019
efc0f0e
adding support for Redis cache
Aug 24, 2019
52ded0f
fix lint errors
Aug 24, 2019
b25f1e3
minor bugfix
Aug 26, 2019
e74f9d3
proper use of pool connections
Aug 29, 2019
1c1b925
fix lint errors
Aug 29, 2019
121322a
addressed comments
Sep 5, 2019
724399c
update unit tests
Sep 5, 2019
1c1f1c4
reuse context in request timeout
Sep 6, 2019
284cba0
use correct function names in logs
Sep 6, 2019
4d4a29b
use common config for cache storage
Sep 11, 2019
476bd2a
added missing module dependency
Sep 11, 2019
d0b5b85
delete extra newline
Sep 11, 2019
9a60e34
optimize redis SET
Sep 13, 2019
4edda24
undo common config for storage systems
Sep 18, 2019
2549dfe
Merge branch 'ds-redis-cache' of github.com:dmitsh/cortex into ds-red…
Sep 18, 2019
2ef6395
Merge branch 'upstream-master' into ds-redis-cache
Sep 18, 2019
4d03706
restore changes in CHANGELOG.md
Sep 18, 2019
7de661c
fixed lint error
Sep 18, 2019
11dc574
fixed mod-check error
Sep 18, 2019
9b2abaa
fix metric labels
Sep 25, 2019
677bf6c
resolved git conflict
Sep 25, 2019
f51f788
addressed comments
Oct 1, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

* [CHANGE] In table-manager, default DynamoDB capacity was reduced from 3,000 units to 1,000 units. We recommend you do not run with the defaults: find out what figures are needed for your environment and set that via `-dynamodb.periodic-table.write-throughput` and `-dynamodb.chunk-table.write-throughput`.
* [CHANGE] `--alertmanager.configs.auto-slack-root` flag was dropped as auto Slack root is not supported anymore. #1597
* [FEATURE] Add Redis support for caching #1612
* [ENHANCEMENT] Upgraded Prometheus to 2.12.0 and Alertmanager to 0.19.0. #1597


## 0.2.0 / 2019-09-05

This release has several exciting features, the most notable of them being setting `-ingester.spread-flushes` to potentially reduce your storage space by upto 50%.
Expand Down
4 changes: 4 additions & 0 deletions docs/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ The ingester query API was improved over time, but defaults to the old behaviour

Use these flags to specify the location and timeout of the memcached cluster used to cache query results.

- `-redis.{endpoint, timeout}`

Use these flags to specify the location and timeout of the Redis service used to cache query results.

## Distributor

- `-distributor.shard-by-all-labels`
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/gogo/status v1.0.3
github.com/golang/protobuf v1.3.2
github.com/golang/snappy v0.0.1
github.com/gomodule/redigo v2.0.0+incompatible
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/mux v1.6.2
github.com/gorilla/websocket v1.4.0 // indirect
Expand All @@ -56,6 +57,7 @@ require (
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/common v0.7.0
github.com/prometheus/prometheus v1.8.2-0.20190918104050-8744afdd1ea0
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1
github.com/satori/go.uuid v1.2.0 // indirect
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/sercand/kuberesolver v2.1.0+incompatible // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/btree v0.0.0-20160524151835-7d79101e329e/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
Expand Down Expand Up @@ -495,6 +497,8 @@ github.com/prometheus/prometheus v0.0.0-20190818123050-43acd0e2e93f h1:7C9G4yUog
github.com/prometheus/prometheus v0.0.0-20190818123050-43acd0e2e93f/go.mod h1:rMTlmxGCvukf2KMu3fClMDKLLoJ5hl61MhcJ7xKakf0=
github.com/prometheus/prometheus v1.8.2-0.20190918104050-8744afdd1ea0 h1:W4dTblzSVIBNfDimJhh70OpZQQMwLVpwK50scXdH94w=
github.com/prometheus/prometheus v1.8.2-0.20190918104050-8744afdd1ea0/go.mod h1:elNqjVbwD3sCZJqKzyN7uEuwGcCpeJvv67D6BrHsDbw=
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1 h1:+kGqA4dNN5hn7WwvKdzHl0rdN5AEkbNZd0VjRltAiZg=
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rs/cors v1.6.0 h1:G9tHG9lebljV9mfp9SNPDL36nCDxmo3zTlAf1YgvzmI=
github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
Expand Down
16 changes: 16 additions & 0 deletions pkg/chunk/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"errors"
"flag"
"time"
)
Expand All @@ -28,6 +29,7 @@ type Config struct {
Background BackgroundConfig `yaml:"background,omitempty"`
Memcache MemcachedConfig `yaml:"memcached,omitempty"`
MemcacheClient MemcachedClientConfig `yaml:"memcached_client,omitempty"`
Redis RedisConfig `yaml:"redis,omitempty"`
Fifocache FifoCacheConfig `yaml:"fifocache,omitempty"`

// This is to name the cache metrics properly.
Expand All @@ -42,6 +44,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f
cfg.Background.RegisterFlagsWithPrefix(prefix, description, f)
cfg.Memcache.RegisterFlagsWithPrefix(prefix, description, f)
cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f)
cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f)
cfg.Fifocache.RegisterFlagsWithPrefix(prefix, description, f)

f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache.")
Expand All @@ -67,6 +70,10 @@ func New(cfg Config) (Cache, error) {
caches = append(caches, Instrument(cfg.Prefix+"fifocache", cache))
}

if cfg.MemcacheClient.Host != "" && cfg.Redis.Endpoint != "" {
return nil, errors.New("use of multiple cache storage systems is not supported")
}

if cfg.MemcacheClient.Host != "" {
if cfg.Memcache.Expiration == 0 && cfg.DefaultValidity != 0 {
cfg.Memcache.Expiration = cfg.DefaultValidity
Expand All @@ -79,6 +86,15 @@ func New(cfg Config) (Cache, error) {
caches = append(caches, NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache)))
}

if cfg.Redis.Endpoint != "" {
if cfg.Redis.Expiration == 0 && cfg.DefaultValidity != 0 {
cfg.Redis.Expiration = cfg.DefaultValidity
}
cacheName := cfg.Prefix + "redis"
cache := NewRedisCache(cfg.Redis, cacheName, nil)
caches = append(caches, NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache)))
}

cache := NewTiered(caches)
if len(caches) > 1 {
cache = Instrument(cfg.Prefix+"tiered", cache)
Expand Down
251 changes: 251 additions & 0 deletions pkg/chunk/cache/redis_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package cache

import (
"context"
"errors"
"flag"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/gomodule/redigo/redis"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
instr "github.com/weaveworks/common/instrument"
)

// RedisClient interface exists for mocking redisClient.
type RedisClient interface {
Connection() redis.Conn
Close() error
}

// RedisCache type caches chunks in redis
type RedisCache struct {
name string
expiration int
timeout time.Duration
client RedisClient
requestDuration observableVecCollector
}

// RedisConfig defines how a RedisCache should be constructed.
type RedisConfig struct {
Endpoint string `yaml:"endpoint,omitempty"`
Timeout time.Duration `yaml:"timeout,omitempty"`
Expiration time.Duration `yaml:"expiration,omitempty"`
MaxIdleConns int `yaml:"max_idle_conns,omitempty"`
MaxActiveConns int `yaml:"max_active_conns,omitempty"`
}

type redisClient struct {
pool *redis.Pool
}

var (
redisRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "redis_request_duration_seconds",
Help: "Total time spent in seconds doing redis requests.",
Buckets: prometheus.ExponentialBuckets(0.00025, 4, 6),
}, []string{"method", "status_code", "name"})

errRedisQueryTimeout = errors.New("redis query timeout")
)

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *RedisConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.StringVar(&cfg.Endpoint, prefix+"redis.endpoint", "", description+"Redis service endpoint to use when caching chunks. If empty, no redis will be used.")
f.DurationVar(&cfg.Timeout, prefix+"redis.timeout", 100*time.Millisecond, description+"Maximum time to wait before giving up on redis requests.")
f.DurationVar(&cfg.Expiration, prefix+"redis.expiration", 0, description+"How long keys stay in the redis.")
f.IntVar(&cfg.MaxIdleConns, prefix+"redis.max-idle-conns", 80, description+"Maximum number of idle connections in pool.")
f.IntVar(&cfg.MaxActiveConns, prefix+"redis.max-active-conns", 0, description+"Maximum number of active connections in pool.")
}

// NewRedisCache creates a new RedisCache
func NewRedisCache(cfg RedisConfig, name string, client RedisClient) *RedisCache {
// client != nil in unit tests
if client == nil {
client = &redisClient{
pool: &redis.Pool{
MaxIdle: cfg.MaxIdleConns,
MaxActive: cfg.MaxActiveConns,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", cfg.Endpoint)
if err != nil {
return nil, err
}
return c, err
},
},
}
}

cache := &RedisCache{
expiration: int(cfg.Expiration.Seconds()),
timeout: cfg.Timeout,
name: name,
client: client,
requestDuration: observableVecCollector{
v: redisRequestDuration.MustCurryWith(prometheus.Labels{
"name": name,
}),
},
}

if err := cache.ping(context.Background()); err != nil {
level.Error(util.Logger).Log("msg", "error connecting to redis", "endpoint", cfg.Endpoint, "err", err)
}

return cache
}

// Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested.
func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) {
var data [][]byte
err := instr.CollectedRequest(ctx, "fetch", c.requestDuration, redisStatusCode, func(ctx context.Context) (err error) {
data, err = c.mget(ctx, keys)
return err
})
if err != nil {
level.Error(util.Logger).Log("msg", "failed to get from redis", "name", c.name, "err", err)
missed = make([]string, len(keys))
copy(missed, keys)
return
}
for i, key := range keys {
if data[i] != nil {
found = append(found, key)
bufs = append(bufs, data[i])
} else {
missed = append(missed, key)
}
}
return
}

// Store stores the key in the cache.
func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte) {
err := instr.CollectedRequest(ctx, "store", c.requestDuration, redisStatusCode, func(ctx context.Context) error {
return c.mset(ctx, keys, bufs, c.expiration)
})
if err != nil {
level.Error(util.Logger).Log("msg", "failed to put to redis", "name", c.name, "err", err)
}
}

// Stop stops the redis client.
func (c *RedisCache) Stop() error {
return c.client.Close()
}

// mset adds key-value pairs to the cache.
func (c *RedisCache) mset(ctx context.Context, keys []string, bufs [][]byte, ttl int) error {
res := make(chan error, 1)

conn := c.client.Connection()
defer conn.Close()

ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

go func() {
var err error
defer func() { res <- err }()

if err = conn.Send("MULTI"); err != nil {
return
}
for i := range keys {
if err = conn.Send("SETEX", keys[i], ttl, bufs[i]); err != nil {
return
}
}
_, err = conn.Do("EXEC")
}()

select {
case err := <-res:
return err
case <-ctx.Done():
return errRedisQueryTimeout
}
}

type mgetResult struct {
bufs [][]byte
err error
}

// mget retrieves values from the cache.
func (c *RedisCache) mget(ctx context.Context, keys []string) ([][]byte, error) {
intf := make([]interface{}, len(keys))
for i, key := range keys {
intf[i] = key
}
res := make(chan *mgetResult, 1)

ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

conn := c.client.Connection()
defer conn.Close()

go func() {
bufs, err := redis.ByteSlices(conn.Do("MGET", intf...))
res <- &mgetResult{bufs: bufs, err: err}
}()

select {
case dat := <-res:
return dat.bufs, dat.err
case <-ctx.Done():
return nil, errRedisQueryTimeout
}
}

func (c *RedisCache) ping(ctx context.Context) error {
res := make(chan error, 1)

conn := c.client.Connection()
defer conn.Close()

ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

go func() {
pong, err := conn.Do("PING")
if err == nil {
_, err = redis.String(pong, err)
}
res <- err
}()

select {
case err := <-res:
return err
case <-ctx.Done():
return errRedisQueryTimeout
}
}

func redisStatusCode(err error) string {
switch err {
case nil:
return "200"
case redis.ErrNil:
return "404"
default:
return "500"
}
}

// Connection returns redis Connection object.
func (c *redisClient) Connection() redis.Conn {
return c.pool.Get()
}

// Close cleans up redis client.
func (c *redisClient) Close() error {
return c.pool.Close()
}
Loading