Skip to content

Commit 8d0f0df

Browse files
yeya24anna-trandanielblando
authored
Cherry pick PR 7082 and 7086 to 1.20 release (#7089)
* Fix metric name validation to use correct validation scheme method (#7087) Signed-off-by: Anna Tran <trananna@amazon.com> * bump to 1.20.0-rc.1 Signed-off-by: yeya24 <benye@amazon.com> * Fix visit marker race condition (#7082) * update changelog Signed-off-by: yeya24 <benye@amazon.com> --------- Signed-off-by: Anna Tran <trananna@amazon.com> Signed-off-by: yeya24 <benye@amazon.com> Co-authored-by: Anna Tran <trananna@amazon.com> Co-authored-by: Daniel Blando <daniel@blando.com.br>
1 parent 6a165b9 commit 8d0f0df

File tree

8 files changed

+239
-9
lines changed

8 files changed

+239
-9
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@
109109
* [BUGFIX] Distributor: Fix `/distributor/all_user_stats` api to work during rolling updates on ingesters. #7026
110110
* [BUGFIX] Runtime-config: Fix panic when the runtime config is `null`. #7062
111111
* [BUGFIX] Scheduler: Avoid all queriers reserved for prioritized requests. #7057
112+
* [BUGFIX] Fix bug where validating metric names uses the wrong validation logic. #7086
113+
* [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082
112114

113115
## 1.19.1 2025-09-20
114116

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.20.0-rc.0
1+
1.20.0-rc.1

pkg/compactor/compactor_paritioning_test.go

Lines changed: 172 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1299,7 +1299,24 @@ func TestPartitionCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingE
12991299

13001300
// Get all of the unique group hashes so that they can be used to ensure all groups were compacted
13011301
groupHashes[groupHash]++
1302-
bucketClient.MockGet(userID+"/partitioned-groups/"+fmt.Sprint(groupHash)+".json", "", nil)
1302+
1303+
// Create mock partitioned group info for the new validation check
1304+
partitionedGroupInfo := PartitionedGroupInfo{
1305+
PartitionedGroupID: groupHash,
1306+
PartitionCount: 1,
1307+
Partitions: []Partition{
1308+
{
1309+
PartitionID: 0,
1310+
Blocks: []ulid.ULID{ulid.MustParse(blockID)},
1311+
},
1312+
},
1313+
RangeStart: blockTimes["startTime"],
1314+
RangeEnd: blockTimes["endTime"],
1315+
CreationTime: time.Now().Unix(),
1316+
Version: PartitionedGroupInfoVersion1,
1317+
}
1318+
partitionedGroupInfoContent, _ := json.Marshal(partitionedGroupInfo)
1319+
bucketClient.MockGet(userID+"/partitioned-groups/"+fmt.Sprint(groupHash)+".json", string(partitionedGroupInfoContent), nil)
13031320
bucketClient.MockUpload(userID+"/partitioned-groups/"+fmt.Sprint(groupHash)+".json", nil)
13041321
}
13051322

@@ -1826,3 +1843,157 @@ func TestPartitionCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFrom
18261843

18271844
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
18281845
}
1846+
1847+
func TestPartitionCompactionRaceCondition(t *testing.T) {
1848+
t.Run("planner_detects_missing_partition_group", func(t *testing.T) {
1849+
setup := newRaceConditionTestSetup(12345)
1850+
1851+
// Create a planner that will try to process blocks but find missing partition group
1852+
planner := setup.createPlanner()
1853+
cortexMetaExtensions := setup.createCortexMetaExtensions(time.Now().Unix())
1854+
metasByMinTime := setup.createTestMetadata()
1855+
1856+
result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil)
1857+
1858+
require.Error(t, err, "Planner should fail when partition group is missing")
1859+
require.Nil(t, result, "Should not return any result when partition group is missing")
1860+
require.ErrorIs(t, err, plannerCompletedPartitionError, "Error should be completed partition error when partition group is missing")
1861+
})
1862+
1863+
t.Run("planner_detects_creation_time_mismatch", func(t *testing.T) {
1864+
setup := newRaceConditionTestSetup(54321)
1865+
originalCreationTime := time.Now().Unix()
1866+
1867+
// Create initial partition group
1868+
partitionedGroupInfo := setup.createPartitionedGroupInfo(originalCreationTime)
1869+
_, err := UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *partitionedGroupInfo)
1870+
require.NoError(t, err)
1871+
1872+
// Simulate cleaner deleting partition group
1873+
partitionGroupFile := GetPartitionedGroupFile(setup.partitionedGroupID)
1874+
err = setup.bucket.Delete(setup.ctx, partitionGroupFile)
1875+
require.NoError(t, err)
1876+
1877+
// Create new partition group with same ID but different creation time
1878+
newCreationTime := time.Now().Unix() + 200
1879+
newPartitionedGroupInfo := setup.createPartitionedGroupInfo(newCreationTime)
1880+
_, err = UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *newPartitionedGroupInfo)
1881+
require.NoError(t, err)
1882+
1883+
// Test planner creation time validation
1884+
planner := setup.createPlanner()
1885+
cortexMetaExtensions := setup.createCortexMetaExtensions(originalCreationTime) // OLD creation time
1886+
metasByMinTime := setup.createTestMetadata()
1887+
1888+
result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil)
1889+
1890+
require.Error(t, err, "Planner should detect creation time mismatch")
1891+
require.ErrorIs(t, err, plannerCompletedPartitionError, "Should abort with completed partition error")
1892+
require.Nil(t, result, "Should not return any result when aborting")
1893+
})
1894+
1895+
t.Run("normal_operation_with_matching_creation_time", func(t *testing.T) {
1896+
setup := newRaceConditionTestSetup(99999)
1897+
creationTime := time.Now().Unix()
1898+
1899+
// Create partition group
1900+
partitionedGroupInfo := setup.createPartitionedGroupInfo(creationTime)
1901+
_, err := UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *partitionedGroupInfo)
1902+
require.NoError(t, err)
1903+
1904+
// Create planner and test with matching creation time
1905+
planner := setup.createPlanner()
1906+
cortexMetaExtensions := setup.createCortexMetaExtensions(creationTime) // MATCHING creation time
1907+
metasByMinTime := setup.createTestMetadata()
1908+
1909+
result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil)
1910+
1911+
require.NoError(t, err, "Should not fail when creation times match")
1912+
require.NotNil(t, result, "Should return result when creation times match")
1913+
})
1914+
}
1915+
1916+
// raceConditionTestSetup provides common setup for race condition tests
1917+
type raceConditionTestSetup struct {
1918+
ctx context.Context
1919+
logger log.Logger
1920+
bucket objstore.InstrumentedBucket
1921+
userID string
1922+
partitionedGroupID uint32
1923+
partitionID int
1924+
partitionCount int
1925+
ranges []int64
1926+
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark
1927+
}
1928+
1929+
func newRaceConditionTestSetup(partitionedGroupID uint32) *raceConditionTestSetup {
1930+
return &raceConditionTestSetup{
1931+
ctx: context.Background(),
1932+
logger: log.NewNopLogger(),
1933+
bucket: objstore.WithNoopInstr(objstore.NewInMemBucket()),
1934+
userID: "test-user",
1935+
partitionedGroupID: partitionedGroupID,
1936+
partitionID: 0,
1937+
partitionCount: 2,
1938+
ranges: []int64{2 * 60 * 60 * 1000}, // 2 hours in milliseconds
1939+
noCompBlocksFunc: func() map[ulid.ULID]*metadata.NoCompactMark { return nil },
1940+
}
1941+
}
1942+
1943+
func (s *raceConditionTestSetup) createPartitionedGroupInfo(creationTime int64) *PartitionedGroupInfo {
1944+
return &PartitionedGroupInfo{
1945+
PartitionedGroupID: s.partitionedGroupID,
1946+
PartitionCount: s.partitionCount,
1947+
Partitions: []Partition{
1948+
{PartitionID: 0, Blocks: []ulid.ULID{ulid.MustNew(ulid.Now(), nil)}},
1949+
{PartitionID: 1, Blocks: []ulid.ULID{ulid.MustNew(ulid.Now(), nil)}},
1950+
},
1951+
RangeStart: 0,
1952+
RangeEnd: 2 * 60 * 60 * 1000,
1953+
CreationTime: creationTime,
1954+
Version: PartitionedGroupInfoVersion1,
1955+
}
1956+
}
1957+
1958+
func (s *raceConditionTestSetup) createPlanner() *PartitionCompactionPlanner {
1959+
// Use the same metrics pattern as other tests
1960+
registerer := prometheus.NewPedanticRegistry()
1961+
metrics := newCompactorMetrics(registerer)
1962+
1963+
return NewPartitionCompactionPlanner(
1964+
s.ctx,
1965+
s.bucket,
1966+
s.logger,
1967+
s.ranges,
1968+
s.noCompBlocksFunc,
1969+
"test-compactor",
1970+
s.userID,
1971+
time.Second,
1972+
10*time.Minute,
1973+
time.Minute,
1974+
metrics,
1975+
)
1976+
}
1977+
1978+
func (s *raceConditionTestSetup) createCortexMetaExtensions(creationTime int64) *cortex_tsdb.CortexMetaExtensions {
1979+
return &cortex_tsdb.CortexMetaExtensions{
1980+
PartitionInfo: &cortex_tsdb.PartitionInfo{
1981+
PartitionedGroupID: s.partitionedGroupID,
1982+
PartitionCount: s.partitionCount,
1983+
PartitionID: s.partitionID,
1984+
PartitionedGroupCreationTime: creationTime,
1985+
},
1986+
}
1987+
}
1988+
1989+
func (s *raceConditionTestSetup) createTestMetadata() []*metadata.Meta {
1990+
return []*metadata.Meta{
1991+
{
1992+
BlockMeta: tsdb.BlockMeta{
1993+
ULID: ulid.MustNew(ulid.Now(), nil),
1994+
MinTime: 0,
1995+
MaxTime: 2 * 60 * 60 * 1000,
1996+
},
1997+
},
1998+
}
1999+
}

