Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull cachekey changes #92

Merged
merged 2 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func runStore(
r := route.New()

if len(cachingBucketConfigYaml) > 0 {
insBkt, err = storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, insBkt, logger, reg, r)
insBkt, err = storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, insBkt, logger, reg, r, conf.cachingBucketConfig.Path())
if err != nil {
return errors.Wrap(err, "create caching bucket")
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/cache/caching_bucket_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ type OperationConfig struct {
// Operation-specific configs.
type IterConfig struct {
OperationConfig
TTL time.Duration
Codec IterCodec
TTL time.Duration
Codec IterCodec
ConfigHash string
}

type ExistsConfig struct {
Expand Down Expand Up @@ -105,11 +106,12 @@ func newOperationConfig(cache Cache, matcher func(string) bool) OperationConfig
}

// CacheIter configures caching of "Iter" operation for matching directories.
func (cfg *CachingBucketConfig) CacheIter(configName string, cache Cache, matcher func(string) bool, ttl time.Duration, codec IterCodec) {
func (cfg *CachingBucketConfig) CacheIter(configName string, cache Cache, matcher func(string) bool, ttl time.Duration, codec IterCodec, configHash string) {
cfg.iter[configName] = &IterConfig{
OperationConfig: newOperationConfig(cache, matcher),
TTL: ttl,
Codec: codec,
ConfigHash: configHash,
}
}

Expand Down
26 changes: 20 additions & 6 deletions pkg/store/cache/cachekey/cachekey.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,21 @@ const (
)

type BucketCacheKey struct {
Verb VerbType
Name string
Start int64
End int64
Verb VerbType
Name string
Start int64
End int64
ObjectStorageConfigHash string
}

// String returns the string representation of BucketCacheKey.
func (ck BucketCacheKey) String() string {
if ck.Start == 0 && ck.End == 0 {
// Let's add object storage configuration hash to the iter verbs
// so that it would be possible to re-use the same cache storage.
if ck.Verb == IterVerb || ck.Verb == IterRecursiveVerb {
return string(ck.Verb) + ":" + ck.Name + ":" + ck.ObjectStorageConfigHash
}
return string(ck.Verb) + ":" + ck.Name
}

Expand Down Expand Up @@ -72,7 +78,8 @@ func ParseBucketCacheKey(key string) (BucketCacheKey, error) {
return BucketCacheKey{}, ErrInvalidBucketCacheKeyVerb
}

if verb == SubrangeVerb {
switch verb {
case SubrangeVerb:
if len(slice) != 4 {
return BucketCacheKey{}, ErrInvalidBucketCacheKeyFormat
}
Expand All @@ -89,7 +96,14 @@ func ParseBucketCacheKey(key string) (BucketCacheKey, error) {

ck.Start = start
ck.End = end
} else {
case IterRecursiveVerb, IterVerb:
if len(slice) == 3 {
ck.ObjectStorageConfigHash = slice[2]
}
if len(slice) > 3 {
return BucketCacheKey{}, ErrInvalidBucketCacheKeyFormat
}
default:
if len(slice) != 2 {
return BucketCacheKey{}, ErrInvalidBucketCacheKeyFormat
}
Expand Down
26 changes: 23 additions & 3 deletions pkg/store/cache/cachekey/cachekey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,24 @@ func TestParseBucketCacheKey(t *testing.T) {
expected: BucketCacheKey{},
expectedErr: ErrInvalidBucketCacheKeyFormat,
},
// Iter could have object storage hash attached to it.
{
key: "iter::asdasdsa",
expected: BucketCacheKey{
Verb: IterVerb,
Name: "",
ObjectStorageConfigHash: "asdasdsa",
},
},
// Iter recursive could have object storage hash attached to it.
{
key: "iter-recursive:foo/:asdasdsa",
expected: BucketCacheKey{
Verb: IterRecursiveVerb,
Name: "foo/",
ObjectStorageConfigHash: "asdasdsa",
},
},
// Key must always have a name.
{
key: "iter",
Expand Down Expand Up @@ -116,8 +134,10 @@ func TestParseBucketCacheKey(t *testing.T) {
}

for _, tc := range testcases {
res, err := ParseBucketCacheKey(tc.key)
testutil.Equals(t, tc.expectedErr, err)
testutil.Equals(t, tc.expected, res)
t.Run(tc.key, func(t *testing.T) {
res, err := ParseBucketCacheKey(tc.key)
testutil.Equals(t, tc.expectedErr, err)
testutil.Equals(t, tc.expected, res)
})
}
}
2 changes: 1 addition & 1 deletion pkg/store/cache/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (cb *CachingBucket) Iter(ctx context.Context, dir string, f func(string) er

cb.operationRequests.WithLabelValues(objstore.OpIter, cfgName).Inc()

iterVerb := cachekey.BucketCacheKey{Verb: cachekey.IterVerb, Name: dir}
iterVerb := cachekey.BucketCacheKey{Verb: cachekey.IterVerb, Name: dir, ObjectStorageConfigHash: cfg.ConfigHash}
opts := objstore.ApplyIterOptions(options...)
if opts.Recursive {
iterVerb.Verb = cachekey.IterRecursiveVerb
Expand Down
12 changes: 10 additions & 2 deletions pkg/store/cache/caching_bucket_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
package storecache

import (
"fmt"
"regexp"
"strings"
"time"

"github.com/cespare/xxhash/v2"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
Expand Down Expand Up @@ -72,7 +74,7 @@ func (cfg *CachingWithBackendConfig) Defaults() {
}

// NewCachingBucketFromYaml uses YAML configuration to create new caching bucket.
func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger log.Logger, reg prometheus.Registerer, r *route.Router) (objstore.InstrumentedBucket, error) {
func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger log.Logger, reg prometheus.Registerer, r *route.Router, configPath string) (objstore.InstrumentedBucket, error) {
level.Info(logger).Log("msg", "loading caching bucket configuration")

config := &CachingWithBackendConfig{}
Expand All @@ -82,6 +84,12 @@ func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger
return nil, errors.Wrap(err, "parsing config YAML file")
}

// Append the config path to the YAML content. This allows
// using identical config with multiple instances.
// TODO(GiedriusS): in the long-term add some kind of "name"
// identifier for each instance.
cfgHash := string(fmt.Sprintf("%d", xxhash.Sum64(append(yamlContent, []byte(configPath)...))))

backendConfig, err := yaml.Marshal(config.BackendConfig)
if err != nil {
return nil, errors.Wrap(err, "marshal content of cache backend configuration")
Expand All @@ -97,7 +105,7 @@ func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger
cfg.CacheGet("meta.jsons", nil, isMetaFile, int(config.MetafileMaxSize), config.MetafileContentTTL, config.MetafileExistsTTL, config.MetafileDoesntExistTTL)

// Cache Iter requests for root.
cfg.CacheIter("blocks-iter", nil, isBlocksRootDir, config.BlocksIterTTL, JSONIterCodec{})
cfg.CacheIter("blocks-iter", nil, isBlocksRootDir, config.BlocksIterTTL, JSONIterCodec{}, cfgHash)

switch strings.ToUpper(string(config.Type)) {
case string(MemcachedBucketCacheProvider):
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/cache/caching_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func TestCachedIter(t *testing.T) {

const cfgName = "dirs"
cfg := thanoscache.NewCachingBucketConfig()
cfg.CacheIter(cfgName, cache, func(string) bool { return true }, 5*time.Minute, JSONIterCodec{})
cfg.CacheIter(cfgName, cache, func(string) bool { return true }, 5*time.Minute, JSONIterCodec{}, "")

cb, err := NewCachingBucket(inmem, cfg, nil, nil)
testutil.Ok(t, err)
Expand Down
Loading