Skip to content

Commit

Permalink
add logic to dedup and compare exemplars
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed May 13, 2021
1 parent eec7d41 commit 3d508c3
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 50 deletions.
62 changes: 45 additions & 17 deletions pkg/exemplars/exemplars.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,43 +79,71 @@ func (rr *GRPCClient) Exemplars(ctx context.Context, req *exemplarspb.ExemplarsR
return nil, nil, errors.Wrap(err, "proxy Exemplars")
}

resp.data = dedupExemplarsSeriesLabels(resp.data, rr.replicaLabels)
resp.data = dedupExemplarsResponse(resp.data, rr.replicaLabels)
return resp.data, resp.warnings, nil
}

func dedupExemplarsSeriesLabels(exemplarsData []*exemplarspb.ExemplarData, replicaLabels map[string]struct{}) []*exemplarspb.ExemplarData {
func dedupExemplarsResponse(exemplarsData []*exemplarspb.ExemplarData, replicaLabels map[string]struct{}) []*exemplarspb.ExemplarData {
if len(exemplarsData) == 0 {
return exemplarsData
}

// Sort each exemplar's label names such that they are comparable.
for _, d := range exemplarsData {
sort.Slice(d.SeriesLabels.Labels, func(i, j int) bool {
return d.SeriesLabels.Labels[i].Name < d.SeriesLabels.Labels[j].Name
// Deduplicate series labels.
hashToExemplar := make(map[uint64]*exemplarspb.ExemplarData)
for _, e := range exemplarsData {
if len(e.Exemplars) == 0 {
continue
}
e.SeriesLabels.Labels = removeReplicaLabels(e.SeriesLabels.Labels, replicaLabels)
h := labelpb.ZLabelsToPromLabels(e.SeriesLabels.Labels).Hash()
if ref, ok := hashToExemplar[h]; ok {
ref.Exemplars = append(ref.Exemplars, e.Exemplars...)
} else {
hashToExemplar[h] = e
}
}

res := make([]*exemplarspb.ExemplarData, 0, len(hashToExemplar))
for _, e := range hashToExemplar {
// Dedup exemplars with the same series labels.
e.Exemplars = dedupExemplars(e.Exemplars)
res = append(res, e)
}

// Sort by series labels.
sort.Slice(res, func(i, j int) bool {
return res[i].Compare(res[j]) < 0
})
return res
}

func dedupExemplars(exemplars []*exemplarspb.Exemplar) []*exemplarspb.Exemplar {
for _, e := range exemplars {
sort.Slice(e.Labels.Labels, func(i, j int) bool {
return e.Labels.Labels[i].Compare(e.Labels.Labels[j]) < 0
})
}

// Sort exemplars data such that they appear next to each other.
sort.Slice(exemplarsData, func(i, j int) bool {
return exemplarsData[i].Compare(exemplarsData[j]) < 0
sort.Slice(exemplars, func(i, j int) bool {
return exemplars[i].Compare(exemplars[j]) < 0
})

i := 0
exemplarsData[i].SeriesLabels.Labels = removeReplicaLabels(exemplarsData[i].SeriesLabels.Labels, replicaLabels)
for j := 1; j < len(exemplarsData); j++ {
exemplarsData[j].SeriesLabels.Labels = removeReplicaLabels(exemplarsData[j].SeriesLabels.Labels, replicaLabels)
if exemplarsData[i].Compare(exemplarsData[j]) != 0 {
// Effectively retain exemplarsData[j] in the resulting slice.
for j := 1; j < len(exemplars); j++ {
if exemplars[i].Compare(exemplars[j]) != 0 {
// Effectively retain exemplars[j] in the resulting slice.
i++
exemplarsData[i] = exemplarsData[j]
continue
exemplars[i] = exemplars[j]
}
}

return exemplarsData[:i+1]
return exemplars[:i+1]
}

func removeReplicaLabels(labels []labelpb.ZLabel, replicaLabels map[string]struct{}) []labelpb.ZLabel {
if len(replicaLabels) == 0 {
return labels
}
newLabels := make([]labelpb.ZLabel, 0, len(labels))
for _, l := range labels {
if _, ok := replicaLabels[l.Name]; !ok {
Expand Down
100 changes: 94 additions & 6 deletions pkg/exemplars/exemplars_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestMain(m *testing.M) {
testutil.TolerantVerifyLeakMain(m)
}

func TestDedupExemplarsSeriesLabels(t *testing.T) {
func TestDedupExemplarsResponse(t *testing.T) {
for _, tc := range []struct {
name string
exemplars, want []*exemplarspb.ExemplarData
Expand Down Expand Up @@ -43,6 +43,56 @@ func TestDedupExemplarsSeriesLabels(t *testing.T) {
}},
},
},
want: []*exemplarspb.ExemplarData{},
},
{
name: "multiple series",
replicaLabels: []string{"replica"},
exemplars: []*exemplarspb.ExemplarData{
{
SeriesLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "__name__", Value: "test_exemplar_metric_total"},
{Name: "instance", Value: "localhost:8090"},
{Name: "job", Value: "prometheus"},
{Name: "service", Value: "bar"},
{Name: "replica", Value: "0"},
}},
Exemplars: []*exemplarspb.Exemplar{
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "traceID", Value: "EpTxMJ40fUus7aGY"},
}},
Value: 19,
Ts: 1600096955479,
},
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "traceID", Value: "EpTxMJ40fUus7aGY"},
}},
Value: 19,
Ts: 1600096955479,
},
},
},
{
SeriesLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "__name__", Value: "test_exemplar_metric_total"},
{Name: "instance", Value: "localhost:8090"},
{Name: "job", Value: "prometheus"},
{Name: "service", Value: "bar"},
{Name: "replica", Value: "1"},
}},
Exemplars: []*exemplarspb.Exemplar{
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "traceID", Value: "EpTxMJ40fUus7aGY"},
}},
Value: 19,
Ts: 1600096955479,
},
},
},
},
want: []*exemplarspb.ExemplarData{
{
SeriesLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
Expand All @@ -51,11 +101,20 @@ func TestDedupExemplarsSeriesLabels(t *testing.T) {
{Name: "job", Value: "prometheus"},
{Name: "service", Value: "bar"},
}},
Exemplars: []*exemplarspb.Exemplar{
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "traceID", Value: "EpTxMJ40fUus7aGY"},
}},
Value: 19,
Ts: 1600096955479,
},
},
},
},
},
{
name: "multiple series",
name: "multiple series with multiple exemplars data",
replicaLabels: []string{"replica"},
exemplars: []*exemplarspb.ExemplarData{
{
Expand All @@ -76,10 +135,10 @@ func TestDedupExemplarsSeriesLabels(t *testing.T) {
},
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "traceID", Value: "EpTxMJ40fUus7aGY"},
{Name: "traceID", Value: "foo"},
}},
Value: 19,
Ts: 1600096955479,
Ts: 1600096955470,
},
},
},
Expand All @@ -92,13 +151,28 @@ func TestDedupExemplarsSeriesLabels(t *testing.T) {
{Name: "replica", Value: "1"},
}},
Exemplars: []*exemplarspb.Exemplar{
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "traceID", Value: "bar"},
}},
Value: 19,
Ts: 1600096955579,
},
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "traceID", Value: "EpTxMJ40fUus7aGY"},
}},
Value: 19,
Ts: 1600096955479,
},
// Same ts but different labels, cannot dedup.
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "traceID", Value: "test"},
}},
Value: 19,
Ts: 1600096955479,
},
},
},
},
Expand All @@ -111,6 +185,13 @@ func TestDedupExemplarsSeriesLabels(t *testing.T) {
{Name: "service", Value: "bar"},
}},
Exemplars: []*exemplarspb.Exemplar{
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "traceID", Value: "foo"},
}},
Value: 19,
Ts: 1600096955470,
},
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "traceID", Value: "EpTxMJ40fUus7aGY"},
Expand All @@ -120,11 +201,18 @@ func TestDedupExemplarsSeriesLabels(t *testing.T) {
},
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "traceID", Value: "EpTxMJ40fUus7aGY"},
{Name: "traceID", Value: "test"},
}},
Value: 19,
Ts: 1600096955479,
},
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
{Name: "traceID", Value: "bar"},
}},
Value: 19,
Ts: 1600096955579,
},
},
},
},
Expand All @@ -135,7 +223,7 @@ func TestDedupExemplarsSeriesLabels(t *testing.T) {
for _, lbl := range tc.replicaLabels {
replicaLabels[lbl] = struct{}{}
}
testutil.Equals(t, tc.want, dedupExemplarsSeriesLabels(tc.exemplars, replicaLabels))
testutil.Equals(t, tc.want, dedupExemplarsResponse(tc.exemplars, replicaLabels))
})
}
}
22 changes: 7 additions & 15 deletions pkg/exemplars/exemplarspb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewWarningExemplarsResponse(warning error) *ExemplarsResponse {
}
}