pkg/compactor/partition_compaction_grouper.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,19 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact
639639
level.Info(partitionedGroupLogger).Log("msg", "skipping group because partition is visited")
640640
continue
641641
}
642+
643+
// Validate that the partition group still exists before creating a visit marker
644+
// This prevents the race condition where the cleaner deletes the partition group
645+
// between the visit marker check and the visit marker creation
646+
if _, err := ReadPartitionedGroupInfo(g.ctx, g.bkt, g.logger, partitionedGroupID); err != nil {
647+
if errors.Is(err, ErrorPartitionedGroupInfoNotFound) {
648+
level.Info(partitionedGroupLogger).Log("msg", "skipping group because partition group was deleted by cleaner", "partitioned_group_id", partitionedGroupID)
649+
} else {
650+
level.Warn(partitionedGroupLogger).Log("msg", "unable to read partition group info", "err", err, "partitioned_group_id", partitionedGroupID)
651+
}
652+
continue
653+
}
654+
642655
partitionedGroupKey := createGroupKeyWithPartitionID(groupHash, partitionID, *partitionedGroup)
643656

644657
level.Info(partitionedGroupLogger).Log("msg", "found compactable group for user", "group", partitionedGroup.String())

pkg/compactor/partition_compaction_planner.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,34 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB
109109
}
110110
}
111111

