Skip to content

exemplar querying #4181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* `cortex_alertmanager_state_initial_sync_duration_seconds`
* `cortex_alertmanager_state_persist_total`
* `cortex_alertmanager_state_persist_failed_total`
* [ENHANCEMENT] Blocks storage: support ingesting exemplars. Enabled by setting new CLI flag `-blocks-storage.tsdb.max-exemplars=<n>` or config option `blocks_storage.tsdb.max_exemplars` to positive value. #4124
* [ENHANCEMENT] Blocks storage: support ingesting exemplars and querying of exemplars. Enabled by setting new CLI flag `-blocks-storage.tsdb.max-exemplars=<n>` or config option `blocks_storage.tsdb.max_exemplars` to positive value. #4124 #4181
* [ENHANCEMENT] Distributor: Added distributors ring status section in the admin page. #4151
* [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128

Expand Down
1 change: 1 addition & 0 deletions development/tsdb-blocks-storage-s3/config/cortex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ blocks_storage:
ship_interval: 1m
block_ranges_period: [ 2h ]
retention_period: 3h
max_exemplars: 5000

bucket_store:
sync_dir: /tmp/cortex-tsdb-querier
Expand Down
1 change: 1 addition & 0 deletions development/tsdb-blocks-storage-s3/config/prometheus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ scrape_configs:

remote_write:
- url: http://distributor:8001/api/v1/push
send_exemplars: true
4 changes: 2 additions & 2 deletions development/tsdb-blocks-storage-s3/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ services:
image: memcached:1.6

prometheus:
image: prom/prometheus:v2.16.0
command: ["--config.file=/etc/prometheus/prometheus.yaml"]
image: prom/prometheus:v2.27.1
command: ["--config.file=/etc/prometheus/prometheus.yaml", "--enable-feature=exemplar-storage"]
volumes:
- ./config:/etc/prometheus
ports:
Expand Down
16 changes: 16 additions & 0 deletions docs/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ For the sake of clarity, in this document we have grouped API endpoints by servi
| [Ingesters ring status](#ingesters-ring-status) | Ingester | `GET /ingester/ring` |
| [Instant query](#instant-query) | Querier, Query-frontend | `GET,POST <prometheus-http-prefix>/api/v1/query` |
| [Range query](#range-query) | Querier, Query-frontend | `GET,POST <prometheus-http-prefix>/api/v1/query_range` |
| [Exemplar query](#exemplar-query) | Querier, Query-frontend | `GET,POST <prometheus-http-prefix>/api/v1/query_exemplars` |
| [Get series by label matchers](#get-series-by-label-matchers) | Querier, Query-frontend | `GET,POST <prometheus-http-prefix>/api/v1/series` |
| [Get label names](#get-label-names) | Querier, Query-frontend | `GET,POST <prometheus-http-prefix>/api/v1/labels` |
| [Get label values](#get-label-values) | Querier, Query-frontend | `GET <prometheus-http-prefix>/api/v1/label/{name}/values` |
Expand Down Expand Up @@ -320,6 +321,21 @@ _For more information, please check out the Prometheus [range query](https://pro

_Requires [authentication](#authentication)._

### Exemplar query

```
GET,POST <prometheus-http-prefix>/api/v1/query_exemplars

# Legacy
GET,POST <legacy-http-prefix>/api/v1/query_exemplars
```

Prometheus-compatible exemplar query endpoint.

_For more information, please check out the Prometheus [exemplar query](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-exemplars) documentation._

_Requires [authentication](#authentication)._

### Get series by label matchers

```
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ func (a *API) RegisterQueryAPI(handler http.Handler) {
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/read"), handler, true, "POST")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/query"), handler, true, "GET", "POST")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/query_range"), handler, true, "GET", "POST")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/query_exemplars"), handler, true, "GET", "POST")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/labels"), handler, true, "GET", "POST")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/label/{name}/values"), handler, true, "GET")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/series"), handler, true, "GET", "POST", "DELETE")
Expand All @@ -372,6 +373,7 @@ func (a *API) RegisterQueryAPI(handler http.Handler) {
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/read"), handler, true, "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/query"), handler, true, "GET", "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/query_range"), handler, true, "GET", "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/query_exemplars"), handler, true, "GET", "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/labels"), handler, true, "GET", "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/label/{name}/values"), handler, true, "GET")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/series"), handler, true, "GET", "POST", "DELETE")
Expand Down
5 changes: 4 additions & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func configHandler(actualCfg interface{}, defaultCfg interface{}) http.HandlerFu
func NewQuerierHandler(
cfg Config,
queryable storage.SampleAndChunkQueryable,
exemplarQueryable storage.ExemplarQueryable,
engine *promql.Engine,
distributor *distributor.Distributor,
tombstonesLoader *purger.TombstonesLoader,
Expand Down Expand Up @@ -189,7 +190,7 @@ func NewQuerierHandler(
engine,
errorTranslateQueryable{queryable}, // Translate errors to errors expected by API.
nil, // No remote write support.
nil, // No exemplars support.
exemplarQueryable,
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },
func(context.Context) v1.AlertmanagerRetriever { return &querier.DummyAlertmanagerRetriever{} },
func() config.Config { return config.Config{} },
Expand Down Expand Up @@ -242,6 +243,7 @@ func NewQuerierHandler(
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(promRouter)
Expand All @@ -254,6 +256,7 @@ func NewQuerierHandler(
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(legacyPromRouter)
Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ type Cortex struct {
Purger *purger.Purger
TombstonesLoader *purger.TombstonesLoader
QuerierQueryable prom_storage.SampleAndChunkQueryable
ExemplarQueryable prom_storage.ExemplarQueryable
QuerierEngine *promql.Engine
QueryFrontendTripperware queryrange.Tripperware

Expand Down
5 changes: 3 additions & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) {
querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer)

// Create a querier queryable and PromQL engine
t.QuerierQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, querierRegisterer, util_log.Logger)
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, querierRegisterer, util_log.Logger)

// Register the default endpoints that are always enabled for the querier module
t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor)
Expand Down Expand Up @@ -298,6 +298,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
internalQuerierRouter := api.NewQuerierHandler(
t.Cfg.API,
t.QuerierQueryable,
t.ExemplarQueryable,
t.QuerierEngine,
t.Distributor,
t.TombstonesLoader,
Expand Down Expand Up @@ -659,7 +660,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
// TODO: Consider wrapping logger to differentiate from querier module logger
queryable, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, rulerRegisterer, util_log.Logger)
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, rulerRegisterer, util_log.Logger)

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides)
manager, err := ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger)
Expand Down
25 changes: 25 additions & 0 deletions pkg/cortexpb/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

jsoniter "github.com/json-iterator/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"

Expand Down Expand Up @@ -116,6 +117,30 @@ func FromMetricsToLabelAdapters(metric model.Metric) []LabelAdapter {
return result
}

func FromExemplarsToExemplarProtos(es []exemplar.Exemplar) []Exemplar {
result := make([]Exemplar, 0, len(es))
for _, e := range es {
result = append(result, Exemplar{
Labels: FromLabelsToLabelAdapters(e.Labels),
Value: e.Value,
TimestampMs: e.Ts,
})
}
return result
}

func FromExemplarProtosToExemplars(es []Exemplar) []exemplar.Exemplar {
result := make([]exemplar.Exemplar, 0, len(es))
for _, e := range es {
result = append(result, exemplar.Exemplar{
Labels: FromLabelAdaptersToLabels(e.Labels),
Value: e.Value,
Ts: e.TimestampMs,
})
}
return result
}

type byLabel []LabelAdapter

func (s byLabel) Len() int { return len(s) }
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "distributor_query_duration_seconds",
Help: "Time spent executing expression queries.",
Help: "Time spent executing expression and exemplar queries.",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30},
}, []string{"method", "status_code"})),
receivedSamples: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Expand Down
106 changes: 105 additions & 1 deletion pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"sort"
"time"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -53,6 +54,33 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
return matrix, err
}

func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*ingester_client.ExemplarQueryResponse, error) {
var result *ingester_client.ExemplarQueryResponse
err := instrument.CollectedRequest(ctx, "Distributor.QueryExemplars", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're reusing d.queryDuration duration here. I think it's fine, but let's see how the other discussion about metrics evolve.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any thoughts here? we haven't discussed the rest of the metrics recently

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think it's fine starting tracking exemplar queries metrics along with "samples query" metrics to begin with (we're doing the same in ingester at query time). We may reconsider it as a future improvement, but I wouldn't block on this (I'm pretty sure we'll refine the exemplars implementation while learning more running it in production, as it happens for every feature we build).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a followup to this it may be worth updating the help text for queryDuration. Currently it's:

Time spent executing expression queries.

It may be worth changing it to

Time spent executing expression and exemplar queries.

req, err := ingester_client.ToExemplarQueryRequest(from, to, matchers...)
if err != nil {
return err
}

// We ask for all ingesters without passing matchers because exemplar queries take in an array of array of label matchers.
replicationSet, err := d.GetIngestersForQuery(ctx, nil)
if err != nil {
return err
}

result, err = d.queryIngestersExemplars(ctx, replicationSet, req)
if err != nil {
return err
}

if s := opentracing.SpanFromContext(ctx); s != nil {
s.LogKV("series", len(result.Timeseries))
}
return nil
})
return result, err
}

// QueryStream multiple ingesters via the streaming interface and returns big ol' set of chunks.
func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error) {
var result *ingester_client.QueryStreamResponse
Expand Down Expand Up @@ -105,7 +133,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab
}

// If "shard by all labels" is disabled, we can get ingesters by metricName if exists.
if !d.cfg.ShardByAllLabels {
if !d.cfg.ShardByAllLabels && len(matchers) > 0 {
metricNameMatcher, _, ok := extract.MetricNameMatcherFromMatchers(matchers)

if ok && metricNameMatcher.Type == labels.MatchEqual {
Expand Down Expand Up @@ -184,6 +212,82 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re
return result, nil
}

// mergeExemplarSets merges and dedupes two sets of already sorted exemplar pairs.
// Both a and b should be lists of exemplars from the same series.
// Defined here instead of pkg/util to avoid a import cycle.
func mergeExemplarSets(a, b []cortexpb.Exemplar) []cortexpb.Exemplar {
result := make([]cortexpb.Exemplar, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].TimestampMs < b[j].TimestampMs {
result = append(result, a[i])
i++
} else if a[i].TimestampMs > b[j].TimestampMs {
result = append(result, b[j])
j++
} else {
result = append(result, a[i])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but should this compare Value as well? The Prometheus dedupe logic does so I think different exemplars with the same Ts but different Values are possible. https://github.com/prometheus/prometheus/blob/main/pkg/exemplar/exemplar.go#L49

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that any data that comes into cortex is replicated to the right ingester replica set, and the write is successful when we're replicated to a quorum (2 out of 3 in our case).

So my assumption is that while not every ingester in the replica set would have every exemplar, there shouldn't be any ingester that has an exemplar at a certain timestamp that doesn't exist in another ingester. That is, because we're not scraping and (potentially) setting the timestamps ourselves within cortex, merging by timestamp should be enough.

Anything I'm missing here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So my assumption is that while not every ingester in the replica set would have every exemplar, there shouldn't be any ingester that has an exemplar at a certain timestamp that doesn't exist in another ingester.

Think about this edge case:

  1. Remote write exemplar for series X with TS=1 and V=1. Replication set is ingesters 1,2,3. This write succeed towards ingester 1 and 2, but not 3 (not ingested here).
  2. Remote write exemplar for series X with TS=1 and V=2. Replication set is ingesters 1,2,3. This write succeed toward ingester 3 (was not ingested before) but fails on 1 and 2 (because of same timestamp but different value). Even if quorum has not been reached, the exemplar with V=2 is written anyway to ingester 3 (will not rollback from ingester 3 because writing to 1 and 2 has failed).

When you read back, you will have two exemplars with TS=1 and values 1 and 2.

The current implementation picks a random exemplar (between the two) if the timestamp is the same, without checking if value is equal as well. I personally think it's fine, but I wanted to outline the edge case above so you can take an informed decision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does Cortex handle this situation for samples? I think exemplars are handled the same way here since this is essentially a copy of util.MergeSampleSets.

The only way this edge case could really happen that I know of is bad relabel configs client side, resulting in series from different Prometheus instances to result in the same series within Cortex. In Prometheus' exemplar storage we rejects duplicate exemplars (description of what is considered a duplicate), and do the same in Cortex since we're just using Prometheus' exemplar storage currently.

Does this make sense, or am I missing something? cc @mdisibio

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think exemplars are handled the same way here since this is essentially a copy of util.MergeSampleSets

Thinking loudly:

  • util.MergeSampleSets is just used when gRPC streaming between queriers and ingesters is disabled (it's enabled by default).
  • When gRPC streaming is enabled and chunks transferring is disabled, we use mergeSamples() which doesn't compare samples value.
  • When both gRPC streaming and chunks transferring is enabled the ingester then distributorQuerier.streamingSelect() returns series.NewConcreteSeriesSet(). NewConcreteSeriesSet() iterates over series stored as chunkSeries so chunks are iterated by chunkSeries. chunkSeries.Iterator() uses the configured chunkIteratorFunc, so the behaviour depends on the configured chunk iteration function. The iteration function is returned by getChunksIteratorFunction(). Let's assume batch iteration is used (we do), then batch.NewChunkMergeIterator() is used. The deduplication is done by mergeStreams() which, in case of same timestamp, it just picks one of the two values without checking the actual value

So in all cases, I believe samples deduplication is equal to your mergeExemplarSets() function 👍

Given the time it took this analysis, what do you think adding a test case to mergeExemplarSets() to assert on the case "same timestamp but different value" so we write it into the stone tests? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for the detailed explanation.

i++
j++
}
}
// Add the rest of a or b. One of them is empty now.
result = append(result, a[i:]...)
result = append(result, b[j:]...)
return result
}

// queryIngestersExemplars queries the ingesters for exemplars.
func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.ExemplarQueryRequest) (*ingester_client.ExemplarQueryResponse, error) {
// Fetch exemplars from multiple ingesters in parallel, using the replicationSet
// to deal with consistency.
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
}

resp, err := client.(ingester_client.IngesterClient).QueryExemplars(ctx, req)
d.ingesterQueries.WithLabelValues(ing.Addr).Inc()
if err != nil {
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
return nil, err
}

return resp, nil
})
if err != nil {
return nil, err
}

// Merge results from replication set.
var keys []string
exemplarResults := make(map[string]cortexpb.TimeSeries)
for _, result := range results {
r := result.(*ingester_client.ExemplarQueryResponse)
for _, ts := range r.Timeseries {
lbls := cortexpb.FromLabelAdaptersToLabels(ts.Labels).String()
e, ok := exemplarResults[lbls]
if !ok {
exemplarResults[lbls] = ts
keys = append(keys, lbls)
}
// Merge in any missing values from another ingesters exemplars for this series.
e.Exemplars = mergeExemplarSets(e.Exemplars, ts.Exemplars)
Comment on lines +274 to +276
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: I may be missing something, but if I'm not mistaken, there is no need to call this function if there isn't an existing result, correct?

Suggested change
}
// Merge in any missing values from another ingesters exemplars for this series.
e.Exemplars = mergeExemplarSets(e.Exemplars, ts.Exemplars)
} else {
// Merge in any missing values from another ingesters exemplars for this series.
e.Exemplars = mergeExemplarSets(e.Exemplars, ts.Exemplars)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah probably not, I don't think it's functionally any different. I guess there's the potential for slightly more memory being allocated than is really necessary without the else. Is this what you were thinking?

}
}

// Query results from each ingester were sorted, but are not necessarily still sorted after merging.
sort.Strings(keys)

result := make([]cortexpb.TimeSeries, len(exemplarResults))
for i, k := range keys {
result[i] = exemplarResults[k]
}

return &ingester_client.ExemplarQueryResponse{Timeseries: result}, nil
}

// queryIngesterStream queries the ingesters using the new streaming API.
func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
var (
Expand Down
52 changes: 52 additions & 0 deletions pkg/distributor/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package distributor

import (
"testing"
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/cortexpb"
Expand Down Expand Up @@ -106,3 +109,52 @@ func TestMergeSamplesIntoFirstNilB(t *testing.T) {

require.Equal(t, b, a)
}

func TestMergeExemplarSets(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean test! 👍

now := timestamp.FromTime(time.Now())
exemplar1 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-1")), TimestampMs: now, Value: 1}
exemplar2 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-2")), TimestampMs: now + 1, Value: 2}
exemplar3 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-3")), TimestampMs: now + 4, Value: 3}
exemplar4 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-4")), TimestampMs: now + 8, Value: 7}
exemplar5 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-4")), TimestampMs: now, Value: 7}

for _, c := range []struct {
exemplarsA []cortexpb.Exemplar
exemplarsB []cortexpb.Exemplar
expected []cortexpb.Exemplar
}{
{
exemplarsA: []cortexpb.Exemplar{},
exemplarsB: []cortexpb.Exemplar{},
expected: []cortexpb.Exemplar{},
},
{
exemplarsA: []cortexpb.Exemplar{exemplar1},
exemplarsB: []cortexpb.Exemplar{},
expected: []cortexpb.Exemplar{exemplar1},
},
{
exemplarsA: []cortexpb.Exemplar{},
exemplarsB: []cortexpb.Exemplar{exemplar1},
expected: []cortexpb.Exemplar{exemplar1},
},
{
exemplarsA: []cortexpb.Exemplar{exemplar1},
exemplarsB: []cortexpb.Exemplar{exemplar1},
expected: []cortexpb.Exemplar{exemplar1},
},
{
exemplarsA: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3},
exemplarsB: []cortexpb.Exemplar{exemplar1, exemplar3, exemplar4},
expected: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4},
},
{ // Ensure that when there are exemplars with duplicate timestamps, the first one wins.
exemplarsA: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3},
exemplarsB: []cortexpb.Exemplar{exemplar5, exemplar3, exemplar4},
expected: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4},
},
} {
e := mergeExemplarSets(c.exemplarsA, c.exemplarsB)
require.Equal(t, c.expected, e)
}
}
Loading