Skip to content

Changing return code from storegateway for CMK errors #5442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* [ENHANCEMENT] Add jitter to lifecycler heartbeat. #5404
* [ENHANCEMENT] Store Gateway: Add config `estimated_max_series_size_bytes` and `estimated_max_chunk_size_bytes` to address data overfetch. #5401
* [ENHANCEMENT] Distributor/Ingester: Add experimental `-distributor.sign_write_requests` flag to sign the write requests. #5430
* [ENHANCEMENT] Store Gateway/Querier/Compactor: Handling CMK Access Denied errors. #5420 #5442
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
12 changes: 12 additions & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,10 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
if s.Code() == codes.ResourceExhausted {
return validation.LimitError(s.Message())
}

if s.Code() == codes.PermissionDenied {
return validation.AccessDeniedError(s.Message())
}
}
return errors.Wrapf(err, "failed to receive series from %s", c.RemoteAddress())
}
Expand Down Expand Up @@ -816,6 +820,10 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
return validation.LimitError(s.Message())
}
}

if s.Code() == codes.PermissionDenied {
return validation.AccessDeniedError(s.Message())
}
return errors.Wrapf(err, "failed to fetch label names from %s", c.RemoteAddress())
}

Expand Down Expand Up @@ -907,6 +915,10 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
if s.Code() == codes.ResourceExhausted {
return validation.LimitError(s.Message())
}

if s.Code() == codes.PermissionDenied {
return validation.AccessDeniedError(s.Message())
}
}
return errors.Wrapf(err, "failed to fetch label values from %s", c.RemoteAddress())
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,44 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
},
},
},
"all store-gateways return PermissionDenied": {
finderResult: bucketindex.Blocks{
{ID: block1},
},
expectedErr: validation.AccessDeniedError("PermissionDenied"),
storeSetResponses: []interface{}{
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "1.1.1.1",
mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2),
mockHintsResponse(block1),
},
mockedSeriesStreamErr: status.Error(codes.PermissionDenied, "PermissionDenied"),
}: {block1},
},
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "2.2.2.2",
mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2),
mockHintsResponse(block1),
},
mockedSeriesStreamErr: status.Error(codes.PermissionDenied, "PermissionDenied"),
}: {block1},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: noOpQueryLimiter,
expectedSeries: []seriesResult{
{
lbls: labels.New(metricNameLabel, series1Label),
values: []valueResult{
{t: minT, v: 2},
},
},
},
},
"multiple store-gateways has the block, but one of them fails to return on stream": {
finderResult: bucketindex.Blocks{
{ID: block1},
Expand Down
25 changes: 23 additions & 2 deletions pkg/storage/bucket/sse_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"context"
"io"

"github.com/gogo/status"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/pkg/errors"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/s3"
"google.golang.org/grpc/codes"

cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"

cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3"
)
Expand Down Expand Up @@ -101,12 +105,24 @@ func (b *SSEBucketClient) Iter(ctx context.Context, dir string, f func(string) e

// Get implements objstore.Bucket.
func (b *SSEBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.bucket.Get(ctx, name)
r, err := b.bucket.Get(ctx, name)

if err != nil && b.IsCustomerManagedKeyError(err) {
// Store gateway will return the status if the returned error is an `status.Error`
return nil, cortex_errors.WithCause(err, status.Error(codes.PermissionDenied, err.Error()))
}

return r, err
}

// GetRange implements objstore.Bucket.
func (b *SSEBucketClient) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.bucket.GetRange(ctx, name, off, length)
r, err := b.bucket.GetRange(ctx, name, off, length)
if err != nil && b.IsCustomerManagedKeyError(err) {
return nil, cortex_errors.WithCause(err, status.Error(codes.PermissionDenied, err.Error()))
}

return r, err
}

// Exists implements objstore.Bucket.
Expand All @@ -119,7 +135,12 @@ func (b *SSEBucketClient) IsObjNotFoundErr(err error) bool {
return b.bucket.IsObjNotFoundErr(err)
}

// IsCustomerManagedKeyError implements objstore.Bucket.
func (b *SSEBucketClient) IsCustomerManagedKeyError(err error) bool {
// unwrap error
if se, ok := err.(interface{ Err() error }); ok {
return b.bucket.IsCustomerManagedKeyError(se.Err()) || b.bucket.IsCustomerManagedKeyError(err)
}
return b.bucket.IsCustomerManagedKeyError(err)
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/storage/bucket/sse_bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,19 @@ func TestSSEBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) {
}
}

func Test_shouldWrapSSeErrors(t *testing.T) {
cfgProvider := &mockTenantConfigProvider{}

bkt := &ClientMock{}

bkt.MockGet("Test", "someContent", errKeyPermissionDenied)

sseBkt := NewSSEBucketClient("user-1", bkt, cfgProvider)

_, err := sseBkt.Get(context.Background(), "Test")
require.True(t, sseBkt.IsCustomerManagedKeyError(err))
}

type mockTenantConfigProvider struct {
s3SseType string
s3KmsKeyID string
Expand Down
16 changes: 15 additions & 1 deletion pkg/storage/tsdb/testutil/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ func (m *MockBucketFailure) Delete(ctx context.Context, name string) error {
return m.Bucket.Delete(ctx, name)
}

func (m *MockBucketFailure) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
m.GetCalls.Add(1)
for prefix, err := range m.GetFailures {
if strings.HasPrefix(name, prefix) {
return nil, err
}
}
if e, ok := m.GetFailures[name]; ok {
return nil, e
}

return m.Bucket.GetRange(ctx, name, off, length)
}

func (m *MockBucketFailure) Get(ctx context.Context, name string) (io.ReadCloser, error) {
m.GetCalls.Add(1)
for prefix, err := range m.GetFailures {
Expand Down Expand Up @@ -96,5 +110,5 @@ func (m *MockBucketFailure) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFai
}

func (m *MockBucketFailure) IsCustomerManagedKeyError(err error) bool {
return errors.Is(err, ErrKeyAccessDeniedError)
return errors.Is(errors.Cause(err), ErrKeyAccessDeniedError)
}
27 changes: 9 additions & 18 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,25 +299,22 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
}

store := u.getStore(userID)
userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are initializing a new bkt client every Series request? Can we just use u.bucket here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to do this to unwrap the error, but creating this here is just creating an object (it does not do anything).

if store == nil {
return nil
}

err := u.getStoreError(userID)

if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err)
if err != nil && cortex_errors.ErrorIs(err, userBkt.IsCustomerManagedKeyError) {
return httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err)
}

err = store.Series(req, spanSeriesServer{
Store_SeriesServer: srv,
ctx: spanCtx,
})

if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err)
}

return err
}

Expand All @@ -332,22 +329,19 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe
}

store := u.getStore(userID)
userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits)
if store == nil {
return &storepb.LabelNamesResponse{}, nil
}

err := u.getStoreError(userID)

if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) {
return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err)
if err != nil && cortex_errors.ErrorIs(err, userBkt.IsCustomerManagedKeyError) {
return nil, httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err)
}

resp, err := store.LabelNames(ctx, req)

if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) {
return resp, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err)
}

return resp, err
}

Expand All @@ -362,22 +356,19 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues
}

store := u.getStore(userID)
userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits)
if store == nil {
return &storepb.LabelValuesResponse{}, nil
}

err := u.getStoreError(userID)

if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) {
return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err)
if err != nil && cortex_errors.ErrorIs(err, userBkt.IsCustomerManagedKeyError) {
return nil, httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err)
}

resp, err := store.LabelValues(ctx, req)

if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) {
return resp, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err)
}

return resp, err
}

Expand Down
Loading