Skip to content

Query Partial Data #6526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2e86c7b
Create partial_data
justinjung04 Jan 20, 2025
ca7b8f3
Fix lazyquery so that warning message is returned
justinjung04 Jan 20, 2025
c71eb5c
Add QueryPartialData limit
justinjung04 Jan 20, 2025
53b41a7
Fix broken mock
justinjung04 Jan 20, 2025
455b25f
Make response with warnings to be not cached
justinjung04 Jan 20, 2025
e2ef335
Updated streamingSelect in distributor_queryable
justinjung04 Jan 21, 2025
96f923c
Update query.go
justinjung04 Jan 21, 2025
1f77f22
Update replication_set
justinjung04 Jan 21, 2025
0f244b8
Lint
justinjung04 Jan 21, 2025
8f581e7
Lint again
justinjung04 Jan 21, 2025
aedab0b
Generated doc
justinjung04 Jan 21, 2025
7790c7d
Changelog
justinjung04 Jan 21, 2025
de7f78a
Update config description
justinjung04 Jan 21, 2025
ea34c0e
Do not remove warnings from seriesSet
justinjung04 Jan 23, 2025
258c488
Avoid cache only if the warning message contains partial data error
justinjung04 Jan 23, 2025
8d50cea
Remove context usage for partial data
justinjung04 Jan 23, 2025
c167f4a
Refactor how partial data info is passed + apply to series and label …
justinjung04 Jan 23, 2025
b13ae90
Lint + fix tests
justinjung04 Jan 23, 2025
c191b77
Fix build
justinjung04 Jan 23, 2025
9c437b3
Create separate config for ruler partial data
justinjung04 Jan 23, 2025
cb24add
Genereta doc
justinjung04 Jan 23, 2025
b7a7ffc
Add more tests
justinjung04 Jan 24, 2025
df5fd8e
Change error
justinjung04 Jan 24, 2025
28ccc38
Fix test
justinjung04 Jan 24, 2025
ba49805
Update changelog
justinjung04 Jan 24, 2025
d626716
Update changelog
justinjung04 Jan 24, 2025
a92e069
Nit
justinjung04 Jan 24, 2025
f2cf4c1
Nit
justinjung04 Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master / unreleased

* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526

## 1.19.0 in progress

* [CHANGE] Deprecate `-blocks-storage.tsdb.wal-compression-enabled` flag (use `blocks-storage.tsdb.wal-compression-type` instead). #6529
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3545,6 +3545,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -frontend.max-queriers-per-tenant
[max_queriers_per_tenant: <float> | default = 0]

# Enable to allow queries to be evaluated with data from a single zone, if other
# zones are not available.
[query_partial_data: <boolean> | default = false]

# Maximum number of outstanding requests per tenant per request queue (either
# query frontend or query scheduler); requests beyond this error with HTTP 429.
# CLI flag: -frontend.max-outstanding-requests-per-tenant
Expand Down Expand Up @@ -3605,6 +3609,10 @@ query_rejection:
# external labels for alerting rules
[ruler_external_labels: <map of string (labelName) to string (labelValue)> | default = []]

# Enable to allow rules to be evaluated with data from a single zone, if other
# zones are not available.
[rules_partial_data: <boolean> | default = false]

# The default tenant's shard size when the shuffle-sharding strategy is used.
# Must be set when the store-gateway sharding is enabled with the
# shuffle-sharding strategy. When this setting is specified in the per-tenant
Expand Down
4 changes: 2 additions & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) {
querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer)

// Create a querier queryable and PromQL engine
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger)
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.Overrides.QueryPartialData)

// Use distributor as default MetadataQuerier
t.MetadataQuerier = t.Distributor
Expand Down Expand Up @@ -623,7 +623,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
} else {
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
// TODO: Consider wrapping logger to differentiate from querier module logger
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.Overrides.RulesPartialData)

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
Expand Down
32 changes: 16 additions & 16 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,8 +1177,8 @@ func getErrorStatus(err error) string {
}

// ForReplicationSet runs f, in parallel, for all ingesters in the input replication set.
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, zoneResultsQuorum bool, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, zoneResultsQuorum, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, zoneResultsQuorum bool, partialDataEnabled bool, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, zoneResultsQuorum, partialDataEnabled, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1228,9 +1228,9 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
}

// LabelValuesForLabelName returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelValues(ctx, req)
if err != nil {
return nil, err
Expand All @@ -1241,9 +1241,9 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
}

// LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelValuesStream(ctx, req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1307,9 +1307,9 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
return r, nil
}

func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, hints, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelNamesStream(ctx, req)
if err != nil {
return nil, err
Expand All @@ -1333,9 +1333,9 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time,
}

// LabelNames returns all the label names.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelNames(ctx, req)
if err != nil {
return nil, err
Expand All @@ -1346,9 +1346,9 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint
}

// MetricsForLabelMatchers gets the metrics that match said matchers
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, hint *storage.SelectHints, matchers ...*labels.Matcher) ([]model.Metric, error) {
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, hint *storage.SelectHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]model.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
_, err := d.ForReplicationSet(ctx, rs, false, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.MetricsForLabelMatchers(ctx, req)
if err != nil {
return nil, err
Expand All @@ -1375,9 +1375,9 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
}, matchers...)
}

func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, hint *storage.SelectHints, matchers ...*labels.Matcher) ([]model.Metric, error) {
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, hint *storage.SelectHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]model.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
_, err := d.ForReplicationSet(ctx, rs, false, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.MetricsForLabelMatchersStream(ctx, req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1453,7 +1453,7 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad

req := &ingester_client.MetricsMetadataRequest{}
// TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled.
resps, err := d.ForReplicationSet(ctx, replicationSet, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resps, err := d.ForReplicationSet(ctx, replicationSet, d.cfg.ZoneResultsQuorumMetadata, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return client.MetricsMetadata(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -1495,7 +1495,7 @@ func (d *Distributor) UserStats(ctx context.Context) (*ingester.UserStats, error
replicationSet.MaxErrors = 0

req := &ingester_client.UserStatsRequest{}
resps, err := d.ForReplicationSet(ctx, replicationSet, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resps, err := d.ForReplicationSet(ctx, replicationSet, false, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return client.UserStats(ctx, req)
})
if err != nil {
Expand Down
Loading
Loading