Skip to content

Add concurrency to the mergeQueryable #4065

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* [ENHANCEMENT] Allow configuration of Cassandra's host selection policy. #4069
* [ENHANCEMENT] Store-gateway: retry synching blocks if a per-tenant sync fails. #3975 #4088
* [ENHANCEMENT] Add metric `cortex_tcp_connections` exposing the current number of accepted TCP connections. #4099
* [ENHANCEMENT] Querier: Allow federated queries to run concurrently. #4065
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
Expand Down
140 changes: 114 additions & 26 deletions pkg/querier/tenantfederation/merge_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strings"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
Expand All @@ -13,12 +14,14 @@ import (
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/concurrency"
)

const (
defaultTenantLabel = "__tenant_id__"
retainExistingPrefix = "original_"
originalDefaultTenantLabel = retainExistingPrefix + defaultTenantLabel
maxConcurrency = 16
)

// NewQueryable returns a queryable that iterates through all the tenant IDs
Expand Down Expand Up @@ -65,6 +68,7 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
}

return &mergeQuerier{
ctx: ctx,
queriers: queriers,
tenantIDs: tenantIDs,
}, nil
Expand All @@ -77,6 +81,7 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
// overwritten by the tenant ID and the previous value is exposed through a new
// label prefixed with "original_". This behaviour is not implemented recursively
type mergeQuerier struct {
ctx context.Context
queriers []storage.Querier
tenantIDs []string
}
Expand All @@ -97,7 +102,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]
name = defaultTenantLabel
}

