Skip to content

Commit d8b1794

Browse files
committed
Implementing Bucket index sync status
Signed-off-by: Alan Protasio <alanprot@gmail.com>
1 parent 7b51a48 commit d8b1794

14 files changed

+414
-85
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,11 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID
206206
if err := bucketindex.DeleteIndex(ctx, c.bucketClient, userID, c.cfgProvider); err != nil {
207207
return err
208208
}
209+
210+
// Delete the bucket sync status
211+
if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil {
212+
return err
213+
}
209214
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
210215

211216
var deletedBlocks, failed int
@@ -321,15 +326,40 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
321326
}
322327
}
323328

329+
// Reading bucket index sync stats
330+
idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, userLogger)
331+
332+
if err != nil {
333+
level.Warn(userLogger).Log("msg", "error reading the bucket index status", "err", err)
334+
idxs = bucketindex.Status{Version: bucketindex.SyncStatusFileVersion, NonQueryableReason: bucketindex.Unknown}
335+
}
336+
337+
idxs.Status = bucketindex.Ok
338+
idxs.SyncTime = time.Now().Unix()
339+
324340
// Read the bucket index.
325341
idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger)
342+
343+
defer func() {
344+
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, idxs, userLogger)
345+
}()
346+
326347
if errors.Is(err, bucketindex.ErrIndexCorrupted) {
327348
level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it")
328349
} else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
329350
// Give up cleaning if we get access denied
330-
level.Warn(userLogger).Log("msg", err.Error())
351+
level.Warn(userLogger).Log("msg", "customer manager key access denied", "err", err)
352+
idxs.Status = bucketindex.CustomerManagedKeyError
353+
// Making the tenant non queryable until 2x the cleanup interval to give time to compactors and storegateways
354+
// to reload the bucket index in case the key access is re-granted
355+
idxs.NonQueryableUntil = time.Now().Add(2 * c.cfg.CleanupInterval).Unix()
356+
idxs.NonQueryableReason = bucketindex.CustomerManagedKeyError
357+
358+
// Update the bucket index update time
359+
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
331360
return nil
332361
} else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) {
362+
idxs.Status = bucketindex.GenericError
333363
return err
334364
}
335365

@@ -348,6 +378,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
348378
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)
349379
idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx)
350380
if err != nil {
381+
idxs.Status = bucketindex.GenericError
351382
return err
352383
}
353384

@@ -398,7 +429,6 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
398429
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
399430
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
400431
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
401-
402432
return nil
403433
}
404434

pkg/compactor/blocks_cleaner_test.go

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,14 @@ func TestBlocksCleaner(t *testing.T) {
5757
func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
5858
const userID = "user-1"
5959

60-
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
61-
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
60+
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
61+
bkt = bucketindex.BucketWithGlobalMarkers(bkt)
6262

6363
// Create blocks.
6464
ctx := context.Background()
6565
deletionDelay := 12 * time.Hour
66-
bucketClient = &cortex_testutil.MockBucketFailure{
67-
Bucket: bucketClient,
66+
mbucket := &cortex_testutil.MockBucketFailure{
67+
Bucket: bkt,
6868
GetFailures: map[string]error{
6969
path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError,
7070
},
@@ -77,12 +77,37 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
7777
}
7878

7979
logger := log.NewNopLogger()
80-
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
80+
scanner := tsdb.NewUsersScanner(mbucket, tsdb.AllUsers, logger)
8181
cfgProvider := newMockConfigProvider()
8282

83-
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
84-
err := cleaner.cleanUser(ctx, userID, true)
83+
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil)
84+
85+
// Clean User with no error
86+
cleaner.bucketClient = bkt
87+
err := cleaner.cleanUser(ctx, userID, false)
8588
require.NoError(t, err)
89+
s, err := bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
90+
require.NoError(t, err)
91+
require.Equal(t, bucketindex.Ok, s.Status)
92+
require.Equal(t, int64(0), s.NonQueryableUntil)
93+
94+
// Clean with cmk error
95+
cleaner.bucketClient = mbucket
96+
err = cleaner.cleanUser(ctx, userID, false)
97+
require.NoError(t, err)
98+
s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
99+
require.NoError(t, err)
100+
require.Equal(t, bucketindex.CustomerManagedKeyError, s.Status)
101+
require.Less(t, int64(0), s.NonQueryableUntil)
102+
103+
// Re grant access to the key
104+
cleaner.bucketClient = bkt
105+
err = cleaner.cleanUser(ctx, userID, false)
106+
require.NoError(t, err)
107+
s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
108+
require.NoError(t, err)
109+
require.Equal(t, bucketindex.Ok, s.Status)
110+
require.Less(t, int64(0), s.NonQueryableUntil)
86111
}
87112

