Skip to content

Commit 8490085

Browse files
committed
Implementing Bucket index sync status
Signed-off-by: Alan Protasio <alanprot@gmail.com>
1 parent 7b51a48 commit 8490085

14 files changed

+422
-85
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,11 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID
206206
if err := bucketindex.DeleteIndex(ctx, c.bucketClient, userID, c.cfgProvider); err != nil {
207207
return err
208208
}
209+
210+
// Delete the bucket sync status
211+
if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil {
212+
return err
213+
}
209214
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
210215

211216
var deletedBlocks, failed int
@@ -321,15 +326,40 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
321326
}
322327
}
323328

329+
// Reading bucket index sync stats
330+
idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, userLogger)
331+
332+
if err != nil {
333+
level.Warn(userLogger).Log("msg", "error reading the bucket index status", "err", err)
334+
idxs = bucketindex.Status{Version: bucketindex.SyncStatusFileVersion, NonQueryableReason: bucketindex.Unknown}
335+
}
336+
337+
idxs.Status = bucketindex.Ok
338+
idxs.SyncTime = time.Now().Unix()
339+
324340
// Read the bucket index.
325341
idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger)
342+
343+
defer func() {
344+
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, idxs, userLogger)
345+
}()
346+
326347
if errors.Is(err, bucketindex.ErrIndexCorrupted) {
327348
level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it")
328349
} else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
329350
// Give up cleaning if we get access denied
330-
level.Warn(userLogger).Log("msg", err.Error())
351+
level.Warn(userLogger).Log("msg", "customer manager key access denied", "err", err)
352+
idxs.Status = bucketindex.CustomerManagedKeyError
353+
// Making the tenant non queryable until 2x the cleanup interval to give time to compactors and storegateways
354+
// to reload the bucket index in case the key access is re-granted
355+
idxs.NonQueryableUntil = time.Now().Add(2 * c.cfg.CleanupInterval).Unix()
356+
idxs.NonQueryableReason = bucketindex.CustomerManagedKeyError
357+
358+
// Update the bucket index update time
359+
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
331360
return nil
332361
} else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) {
362+
idxs.Status = bucketindex.GenericError
333363
return err
334364
}
335365

@@ -348,6 +378,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
348378
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)
349379
idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx)
350380
if err != nil {
381+
idxs.Status = bucketindex.GenericError
351382
return err
352383
}
353384

@@ -398,7 +429,6 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
398429
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
399430
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
400431
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
401-
402432
return nil
403433
}
404434

pkg/compactor/blocks_cleaner_test.go

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,14 @@ func TestBlocksCleaner(t *testing.T) {
5757
func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
5858
const userID = "user-1"
5959

60-
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
61-
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
60+
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
61+
bkt = bucketindex.BucketWithGlobalMarkers(bkt)
6262

6363
// Create blocks.
6464
ctx := context.Background()
6565
deletionDelay := 12 * time.Hour
66-
bucketClient = &cortex_testutil.MockBucketFailure{
67-
Bucket: bucketClient,
66+
mbucket := &cortex_testutil.MockBucketFailure{
67+
Bucket: bkt,
6868
GetFailures: map[string]error{
6969
path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError,
7070
},
@@ -77,12 +77,37 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
7777
}
7878

7979
logger := log.NewNopLogger()
80-
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
80+
scanner := tsdb.NewUsersScanner(mbucket, tsdb.AllUsers, logger)
8181
cfgProvider := newMockConfigProvider()
8282

83-
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
84-
err := cleaner.cleanUser(ctx, userID, true)
83+
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil)
84+
85+
// Clean User with no error
86+
cleaner.bucketClient = bkt
87+
err := cleaner.cleanUser(ctx, userID, false)
8588
require.NoError(t, err)
89+
s, err := bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
90+
require.NoError(t, err)
91+
require.Equal(t, bucketindex.Ok, s.Status)
92+
require.Equal(t, int64(0), s.NonQueryableUntil)
93+
94+
// Clean with cmk error
95+
cleaner.bucketClient = mbucket
96+
err = cleaner.cleanUser(ctx, userID, false)
97+
require.NoError(t, err)
98+
s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
99+
require.NoError(t, err)
100+
require.Equal(t, bucketindex.CustomerManagedKeyError, s.Status)
101+
require.Less(t, int64(0), s.NonQueryableUntil)
102+
103+
// Re grant access to the key
104+
cleaner.bucketClient = bkt
105+
err = cleaner.cleanUser(ctx, userID, false)
106+
require.NoError(t, err)
107+
s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
108+
require.NoError(t, err)
109+
require.Equal(t, bucketindex.Ok, s.Status)
110+
require.Less(t, int64(0), s.NonQueryableUntil)
86111
}
87112

