Skip to content

Filter empty labels from sharding key #5717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684
* [ENHANCEMENT] Index Cache: Multi level cache adds config `max_backfill_items` to cap max items to backfill per async operation. #5686
* [ENHANCEMENT] Query Frontend: Log number of split queries in `query stats` log. #5703
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717


## 1.16.0 2023-11-20

Expand Down
6 changes: 4 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,10 @@ func shardByUser(userID string) uint32 {
func shardByAllLabels(userID string, labels []cortexpb.LabelAdapter) uint32 {
h := shardByUser(userID)
for _, label := range labels {
h = ingester_client.HashAdd32(h, label.Name)
h = ingester_client.HashAdd32(h, label.Value)
if len(label.Value) > 0 {
h = ingester_client.HashAdd32(h, label.Name)
h = ingester_client.HashAdd32(h, label.Value)
}
}
return h
}
Expand Down
89 changes: 88 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2391,6 +2391,7 @@ type prepConfig struct {
replicationFactor int
enableTracker bool
errFail error
tokens [][]uint32
}

func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*prometheus.Registry, *ring.Ring) {
Expand All @@ -2417,14 +2418,20 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
ingesterDescs := map[string]ring.InstanceDesc{}
ingestersByAddr := map[string]*mockIngester{}
for i := range ingesters {
var tokens []uint32
if len(cfg.tokens) > i {
tokens = cfg.tokens[i]
} else {
tokens = []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)}
}
addr := fmt.Sprintf("%d", i)
ingesterDescs[addr] = ring.InstanceDesc{
Addr: addr,
Zone: "",
State: ring.ACTIVE,
Timestamp: time.Now().Unix(),
RegisteredTimestamp: time.Now().Add(-2 * time.Hour).Unix(),
Tokens: []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)},
Tokens: tokens,
}
ingestersByAddr[addr] = ingesters[i]
}
Expand Down Expand Up @@ -3303,6 +3310,86 @@ func TestDistributor_Push_Relabel(t *testing.T) {
}
}

func TestDistributor_Push_EmptyLabel(t *testing.T) {
t.Parallel()
ctx := user.InjectOrgID(context.Background(), "pushEmptyLabel")
type testcase struct {
name string
inputSeries []labels.Labels
expectedSeries labels.Labels
}

cases := []testcase{
{
name: "with empty label",
inputSeries: []labels.Labels{
{ //Token 1106054332 without filtering
{Name: "__name__", Value: "foo"},
{Name: "empty", Value: ""},
},
{ //Token 3827924124 without filtering
{Name: "__name__", Value: "foo"},
{Name: "changHash", Value: ""},
},
},
expectedSeries: labels.Labels{
//Token 1797290973
{Name: "__name__", Value: "foo"},
},
},
}

for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
var err error
var limits validation.Limits
flagext.DefaultValues(&limits)

token := [][]uint32{
{1},
{2},
{3},
{1106054333},
{5},
{6},
{7},
{8},
{9},
{3827924125},
}

ds, ingesters, _, _ := prepare(t, prepConfig{
numIngesters: 10,
happyIngesters: 10,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
replicationFactor: 1,
shuffleShardSize: 10,
tokens: token,
})

// Push the series to the distributor
req := mockWriteRequest(tc.inputSeries, 1, 1)
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
ingesterWithSeries := 0
for i := range ingesters {
timeseries := ingesters[i].series()
if len(timeseries) > 0 {
ingesterWithSeries++
}
}
assert.Equal(t, 1, ingesterWithSeries)
})
}
}

func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing.T) {
t.Parallel()
metricRelabelConfigs := []*relabel.Config{
Expand Down