88113
func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) {
@@ -232,6 +257,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
232257
require.NoError(t, err)
233258
assert.ElementsMatch(t, tc.expectedBlocks, idx.Blocks.GetULIDs())
234259
assert.ElementsMatch(t, tc.expectedMarks, idx.BlockDeletionMarks.GetULIDs())
260+
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, tc.userID, logger)
261+
require.NoError(t, err)
262+
require.Equal(t, bucketindex.Ok, s.Status)
235263
}
236264

237265
assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
@@ -385,6 +413,9 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
385413
require.NoError(t, err)
386414
assert.ElementsMatch(t, []ulid.ULID{block1, block3}, idx.Blocks.GetULIDs())
387415
assert.ElementsMatch(t, []ulid.ULID{block3}, idx.BlockDeletionMarks.GetULIDs())
416+
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, userID, logger)
417+
require.NoError(t, err)
418+
require.Equal(t, bucketindex.Ok, s.Status)
388419
}
389420

390421
func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShard(t *testing.T) {

pkg/compactor/compactor.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,14 @@ func (c *Compactor) compactUsers(ctx context.Context) {
675675
continue
676676
}
677677

678+
if idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, util_log.WithUserID(userID, c.logger)); err == nil {
679+
if idxs.Status == bucketindex.CustomerManagedKeyError {
680+
c.compactionRunSkippedTenants.Inc()
681+
level.Info(c.logger).Log("msg", "skipping compactUser due CustomerManagedKeyError", "user", userID)
682+
continue
683+
}
684+
}
685+
678686
ownedUsers[userID] = struct{}{}
679687

680688
if markedForDeletion, err := cortex_tsdb.TenantDeletionMarkExists(ctx, c.bucketClient, userID); err != nil {

pkg/compactor/compactor_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,38 @@ func TestConfig_Validate(t *testing.T) {
151151
}
152152
}
153153

154+
func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) {
155+
t.Parallel()
156+
userID := "user-1"
157+
158+
ss := bucketindex.Status{Status: bucketindex.CustomerManagedKeyError, Version: bucketindex.SyncStatusFileVersion}
159+
content, err := json.Marshal(ss)
160+
require.NoError(t, err)
161+
162+
// No user blocks stored in the bucket.
163+
bucketClient := &bucket.ClientMock{}
164+
bucketClient.MockIter("", []string{userID}, nil)
165+
bucketClient.MockIter(userID+"/", []string{}, nil)
166+
bucketClient.MockIter(userID+"/markers/", nil, nil)
167+
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", string(content), nil)
168+
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
169+
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
170+
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
171+
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)
172+
173+
cfg := prepareConfig()
174+
c, _, _, logs, _ := prepare(t, cfg, bucketClient, nil)
175+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
176+
177+
// Wait until a run has completed.
178+
cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} {
179+
return prom_testutil.ToFloat64(c.compactionRunsCompleted)
180+
})
181+
182+
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
183+
assert.Contains(t, strings.Split(strings.TrimSpace(logs.String()), "\n"), `level=info component=compactor msg="skipping compactUser due CustomerManagedKeyError" user=user-1`)
184+
}
185+
154186
func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
155187
t.Parallel()
156188