return m.mergeDistinctStringSlice(func(q storage.Querier) ([]string, storage.Warnings, error) {
return m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) {
return q.LabelValues(name, matchers...)
})
}
Expand All @@ -106,7 +111,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]
// queriers. It also adds the defaultTenantLabel and if present in the original
// results the originalDefaultTenantLabel
func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) {
labelNames, warnings, err := m.mergeDistinctStringSlice(func(q storage.Querier) ([]string, storage.Warnings, error) {
labelNames, warnings, err := m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) {
return q.LabelNames()
})
if err != nil {
Expand Down Expand Up @@ -137,27 +142,64 @@ func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) {
return labelNames, warnings, nil
}

type stringSliceFunc func(storage.Querier) ([]string, storage.Warnings, error)
type stringSliceFunc func(context.Context, storage.Querier) ([]string, storage.Warnings, error)

type stringSliceFuncJob struct {
querier storage.Querier
tenantID string
result []string
warnings storage.Warnings
}

// mergeDistinctStringSlice is aggregating results from stringSliceFunc calls
// on a querier. It removes duplicates and sorts the result. It doesn't require
// the output of the stringSliceFunc to be sorted, as results of LabelValues
// are not sorted.
//
// TODO: Consider running stringSliceFunc calls concurrently
// on per querier in parallel. It removes duplicates and sorts the result. It
// doesn't require the output of the stringSliceFunc to be sorted, as results
// of LabelValues are not sorted.
func (m *mergeQuerier) mergeDistinctStringSlice(f stringSliceFunc) ([]string, storage.Warnings, error) {
var jobs = make([]interface{}, len(m.tenantIDs))

for pos := range m.tenantIDs {
jobs[pos] = &stringSliceFuncJob{
querier: m.queriers[pos],
tenantID: m.tenantIDs[pos],
}
}

run := func(ctx context.Context, jobIntf interface{}) error {
job, ok := jobIntf.(*stringSliceFuncJob)
if !ok {
return fmt.Errorf("unexpected type %T", jobIntf)
}

var err error
job.result, job.warnings, err = f(ctx, job.querier)
if err != nil {
return errors.Wrapf(err, "error querying %s %s", rewriteLabelName(defaultTenantLabel), job.tenantID)
}

return nil
}

err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run)
if err != nil {
return nil, nil, err
}

// aggregate warnings and deduplicate string results
var warnings storage.Warnings
resultMap := make(map[string]struct{})
for pos, tenantID := range m.tenantIDs {
result, resultWarnings, err := f(m.queriers[pos])
if err != nil {
return nil, nil, err
for _, jobIntf := range jobs {
job, ok := jobIntf.(*stringSliceFuncJob)
if !ok {
return nil, nil, fmt.Errorf("unexpected type %T", jobIntf)
}
for _, e := range result {

for _, e := range job.result {
resultMap[e] = struct{}{}
}
for _, w := range resultWarnings {
warnings = append(warnings, fmt.Errorf("error querying tenant id %s: %w", tenantID, w))

for _, w := range job.warnings {
warnings = append(warnings, errors.Wrapf(w, "warning querying %s %s", rewriteLabelName(defaultTenantLabel), job.tenantID))
}
}

Expand All @@ -173,33 +215,60 @@ func (m *mergeQuerier) mergeDistinctStringSlice(f stringSliceFunc) ([]string, st
func (m *mergeQuerier) Close() error {
errs := tsdb_errors.NewMulti()
for pos, tenantID := range m.tenantIDs {
errs.Add(errors.Wrapf(m.queriers[pos].Close(), "failed to close querier for tenant id %s", tenantID))
errs.Add(errors.Wrapf(m.queriers[pos].Close(), "failed to close querier for %s %s", rewriteLabelName(defaultTenantLabel), tenantID))
}
return errs.Err()
}

type selectJob struct {
pos int
querier storage.Querier
tenantID string
}

// Select returns a set of series that matches the given label matchers. If the
// tenantLabelName is matched on it only considers those queriers matching. The
// forwarded labelSelector is not containing those that operate on
// tenantLabelName.
func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
matchedTenants, filteredMatchers := filterValuesByMatchers(defaultTenantLabel, m.tenantIDs, matchers...)
var seriesSets = make([]storage.SeriesSet, 0, len(matchedTenants))
for pos, tenantID := range m.tenantIDs {
if _, matched := matchedTenants[tenantID]; !matched {
var jobs = make([]interface{}, len(matchedTenants))
var seriesSets = make([]storage.SeriesSet, len(matchedTenants))
var jobPos int
for tenantPos := range m.tenantIDs {
if _, matched := matchedTenants[m.tenantIDs[tenantPos]]; !matched {
continue
}
seriesSets = append(seriesSets, &addLabelsSeriesSet{
// TODO: Consider running Select calls concurrently
upstream: m.queriers[pos].Select(sortSeries, hints, filteredMatchers...),
jobs[jobPos] = &selectJob{
pos: jobPos,
querier: m.queriers[tenantPos],
tenantID: m.tenantIDs[tenantPos],
}
jobPos++
}

run := func(ctx context.Context, jobIntf interface{}) error {
job, ok := jobIntf.(*selectJob)
if !ok {
return fmt.Errorf("unexpected type %T", jobIntf)
}
seriesSets[job.pos] = &addLabelsSeriesSet{
upstream: job.querier.Select(sortSeries, hints, filteredMatchers...),
labels: labels.Labels{
{
Name: defaultTenantLabel,
Value: tenantID,
Value: job.tenantID,
},
},
})
}
return nil
}

err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run)
if err != nil {
return storage.ErrSeriesSet(err)
}

return storage.NewMergeSeriesSet(seriesSets, storage.ChainedSeriesMerge)
}

Expand Down Expand Up @@ -266,13 +335,32 @@ func (m *addLabelsSeriesSet) At() storage.Series {
// The error that iteration as failed with.
// When an error occurs, set cannot continue to iterate.
func (m *addLabelsSeriesSet) Err() error {
return m.upstream.Err()
return errors.Wrapf(m.upstream.Err(), "error querying %s", labelsToString(m.labels))
}

// A collection of warnings for the whole set.
// Warnings could be return even iteration has not failed with error.
func (m *addLabelsSeriesSet) Warnings() storage.Warnings {
return m.upstream.Warnings()
upstream := m.upstream.Warnings()
warnings := make(storage.Warnings, len(upstream))
for pos := range upstream {
warnings[pos] = errors.Wrapf(upstream[pos], "warning querying %s", labelsToString(m.labels))
}
return warnings
}

// rewrite label name to be more readable in error output
func rewriteLabelName(s string) string {
return strings.TrimRight(strings.TrimLeft(s, "_"), "_")
}

// this outputs a more readable error format
func labelsToString(labels labels.Labels) string {
parts := make([]string, len(labels))
for pos, l := range labels {
parts[pos] = rewriteLabelName(l.Name) + " " + l.Value
}
return strings.Join(parts, ", ")
}

type addLabelsSeries struct {
Expand Down
Loading