diff --git a/aws/storage_client.go b/aws/storage_client.go index 3bf4b168694f2..9353280ead53b 100644 --- a/aws/storage_client.go +++ b/aws/storage_client.go @@ -24,6 +24,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" awscommon "github.com/weaveworks/common/aws" "github.com/weaveworks/common/instrument" @@ -159,6 +160,18 @@ type storageClient struct { batchWriteItemRequestFn func(ctx context.Context, input *dynamodb.BatchWriteItemInput) dynamoDBRequest } +// Opts returns the chunk.StorageOpt's for the config. +func Opts(cfg StorageConfig, schemaCfg chunk.SchemaConfig) ([]chunk.StorageOpt, error) { + client, err := NewStorageClient(cfg, schemaCfg) + if err != nil { + return nil, err + } + return []chunk.StorageOpt{{ + From: model.Time(0), + Client: client, + }}, err +} + // NewStorageClient makes a new AWS-backed StorageClient. func NewStorageClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { dynamoDB, err := dynamoClientFromURL(cfg.DynamoDB.URL) diff --git a/cache/background.go b/cache/background.go index 2560cc49a7e46..c41ee5f18e77f 100644 --- a/cache/background.go +++ b/cache/background.go @@ -77,8 +77,8 @@ func (c *backgroundCache) Stop() error { return c.Cache.Stop() } -// StoreChunk writes chunks for the cache in the background. -func (c *backgroundCache) StoreChunk(ctx context.Context, key string, buf []byte) error { +// Store writes keys for the cache in the background. +func (c *backgroundCache) Store(ctx context.Context, key string, buf []byte) error { bgWrite := backgroundWrite{ key: key, buf: buf, @@ -102,7 +102,7 @@ func (c *backgroundCache) writeBackLoop() { return } queueLength.Dec() - err := c.Cache.StoreChunk(context.Background(), bgWrite.key, bgWrite.buf) + err := c.Cache.Store(context.Background(), bgWrite.key, bgWrite.buf) if err != nil { level.Error(util.Logger).Log("msg", "error writing to memcache", "err", err) } diff --git a/cache/background_test.go b/cache/background_test.go index ebf540abd755c..2276c764ad4e9 100644 --- a/cache/background_test.go +++ b/cache/background_test.go @@ -13,14 +13,14 @@ type mockCache struct { cache map[string][]byte } -func (m *mockCache) StoreChunk(_ context.Context, key string, buf []byte) error { +func (m *mockCache) Store(_ context.Context, key string, buf []byte) error { m.Lock() defer m.Unlock() m.cache[key] = buf return nil } -func (m *mockCache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) { +func (m *mockCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) { m.Lock() defer m.Unlock() for _, key := range keys { diff --git a/cache/cache.go b/cache/cache.go index 440f913f13a8d..769555e2ea626 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -7,8 +7,8 @@ import ( // Cache byte arrays by key. type Cache interface { - StoreChunk(ctx context.Context, key string, buf []byte) error - FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) + Store(ctx context.Context, key string, buf []byte) error + Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) Stop() error } @@ -48,18 +48,18 @@ func New(cfg Config) (Cache, error) { if err != nil { return nil, err } - caches = append(caches, instrument("diskcache", cache)) + caches = append(caches, Instrument("diskcache", cache)) } if cfg.memcacheClient.Host != "" { - client := newMemcachedClient(cfg.memcacheClient) + client := NewMemcachedClient(cfg.memcacheClient) cache := NewMemcached(cfg.memcache, client) - caches = append(caches, instrument("memcache", cache)) + caches = append(caches, Instrument("memcache", cache)) } - var cache Cache = tiered(caches) + cache := NewTiered(caches) if len(caches) > 1 { - cache = instrument("tiered", cache) + cache = Instrument("tiered", cache) } cache = NewBackground(cfg.background, cache) diff --git a/cache/cache_test.go b/cache/cache_test.go index 9da25e2af0037..a22fcb56b17e2 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -46,7 +46,7 @@ func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) { require.NoError(t, err) key := c.ExternalKey() - err = cache.StoreChunk(context.Background(), key, buf) + err = cache.Store(context.Background(), key, buf) require.NoError(t, err) keys = append(keys, key) @@ -61,7 +61,7 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch index := rand.Intn(len(keys)) key := keys[index] - found, bufs, missingKeys, err := cache.FetchChunkData(context.Background(), []string{key}) + found, bufs, missingKeys, err := cache.Fetch(context.Background(), []string{key}) require.NoError(t, err) require.Len(t, found, 1) require.Len(t, bufs, 1) @@ -77,7 +77,7 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []chunk.Chunk) { // test getting them all - found, bufs, missingKeys, err := cache.FetchChunkData(context.Background(), keys) + found, bufs, missingKeys, err := cache.Fetch(context.Background(), keys) require.NoError(t, err) require.Len(t, found, len(keys)) require.Len(t, bufs, len(keys)) @@ -117,7 +117,7 @@ func (a byExternalKey) Less(i, j int) bool { return a[i].ExternalKey() < a[j].Ex func testCacheMiss(t *testing.T, cache cache.Cache) { for i := 0; i < 100; i++ { key := strconv.Itoa(rand.Int()) - found, bufs, missing, err := cache.FetchChunkData(context.Background(), []string{key}) + found, bufs, missing, err := cache.Fetch(context.Background(), []string{key}) require.NoError(t, err) require.Empty(t, found) require.Empty(t, bufs) diff --git a/cache/diskcache.go b/cache/diskcache.go index d32683aa40ae6..0a2ae705b4c9b 100644 --- a/cache/diskcache.go +++ b/cache/diskcache.go @@ -78,8 +78,8 @@ func (d *Diskcache) Stop() error { return d.f.Close() } -// FetchChunkData get chunks from the cache. -func (d *Diskcache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { +// Fetch get chunks from the cache. +func (d *Diskcache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { for _, key := range keys { buf, ok := d.fetch(key) if ok { @@ -114,8 +114,8 @@ func (d *Diskcache) fetch(key string) ([]byte, bool) { return result, true } -// StoreChunk puts a chunk into the cache. -func (d *Diskcache) StoreChunk(ctx context.Context, key string, value []byte) error { +// Store puts a chunk into the cache. +func (d *Diskcache) Store(ctx context.Context, key string, value []byte) error { d.mtx.Lock() defer d.mtx.Unlock() diff --git a/cache/fifo_cache.go b/cache/fifo_cache.go index 1eb5099ece8d6..a5a2f8218f7fc 100644 --- a/cache/fifo_cache.go +++ b/cache/fifo_cache.go @@ -101,6 +101,35 @@ func NewFifoCache(name string, size int, validity time.Duration) *FifoCache { } } +// Store implements Cache. +func (c *FifoCache) Store(ctx context.Context, key string, buf []byte) error { + c.Put(ctx, key, buf) + + return nil +} + +// Fetch implements Cache. +func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) { + found, missing, bufs = make([]string, 0, len(keys)), make([]string, 0, len(keys)), make([][]byte, 0, len(keys)) + for _, key := range keys { + val, ok := c.Get(ctx, key) + if !ok { + missing = append(missing, key) + continue + } + + found = append(found, key) + bufs = append(bufs, val.([]byte)) + } + + return +} + +// Stop implements Cache. +func (c *FifoCache) Stop() error { + return nil +} + // Put stores the value against the key. func (c *FifoCache) Put(ctx context.Context, key string, value interface{}) { span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-put") diff --git a/cache/instrumented.go b/cache/instrumented.go index 2b582d2f0b36c..fd075d1946a16 100644 --- a/cache/instrumented.go +++ b/cache/instrumented.go @@ -2,6 +2,7 @@ package cache import ( "context" + "time" ot "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" @@ -21,13 +22,13 @@ var ( fetchedKeys = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "cache_fetched_keys", - Help: "Total count of chunks requested from cache.", + Help: "Total count of keys requested from cache.", }, []string{"name"}) hits = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "cache_hits", - Help: "Total count of chunks found in cache.", + Help: "Total count of keys found in cache.", }, []string{"name"}) ) @@ -37,7 +38,19 @@ func init() { prometheus.MustRegister(hits) } -func instrument(name string, cache Cache) Cache { +// Instrument returns an instrumented cache. +func Instrument(name string, cache Cache) Cache { + return &instrumentedCache{ + name: name, + fetchedKeys: fetchedKeys.WithLabelValues(name), + hits: hits.WithLabelValues(name), + trace: true, + Cache: cache, + } +} + +// MetricsInstrument returns an instrumented cache that only tracks metrics and not traces. +func MetricsInstrument(name string, cache Cache) Cache { return &instrumentedCache{ name: name, fetchedKeys: fetchedKeys.WithLabelValues(name), @@ -49,36 +62,58 @@ func instrument(name string, cache Cache) Cache { type instrumentedCache struct { name string fetchedKeys, hits prometheus.Counter + trace bool Cache } -func (i *instrumentedCache) StoreChunk(ctx context.Context, key string, buf []byte) error { - return instr.TimeRequestHistogram(ctx, i.name+".store", requestDuration, func(ctx context.Context) error { - return i.Cache.StoreChunk(ctx, key, buf) +func (i *instrumentedCache) Store(ctx context.Context, key string, buf []byte) error { + method := i.name + ".store" + if i.trace { + return instr.TimeRequestHistogram(ctx, method, requestDuration, func(ctx context.Context) error { + sp := ot.SpanFromContext(ctx) + sp.LogFields(otlog.String("key", key)) + + return i.Cache.Store(ctx, key, buf) + }) + } + + return UntracedCollectedRequest(ctx, method, instr.NewHistogramCollector(requestDuration), instr.ErrorCode, func(ctx context.Context) error { + return i.Cache.Store(ctx, key, buf) }) } -func (i *instrumentedCache) FetchChunkData(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { +func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { var ( found []string bufs [][]byte missing []string + err error + method = i.name + ".fetch" ) - err := instr.TimeRequestHistogram(ctx, i.name+".fetch", requestDuration, func(ctx context.Context) error { - sp := ot.SpanFromContext(ctx) - sp.LogFields(otlog.Int("chunks requested", len(keys))) - var err error - found, bufs, missing, err = i.Cache.FetchChunkData(ctx, keys) + if i.trace { + err = instr.TimeRequestHistogram(ctx, method, requestDuration, func(ctx context.Context) error { + sp := ot.SpanFromContext(ctx) + sp.LogFields(otlog.Int("keys requested", len(keys))) - if err == nil { - sp.LogFields(otlog.Int("chunks found", len(found)), otlog.Int("chunks missing", len(keys)-len(found))) - } else { - sp.LogFields(otlog.Error(err)) - } + var err error + found, bufs, missing, err = i.Cache.Fetch(ctx, keys) + + if err == nil { + sp.LogFields(otlog.Int("keys found", len(found)), otlog.Int("keys missing", len(keys)-len(found))) + } + + return err + }) + } else { + err = UntracedCollectedRequest(ctx, method, instr.NewHistogramCollector(requestDuration), instr.ErrorCode, func(ctx context.Context) error { + var err error + found, bufs, missing, err = i.Cache.Fetch(ctx, keys) + + return err + }) + } - return err - }) i.fetchedKeys.Add(float64(len(keys))) i.hits.Add(float64(len(found))) return found, bufs, missing, err @@ -87,3 +122,13 @@ func (i *instrumentedCache) FetchChunkData(ctx context.Context, keys []string) ( func (i *instrumentedCache) Stop() error { return i.Cache.Stop() } + +// UntracedCollectedRequest is the same as instr.CollectedRequest but without any tracing. +func UntracedCollectedRequest(ctx context.Context, method string, col instr.Collector, toStatusCode func(error) string, f func(context.Context) error) error { + start := time.Now() + col.Before(method, start) + err := f(ctx) + col.After(method, toStatusCode(err), start) + + return err +} diff --git a/cache/memcached.go b/cache/memcached.go index 7cfb33d4c7140..4e477123826ec 100644 --- a/cache/memcached.go +++ b/cache/memcached.go @@ -63,8 +63,8 @@ func memcacheStatusCode(err error) string { } } -// FetchChunkData gets chunks from the chunk cache. -func (c *Memcached) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { +// Fetch gets keys from the cache. +func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { var items map[string]*memcache.Item err = instr.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { var err error @@ -86,8 +86,8 @@ func (c *Memcached) FetchChunkData(ctx context.Context, keys []string) (found [] return } -// StoreChunk serializes and stores a chunk in the chunk cache. -func (c *Memcached) StoreChunk(ctx context.Context, key string, buf []byte) error { +// Store stores the key in the cache. +func (c *Memcached) Store(ctx context.Context, key string, buf []byte) error { return instr.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { item := memcache.Item{ Key: key, diff --git a/cache/memcached_client.go b/cache/memcached_client.go index 43a52fffab348..eec25e5529681 100644 --- a/cache/memcached_client.go +++ b/cache/memcached_client.go @@ -41,15 +41,28 @@ type MemcachedClientConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *MemcachedClientConfig) RegisterFlags(f *flag.FlagSet) { - f.StringVar(&cfg.Host, "memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") - f.StringVar(&cfg.Service, "memcached.service", "memcached", "SRV service used to discover memcache servers.") - f.DurationVar(&cfg.Timeout, "memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.") - f.DurationVar(&cfg.UpdateInterval, "memcached.update-interval", 1*time.Minute, "Period with which to poll DNS for memcache servers.") + cfg.registerFlagsWithPrefix("", f) } -// newMemcachedClient creates a new MemcacheClient that gets its server list +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.registerFlagsWithPrefix(prefix, f) +} + +func (cfg *MemcachedClientConfig) registerFlagsWithPrefix(prefix string, f *flag.FlagSet) { + if prefix != "" { + prefix = prefix + "." + } + + f.StringVar(&cfg.Host, prefix+"memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") + f.StringVar(&cfg.Service, prefix+"memcached.service", "memcached", "SRV service used to discover memcache servers.") + f.DurationVar(&cfg.Timeout, prefix+"memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.") + f.DurationVar(&cfg.UpdateInterval, prefix+"memcached.update-interval", 1*time.Minute, "Period with which to poll DNS for memcache servers.") +} + +// NewMemcachedClient creates a new MemcacheClient that gets its server list // from SRV and updates the server list on a regular basis. -func newMemcachedClient(cfg MemcachedClientConfig) *memcachedClient { +func NewMemcachedClient(cfg MemcachedClientConfig) MemcachedClient { var servers memcache.ServerList client := memcache.NewFromSelector(&servers) client.Timeout = cfg.Timeout diff --git a/cache/tiered.go b/cache/tiered.go index 9151e3afa74e3..65854c096ed4b 100644 --- a/cache/tiered.go +++ b/cache/tiered.go @@ -6,19 +6,23 @@ type tiered []Cache // NewTiered makes a new tiered cache. func NewTiered(caches []Cache) Cache { + if len(caches) == 1 { + return caches[0] + } + return tiered(caches) } -func (t tiered) StoreChunk(ctx context.Context, key string, buf []byte) error { +func (t tiered) Store(ctx context.Context, key string, buf []byte) error { for _, c := range []Cache(t) { - if err := c.StoreChunk(ctx, key, buf); err != nil { + if err := c.Store(ctx, key, buf); err != nil { return err } } return nil } -func (t tiered) FetchChunkData(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { +func (t tiered) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { found := make(map[string][]byte, len(keys)) missing := keys previousCaches := make([]Cache, 0, len(t)) @@ -30,14 +34,18 @@ func (t tiered) FetchChunkData(ctx context.Context, keys []string) ([]string, [] passBufs [][]byte ) - passKeys, passBufs, missing, err = c.FetchChunkData(ctx, missing) + passKeys, passBufs, missing, err = c.Fetch(ctx, missing) if err != nil { return nil, nil, nil, err } for i, key := range passKeys { found[key] = passBufs[i] - tiered(previousCaches).StoreChunk(ctx, key, passBufs[i]) + tiered(previousCaches).Store(ctx, key, passBufs[i]) + } + + if len(missing) == 0 { + break } previousCaches = append(previousCaches, c) diff --git a/cache/tiered_test.go b/cache/tiered_test.go index 32cc3b6bc4672..0657cb3d12816 100644 --- a/cache/tiered_test.go +++ b/cache/tiered_test.go @@ -23,13 +23,13 @@ func TestTiered(t *testing.T) { level1, level2 := newMockCache(), newMockCache() cache := cache.NewTiered([]cache.Cache{level1, level2}) - err := level1.StoreChunk(context.Background(), "key1", []byte("hello")) + err := level1.Store(context.Background(), "key1", []byte("hello")) require.NoError(t, err) - err = level2.StoreChunk(context.Background(), "key2", []byte("world")) + err = level2.Store(context.Background(), "key2", []byte("world")) require.NoError(t, err) - keys, bufs, missing, err := cache.FetchChunkData(context.Background(), []string{"key1", "key2", "key3"}) + keys, bufs, missing, err := cache.Fetch(context.Background(), []string{"key1", "key2", "key3"}) require.NoError(t, err) require.Equal(t, []string{"key1", "key2"}, keys) require.Equal(t, [][]byte{[]byte("hello"), []byte("world")}, bufs) diff --git a/cassandra/storage_client.go b/cassandra/storage_client.go index 54d40be2f397f..88d443db8bf55 100644 --- a/cassandra/storage_client.go +++ b/cassandra/storage_client.go @@ -9,6 +9,7 @@ import ( "github.com/gocql/gocql" "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/weaveworks/cortex/pkg/chunk" ) @@ -121,6 +122,18 @@ type storageClient struct { session *gocql.Session } +// Opts returns the chunk.StorageOpt's for the config. +func Opts(cfg Config, schemaCfg chunk.SchemaConfig) ([]chunk.StorageOpt, error) { + client, err := NewStorageClient(cfg, schemaCfg) + if err != nil { + return nil, err + } + return []chunk.StorageOpt{{ + From: model.Time(0), + Client: client, + }}, err +} + // NewStorageClient returns a new StorageClient. func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { session, err := cfg.session() diff --git a/chunk_store_utils.go b/chunk_store_utils.go index 4edfa15c1af52..14f39aed504b6 100644 --- a/chunk_store_utils.go +++ b/chunk_store_utils.go @@ -143,7 +143,7 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string defer log.Span.Finish() // Now fetch the actual chunk data from Memcache / S3 - cacheHits, cacheBufs, _, err := c.cache.FetchChunkData(ctx, keys) + cacheHits, cacheBufs, _, err := c.cache.Fetch(ctx, keys) if err != nil { level.Warn(log).Log("msg", "error fetching from cache", "err", err) } @@ -177,7 +177,7 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error { if err != nil { return err } - if err := c.cache.StoreChunk(ctx, chunks[i].ExternalKey(), encoded); err != nil { + if err := c.cache.Store(ctx, chunks[i].ExternalKey(), encoded); err != nil { return err } } diff --git a/gcp/storage_client.go b/gcp/storage_client.go index 18b779c2e9f19..9135236acc752 100644 --- a/gcp/storage_client.go +++ b/gcp/storage_client.go @@ -10,6 +10,7 @@ import ( "cloud.google.com/go/bigtable" ot "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" + "github.com/prometheus/common/model" "github.com/pkg/errors" "github.com/weaveworks/cortex/pkg/chunk" @@ -37,6 +38,30 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.instance, "bigtable.instance", "", "Bigtable instance ID.") } +// Opts returns the chunk.StorageOpt's for the config. +func Opts(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) ([]chunk.StorageOpt, error) { + client, err := NewStorageClientV1(ctx, cfg, schemaCfg) + if err != nil { + return nil, err + } + + opts := []chunk.StorageOpt{} + opts = append(opts, chunk.StorageOpt{From: model.Time(0), Client: client}) + if schemaCfg.BigtableColumnKeyFrom.IsSet() { + client, err = NewStorageClientColumnKey(context.Background(), cfg, schemaCfg) + if err != nil { + return nil, errors.Wrap(err, "error creating storage client") + } + + opts = append(opts, chunk.StorageOpt{ + From: schemaCfg.BigtableColumnKeyFrom.Time, + Client: client, + }) + } + + return opts, nil +} + // NewStorageClient returns a new StorageClient. func NewStorageClient(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { if cfg.ColumnKey { diff --git a/inmemory_storage_client.go b/inmemory_storage_client.go index c95f7e5a4ea62..0add546b2a4b0 100644 --- a/inmemory_storage_client.go +++ b/inmemory_storage_client.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/model" "github.com/weaveworks/cortex/pkg/util" ) @@ -28,6 +29,16 @@ type mockItem struct { value []byte } +// Opts returns the chunk.StorageOpt's for the config. +func Opts() ([]StorageOpt, error) { + client := NewMockStorage() + + return []StorageOpt{{ + From: model.Time(0), + Client: client, + }}, nil +} + // NewMockStorage creates a new MockStorage. func NewMockStorage() *MockStorage { return &MockStorage{ diff --git a/storage/caching_fixtures.go b/storage/caching_fixtures.go index 35b6bb0c4690b..36e46bfaf0cf8 100644 --- a/storage/caching_fixtures.go +++ b/storage/caching_fixtures.go @@ -3,6 +3,7 @@ package storage import ( "time" + "github.com/weaveworks/cortex/pkg/chunk/cache" "github.com/weaveworks/cortex/pkg/chunk/gcp" "github.com/weaveworks/cortex/pkg/chunk" @@ -16,7 +17,7 @@ type fixture struct { func (f fixture) Name() string { return "caching-store" } func (f fixture) Clients() (chunk.StorageClient, chunk.TableClient, chunk.SchemaConfig, error) { storageClient, tableClient, schemaConfig, err := f.fixture.Clients() - client := newCachingStorageClient(storageClient, 500, 5*time.Minute) + client := newCachingStorageClient(storageClient, cache.NewFifoCache("index-fifo", 500, 5*time.Minute), 5*time.Minute) return client, tableClient, schemaConfig, err } func (f fixture) Teardown() error { return f.fixture.Teardown() } diff --git a/storage/caching_storage_client.go b/storage/caching_storage_client.go index 5b58078b2af41..00e9ed922e4d1 100644 --- a/storage/caching_storage_client.go +++ b/storage/caching_storage_client.go @@ -3,35 +3,115 @@ package storage import ( "bytes" "context" + "encoding/hex" + "hash/fnv" "strings" "time" + proto "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/cortex/pkg/chunk" "github.com/weaveworks/cortex/pkg/chunk/cache" ) +var ( + cacheCorruptErrs = promauto.NewCounter(prometheus.CounterOpts{ + Name: "querier_index_cache_corruptions_total", + Help: "The number of cache corruptions for the index cache.", + }) + cacheHits = promauto.NewCounter(prometheus.CounterOpts{ + Name: "querier_index_cache_hits_total", + Help: "The number of cache hits for the index cache.", + }) + cacheGets = promauto.NewCounter(prometheus.CounterOpts{ + Name: "querier_index_cache_gets_total", + Help: "The number of gets for the index cache.", + }) + cachePuts = promauto.NewCounter(prometheus.CounterOpts{ + Name: "querier_index_cache_puts_total", + Help: "The number of puts for the index cache.", + }) + cacheEncodeErrs = promauto.NewCounter(prometheus.CounterOpts{ + Name: "querier_index_cache_encode_errors_total", + Help: "The number of errors for the index cache while encoding the body.", + }) +) + +// IndexCache describes the cache for the Index. +type IndexCache interface { + Store(ctx context.Context, key string, val ReadBatch) + Fetch(ctx context.Context, key string) (val ReadBatch, ok bool, err error) + Stop() error +} + +type indexCache struct { + cache.Cache +} + +func (c *indexCache) Store(ctx context.Context, key string, val ReadBatch) { + cachePuts.Inc() + out, err := proto.Marshal(&val) + if err != nil { + cacheEncodeErrs.Inc() + return + } + + // We're doing the hashing to handle unicode and key len properly. + // Memcache fails for unicode keys and keys longer than 250 Bytes. + c.Cache.Store(ctx, hashKey(key), out) + return +} + +func (c *indexCache) Fetch(ctx context.Context, key string) (ReadBatch, bool, error) { + cacheGets.Inc() + + found, valBytes, _, err := c.Cache.Fetch(ctx, []string{hashKey(key)}) + if len(found) != 1 || err != nil { + return ReadBatch{}, false, err + } + + var rb ReadBatch + if err := proto.Unmarshal(valBytes[0], &rb); err != nil { + return rb, false, err + } + + // Make sure the hash(key) is not a collision by looking at the key in the value. + if key == rb.Key && time.Now().Before(time.Unix(0, rb.Expiry)) { + cacheHits.Inc() + return rb, true, nil + } + + return ReadBatch{}, false, nil +} + type cachingStorageClient struct { chunk.StorageClient - cache *cache.FifoCache + cache IndexCache validity time.Duration } -func newCachingStorageClient(client chunk.StorageClient, size int, validity time.Duration) chunk.StorageClient { - if size == 0 { +func newCachingStorageClient(client chunk.StorageClient, cache cache.Cache, validity time.Duration) chunk.StorageClient { + if cache == nil { return client } return &cachingStorageClient{ StorageClient: client, - cache: cache.NewFifoCache("index", size, validity), + cache: &indexCache{cache}, + validity: validity, } } func (s *cachingStorageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { - value, ok := s.cache.Get(ctx, queryKey(query)) - if ok { - batches := value.([]chunk.ReadBatch) - filteredBatch := filterBatchByQuery(query, batches) + value, ok, err := s.cache.Fetch(ctx, queryKey(query)) + if err != nil { + cacheCorruptErrs.Inc() + } + + if ok && err == nil { + filteredBatch, _ := filterBatchByQuery(query, []chunk.ReadBatch{value}) callback(filteredBatch) return nil @@ -43,29 +123,30 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, query chunk.Index HashValue: query.HashValue, } // Just reads the entire row and caches it. - err := s.StorageClient.QueryPages(ctx, cacheableQuery, copyingCallback(&batches)) + expiryTime := time.Now().Add(s.validity) + err = s.StorageClient.QueryPages(ctx, cacheableQuery, copyingCallback(&batches)) if err != nil { return err } - filteredBatch := filterBatchByQuery(query, batches) + filteredBatch, totalBatches := filterBatchByQuery(query, batches) callback(filteredBatch) - s.cache.Put(ctx, queryKey(query), batches) + totalBatches.Key = queryKey(query) + totalBatches.Expiry = expiryTime.UnixNano() + s.cache.Store(ctx, totalBatches.Key, totalBatches) return nil } -type readBatch []cell +// Len implements chunk.ReadBatch. +func (b ReadBatch) Len() int { return len(b.Entries) } -func (b readBatch) Len() int { return len(b) } -func (b readBatch) RangeValue(i int) []byte { return b[i].column } -func (b readBatch) Value(i int) []byte { return b[i].value } +// RangeValue implements chunk.ReadBatch. +func (b ReadBatch) RangeValue(i int) []byte { return b.Entries[i].Column } -type cell struct { - column []byte - value []byte -} +// Value implements chunk.ReadBatch. +func (b ReadBatch) Value(i int) []byte { return b.Entries[i].Value } func copyingCallback(readBatches *[]chunk.ReadBatch) func(chunk.ReadBatch) bool { return func(result chunk.ReadBatch) bool { @@ -79,7 +160,7 @@ func queryKey(q chunk.IndexQuery) string { return q.TableName + sep + q.HashValue } -func filterBatchByQuery(query chunk.IndexQuery, batches []chunk.ReadBatch) readBatch { +func filterBatchByQuery(query chunk.IndexQuery, batches []chunk.ReadBatch) (filteredBatch, totalBatch ReadBatch) { filter := func([]byte, []byte) bool { return true } if len(query.RangeValuePrefix) != 0 { @@ -100,14 +181,25 @@ func filterBatchByQuery(query chunk.IndexQuery, batches []chunk.ReadBatch) readB } } - finalBatch := make(readBatch, 0, len(batches)) // On the higher side for most queries. On the lower side for column key schema. + filteredBatch.Entries = make([]*Entry, 0, len(batches)) // On the higher side for most queries. On the lower side for column key schema. + totalBatch.Entries = make([]*Entry, 0, len(batches)) for _, batch := range batches { for i := 0; i < batch.Len(); i++ { + totalBatch.Entries = append(totalBatch.Entries, &Entry{Column: batch.RangeValue(i), Value: batch.Value(i)}) + if filter(batch.RangeValue(i), batch.Value(i)) { - finalBatch = append(finalBatch, cell{column: batch.RangeValue(i), value: batch.Value(i)}) + filteredBatch.Entries = append(filteredBatch.Entries, &Entry{Column: batch.RangeValue(i), Value: batch.Value(i)}) } } } - return finalBatch + return +} + +func hashKey(key string) string { + hasher := fnv.New64a() + hasher.Write([]byte(key)) // This'll never error. + + // Hex because memcache errors for the bytes produced by the hash. + return hex.EncodeToString(hasher.Sum(nil)) } diff --git a/storage/caching_storage_client.pb.go b/storage/caching_storage_client.pb.go new file mode 100644 index 0000000000000..47c09c47d918e --- /dev/null +++ b/storage/caching_storage_client.pb.go @@ -0,0 +1,714 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: caching_storage_client.proto + +/* + Package storage is a generated protocol buffer package. + + It is generated from these files: + caching_storage_client.proto + + It has these top-level messages: + Entry + ReadBatch +*/ +package storage + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import bytes "bytes" + +import strings "strings" +import reflect "reflect" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type Entry struct { + Column []byte `protobuf:"bytes,1,opt,name=Column,json=column,proto3" json:"Column,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=Value,json=value,proto3" json:"Value,omitempty"` +} + +func (m *Entry) Reset() { *m = Entry{} } +func (*Entry) ProtoMessage() {} +func (*Entry) Descriptor() ([]byte, []int) { return fileDescriptorCachingStorageClient, []int{0} } + +func (m *Entry) GetColumn() []byte { + if m != nil { + return m.Column + } + return nil +} + +func (m *Entry) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +type ReadBatch struct { + Entries []*Entry `protobuf:"bytes,1,rep,name=entries" json:"entries,omitempty"` + Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` + // The time at which the key expires. + Expiry int64 `protobuf:"varint,3,opt,name=expiry,proto3" json:"expiry,omitempty"` +} + +func (m *ReadBatch) Reset() { *m = ReadBatch{} } +func (*ReadBatch) ProtoMessage() {} +func (*ReadBatch) Descriptor() ([]byte, []int) { return fileDescriptorCachingStorageClient, []int{1} } + +func (m *ReadBatch) GetEntries() []*Entry { + if m != nil { + return m.Entries + } + return nil +} + +func (m *ReadBatch) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *ReadBatch) GetExpiry() int64 { + if m != nil { + return m.Expiry + } + return 0 +} + +func init() { + proto.RegisterType((*Entry)(nil), "storage.Entry") + proto.RegisterType((*ReadBatch)(nil), "storage.ReadBatch") +} +func (this *Entry) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Entry) + if !ok { + that2, ok := that.(Entry) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !bytes.Equal(this.Column, that1.Column) { + return false + } + if !bytes.Equal(this.Value, that1.Value) { + return false + } + return true +} +func (this *ReadBatch) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*ReadBatch) + if !ok { + that2, ok := that.(ReadBatch) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.Entries) != len(that1.Entries) { + return false + } + for i := range this.Entries { + if !this.Entries[i].Equal(that1.Entries[i]) { + return false + } + } + if this.Key != that1.Key { + return false + } + if this.Expiry != that1.Expiry { + return false + } + return true +} +func (this *Entry) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&storage.Entry{") + s = append(s, "Column: "+fmt.Sprintf("%#v", this.Column)+",\n") + s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *ReadBatch) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&storage.ReadBatch{") + if this.Entries != nil { + s = append(s, "Entries: "+fmt.Sprintf("%#v", this.Entries)+",\n") + } + s = append(s, "Key: "+fmt.Sprintf("%#v", this.Key)+",\n") + s = append(s, "Expiry: "+fmt.Sprintf("%#v", this.Expiry)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringCachingStorageClient(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func (m *Entry) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Entry) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Column) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintCachingStorageClient(dAtA, i, uint64(len(m.Column))) + i += copy(dAtA[i:], m.Column) + } + if len(m.Value) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintCachingStorageClient(dAtA, i, uint64(len(m.Value))) + i += copy(dAtA[i:], m.Value) + } + return i, nil +} + +func (m *ReadBatch) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReadBatch) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Entries) > 0 { + for _, msg := range m.Entries { + dAtA[i] = 0xa + i++ + i = encodeVarintCachingStorageClient(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.Key) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintCachingStorageClient(dAtA, i, uint64(len(m.Key))) + i += copy(dAtA[i:], m.Key) + } + if m.Expiry != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintCachingStorageClient(dAtA, i, uint64(m.Expiry)) + } + return i, nil +} + +func encodeVarintCachingStorageClient(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *Entry) Size() (n int) { + var l int + _ = l + l = len(m.Column) + if l > 0 { + n += 1 + l + sovCachingStorageClient(uint64(l)) + } + l = len(m.Value) + if l > 0 { + n += 1 + l + sovCachingStorageClient(uint64(l)) + } + return n +} + +func (m *ReadBatch) Size() (n int) { + var l int + _ = l + if len(m.Entries) > 0 { + for _, e := range m.Entries { + l = e.Size() + n += 1 + l + sovCachingStorageClient(uint64(l)) + } + } + l = len(m.Key) + if l > 0 { + n += 1 + l + sovCachingStorageClient(uint64(l)) + } + if m.Expiry != 0 { + n += 1 + sovCachingStorageClient(uint64(m.Expiry)) + } + return n +} + +func sovCachingStorageClient(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozCachingStorageClient(x uint64) (n int) { + return sovCachingStorageClient(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *Entry) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Entry{`, + `Column:` + fmt.Sprintf("%v", this.Column) + `,`, + `Value:` + fmt.Sprintf("%v", this.Value) + `,`, + `}`, + }, "") + return s +} +func (this *ReadBatch) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ReadBatch{`, + `Entries:` + strings.Replace(fmt.Sprintf("%v", this.Entries), "Entry", "Entry", 1) + `,`, + `Key:` + fmt.Sprintf("%v", this.Key) + `,`, + `Expiry:` + fmt.Sprintf("%v", this.Expiry) + `,`, + `}`, + }, "") + return s +} +func valueToStringCachingStorageClient(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *Entry) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCachingStorageClient + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Entry: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Entry: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Column", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCachingStorageClient + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthCachingStorageClient + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Column = append(m.Column[:0], dAtA[iNdEx:postIndex]...) + if m.Column == nil { + m.Column = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCachingStorageClient + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthCachingStorageClient + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Value = append(m.Value[:0], dAtA[iNdEx:postIndex]...) + if m.Value == nil { + m.Value = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCachingStorageClient(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthCachingStorageClient + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ReadBatch) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCachingStorageClient + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReadBatch: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReadBatch: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCachingStorageClient + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCachingStorageClient + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Entries = append(m.Entries, &Entry{}) + if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCachingStorageClient + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCachingStorageClient + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Expiry", wireType) + } + m.Expiry = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCachingStorageClient + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Expiry |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipCachingStorageClient(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthCachingStorageClient + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipCachingStorageClient(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCachingStorageClient + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCachingStorageClient + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCachingStorageClient + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthCachingStorageClient + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCachingStorageClient + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipCachingStorageClient(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthCachingStorageClient = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowCachingStorageClient = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("caching_storage_client.proto", fileDescriptorCachingStorageClient) } + +var fileDescriptorCachingStorageClient = []byte{ + // 229 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x49, 0x4e, 0x4c, 0xce, + 0xc8, 0xcc, 0x4b, 0x8f, 0x2f, 0x2e, 0xc9, 0x2f, 0x4a, 0x4c, 0x4f, 0x8d, 0x4f, 0xce, 0xc9, 0x4c, + 0xcd, 0x2b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x87, 0x8a, 0x2a, 0x99, 0x72, 0xb1, + 0xba, 0xe6, 0x95, 0x14, 0x55, 0x0a, 0x89, 0x71, 0xb1, 0x39, 0xe7, 0xe7, 0x94, 0xe6, 0xe6, 0x49, + 0x30, 0x2a, 0x30, 0x6a, 0xf0, 0x04, 0xb1, 0x25, 0x83, 0x79, 0x42, 0x22, 0x5c, 0xac, 0x61, 0x89, + 0x39, 0xa5, 0xa9, 0x12, 0x4c, 0x60, 0x61, 0xd6, 0x32, 0x10, 0x47, 0x29, 0x9e, 0x8b, 0x33, 0x28, + 0x35, 0x31, 0xc5, 0x29, 0xb1, 0x24, 0x39, 0x43, 0x48, 0x83, 0x8b, 0x3d, 0x35, 0xaf, 0xa4, 0x28, + 0x33, 0xb5, 0x58, 0x82, 0x51, 0x81, 0x59, 0x83, 0xdb, 0x88, 0x4f, 0x0f, 0x6a, 0xbc, 0x1e, 0xd8, + 0xec, 0x20, 0x98, 0xb4, 0x90, 0x00, 0x17, 0x73, 0x76, 0x6a, 0x25, 0xd8, 0x28, 0xce, 0x20, 0x10, + 0x13, 0x64, 0x6d, 0x6a, 0x45, 0x41, 0x66, 0x51, 0xa5, 0x04, 0xb3, 0x02, 0xa3, 0x06, 0x73, 0x10, + 0x94, 0xe7, 0xa4, 0x73, 0xe1, 0xa1, 0x1c, 0xc3, 0x8d, 0x87, 0x72, 0x0c, 0x1f, 0x1e, 0xca, 0x31, + 0x36, 0x3c, 0x92, 0x63, 0x5c, 0xf1, 0x48, 0x8e, 0xf1, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, + 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x7c, 0xf1, 0x48, 0x8e, 0xe1, 0xc3, 0x23, 0x39, 0xc6, 0x09, 0x8f, + 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0xbe, 0x32, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x00, 0x1b, 0x46, + 0xe1, 0xf5, 0x00, 0x00, 0x00, +} diff --git a/storage/caching_storage_client.proto b/storage/caching_storage_client.proto new file mode 100644 index 0000000000000..f10dbe3443719 --- /dev/null +++ b/storage/caching_storage_client.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; +package storage; + +message Entry { + bytes Column = 1; + bytes Value = 2; +} + +message ReadBatch { + repeated Entry entries = 1; + string key = 2; + + // The time at which the key expires. + int64 expiry = 3; +} \ No newline at end of file diff --git a/storage/factory.go b/storage/factory.go index 43f341d49a6b8..8cf7c34facc40 100644 --- a/storage/factory.go +++ b/storage/factory.go @@ -9,9 +9,9 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - "github.com/prometheus/common/model" "github.com/weaveworks/cortex/pkg/chunk" "github.com/weaveworks/cortex/pkg/chunk/aws" + "github.com/weaveworks/cortex/pkg/chunk/cache" "github.com/weaveworks/cortex/pkg/chunk/cassandra" "github.com/weaveworks/cortex/pkg/chunk/gcp" "github.com/weaveworks/cortex/pkg/util" @@ -26,6 +26,7 @@ type Config struct { IndexCacheSize int IndexCacheValidity time.Duration + memcacheClient cache.MemcachedClientConfig } // RegisterFlags adds the flags required to configure this flag set. @@ -37,36 +38,47 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Size of in-memory index cache, 0 to disable.") f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Period for which entries in the index cache are valid. Should be no higher than -ingester.max-chunk-idle.") + cfg.memcacheClient.RegisterFlagsWithPrefix("index", f) } // Opts makes the storage clients based on the configuration. func Opts(cfg Config, schemaCfg chunk.SchemaConfig) ([]chunk.StorageOpt, error) { - opts := []chunk.StorageOpt{} - client, err := newStorageClient(cfg, schemaCfg) + var caches []cache.Cache + if cfg.IndexCacheSize > 0 { + fifocache := cache.MetricsInstrument("fifo-index", cache.NewFifoCache("index", cfg.IndexCacheSize, cfg.IndexCacheValidity)) + caches = append(caches, fifocache) + } + + if cfg.memcacheClient.Host != "" { + client := cache.NewMemcachedClient(cfg.memcacheClient) + memcache := cache.MetricsInstrument("memcache-index", cache.NewMemcached(cache.MemcachedConfig{ + Expiration: cfg.IndexCacheValidity, + }, client)) + caches = append(caches, cache.NewBackground(cache.BackgroundConfig{ + WriteBackGoroutines: 10, + WriteBackBuffer: 100, + }, memcache)) + } + + opts, err := newStorageOpts(cfg, schemaCfg) if err != nil { return nil, errors.Wrap(err, "error creating storage client") } - opts = append(opts, chunk.StorageOpt{From: model.Time(0), Client: client}) - if cfg.StorageClient == "gcp" && schemaCfg.BigtableColumnKeyFrom.IsSet() { - client, err = gcp.NewStorageClientColumnKey(context.Background(), cfg.GCPStorageConfig, schemaCfg) - if err != nil { - return nil, errors.Wrap(err, "error creating storage client") + if len(caches) > 0 { + tieredCache := cache.Instrument("tiered-index", cache.NewTiered(caches)) + for i := range opts { + opts[i].Client = newCachingStorageClient(opts[i].Client, tieredCache, cfg.IndexCacheValidity) } - - opts = append(opts, chunk.StorageOpt{ - From: schemaCfg.BigtableColumnKeyFrom.Time, - Client: newCachingStorageClient(client, cfg.IndexCacheSize, cfg.IndexCacheValidity), - }) } return opts, nil } -func newStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (client chunk.StorageClient, err error) { +func newStorageOpts(cfg Config, schemaCfg chunk.SchemaConfig) ([]chunk.StorageOpt, error) { switch cfg.StorageClient { case "inmemory": - client, err = chunk.NewMockStorage(), nil + return chunk.Opts() case "aws": if cfg.AWSStorageConfig.DynamoDB.URL == nil { return nil, fmt.Errorf("Must set -dynamodb.url in aws mode") @@ -75,17 +87,14 @@ func newStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (client chunk.St if len(path) > 0 { level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path) } - client, err = aws.NewStorageClient(cfg.AWSStorageConfig, schemaCfg) + return aws.Opts(cfg.AWSStorageConfig, schemaCfg) case "gcp": - client, err = gcp.NewStorageClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) + return gcp.Opts(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "cassandra": - client, err = cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg) + return cassandra.Opts(cfg.CassandraStorageConfig, schemaCfg) default: return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: aws, gcp, cassandra, inmemory", cfg.StorageClient) } - - client = newCachingStorageClient(client, cfg.IndexCacheSize, cfg.IndexCacheValidity) - return } // NewTableClient makes a new table client based on the configuration.