112+
// Double-check that the partition group still exists and is the same one we started with
113+
// to prevent race condition with cleaner. If the cleaner deleted the partition group
114+
// after we created the visit marker in the grouper, we should abort the compaction
115+
// to avoid orphaned visit markers.
116+
currentPartitionedGroupInfo, err := ReadPartitionedGroupInfo(p.ctx, p.bkt, p.logger, partitionedGroupID)
117+
if err != nil {
118+
if errors.Is(err, ErrorPartitionedGroupInfoNotFound) {
119+
level.Warn(p.logger).Log("msg", "partition group was deleted by cleaner, aborting compaction", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID)
120+
return nil, plannerCompletedPartitionError
121+
} else {
122+
level.Warn(p.logger).Log("msg", "unable to read partition group info during planning", "err", err, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID)
123+
return nil, fmt.Errorf("unable to read partition group info for partition ID %d, partitioned group ID %d: %s", partitionID, partitionedGroupID, err.Error())
124+
}
125+
}
126+
127+
// Verify that this is the same partition group that the grouper created the visit marker for
128+
// by comparing creation times. If they don't match, it means the cleaner deleted the old
129+
// partition group and a new one was created with the same ID.
130+
expectedCreationTime := partitionInfo.PartitionedGroupCreationTime
131+
if currentPartitionedGroupInfo.CreationTime != expectedCreationTime {
132+
level.Warn(p.logger).Log("msg", "partition group creation time mismatch, cleaner deleted old group and new one was created, aborting compaction",
133+
"partitioned_group_id", partitionedGroupID,
134+
"partition_id", partitionID,
135+
"expected_creation_time", expectedCreationTime,
136+
"current_creation_time", currentPartitionedGroupInfo.CreationTime)
137+
return nil, plannerCompletedPartitionError
138+
}
139+
112140
// Ensure all blocks fits within the largest range. This is a double check
113141
// to ensure there's no bug in the previous blocks grouping, given this Plan()
114142
// is just a pass-through.

