Skip to content

Commit cb457ef

Browse files
committed
Delete invalid visit markers even when partition group not completed
Signed-off-by: Anna Tran <trananna@amazon.com>
1 parent 7123f6b commit cb457ef

4 files changed

Lines changed: 74 additions & 42 deletions

File tree

pkg/compactor/blocks_cleaner.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -815,13 +815,22 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
815815
}
816816
}
817817

818-
if extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker {
819-
// Remove partition visit markers
818+
if extraInfo.status.CanDelete {
819+
// Remove all partition visit markers for completed partitions
820820
if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger, defaultDeleteBlocksConcurrency); err != nil {
821-
level.Warn(partitionedGroupLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "err", err)
821+
level.Warn(partitionedGroupLogger).Log("msg", "failed to delete all partition visit markers for partitioned group", "err", err)
822822
} else {
823823
level.Info(partitionedGroupLogger).Log("msg", "deleted partition visit markers for partitioned group")
824824
}
825+
} else {
826+
// Remove all invalid visit markers
827+
for _, v := range extraInfo.status.VisitMarkersToDelete {
828+
if err := userBucket.Delete(ctx, v.GetVisitMarkerFilePath()); err != nil {
829+
level.Warn(partitionedGroupLogger).Log("msg", "failed to delete invalid visit marker", "partition_visit_marker_file", v.String(), "err", err)
830+
} else {
831+
level.Info(partitionedGroupLogger).Log("msg", "deleted invalid visit marker", "partition_visit_marker_file", v.String())
832+
}
833+
}
825834
}
826835
}
827836
}

pkg/compactor/blocks_cleaner_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -999,6 +999,10 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
999999
partitionedGroupFileExists, err = userBucket.Exists(ctx, GetPartitionedGroupFile(partitionedGroupID))
10001000
require.NoError(t, err)
10011001
require.False(t, partitionedGroupFileExists)
1002+
1003+
partitionedGroupFileExists, err = userBucket.Exists(ctx, visitMarker.GetVisitMarkerFilePath())
1004+
require.NoError(t, err)
1005+
require.False(t, partitionedGroupFileExists)
10021006
}
10031007

