Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Reuse operation name constants for consistency #2991

Merged
merged 2 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions pkg/objstore/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/thanos-io/thanos/pkg/runutil"
)

const (
OpIter = "iter"
OpGet = "get"
OpGetRange = "get_range"
OpExists = "exists"
OpUpload = "upload"
OpDelete = "delete"
OpAttributes = "attributes"
)

// Bucket provides read and write access to an object storage bucket.
// NOTE: We assume strong consistency for write-read flow.
type Bucket interface {
Expand Down Expand Up @@ -218,16 +229,6 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src,
return nil
}

const (
iterOp = "iter"
getOp = "get"
getRangeOp = "get_range"
existsOp = "exists"
uploadOp = "upload"
deleteOp = "delete"
attributesOp = "attributes"
)

// IsOpFailureExpectedFunc allows to mark certain errors as expected, so they will not increment thanos_objstore_bucket_operation_failures_total metric.
type IsOpFailureExpectedFunc func(error) bool

Expand Down Expand Up @@ -263,13 +264,13 @@ func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) *metric
}, []string{"bucket"}),
}
for _, op := range []string{
iterOp,
getOp,
getRangeOp,
existsOp,
uploadOp,
deleteOp,
attributesOp,
OpIter,
OpGet,
OpGetRange,
OpExists,
OpUpload,
OpDelete,
OpAttributes,
} {
bkt.ops.WithLabelValues(op)
bkt.opsFailures.WithLabelValues(op)
Expand Down Expand Up @@ -306,7 +307,7 @@ func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket
}

func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error) error {
const op = iterOp
const op = OpIter
b.ops.WithLabelValues(op).Inc()

err := b.bkt.Iter(ctx, dir, f)
Expand All @@ -317,7 +318,7 @@ func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string)
}

func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
const op = attributesOp
const op = OpAttributes
b.ops.WithLabelValues(op).Inc()

start := time.Now()
Expand All @@ -333,7 +334,7 @@ func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttri
}

func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
const op = getOp
const op = OpGet
b.ops.WithLabelValues(op).Inc()

rc, err := b.bkt.Get(ctx, name)
Expand All @@ -353,7 +354,7 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err
}

func (b *metricBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
const op = getRangeOp
const op = OpGetRange
b.ops.WithLabelValues(op).Inc()

rc, err := b.bkt.GetRange(ctx, name, off, length)
Expand All @@ -373,7 +374,7 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in
}

func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) {
const op = existsOp
const op = OpExists
b.ops.WithLabelValues(op).Inc()

start := time.Now()
Expand All @@ -389,7 +390,7 @@ func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) {
}

func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) error {
const op = uploadOp
const op = OpUpload
b.ops.WithLabelValues(op).Inc()

start := time.Now()
Expand All @@ -405,7 +406,7 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err
}

func (b *metricBucket) Delete(ctx context.Context, name string) error {
const op = deleteOp
const op = OpDelete
b.ops.WithLabelValues(op).Inc()

start := time.Now()
Expand Down
57 changes: 29 additions & 28 deletions pkg/objstore/objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

promtest "github.com/prometheus/client_golang/prometheus/testutil"

"github.com/thanos-io/thanos/pkg/testutil"
)

Expand All @@ -18,21 +19,21 @@ func TestMetricBucket_Close(t *testing.T) {
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration))