pkg/compactor/partition_compaction_planner_test.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -289,9 +289,24 @@ func TestPartitionCompactionPlanner_Plan(t *testing.T) {
289289
Version: PartitionVisitMarkerVersion1,
290290
}
291291
visitMarkerFileContent, _ := json.Marshal(visitMarker)
292+
// Mock partition group info for race condition fix
293+
partitionedGroupInfo := PartitionedGroupInfo{
294+
PartitionedGroupID: partitionedGroupID,
295+
PartitionCount: 1,
296+
Partitions: []Partition{
297+
{PartitionID: partitionID, Blocks: []ulid.ULID{}},
298+
},
299+
RangeStart: 0,
300+
RangeEnd: 2 * time.Hour.Milliseconds(),
301+
CreationTime: time.Now().Unix(),
302+
Version: PartitionedGroupInfoVersion1,
303+
}
304+
partitionedGroupContent, _ := json.Marshal(partitionedGroupInfo)
305+
partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID)
306+
292307
bkt.MockGet(visitMarkerFile, string(visitMarkerFileContent), nil)
308+
bkt.MockGet(partitionedGroupFile, string(partitionedGroupContent), nil)
293309
bkt.MockUpload(mock.Anything, nil)
294-
bkt.MockGet(mock.Anything, "", nil)
295310

296311
registerer := prometheus.NewPedanticRegistry()
297312

@@ -316,9 +331,10 @@ func TestPartitionCompactionPlanner_Plan(t *testing.T) {
316331
)
317332
actual, err := p.Plan(context.Background(), testData.blocks, nil, &cortextsdb.CortexMetaExtensions{
318333
PartitionInfo: &cortextsdb.PartitionInfo{
319-
PartitionCount: 1,
320-
PartitionID: partitionID,
321-
PartitionedGroupID: partitionedGroupID,
334+
PartitionCount: 1,
335+
PartitionID: partitionID,
336+
PartitionedGroupID: partitionedGroupID,
337+
PartitionedGroupCreationTime: partitionedGroupInfo.CreationTime,
322338
},
323339
})
324340

pkg/util/validation/validate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func ValidateLabels(validateMetrics *ValidateMetrics, limits *Limits, userID str
285285
return newNoMetricNameError()
286286
}
287287

288-
if !nameValidationScheme.IsValidLabelName(unsafeMetricName) {
288+
if !nameValidationScheme.IsValidMetricName(unsafeMetricName) {
289289
validateMetrics.DiscardedSamples.WithLabelValues(invalidMetricName, userID).Inc()
290290
return newInvalidMetricNameError(unsafeMetricName)
291291
}

pkg/util/validation/validate_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func TestValidateLabels(t *testing.T) {
134134
}, "foo "),
135135
},
136136
{
137-
map[model.LabelName]model.LabelValue{model.MetricNameLabel: "valid"},
137+
map[model.LabelName]model.LabelValue{model.MetricNameLabel: "valid:name"},
138138
false,
139139
nil,
140140
},
@@ -201,7 +201,7 @@ func TestValidateLabels(t *testing.T) {
201201
# HELP cortex_label_size_bytes The combined size in bytes of all labels and label values for a time series.
202202
# TYPE cortex_label_size_bytes histogram
203203
cortex_label_size_bytes_bucket{user="testUser",le="+Inf"} 3
204-
cortex_label_size_bytes_sum{user="testUser"} 148
204+
cortex_label_size_bytes_sum{user="testUser"} 153
205205
cortex_label_size_bytes_count{user="testUser"} 3
206206
`), "cortex_label_size_bytes"))
207207

0 commit comments

Comments
 (0)