88113
func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) {
@@ -232,6 +257,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
232257
require.NoError(t, err)
233258
assert.ElementsMatch(t, tc.expectedBlocks, idx.Blocks.GetULIDs())
234259
assert.ElementsMatch(t, tc.expectedMarks, idx.BlockDeletionMarks.GetULIDs())
260+
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, tc.userID, logger)
261+
require.NoError(t, err)
262+
require.Equal(t, bucketindex.Ok, s.Status)
235263
}
236264

237265
assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
@@ -385,6 +413,9 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
385413
require.NoError(t, err)
386414
assert.ElementsMatch(t, []ulid.ULID{block1, block3}, idx.Blocks.GetULIDs())
387415
assert.ElementsMatch(t, []ulid.ULID{block3}, idx.BlockDeletionMarks.GetULIDs())
416+
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, userID, logger)
417+
require.NoError(t, err)
418+
require.Equal(t, bucketindex.Ok, s.Status)
388419
}
389420

390421
func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShard(t *testing.T) {

pkg/compactor/compactor.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,15 @@ func (c *Compactor) compactUsers(ctx context.Context) {
675675
continue
676676
}
677677

678+
// Skipping compaction if the bucket index failed to sync due CMK errors.
679+
if idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, util_log.WithUserID(userID, c.logger)); err == nil {
680+
if idxs.Status == bucketindex.CustomerManagedKeyError {
681+
c.compactionRunSkippedTenants.Inc()
682+
level.Info(c.logger).Log("msg", "skipping compactUser due CustomerManagedKeyError", "user", userID)
683+
continue
684+
}
685+
}
686+
678687
ownedUsers[userID] = struct{}{}
679688

680689
if markedForDeletion, err := cortex_tsdb.TenantDeletionMarkExists(ctx, c.bucketClient, userID); err != nil {

pkg/compactor/compactor_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,38 @@ func TestConfig_Validate(t *testing.T) {
151151
}
152152
}
153153

154+
func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) {
155+
t.Parallel()
156+
userID := "user-1"
157+
158+
ss := bucketindex.Status{Status: bucketindex.CustomerManagedKeyError, Version: bucketindex.SyncStatusFileVersion}
159+
content, err := json.Marshal(ss)
160+
require.NoError(t, err)
161+
162+
// No user blocks stored in the bucket.
163+
bucketClient := &bucket.ClientMock{}
164+
bucketClient.MockIter("", []string{userID}, nil)
165+
bucketClient.MockIter(userID+"/", []string{}, nil)
166+
bucketClient.MockIter(userID+"/markers/", nil, nil)
167+
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", string(content), nil)
168+
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
169+
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
170+
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
171+
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)
172+
173+
cfg := prepareConfig()
174+
c, _, _, logs, _ := prepare(t, cfg, bucketClient, nil)
175+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
176+
177+
// Wait until a run has completed.
178+
cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} {
179+
return prom_testutil.ToFloat64(c.compactionRunsCompleted)
180+
})
181+
182+
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
183+
assert.Contains(t, strings.Split(strings.TrimSpace(logs.String()), "\n"), `level=info component=compactor msg="skipping compactUser due CustomerManagedKeyError" user=user-1`)
184+
}
185+
154186
func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
155187
t.Parallel()
156188

@@ -465,6 +497,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
465497
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
466498
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
467499
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
500+
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", "", nil)
468501
bucketClient.MockUpload(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
469502
bucketClient.MockGet(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil)
470503
bucketClient.MockGet(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil)
@@ -473,6 +506,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
473506
bucketClient.MockUpload(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/visit-mark.json", nil)
474507
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
475508
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
509+
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
476510

477511
c, _, tsdbPlannerMock, _, registry := prepare(t, prepareConfig(), bucketClient, nil)
478512
tsdbPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, errors.New("Failed to plan"))
@@ -520,6 +554,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
520554
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
521555
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
522556
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
557+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
523558
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil)
524559
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil)
525560
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil)
@@ -534,10 +569,14 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
534569
bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", "", nil)
535570
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
536571
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
572+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
573+
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
537574
bucketClient.MockIter("user-1/markers/", nil, nil)
538575
bucketClient.MockIter("user-2/markers/", nil, nil)
539576
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
540577
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
578+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
579+
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)
541580

542581
c, _, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient, nil)
543582