AcceptanceTest(t, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(iterOp)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(attributesOp)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(getOp)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(getRangeOp)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(existsOp)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(uploadOp)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(deleteOp)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpIter)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpAttributes)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGet)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGetRange)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpExists)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpDelete)))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(iterOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(attributesOp)))
testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getRangeOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(existsOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(uploadOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(deleteOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpIter)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpAttributes)))
testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpGet)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpGetRange)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpExists)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpUpload)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpDelete)))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration))
lastUpload := promtest.ToFloat64(bkt.lastSuccessfulUploadTime)
Expand All @@ -41,23 +42,23 @@ func TestMetricBucket_Close(t *testing.T) {
// Clear bucket, but don't clear metrics to ensure we use same.
bkt.bkt = NewInMemBucket()
AcceptanceTest(t, bkt)
testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(iterOp)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(attributesOp)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(getOp)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(getRangeOp)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(existsOp)))
testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(uploadOp)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(deleteOp)))
testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(OpIter)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpAttributes)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGet)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGetRange)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpExists)))
testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpDelete)))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(iterOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpIter)))
// Not expected not found error here.
testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(attributesOp)))
testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpAttributes)))
// Not expected not found errors, this should increment failure metric on get for not found as well, so +2.
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getRangeOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(existsOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(uploadOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(deleteOp)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpGet)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpGetRange)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpExists)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpUpload)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpDelete)))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration))
testutil.Assert(t, promtest.ToFloat64(bkt.lastSuccessfulUploadTime) > lastUpload)
Expand Down
30 changes: 12 additions & 18 deletions pkg/store/cache/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ import (
const (
originCache = "cache"
originBucket = "bucket"

opGet = "get"
opGetRange = "getrange"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually mismatching what we have already for bucket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh! I think we should mention this change in the CHANGELOG

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any mixin / alert we could change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing, I could find.

opIter = "iter"
opExists = "exists"
opAttributes = "attributes"
)

var errObjNotFound = errors.Errorf("object not found")
Expand Down Expand Up @@ -97,7 +91,7 @@ func NewCachingBucket(b objstore.Bucket, cfg *CachingBucketConfig, logger log.Lo
cb.operationRequests.WithLabelValues(op, n)
cb.operationHits.WithLabelValues(op, n)

if op == opGetRange {
if op == objstore.OpGetRange {
cb.requestedGetRangeBytes.WithLabelValues(n)
cb.fetchedGetRangeBytes.WithLabelValues(originCache, n)
cb.fetchedGetRangeBytes.WithLabelValues(originBucket, n)
Expand Down Expand Up @@ -135,14 +129,14 @@ func (cb *CachingBucket) Iter(ctx context.Context, dir string, f func(string) er
return cb.Bucket.Iter(ctx, dir, f)
}

cb.operationRequests.WithLabelValues(opIter, cfgName).Inc()
cb.operationRequests.WithLabelValues(objstore.OpIter, cfgName).Inc()

key := cachingKeyIter(dir)
data := cfg.cache.Fetch(ctx, []string{key})
if data[key] != nil {
list, err := cfg.codec.Decode(data[key])
if err == nil {
cb.operationHits.WithLabelValues(opIter, cfgName).Inc()
cb.operationHits.WithLabelValues(objstore.OpIter, cfgName).Inc()
for _, n := range list {
if err := f(n); err != nil {
return err
Expand Down Expand Up @@ -180,15 +174,15 @@ func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error)
return cb.Bucket.Exists(ctx, name)
}

cb.operationRequests.WithLabelValues(opExists, cfgName).Inc()
cb.operationRequests.WithLabelValues(objstore.OpExists, cfgName).Inc()

key := cachingKeyExists(name)
hits := cfg.cache.Fetch(ctx, []string{key})

if ex := hits[key]; ex != nil {
exists, err := strconv.ParseBool(string(ex))
if err == nil {
cb.operationHits.WithLabelValues(opExists, cfgName).Inc()
cb.operationHits.WithLabelValues(objstore.OpExists, cfgName).Inc()
return exists, nil
}
level.Warn(cb.logger).Log("msg", "unexpected cached 'exists' value", "key", key, "val", string(ex))
Expand Down Expand Up @@ -222,21 +216,21 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e
return cb.Bucket.Get(ctx, name)
}

cb.operationRequests.WithLabelValues(opGet, cfgName).Inc()
cb.operationRequests.WithLabelValues(objstore.OpGet, cfgName).Inc()

contentKey := cachingKeyContent(name)
existsKey := cachingKeyExists(name)

hits := cfg.cache.Fetch(ctx, []string{contentKey, existsKey})
if hits[contentKey] != nil {
cb.operationHits.WithLabelValues(opGet, cfgName).Inc()
cb.operationHits.WithLabelValues(objstore.OpGet, cfgName).Inc()
return ioutil.NopCloser(bytes.NewReader(hits[contentKey])), nil
}

// If we know that file doesn't exist, we can return that. Useful for deletion marks.
if ex := hits[existsKey]; ex != nil {
if exists, err := strconv.ParseBool(string(ex)); err == nil && !exists {
cb.operationHits.WithLabelValues(opGet, cfgName).Inc()
cb.operationHits.WithLabelValues(objstore.OpGet, cfgName).Inc()
return nil, errObjNotFound
}
}
Expand Down Expand Up @@ -294,14 +288,14 @@ func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore.
func (cb *CachingBucket) cachedAttributes(ctx context.Context, name string, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) {
key := cachingKeyAttributes(name)

cb.operationRequests.WithLabelValues(opAttributes, cfgName).Inc()
cb.operationRequests.WithLabelValues(objstore.OpAttributes, cfgName).Inc()

hits := cache.Fetch(ctx, []string{key})
if raw, ok := hits[key]; ok {
var attrs objstore.ObjectAttributes
err := json.Unmarshal(raw, &attrs)
if err == nil {
cb.operationHits.WithLabelValues(opAttributes, cfgName).Inc()
cb.operationHits.WithLabelValues(objstore.OpAttributes, cfgName).Inc()
return attrs, nil
}

Expand All @@ -323,7 +317,7 @@ func (cb *CachingBucket) cachedAttributes(ctx context.Context, name string, cfgN
}

func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) {
cb.operationRequests.WithLabelValues(opGetRange, cfgName).Inc()
cb.operationRequests.WithLabelValues(objstore.OpGetRange, cfgName).Inc()
cb.requestedGetRangeBytes.WithLabelValues(cfgName).Add(float64(length))

attrs, err := cb.cachedAttributes(ctx, name, cfgName, cfg.cache, cfg.attributesTTL)
Expand Down Expand Up @@ -376,7 +370,7 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset
totalCachedBytes += int64(len(b))
}
cb.fetchedGetRangeBytes.WithLabelValues(originCache, cfgName).Add(float64(totalCachedBytes))
cb.operationHits.WithLabelValues(opGetRange, cfgName).Add(float64(len(hits)) / float64(len(keys)))
cb.operationHits.WithLabelValues(objstore.OpGetRange, cfgName).Add(float64(len(hits)) / float64(len(keys)))

if len(hits) < len(keys) {
if hits == nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/store/cache/caching_bucket_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/thanos-io/thanos/pkg/cache"
"github.com/thanos-io/thanos/pkg/objstore"
)

// Codec for encoding and decoding results of Iter call.
Expand Down Expand Up @@ -145,19 +146,19 @@ func (cfg *CachingBucketConfig) CacheAttributes(configName string, cache cache.C
func (cfg *CachingBucketConfig) allConfigNames() map[string][]string {
result := map[string][]string{}
for n := range cfg.get {
result[opGet] = append(result[opGet], n)
result[objstore.OpGet] = append(result[objstore.OpGet], n)
}
for n := range cfg.iter {
result[opIter] = append(result[opIter], n)
result[objstore.OpIter] = append(result[objstore.OpIter], n)
}
for n := range cfg.exists {
result[opExists] = append(result[opExists], n)
result[objstore.OpExists] = append(result[objstore.OpExists], n)
}
for n := range cfg.getRange {
result[opGetRange] = append(result[opGetRange], n)
result[objstore.OpGetRange] = append(result[objstore.OpGetRange], n)
}
for n := range cfg.attributes {
result[opAttributes] = append(result[opAttributes], n)
result[objstore.OpAttributes] = append(result[objstore.OpAttributes], n)
}
return result
}
Expand Down
Loading