-
Notifications
You must be signed in to change notification settings - Fork 816
Add Redis support #1612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Add Redis support #1612
Changes from 36 commits
Commits
Show all changes
37 commits
Select commit
Hold shift + click to select a range
e505d30
adding support for Redis cache
54ffdb8
fix lint errors
97b2824
minor bugfix
3b57fc2
proper use of pool connections
bbaf6cc
fix lint errors
2dc96ec
addressed comments
c7d3d89
Merge branch 'upstream-master' into ds-redis-cache
b0b90de
update unit tests
a459387
reuse context in request timeout
e94a4e0
use correct function names in logs
ddac452
use common config for cache storage
2c8fa9e
master rebase
03e30de
added missing module dependency
6ec6c4d
delete extra newline
b9cc19b
optimize redis SET
efc0f0e
adding support for Redis cache
52ded0f
fix lint errors
b25f1e3
minor bugfix
e74f9d3
proper use of pool connections
1c1b925
fix lint errors
121322a
addressed comments
724399c
update unit tests
1c1f1c4
reuse context in request timeout
284cba0
use correct function names in logs
4d4a29b
use common config for cache storage
476bd2a
added missing module dependency
d0b5b85
delete extra newline
9a60e34
optimize redis SET
4edda24
undo common config for storage systems
2549dfe
Merge branch 'ds-redis-cache' of github.com:dmitsh/cortex into ds-red…
2ef6395
Merge branch 'upstream-master' into ds-redis-cache
4d03706
restore changes in CHANGELOG.md
7de661c
fixed lint error
11dc574
fixed mod-check error
9b2abaa
fix metric labels
677bf6c
resolved git conflict
f51f788
addressed comments
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,251 @@ | ||
package cache | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"flag" | ||
"time" | ||
|
||
"github.com/cortexproject/cortex/pkg/util" | ||
"github.com/go-kit/kit/log/level" | ||
"github.com/gomodule/redigo/redis" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
instr "github.com/weaveworks/common/instrument" | ||
) | ||
|
||
// RedisClient interface exists for mocking redisClient. | ||
type RedisClient interface { | ||
Connection() redis.Conn | ||
Close() error | ||
} | ||
|
||
// RedisCache type caches chunks in redis | ||
type RedisCache struct { | ||
name string | ||
expiration int | ||
timeout time.Duration | ||
client RedisClient | ||
requestDuration observableVecCollector | ||
} | ||
|
||
// RedisConfig defines how a RedisCache should be constructed. | ||
type RedisConfig struct { | ||
Endpoint string `yaml:"endpoint,omitempty"` | ||
Timeout time.Duration `yaml:"timeout,omitempty"` | ||
Expiration time.Duration `yaml:"expiration,omitempty"` | ||
MaxIdleConns int `yaml:"max_idle_conns,omitempty"` | ||
MaxActiveConns int `yaml:"max_active_conns,omitempty"` | ||
} | ||
|
||
type redisClient struct { | ||
pool *redis.Pool | ||
} | ||
|
||
var ( | ||
redisRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ | ||
Namespace: "cortex", | ||
Name: "redis_request_duration_seconds", | ||
Help: "Total time spent in seconds doing redis requests.", | ||
Buckets: prometheus.ExponentialBuckets(0.00025, 4, 6), | ||
}, []string{"method", "status_code", "name"}) | ||
|
||
errRedisQueryTimeout = errors.New("redis query timeout") | ||
) | ||
|
||
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet | ||
func (cfg *RedisConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { | ||
f.StringVar(&cfg.Endpoint, prefix+"redis.endpoint", "", description+"Redis service endpoint to use when caching chunks. If empty, no redis will be used.") | ||
f.DurationVar(&cfg.Timeout, prefix+"redis.timeout", 100*time.Millisecond, description+"Maximum time to wait before giving up on redis requests.") | ||
f.DurationVar(&cfg.Expiration, prefix+"redis.expiration", 0, description+"How long keys stay in the redis.") | ||
f.IntVar(&cfg.MaxIdleConns, prefix+"redis.max-idle-conns", 80, description+"Maximum number of idle connections in pool.") | ||
f.IntVar(&cfg.MaxActiveConns, prefix+"redis.max-active-conns", 0, description+"Maximum number of active connections in pool.") | ||
} | ||
|
||
// NewRedisCache creates a new RedisCache | ||
func NewRedisCache(cfg RedisConfig, name string, client RedisClient) *RedisCache { | ||
// client != nil in unit tests | ||
if client == nil { | ||
client = &redisClient{ | ||
pool: &redis.Pool{ | ||
MaxIdle: cfg.MaxIdleConns, | ||
MaxActive: cfg.MaxActiveConns, | ||
Dial: func() (redis.Conn, error) { | ||
c, err := redis.Dial("tcp", cfg.Endpoint) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return c, err | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
cache := &RedisCache{ | ||
expiration: int(cfg.Expiration.Seconds()), | ||
timeout: cfg.Timeout, | ||
name: name, | ||
client: client, | ||
requestDuration: observableVecCollector{ | ||
v: redisRequestDuration.MustCurryWith(prometheus.Labels{ | ||
"name": name, | ||
}), | ||
}, | ||
} | ||
|
||
if err := cache.ping(context.Background()); err != nil { | ||
dmitsh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
level.Error(util.Logger).Log("msg", "error connecting to redis", "endpoint", cfg.Endpoint, "err", err) | ||
} | ||
|
||
return cache | ||
} | ||
|
||
// Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested. | ||
func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { | ||
var data [][]byte | ||
err := instr.CollectedRequest(ctx, "fetch", c.requestDuration, redisStatusCode, func(ctx context.Context) (err error) { | ||
data, err = c.mget(ctx, keys) | ||
return err | ||
}) | ||
if err != nil { | ||
level.Error(util.Logger).Log("msg", "failed to get from redis", "name", c.name, "err", err) | ||
missed = make([]string, len(keys)) | ||
copy(missed, keys) | ||
return | ||
} | ||
for i, key := range keys { | ||
if data[i] != nil { | ||
found = append(found, key) | ||
bufs = append(bufs, data[i]) | ||
} else { | ||
missed = append(missed, key) | ||
} | ||
} | ||
return | ||
} | ||
|
||
// Store stores the key in the cache. | ||
func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte) { | ||
err := instr.CollectedRequest(ctx, "store", c.requestDuration, redisStatusCode, func(ctx context.Context) error { | ||
dmitsh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return c.mset(ctx, keys, bufs, c.expiration) | ||
}) | ||
if err != nil { | ||
level.Error(util.Logger).Log("msg", "failed to put to redis", "name", c.name, "err", err) | ||
} | ||
} | ||
|
||
// Stop stops the redis client. | ||
func (c *RedisCache) Stop() error { | ||
return c.client.Close() | ||
} | ||
|
||
// mset adds key-value pairs to the cache. | ||
func (c *RedisCache) mset(ctx context.Context, keys []string, bufs [][]byte, ttl int) error { | ||
res := make(chan error, 1) | ||
|
||
conn := c.client.Connection() | ||
defer conn.Close() | ||
|
||
ctx, cancel := context.WithTimeout(ctx, c.timeout) | ||
dmitsh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
defer cancel() | ||
|
||
go func() { | ||
dmitsh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var err error | ||
defer func() { res <- err }() | ||
|
||
if err = conn.Send("MULTI"); err != nil { | ||
return | ||
} | ||
for i := range keys { | ||
if err = conn.Send("SETEX", keys[i], ttl, bufs[i]); err != nil { | ||
return | ||
} | ||
} | ||
_, err = conn.Do("EXEC") | ||
}() | ||
|
||
select { | ||
case err := <-res: | ||
return err | ||
case <-ctx.Done(): | ||
return errRedisQueryTimeout | ||
dmitsh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
type mgetResult struct { | ||
bufs [][]byte | ||
err error | ||
} | ||
|
||
// mget retrieves values from the cache. | ||
func (c *RedisCache) mget(ctx context.Context, keys []string) ([][]byte, error) { | ||
intf := make([]interface{}, len(keys)) | ||
for i, key := range keys { | ||
intf[i] = key | ||
} | ||
res := make(chan *mgetResult, 1) | ||
|
||
ctx, cancel := context.WithTimeout(ctx, c.timeout) | ||
defer cancel() | ||
|
||
conn := c.client.Connection() | ||
defer conn.Close() | ||
|
||
go func() { | ||
bufs, err := redis.ByteSlices(conn.Do("MGET", intf...)) | ||
res <- &mgetResult{bufs: bufs, err: err} | ||
}() | ||
|
||
select { | ||
case dat := <-res: | ||
return dat.bufs, dat.err | ||
case <-ctx.Done(): | ||
return nil, errRedisQueryTimeout | ||
dmitsh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
func (c *RedisCache) ping(ctx context.Context) error { | ||
res := make(chan error, 1) | ||
|
||
conn := c.client.Connection() | ||
defer conn.Close() | ||
|
||
ctx, cancel := context.WithTimeout(ctx, c.timeout) | ||
defer cancel() | ||
|
||
go func() { | ||
pong, err := conn.Do("PING") | ||
if err == nil { | ||
_, err = redis.String(pong, err) | ||
} | ||
res <- err | ||
}() | ||
|
||
select { | ||
case err := <-res: | ||
return err | ||
case <-ctx.Done(): | ||
return errRedisQueryTimeout | ||
dmitsh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
func redisStatusCode(err error) string { | ||
switch err { | ||
case nil: | ||
return "200" | ||
case redis.ErrNil: | ||
return "404" | ||
default: | ||
return "500" | ||
} | ||
} | ||
|
||
// Connection returns redis Connection object. | ||
func (c *redisClient) Connection() redis.Conn { | ||
return c.pool.Get() | ||
} | ||
|
||
// Close cleans up redis client. | ||
func (c *redisClient) Close() error { | ||
return c.pool.Close() | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.