// Compare only compares the series labels of two exemplar data.
func (s1 *ExemplarData) Compare(s2 *ExemplarData) int {
return labels.Compare(s1.SeriesLabels.PromLabels(), s2.SeriesLabels.PromLabels())
}
Expand All @@ -80,26 +81,17 @@ func (s *ExemplarData) SetSeriesLabels(ls labels.Labels) {
s.SeriesLabels = result
}

func (e *Exemplar) SetLabels(ls labels.Labels) {
var result labelpb.ZLabelSet

if len(ls) > 0 {
result = labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(ls)}
}

e.Labels = result
}

// Compare is used for sorting and comparing exemplars. Start from timestamp, then labels, finally values.
func (e1 *Exemplar) Compare(e2 *Exemplar) int {
if d := labels.Compare(e1.Labels.PromLabels(), e2.Labels.PromLabels()); d != 0 {
return d
}
if e1.Ts < e2.Ts {
return 1
return -1
}
if e1.Ts > e2.Ts {
return -1
return 1
}

if d := labels.Compare(e1.Labels.PromLabels(), e2.Labels.PromLabels()); d != 0 {
return d
}
return big.NewFloat(e1.Value).Cmp(big.NewFloat(e2.Value))
}
15 changes: 3 additions & 12 deletions pkg/exemplars/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,13 @@ func (p *Prometheus) Exemplars(r *exemplarspb.ExemplarsRequest, s exemplarspb.Ex
}

// Prometheus does not add external labels, so we need to add on our own.
enrichExemplarsWithExtLabels(exemplars, p.extLabels())

extLset := p.extLabels()
for _, e := range exemplars {
// Make sure the returned series labels are sorted.
e.SetSeriesLabels(labelpb.ExtendSortedLabels(e.SeriesLabels.PromLabels(), extLset))
if err := s.Send(&exemplarspb.ExemplarsResponse{Result: &exemplarspb.ExemplarsResponse_Data{Data: e}}); err != nil {
return err
}
}
return nil
}

func enrichExemplarsWithExtLabels(exemplars []*exemplarspb.ExemplarData, extLset labels.Labels) {
for _, d := range exemplars {
d.SetSeriesLabels(labelpb.ExtendSortedLabels(d.SeriesLabels.PromLabels(), extLset))
for i, e := range d.Exemplars {
e.SetLabels(labelpb.ExtendSortedLabels(e.Labels.PromLabels(), extLset))
d.Exemplars[i] = e
}
}
}

0 comments on commit 3d508c3

Please sign in to comment.