@@ -652,6 +691,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
652691
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
653692
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", mockDeletionMarkJSON("01DTVP434PA9VFXSW2JKB3392D", time.Now()), nil)
654693
bucketClient.MockGet("user-1/markers/01DTVP434PA9VFXSW2JKB3392D-deletion-mark.json", mockDeletionMarkJSON("01DTVP434PA9VFXSW2JKB3392D", time.Now()), nil)
694+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
655695

656696
// This block will be deleted by cleaner.
657697
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
@@ -673,7 +713,9 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
673713
bucketClient.MockDelete("user-1/markers/01DTW0ZCPDDNV4BV83Q2SV4QAZ-deletion-mark.json", nil)
674714
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil)
675715
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
716+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
676717
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
718+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
677719

678720
c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil)
679721

@@ -775,6 +817,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) {
775817
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
776818
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", mockNoCompactBlockJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
777819
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
820+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
778821
bucketClient.MockUpload("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
779822
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil)
780823
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil)
@@ -795,10 +838,14 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) {
795838

796839
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
797840
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
841+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
842+
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
798843
bucketClient.MockIter("user-1/markers/", nil, nil)
799844
bucketClient.MockIter("user-2/markers/", nil, nil)
800845
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
801846
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
847+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
848+
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)
802849

803850
c, _, tsdbPlanner, _, registry := prepare(t, prepareConfig(), bucketClient, nil)
804851

@@ -843,13 +890,15 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)
843890
bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTVP434PA9VFXSW2JKB3392D/index"}, nil)
844891
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
845892
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/index", "some index content", nil)
893+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
846894
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
847895
bucketClient.MockUpload("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
848896
bucketClient.MockExists("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", false, nil)
849897

850898
bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", nil)
851899
bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/index", nil)
852900
bucketClient.MockDelete("user-1/bucket-index.json.gz", nil)
901+
bucketClient.MockDelete("user-1/bucket-index-sync-status.json", nil)
853902

854903
c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil)
855904

@@ -1006,6 +1055,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
10061055
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
10071056
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
10081057
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
1058+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
10091059
bucketClient.MockUpload("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
10101060
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil)
10111061
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil)
@@ -1024,8 +1074,12 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
10241074
bucketClient.MockUpload("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", nil)
10251075
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
10261076
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
1077+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
1078+
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
10271079
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
10281080
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
1081+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
1082+
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)
10291083

10301084
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
10311085
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
@@ -1104,9 +1158,11 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
11041158
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
11051159
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
11061160
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
1161+
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", "", nil)
11071162
bucketClient.MockUpload(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
11081163
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
11091164
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
1165+
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
11101166
}
11111167

11121168
// Create a shared KV Store
@@ -1212,6 +1268,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit
12121268
Version: VisitMarkerVersion1,
12131269
}
12141270
visitMarkerFileContent, _ := json.Marshal(blockVisitMarker)
1271+
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", "", nil)
12151272
bucketClient.MockGet(userID+"/"+blockID+"/meta.json", mockBlockMetaJSONWithTime(blockID, userID, blockTimes["startTime"], blockTimes["endTime"]), nil)
12161273
bucketClient.MockGet(userID+"/"+blockID+"/deletion-mark.json", "", nil)
12171274
bucketClient.MockGet(userID+"/"+blockID+"/no-compact-mark.json", "", nil)
@@ -1230,6 +1287,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit
12301287
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)
12311288
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
12321289
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
1290+
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
12331291
}
12341292

12351293
// Create a shared KV Store

pkg/ingester/ingester.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"github.com/prometheus/prometheus/config"
1717
"github.com/prometheus/prometheus/tsdb/chunks"
1818

19+
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
20+
1921
"github.com/go-kit/log"
2022
"github.com/go-kit/log/level"
2123
"github.com/gogo/status"
@@ -2289,6 +2291,14 @@ func (i *Ingester) shipBlocks(ctx context.Context, allowed *util.AllowedTenants)
22892291
}
22902292
defer userDB.casState(activeShipping, active)
22912293

2294+
if idxs, err := bucketindex.ReadSyncStatus(ctx, i.TSDBState.bucket, userID, logutil.WithContext(ctx, i.logger)); err == nil {
2295+
// Skip blocks shipping if the bucket index failed to sync due CMK errors.
2296+
if idxs.Status == bucketindex.CustomerManagedKeyError {
2297+
level.Info(logutil.WithContext(ctx, i.logger)).Log("msg", "skipping shipping blocks due CustomerManagedKeyError", "user", userID)
2298+
return nil
2299+
}
2300+
}
2301+
22922302
uploaded, err := userDB.shipper.Sync(ctx)
22932303
if err != nil {
22942304
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err)

0 commit comments

Comments
 (0)