-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
redis_client.go
346 lines (289 loc) · 10.7 KB
/
redis_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package cacheutil
import (
"context"
"crypto/tls"
"net"
"strings"
"time"
"unsafe"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/redis/rueidis"
"gopkg.in/yaml.v3"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
thanos_tls "github.com/thanos-io/thanos/pkg/tls"
)
var (
// DefaultRedisClientConfig is default redis config.
DefaultRedisClientConfig = RedisClientConfig{
DialTimeout: time.Second * 5,
ReadTimeout: time.Second * 3,
WriteTimeout: time.Second * 3,
MaxGetMultiConcurrency: 100,
GetMultiBatchSize: 100,
MaxSetMultiConcurrency: 100,
SetMultiBatchSize: 100,
TLSEnabled: false,
TLSConfig: TLSConfig{},
MaxAsyncConcurrency: 20,
MaxAsyncBufferSize: 10000,
SetAsyncCircuitBreaker: defaultCircuitBreakerConfig,
}
)
// TLSConfig configures TLS connections.
type TLSConfig struct {
// The CA cert to use for the targets.
CAFile string `yaml:"ca_file"`
// The client cert file for the targets.
CertFile string `yaml:"cert_file"`
// The client key file for the targets.
KeyFile string `yaml:"key_file"`
// Used to verify the hostname for the targets. See https://tools.ietf.org/html/rfc4366#section-3.1
ServerName string `yaml:"server_name"`
// Disable target certificate validation.
InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
}
// RedisClientConfig is the config accepted by RedisClient.
type RedisClientConfig struct {
// Addr specifies the addresses of redis server.
Addr string `yaml:"addr"`
// Use the specified Username to authenticate the current connection
// with one of the connections defined in the ACL list when connecting
// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
Username string `yaml:"username"`
// Optional password. Must match the password specified in the
// requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower),
// or the User Password when connecting to a Redis 6.0 instance, or greater,
// that is using the Redis ACL system.
Password string `yaml:"password"`
// DB Database to be selected after connecting to the server.
DB int `yaml:"db"`
// DialTimeout specifies the client dial timeout.
DialTimeout time.Duration `yaml:"dial_timeout"`
// ReadTimeout specifies the client read timeout.
ReadTimeout time.Duration `yaml:"read_timeout"`
// WriteTimeout specifies the client write timeout.
WriteTimeout time.Duration `yaml:"write_timeout"`
// MaxGetMultiConcurrency specifies the maximum number of concurrent GetMulti() operations.
// If set to 0, concurrency is unlimited.
MaxGetMultiConcurrency int `yaml:"max_get_multi_concurrency"`
// GetMultiBatchSize specifies the maximum size per batch for mget.
GetMultiBatchSize int `yaml:"get_multi_batch_size"`
// MaxSetMultiConcurrency specifies the maximum number of concurrent SetMulti() operations.
// If set to 0, concurrency is unlimited.
MaxSetMultiConcurrency int `yaml:"max_set_multi_concurrency"`
// SetMultiBatchSize specifies the maximum size per batch for pipeline set.
SetMultiBatchSize int `yaml:"set_multi_batch_size"`
// TLSEnabled enable tls for redis connection.
TLSEnabled bool `yaml:"tls_enabled"`
// TLSConfig to use to connect to the redis server.
TLSConfig TLSConfig `yaml:"tls_config"`
// If not zero then client-side caching is enabled.
// Client-side caching is when data is stored in memory
// instead of fetching data each time.
// See https://redis.io/docs/manual/client-side-caching/ for info.
CacheSize model.Bytes `yaml:"cache_size"`
// MasterName specifies the master's name. Must be not empty
// for Redis Sentinel.
MasterName string `yaml:"master_name"`
// MaxAsyncBufferSize specifies the queue buffer size for SetAsync operations.
MaxAsyncBufferSize int `yaml:"max_async_buffer_size"`
// MaxAsyncConcurrency specifies the maximum number of SetAsync goroutines.
MaxAsyncConcurrency int `yaml:"max_async_concurrency"`
// SetAsyncCircuitBreaker configures the circuit breaker for SetAsync operations.
SetAsyncCircuitBreaker CircuitBreakerConfig `yaml:"set_async_circuit_breaker_config"`
}
func (c *RedisClientConfig) validate() error {
if c.Addr == "" {
return errors.New("no redis addr provided")
}
if c.TLSEnabled {
if (c.TLSConfig.CertFile != "") != (c.TLSConfig.KeyFile != "") {
return errors.New("both client key and certificate must be provided")
}
}
if err := c.SetAsyncCircuitBreaker.validate(); err != nil {
return err
}
return nil
}
type RedisClient struct {
client rueidis.Client
config RedisClientConfig
// getMultiGate used to enforce the max number of concurrent GetMulti() operations.
getMultiGate gate.Gate
// setMultiGate used to enforce the max number of concurrent SetMulti() operations.
setMultiGate gate.Gate
logger log.Logger
durationSet prometheus.Observer
durationSetMulti prometheus.Observer
durationGetMulti prometheus.Observer
p *AsyncOperationProcessor
setAsyncCircuitBreaker CircuitBreaker
}
// NewRedisClient makes a new RedisClient.
func NewRedisClient(logger log.Logger, name string, conf []byte, reg prometheus.Registerer) (*RedisClient, error) {
config, err := parseRedisClientConfig(conf)
if err != nil {
return nil, err
}
return NewRedisClientWithConfig(logger, name, config, reg)
}
// NewRedisClientWithConfig makes a new RedisClient.
func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClientConfig,
reg prometheus.Registerer) (*RedisClient, error) {
if err := config.validate(); err != nil {
return nil, err
}
if reg != nil {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"name": name}, reg)
}
var tlsConfig *tls.Config
if config.TLSEnabled {
userTLSConfig := config.TLSConfig
tlsClientConfig, err := thanos_tls.NewClientConfig(logger, userTLSConfig.CertFile, userTLSConfig.KeyFile,
userTLSConfig.CAFile, userTLSConfig.ServerName, userTLSConfig.InsecureSkipVerify)
if err != nil {
return nil, err
}
tlsConfig = tlsClientConfig
}
clientSideCacheDisabled := false
if config.CacheSize == 0 {
clientSideCacheDisabled = true
}
clientOpts := rueidis.ClientOption{
InitAddress: strings.Split(config.Addr, ","),
ShuffleInit: true,
Username: config.Username,
Password: config.Password,
SelectDB: config.DB,
CacheSizeEachConn: int(config.CacheSize),
Dialer: net.Dialer{Timeout: config.DialTimeout},
ConnWriteTimeout: config.WriteTimeout,
DisableCache: clientSideCacheDisabled,
TLSConfig: tlsConfig,
}
if config.MasterName != "" {
clientOpts.Sentinel = rueidis.SentinelOption{
MasterSet: config.MasterName,
}
}
client, err := rueidis.NewClient(clientOpts)
if err != nil {
return nil, err
}
c := &RedisClient{
client: client,
config: config,
logger: logger,
p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency),
getMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_redis_getmulti_", reg),
config.MaxGetMultiConcurrency,
gate.Gets,
),
setMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_redis_setmulti_", reg),
config.MaxSetMultiConcurrency,
gate.Sets,
),
setAsyncCircuitBreaker: newCircuitBreaker("redis-set-async", config.SetAsyncCircuitBreaker),
}
duration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_redis_operation_duration_seconds",
Help: "Duration of operations against redis.",
Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 3, 6, 10},
}, []string{"operation"})
c.durationSet = duration.WithLabelValues(opSet)
c.durationSetMulti = duration.WithLabelValues(opSetMulti)
c.durationGetMulti = duration.WithLabelValues(opGetMulti)
return c, nil
}
// SetAsync implement RemoteCacheClient.
func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) error {
return c.p.EnqueueAsync(func() {
start := time.Now()
err := c.setAsyncCircuitBreaker.Execute(func() error {
return c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error()
})
if err != nil {
level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value))
return
}
c.durationSet.Observe(time.Since(start).Seconds())
})
}
// SetMulti set multiple keys and value.
func (c *RedisClient) SetMulti(data map[string][]byte, ttl time.Duration) {
if len(data) == 0 {
return
}
start := time.Now()
sets := make(rueidis.Commands, 0, len(data))
ittl := int64(ttl.Seconds())
for k, v := range data {
sets = append(sets, c.client.B().Setex().Key(k).Seconds(ittl).Value(rueidis.BinaryString(v)).Build())
}
for _, resp := range c.client.DoMulti(context.Background(), sets...) {
if err := resp.Error(); err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis", "err", err, "items", len(data))
return
}
}
c.durationSetMulti.Observe(time.Since(start).Seconds())
}
// GetMulti implement RemoteCacheClient.
func (c *RedisClient) GetMulti(ctx context.Context, keys []string) map[string][]byte {
if len(keys) == 0 {
return nil
}
start := time.Now()
results := make(map[string][]byte, len(keys))
if c.config.ReadTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, c.config.ReadTimeout)
defer cancel()
ctx = timeoutCtx
}
// NOTE(GiedriusS): TTL is the default one in case PTTL fails. 8 hours should be good enough IMHO.
resps, err := rueidis.MGetCache(c.client, ctx, 8*time.Hour, keys)
if err != nil {
level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(resps))
}
for key, resp := range resps {
if val, err := resp.ToString(); err == nil {
results[key] = stringToBytes(val)
}
}
c.durationGetMulti.Observe(time.Since(start).Seconds())
return results
}
// Stop implement RemoteCacheClient.
func (c *RedisClient) Stop() {
c.p.Stop()
c.client.Close()
}
// stringToBytes converts string to byte slice (copied from vendor/github.com/go-redis/redis/v8/internal/util/unsafe.go).
func stringToBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(
&struct {
string
Cap int
}{s, len(s)},
))
}
// parseRedisClientConfig unmarshals a buffer into a RedisClientConfig with default values.
func parseRedisClientConfig(conf []byte) (RedisClientConfig, error) {
config := DefaultRedisClientConfig
if err := yaml.Unmarshal(conf, &config); err != nil {
return RedisClientConfig{}, err
}
return config, nil
}