Skip to content

Commit 7f8d194

Browse files
authored
update thanos and add block_ids_fetcher to bucketindex (#5681)
* update thanos and add bucket-index-ids-fetcher to compactor Signed-off-by: Wen Xu <wenxuamz@amazon.com> * make mod-check Signed-off-by: Wen Xu <wenxuamz@amazon.com> * udpate docs Signed-off-by: Wen Xu <wenxuamz@amazon.com> * add unit test for bucketindex block ids fetcher Signed-off-by: Wen Xu <wenxuamz@amazon.com> * group imports Signed-off-by: Wen Xu <wenxuamz@amazon.com> * initialize the baseBlockIDsFetcher in the constructor Signed-off-by: Wen Xu <wenxuamz@amazon.com> * set the bucketindex block ids fetcher as default if bucketindex is enabled Signed-off-by: Wen Xu <wenxuamz@amazon.com> * remove TODO Signed-off-by: Wen Xu <wenxuamz@amazon.com> --------- Signed-off-by: Wen Xu <wenxuamz@amazon.com>
1 parent 9dbd731 commit 7f8d194

File tree

600 files changed

+11978
-6626
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

600 files changed

+11978
-6626
lines changed

go.mod

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ require (
5151
github.com/sony/gobreaker v0.5.0
5252
github.com/spf13/afero v1.9.5
5353
github.com/stretchr/testify v1.8.4
54-
github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98
55-
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591
56-
github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e
54+
github.com/thanos-io/objstore v0.0.0-20231123170144-bffedaa58acb
55+
github.com/thanos-io/promql-engine v0.0.0-20231127105941-257543af55e8
56+
github.com/thanos-io/thanos v0.32.5-0.20231127170340-8ffb9da1383e
5757
github.com/uber/jaeger-client-go v2.30.0+incompatible
5858
github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d
5959
go.etcd.io/etcd/api/v3 v3.5.10
@@ -67,10 +67,10 @@ require (
6767
go.opentelemetry.io/otel/sdk v1.19.0
6868
go.opentelemetry.io/otel/trace v1.19.0
6969
go.uber.org/atomic v1.11.0
70-
golang.org/x/net v0.17.0
71-
golang.org/x/sync v0.4.0
72-
golang.org/x/time v0.3.0
73-
google.golang.org/grpc v1.58.3
70+
golang.org/x/net v0.18.0
71+
golang.org/x/sync v0.5.0
72+
golang.org/x/time v0.4.0
73+
google.golang.org/grpc v1.59.0
7474
gopkg.in/yaml.v2 v2.4.0
7575
gopkg.in/yaml.v3 v3.0.1
7676
sigs.k8s.io/yaml v1.3.0
@@ -84,11 +84,11 @@ require (
8484
)
8585

8686
require (
87-
cloud.google.com/go v0.110.8 // indirect
88-
cloud.google.com/go/compute v1.23.0 // indirect
87+
cloud.google.com/go v0.110.10 // indirect
88+
cloud.google.com/go/compute v1.23.3 // indirect
8989
cloud.google.com/go/compute/metadata v0.2.3 // indirect
90-
cloud.google.com/go/iam v1.1.2 // indirect
91-
cloud.google.com/go/storage v1.30.1 // indirect
90+
cloud.google.com/go/iam v1.1.5 // indirect
91+
cloud.google.com/go/storage v1.35.1 // indirect
9292
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.8.0 // indirect
9393
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 // indirect
9494
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
@@ -143,8 +143,8 @@ require (
143143
github.com/google/btree v1.0.1 // indirect
144144
github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect
145145
github.com/google/s2a-go v0.1.7 // indirect
146-
github.com/google/uuid v1.3.1 // indirect
147-
github.com/googleapis/enterprise-certificate-proxy v0.3.1 // indirect
146+
github.com/google/uuid v1.4.0 // indirect
147+
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
148148
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
149149
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2.0.20201207153454-9f6bf00c00a7 // indirect
150150
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
@@ -216,20 +216,20 @@ require (
216216
go.uber.org/zap v1.21.0 // indirect
217217
go4.org/intern v0.0.0-20230525184215-6c62f75575cb // indirect
218218
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect
219-
golang.org/x/crypto v0.14.0 // indirect
219+
golang.org/x/crypto v0.15.0 // indirect
220220
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
221221
golang.org/x/mod v0.13.0 // indirect
222-
golang.org/x/oauth2 v0.13.0 // indirect
223-
golang.org/x/sys v0.13.0 // indirect
224-
golang.org/x/text v0.13.0 // indirect
222+
golang.org/x/oauth2 v0.14.0 // indirect
223+
golang.org/x/sys v0.14.0 // indirect
224+
golang.org/x/text v0.14.0 // indirect
225225
golang.org/x/tools v0.14.0 // indirect
226-
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
226+
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
227227
gonum.org/v1/gonum v0.12.0 // indirect
228-
google.golang.org/api v0.147.0 // indirect
229-
google.golang.org/appengine v1.6.7 // indirect
230-
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect
231-
google.golang.org/genproto/googleapis/api v0.0.0-20231012201019-e917dd12ba7a // indirect
232-
google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c // indirect
228+
google.golang.org/api v0.150.0 // indirect
229+
google.golang.org/appengine v1.6.8 // indirect
230+
google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect
231+
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect
232+
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
233233
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
234234
gopkg.in/ini.v1 v1.67.0 // indirect
235235
gopkg.in/telebot.v3 v3.1.3 // indirect

go.sum

Lines changed: 46 additions & 43 deletions
Large diffs are not rendered by default.

pkg/compactor/compactor.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -790,10 +790,18 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
790790
// out of order chunks or index file too big.
791791
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency)
792792

793+
var blockIDsFetcher block.BlockIDsFetcher
794+
if c.storageCfg.BucketStore.BucketIndex.Enabled {
795+
blockIDsFetcher = bucketindex.NewBlockIDsFetcher(ulogger, c.bucketClient, userID, c.limits)
796+
} else {
797+
blockIDsFetcher = block.NewBaseBlockIDsFetcher(ulogger, bucket)
798+
}
799+
793800
fetcher, err := block.NewMetaFetcher(
794801
ulogger,
795802
c.compactorCfg.MetaSyncConcurrency,
796803
bucket,
804+
blockIDsFetcher,
797805
c.metaSyncDirForUser(userID),
798806
reg,
799807
// List of filters to apply (order matters).

pkg/querier/blocks_finder_bucket_scan.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,10 +384,12 @@ func (d *BucketScanBlocksFinder) createMetaFetcher(userID string) (block.Metadat
384384
filters = append(filters, storegateway.NewIgnoreNonQueryableBlocksFilter(d.logger, d.cfg.IgnoreBlocksWithin))
385385
}
386386

387+
blockIdsFetcher := block.NewBaseBlockIDsFetcher(userLogger, userBucket)
387388
f, err := block.NewMetaFetcher(
388389
userLogger,
389390
d.cfg.MetasConcurrency,
390391
userBucket,
392+
blockIdsFetcher,
391393
// The fetcher stores cached metas in the "meta-syncer/" sub directory.
392394
filepath.Join(d.cfg.CacheDir, userID),
393395
userReg,
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package bucketindex
2+
3+
import (
4+
"context"
5+
6+
"github.com/go-kit/log"
7+
"github.com/go-kit/log/level"
8+
"github.com/oklog/ulid"
9+
"github.com/pkg/errors"
10+
"github.com/thanos-io/objstore"
11+
"github.com/thanos-io/thanos/pkg/block"
12+
13+
"github.com/cortexproject/cortex/pkg/storage/bucket"
14+
)
15+
16+
type BlockIDsFetcher struct {
17+
logger log.Logger
18+
bkt objstore.Bucket
19+
userID string
20+
cfgProvider bucket.TenantConfigProvider
21+
baseBlockIDsFetcher block.BlockIDsFetcher
22+
}
23+
24+
func NewBlockIDsFetcher(logger log.Logger, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider) *BlockIDsFetcher {
25+
userBkt := bucket.NewUserBucketClient(userID, bkt, cfgProvider)
26+
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, userBkt)
27+
return &BlockIDsFetcher{
28+
logger: logger,
29+
bkt: bkt,
30+
userID: userID,
31+
cfgProvider: cfgProvider,
32+
baseBlockIDsFetcher: baseBlockIDsFetcher,
33+
}
34+
}
35+
36+
func (f *BlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
37+
// Fetch the bucket index.
38+
idx, err := ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger)
39+
if errors.Is(err, ErrIndexNotFound) {
40+
// This is a legit case happening when the first blocks of a tenant have recently been uploaded by ingesters
41+
// and their bucket index has not been created yet.
42+
// Fallback to BaseBlockIDsFetcher.
43+
return f.baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
44+
}
45+
if errors.Is(err, ErrIndexCorrupted) {
46+
// In case a single tenant bucket index is corrupted, we want to return empty active blocks and parital blocks, so skipping this compaction cycle
47+
level.Error(f.logger).Log("msg", "corrupted bucket index found", "user", f.userID, "err", err)
48+
// Fallback to BaseBlockIDsFetcher.
49+
return f.baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
50+
}
51+
52+
if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
53+
// stop the job and return the error
54+
// this error should be used to return Access Denied to the caller
55+
level.Error(f.logger).Log("msg", "bucket index key permission revoked", "user", f.userID, "err", err)
56+
return nil, err
57+
}
58+
59+
if err != nil {
60+
return nil, err
61+
}
62+
63+
// Sent the active block ids
64+
for _, b := range idx.Blocks {
65+
select {
66+
case <-ctx.Done():
67+
return nil, ctx.Err()
68+
case ch <- b.ID:
69+
}
70+
}
71+
return nil, nil
72+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package bucketindex
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"path"
8+
"sync"
9+
"testing"
10+
"time"
11+
12+
"github.com/go-kit/log"
13+
"github.com/oklog/ulid"
14+
"github.com/stretchr/testify/require"
15+
"github.com/thanos-io/thanos/pkg/block/metadata"
16+
17+
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
18+
"github.com/cortexproject/cortex/pkg/util/concurrency"
19+
)
20+
21+
func TestBlockIDsFetcher_Fetch(t *testing.T) {
22+
t.Parallel()
23+
const userID = "user-1"
24+
25+
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
26+
ctx := context.Background()
27+
now := time.Now()
28+
logs := &concurrency.SyncBuffer{}
29+
logger := log.NewLogfmtLogger(logs)
30+
31+
// Create a bucket index.
32+
block1 := &Block{ID: ulid.MustNew(1, nil)}
33+
block2 := &Block{ID: ulid.MustNew(2, nil)}
34+
block3 := &Block{ID: ulid.MustNew(3, nil)}
35+
mark1 := &BlockDeletionMark{ID: block1.ID, DeletionTime: now.Add(-time.Hour).Unix()} // Below the ignore delay threshold.
36+
mark2 := &BlockDeletionMark{ID: block2.ID, DeletionTime: now.Add(-3 * time.Hour).Unix()} // Above the ignore delay threshold.
37+
38+
require.NoError(t, WriteIndex(ctx, bkt, userID, nil, &Index{
39+
Version: IndexVersion1,
40+
Blocks: Blocks{block1, block2, block3},
41+
BlockDeletionMarks: BlockDeletionMarks{mark1, mark2},
42+
UpdatedAt: now.Unix(),
43+
}))
44+
45+
blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil)
46+
ch := make(chan ulid.ULID)
47+
var wg sync.WaitGroup
48+
var blockIds []ulid.ULID
49+
wg.Add(1)
50+
go func() {
51+
defer wg.Done()
52+
for id := range ch {
53+
blockIds = append(blockIds, id)
54+
}
55+
}()
56+
blockIdsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
57+
close(ch)
58+
wg.Wait()
59+
require.Equal(t, []ulid.ULID{block1.ID, block2.ID, block3.ID}, blockIds)
60+
}
61+
62+
func TestBlockIDsFetcherFetcher_Fetch_NoBucketIndex(t *testing.T) {
63+
t.Parallel()
64+
const userID = "user-1"
65+
66+
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
67+
ctx := context.Background()
68+
now := time.Now()
69+
logs := &concurrency.SyncBuffer{}
70+
logger := log.NewLogfmtLogger(logs)
71+
72+
//prepare tenant bucket
73+
var meta1, meta2, meta3 metadata.Meta
74+
block1 := &Block{ID: ulid.MustNew(1, nil)}
75+
meta1.Version = 1
76+
meta1.ULID = block1.ID
77+
block2 := &Block{ID: ulid.MustNew(2, nil)}
78+
meta2.Version = 1
79+
meta2.ULID = block2.ID
80+
block3 := &Block{ID: ulid.MustNew(3, nil)}
81+
meta3.Version = 1
82+
meta3.ULID = block3.ID
83+
metas := []metadata.Meta{meta1, meta2, meta3}
84+
mark1 := &BlockDeletionMark{ID: block1.ID, DeletionTime: now.Add(-time.Hour).Unix()} // Below the ignore delay threshold.
85+
mark2 := &BlockDeletionMark{ID: block2.ID, DeletionTime: now.Add(-3 * time.Hour).Unix()} // Above the ignore delay threshold.
86+
marks := []*BlockDeletionMark{mark1, mark2}
87+
var buf bytes.Buffer
88+
for _, meta := range metas {
89+
require.NoError(t, json.NewEncoder(&buf).Encode(&meta))
90+
require.NoError(t, bkt.Upload(ctx, path.Join(userID, meta.ULID.String(), metadata.MetaFilename), &buf))
91+
}
92+
for _, mark := range marks {
93+
require.NoError(t, json.NewEncoder(&buf).Encode(mark))
94+
require.NoError(t, bkt.Upload(ctx, path.Join(userID, mark.ID.String(), metadata.DeletionMarkFilename), &buf))
95+
}
96+
blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil)
97+
ch := make(chan ulid.ULID)
98+
var wg sync.WaitGroup
99+
var blockIds []ulid.ULID
100+
wg.Add(1)
101+
go func() {
102+
defer wg.Done()
103+
for id := range ch {
104+
blockIds = append(blockIds, id)
105+
}
106+
}()
107+
blockIdsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
108+
close(ch)
109+
wg.Wait()
110+
require.Equal(t, []ulid.ULID{block1.ID, block2.ID, block3.ID}, blockIds)
111+
}

pkg/storegateway/bucket_stores.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,10 +552,12 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
552552
fetcherBkt := NewShardingBucketReaderAdapter(userID, u.shardingStrategy, userBkt)
553553

554554
var err error
555+
blockIdsFetcher := block.NewBaseBlockIDsFetcher(userLogger, fetcherBkt)
555556
fetcher, err = block.NewMetaFetcher(
556557
userLogger,
557558
u.cfg.BucketStore.MetaSyncConcurrency,
558559
fetcherBkt,
560+
blockIdsFetcher,
559561
u.syncDirForUser(userID), // The fetcher stores cached metas in the "meta-syncer/" sub directory
560562
fetcherReg,
561563
filters,

vendor/cloud.google.com/go/compute/internal/version.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/cloud.google.com/go/iam/CHANGES.md

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/cloud.google.com/go/iam/apiv1/iampb/iam_policy.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)