Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative implementation: Push replica labels at the end after Recv #5742

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 26 additions & 34 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ type querier struct {
logger log.Logger
cancel func()
mint, maxt int64
replicaLabels map[string]struct{}
replicaLabels []string
replicaLabelSet map[string]struct{}
storeDebugMatchers [][]*labels.Matcher
proxy storepb.StoreServer
deduplicate bool
Expand Down Expand Up @@ -134,9 +135,11 @@ func newQuerier(
selectGate: selectGate,
selectTimeout: selectTimeout,

mint: mint,
maxt: maxt,
replicaLabels: rl,
mint: mint,
maxt: maxt,

replicaLabels: replicaLabels,
replicaLabelSet: rl,
storeDebugMatchers: storeDebugMatchers,
proxy: proxy,
deduplicate: deduplicate,
Expand All @@ -157,13 +160,18 @@ type seriesServer struct {
storepb.Store_SeriesServer
ctx context.Context

seriesSet []storepb.Series
warnings []string
sortRequired bool
seriesSet []storepb.Series
warnings []string
}

func (s *seriesServer) Send(r *storepb.SeriesResponse) error {
if r.GetWarning() != "" {
s.warnings = append(s.warnings, r.GetWarning())
if r.GetWarning() == store.ErrUnsortedSeriesSetDetected.Error() {
s.sortRequired = true
} else {
s.warnings = append(s.warnings, r.GetWarning())
}
return nil
}

Expand Down Expand Up @@ -297,13 +305,18 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
queryHints = storeHintsFromPromHints(hints)
}

var replicaLabels []string
if q.deduplicate {
replicaLabels = q.replicaLabels
}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: hints.Start,
MaxTime: hints.End,
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: aggrs,
QueryHints: queryHints,
ReplicaLabels: replicaLabels,
ShardInfo: q.shardInfo,
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
Expand Down Expand Up @@ -345,8 +358,9 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
}, nil
}

// TODO(fabxc): this could potentially pushed further down into the store API to make true streaming possible.
sortDedupLabels(resp.seriesSet, q.replicaLabels)
if resp.sortRequired {
sortSeries(resp.seriesSet)
}
set := &promSeriesSet{
mint: q.mint,
maxt: q.maxt,
Expand All @@ -357,33 +371,11 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

// The merged series set assembles all potentially-overlapping time ranges of the same series into a single one.
// TODO(bwplotka): We could potentially dedup on chunk level, use chunk iterator for that when available.
return dedup.NewSeriesSet(set, q.replicaLabels, hints.Func, q.enableQueryPushdown), nil
return dedup.NewSeriesSet(set, q.replicaLabelSet, hints.Func, q.enableQueryPushdown), nil
}

// sortDedupLabels re-sorts the set so that the same series with different replica
// labels are coming right after each other.
func sortDedupLabels(set []storepb.Series, replicaLabels map[string]struct{}) {
for _, s := range set {
// Move the replica labels to the very end.
sort.Slice(s.Labels, func(i, j int) bool {
if _, ok := replicaLabels[s.Labels[i].Name]; ok {
return false
}
if _, ok := replicaLabels[s.Labels[j].Name]; ok {
return true
}
// Ensure that dedup marker goes just right before the replica labels.
if s.Labels[i].Name == dedup.PushdownMarker.Name {
return false
}
if s.Labels[j].Name == dedup.PushdownMarker.Name {
return true
}
return s.Labels[i].Name < s.Labels[j].Name
})
}
// With the re-ordered label sets, re-sorting all series aligns the same series
// from different replicas sequentially.
// sortSeries re-sorts the series set. Used in case one server sends unsorted data.
func sortSeries(set []storepb.Series) {
sort.Slice(set, func(i, j int) bool {
return labels.Compare(labelpb.ZLabelsToPromLabels(set[i].Labels), labelpb.ZLabelsToPromLabels(set[j].Labels)) < 0
})
Expand Down
Loading