10041008
func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {

pkg/compactor/partitioned_group_info.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,23 @@ type PartitionedGroupStatus struct {
4141
PartitionedGroupID uint32
4242
CanDelete bool
4343
IsCompleted bool
44-
DeleteVisitMarker bool
4544
PendingPartitions int
4645
InProgressPartitions int
4746
PendingOrFailedPartitions []Partition
47+
VisitMarkersToDelete []VisitMarker
4848
}
4949

5050
func (s PartitionedGroupStatus) String() string {
5151
var partitions []string
5252
for _, p := range s.PendingOrFailedPartitions {
5353
partitions = append(partitions, fmt.Sprintf("%d", p.PartitionID))
5454
}
55-
return fmt.Sprintf(`{"partitioned_group_id": %d, "can_delete": %t, "is_complete": %t, "delete_visit_marker": %t, "pending_partitions": %d, "in_progress_partitions": %d, "pending_or_failed_partitions": [%s]}`,
56-
s.PartitionedGroupID, s.CanDelete, s.IsCompleted, s.DeleteVisitMarker, s.PendingPartitions, s.InProgressPartitions, strings.Join(partitions, ","))
55+
var visitMarkers []string
56+
for _, v := range s.VisitMarkersToDelete {
57+
visitMarkers = append(visitMarkers, v.GetVisitMarkerFilePath())
58+
}
59+
return fmt.Sprintf(`{"partitioned_group_id": %d, "can_delete": %t, "is_complete": %t, "pending_partitions": %d, "in_progress_partitions": %d, "pending_or_failed_partitions": [%s], "visit_markers_to_delete": [%s]}`,
60+
s.PartitionedGroupID, s.CanDelete, s.IsCompleted, s.PendingPartitions, s.InProgressPartitions, strings.Join(partitions, ","), strings.Join(visitMarkers, ","))
5761
}
5862

5963
type PartitionedGroupInfo struct {
@@ -125,10 +129,10 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus(
125129
PartitionedGroupID: p.PartitionedGroupID,
126130
CanDelete: false,
127131
IsCompleted: false,
128-
DeleteVisitMarker: false,
129132
PendingPartitions: 0,
130133
InProgressPartitions: 0,
131134
PendingOrFailedPartitions: []Partition{},
135+
VisitMarkersToDelete: []VisitMarker{},
132136
}
133137
allPartitionCompleted := true
134138
hasInProgressPartitions := false
@@ -153,7 +157,7 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus(
153157
allPartitionCompleted = false
154158
status.PendingOrFailedPartitions = append(status.PendingOrFailedPartitions, partition)
155159
} else if visitMarker.VisitTime < p.CreationTime {
156-
status.DeleteVisitMarker = true
160+
status.VisitMarkersToDelete = append(status.VisitMarkersToDelete, visitMarker)
157161
allPartitionCompleted = false
158162
} else if (visitMarker.GetStatus() == Pending || visitMarker.GetStatus() == InProgress) && !visitMarker.IsExpired(partitionVisitMarkerTimeout) {
159163
status.InProgressPartitions++
@@ -174,7 +178,6 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus(
174178

175179
if allPartitionCompleted {
176180
status.CanDelete = true
177-
status.DeleteVisitMarker = true
178181
return status
179182
}
180183

@@ -187,19 +190,16 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus(
187190
if !p.doesBlockExist(ctx, userBucket, partitionedGroupLogger, blockID) {
188191
level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is physically deleted", "block", blockID)
189192
status.CanDelete = true
190-
status.DeleteVisitMarker = true
191193
return status
192194
}
193195
if p.isBlockDeleted(ctx, userBucket, partitionedGroupLogger, blockID) {
194196
level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is marked for deletion", "block", blockID)
195197
status.CanDelete = true
196-
status.DeleteVisitMarker = true
197198
return status
198199
}
199200
if p.isBlockNoCompact(ctx, userBucket, partitionedGroupLogger, blockID) {
200201
level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is marked for no compact", "block", blockID)
201202
status.CanDelete = true
202-
status.DeleteVisitMarker = true
203203
return status
204204
}
205205
checkedBlocks[blockID] = struct{}{}

pkg/compactor/partitioned_group_info_test.go

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) {
177177
{
178178
name: "test one partition is not visited and contains block marked for deletion",
179179
expectedResult: PartitionedGroupStatus{
180-
CanDelete: true,
181-
IsCompleted: false,
182-
DeleteVisitMarker: true,
180+
CanDelete: true,
181+
IsCompleted: false,
182+
VisitMarkersToDelete: []VisitMarker{},
183183
PendingOrFailedPartitions: []Partition{
184184
{
185185
PartitionID: 1,
@@ -230,9 +230,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) {
230230
{
231231
name: "test one partition is pending and contains block marked for deletion",
232232
expectedResult: PartitionedGroupStatus{
233-
CanDelete: true,
234-
IsCompleted: false,
235-
DeleteVisitMarker: true,
233+
CanDelete: true,
234+
IsCompleted: false,
235+
VisitMarkersToDelete: []VisitMarker{},
236236
PendingOrFailedPartitions: []Partition{
237237
{
238238
PartitionID: 1,
@@ -292,7 +292,7 @@ func TestGetPartitionedGroupStatus(t *testing.T) {
292292
expectedResult: PartitionedGroupStatus{
293293
CanDelete: false,
294294
IsCompleted: false,
295-
DeleteVisitMarker: false,
295+
VisitMarkersToDelete: []VisitMarker{},
296296
PendingOrFailedPartitions: []Partition{},
297297
},
298298
partitionedGroupInfo: PartitionedGroupInfo{
@@ -342,9 +342,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) {
342342
{
343343
name: "test one partition is pending expired",
344344
expectedResult: PartitionedGroupStatus{
345-
CanDelete: false,
346-
IsCompleted: false,
347-
DeleteVisitMarker: false,
345+
CanDelete: false,
346+
IsCompleted: false,
347+
VisitMarkersToDelete: []VisitMarker{},
348348
PendingOrFailedPartitions: []Partition{
349349
{
350350
PartitionID: 0,
@@ -400,9 +400,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) {
400400
{
401401
name: "test one partition is complete with one block deleted and one partition is not visited with no blocks deleted",
402402
expectedResult: PartitionedGroupStatus{
403-
CanDelete: false,
404-
IsCompleted: false,
405-
DeleteVisitMarker: false,
403+
CanDelete: false,
404+
IsCompleted: false,
405+
VisitMarkersToDelete: []VisitMarker{},
406406
PendingOrFailedPartitions: []Partition{
407407
{
408408
PartitionID: 1,
@@ -453,9 +453,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) {
453453
{
454454
name: "test one partition is complete and one partition is failed with no blocks deleted",
455455
expectedResult: PartitionedGroupStatus{
456-
CanDelete: false,
457-
IsCompleted: false,
458-
DeleteVisitMarker: false,
456+
CanDelete: false,
457+
IsCompleted: false,
458+
VisitMarkersToDelete: []VisitMarker{},
459459
PendingOrFailedPartitions: []Partition{
460460
{
461461
PartitionID: 1,
@@ -511,9 +511,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) {
511511
{
512512
name: "test one partition is complete and one partition is failed one block deleted",
513513
expectedResult: PartitionedGroupStatus{
514-
CanDelete: true,
515-
IsCompleted: false,
516-
DeleteVisitMarker: true,
514+
CanDelete: true,
515+
IsCompleted: false,
516+
VisitMarkersToDelete: []VisitMarker{},
517517
PendingOrFailedPartitions: []Partition{
518518
{
519519
PartitionID: 1,
@@ -573,7 +573,7 @@ func TestGetPartitionedGroupStatus(t *testing.T) {
573573
expectedResult: PartitionedGroupStatus{
574574
CanDelete: true,
575575
IsCompleted: true,
576-
DeleteVisitMarker: true,
576+
VisitMarkersToDelete: []VisitMarker{},
577577
PendingOrFailedPartitions: []Partition{},
578578
},
579579
partitionedGroupInfo: PartitionedGroupInfo{
@@ -623,9 +623,24 @@ func TestGetPartitionedGroupStatus(t *testing.T) {
623623
{
624624
name: "test partitioned group created after visit marker",
625625
expectedResult: PartitionedGroupStatus{
626-
CanDelete: false,
627-
IsCompleted: false,
628-
DeleteVisitMarker: true,
626+
CanDelete: false,
627+
IsCompleted: false,
628+
VisitMarkersToDelete: []VisitMarker{
629+
&partitionVisitMarker{
630+
PartitionedGroupID: partitionedGroupID,
631+
PartitionID: 0,
632+
Status: Completed,
633+
VisitTime: time.Now().Add(-2 * time.Minute).Unix(),
634+
Version: PartitionVisitMarkerVersion1,
635+
},
636+
&partitionVisitMarker{
637+
PartitionedGroupID: partitionedGroupID,
638+
PartitionID: 1,
639+
Status: Completed,
640+
VisitTime: time.Now().Add(-2 * time.Minute).Unix(),
641+
Version: PartitionVisitMarkerVersion1,
642+
},
643+
},
629644
PendingOrFailedPartitions: []Partition{},
630645
},
631646
partitionedGroupInfo: PartitionedGroupInfo{
@@ -675,7 +690,7 @@ func TestGetPartitionedGroupStatus(t *testing.T) {
675690
expectedResult: PartitionedGroupStatus{
676691
CanDelete: false,
677692
IsCompleted: false,
678-
DeleteVisitMarker: false,
693+
VisitMarkersToDelete: []VisitMarker{},
679694
PendingOrFailedPartitions: []Partition{},
680695
},
681696
partitionedGroupInfo: PartitionedGroupInfo{
@@ -725,9 +740,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) {
725740
{
726741
name: "test one partition is not visited and contains block with no compact mark",
727742
expectedResult: PartitionedGroupStatus{
728-
CanDelete: true,
729-
IsCompleted: false,
730-
DeleteVisitMarker: true,
743+
CanDelete: true,
744+
IsCompleted: false,
745+
VisitMarkersToDelete: []VisitMarker{},
731746
PendingOrFailedPartitions: []Partition{
732747
{
733748
PartitionID: 1,
@@ -778,9 +793,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) {
778793
{
779794
name: "test one partition is expired and contains block with no compact mark",
780795
expectedResult: PartitionedGroupStatus{
781-
CanDelete: true,
782-
IsCompleted: false,
783-
DeleteVisitMarker: true,
796+
CanDelete: true,
797+
IsCompleted: false,
798+
VisitMarkersToDelete: []VisitMarker{},
784799
PendingOrFailedPartitions: []Partition{
785800
{
786801
PartitionID: 1,
@@ -877,6 +892,10 @@ func TestGetPartitionedGroupStatus(t *testing.T) {
877892
for _, partition := range result.PendingOrFailedPartitions {
878893
require.Contains(t, tcase.expectedResult.PendingOrFailedPartitions, partition)
879894
}
895+
require.Equal(t, len(tcase.expectedResult.VisitMarkersToDelete), len(result.VisitMarkersToDelete))
896+
for _, visitMarker := range result.VisitMarkersToDelete {
897+
require.Contains(t, tcase.expectedResult.VisitMarkersToDelete, visitMarker)
898+
}
880899
})
881900
}
882901
}

0 commit comments

Comments
 (0)