@@ -473,6 +505,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
473505
bucketClient.MockUpload(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/visit-mark.json", nil)
474506
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
475507
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
508+
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
476509

477510
c, _, tsdbPlannerMock, _, registry := prepare(t, prepareConfig(), bucketClient, nil)
478511
tsdbPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, errors.New("Failed to plan"))
@@ -534,10 +567,14 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
534567
bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", "", nil)
535568
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
536569
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
570+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
571+
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
537572
bucketClient.MockIter("user-1/markers/", nil, nil)
538573
bucketClient.MockIter("user-2/markers/", nil, nil)
539574
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
540575
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
576+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
577+
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)
541578

542579
c, _, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient, nil)
543580

@@ -673,7 +710,9 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
673710
bucketClient.MockDelete("user-1/markers/01DTW0ZCPDDNV4BV83Q2SV4QAZ-deletion-mark.json", nil)
674711
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil)
675712
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
713+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
676714
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
715+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
677716

678717
c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil)
679718

@@ -795,10 +834,14 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) {
795834

796835
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
797836
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
837+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
838+
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
798839
bucketClient.MockIter("user-1/markers/", nil, nil)
799840
bucketClient.MockIter("user-2/markers/", nil, nil)
800841
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
801842
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
843+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
844+
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)
802845

803846
c, _, tsdbPlanner, _, registry := prepare(t, prepareConfig(), bucketClient, nil)
804847

@@ -850,6 +893,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)
850893
bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", nil)
851894
bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/index", nil)
852895
bucketClient.MockDelete("user-1/bucket-index.json.gz", nil)
896+
bucketClient.MockDelete("user-1/bucket-index-sync-status.json", nil)
853897

854898
c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil)
855899

@@ -1024,8 +1068,12 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
10241068
bucketClient.MockUpload("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", nil)
10251069
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
10261070
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
1071+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
1072+
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
10271073
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
10281074
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
1075+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
1076+
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)
10291077

10301078
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
10311079
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
@@ -1107,6 +1155,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
11071155
bucketClient.MockUpload(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
11081156
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
11091157
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
1158+
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
11101159
}
11111160

11121161
// Create a shared KV Store
@@ -1212,6 +1261,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit
12121261
Version: VisitMarkerVersion1,
12131262
}
12141263
visitMarkerFileContent, _ := json.Marshal(blockVisitMarker)
1264+
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", "", nil)
12151265
bucketClient.MockGet(userID+"/"+blockID+"/meta.json", mockBlockMetaJSONWithTime(blockID, userID, blockTimes["startTime"], blockTimes["endTime"]), nil)
12161266
bucketClient.MockGet(userID+"/"+blockID+"/deletion-mark.json", "", nil)
12171267
bucketClient.MockGet(userID+"/"+blockID+"/no-compact-mark.json", "", nil)
@@ -1230,6 +1280,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit
12301280
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)
12311281
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
12321282
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
1283+
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
12331284
}
12341285

12351286
// Create a shared KV Store

pkg/ingester/ingester.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"github.com/prometheus/prometheus/config"
1717
"github.com/prometheus/prometheus/tsdb/chunks"
1818

19+
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
20+
1921
"github.com/go-kit/log"
2022
"github.com/go-kit/log/level"
2123
"github.com/gogo/status"
@@ -2289,6 +2291,14 @@ func (i *Ingester) shipBlocks(ctx context.Context, allowed *util.AllowedTenants)
22892291
}
22902292
defer userDB.casState(activeShipping, active)
22912293

2294+
if idxs, err := bucketindex.ReadSyncStatus(ctx, i.TSDBState.bucket, userID, logutil.WithContext(ctx, i.logger)); err == nil {
2295+
// Skip blocks shipping if the bucket index failed to sync due CMK errors
2296+
if idxs.Status == bucketindex.CustomerManagedKeyError {
2297+
level.Info(logutil.WithContext(ctx, i.logger)).Log("msg", "skipping shipping blocks due CustomerManagedKeyError", "user", userID)
2298+
return nil
2299+
}
2300+
}
2301+
22922302
uploaded, err := userDB.shipper.Sync(ctx)
22932303
if err != nil {
22942304
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err)

0 commit comments

Comments
 (0)