-
Notifications
You must be signed in to change notification settings - Fork 820
exemplar querying #4181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
exemplar querying #4181
Changes from all commits
cc51f3f
6254ef2
9e440b0
f6abe52
3588940
aa26af2
ed3b913
9fd50af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,3 +55,4 @@ scrape_configs: | |
|
||
remote_write: | ||
- url: http://distributor:8001/api/v1/push | ||
send_exemplars: true |
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -4,6 +4,7 @@ import ( | |||||||||||||||
"context" | ||||||||||||||||
"fmt" | ||||||||||||||||
"io" | ||||||||||||||||
"sort" | ||||||||||||||||
"time" | ||||||||||||||||
|
||||||||||||||||
"github.com/opentracing/opentracing-go" | ||||||||||||||||
|
@@ -53,6 +54,33 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . | |||||||||||||||
return matrix, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*ingester_client.ExemplarQueryResponse, error) { | ||||||||||||||||
var result *ingester_client.ExemplarQueryResponse | ||||||||||||||||
err := instrument.CollectedRequest(ctx, "Distributor.QueryExemplars", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error { | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're reusing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any thoughts here? we haven't discussed the rest of the metrics recently There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I personally think it's fine starting tracking exemplar queries metrics along with "samples query" metrics to begin with (we're doing the same in ingester at query time). We may reconsider it as a future improvement, but I wouldn't block on this (I'm pretty sure we'll refine the exemplars implementation while learning more running it in production, as it happens for every feature we build). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a followup to this it may be worth updating the help text for queryDuration. Currently it's:
It may be worth changing it to
|
||||||||||||||||
req, err := ingester_client.ToExemplarQueryRequest(from, to, matchers...) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// We ask for all ingesters without passing matchers because exemplar queries take in an array of array of label matchers. | ||||||||||||||||
replicationSet, err := d.GetIngestersForQuery(ctx, nil) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
result, err = d.queryIngestersExemplars(ctx, replicationSet, req) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
if s := opentracing.SpanFromContext(ctx); s != nil { | ||||||||||||||||
s.LogKV("series", len(result.Timeseries)) | ||||||||||||||||
} | ||||||||||||||||
return nil | ||||||||||||||||
}) | ||||||||||||||||
return result, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// QueryStream multiple ingesters via the streaming interface and returns big ol' set of chunks. | ||||||||||||||||
func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error) { | ||||||||||||||||
var result *ingester_client.QueryStreamResponse | ||||||||||||||||
|
@@ -105,7 +133,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab | |||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// If "shard by all labels" is disabled, we can get ingesters by metricName if exists. | ||||||||||||||||
if !d.cfg.ShardByAllLabels { | ||||||||||||||||
if !d.cfg.ShardByAllLabels && len(matchers) > 0 { | ||||||||||||||||
metricNameMatcher, _, ok := extract.MetricNameMatcherFromMatchers(matchers) | ||||||||||||||||
|
||||||||||||||||
if ok && metricNameMatcher.Type == labels.MatchEqual { | ||||||||||||||||
|
@@ -184,6 +212,82 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re | |||||||||||||||
return result, nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// mergeExemplarSets merges and dedupes two sets of already sorted exemplar pairs. | ||||||||||||||||
// Both a and b should be lists of exemplars from the same series. | ||||||||||||||||
// Defined here instead of pkg/util to avoid a import cycle. | ||||||||||||||||
cstyan marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||
func mergeExemplarSets(a, b []cortexpb.Exemplar) []cortexpb.Exemplar { | ||||||||||||||||
result := make([]cortexpb.Exemplar, 0, len(a)+len(b)) | ||||||||||||||||
i, j := 0, 0 | ||||||||||||||||
for i < len(a) && j < len(b) { | ||||||||||||||||
if a[i].TimestampMs < b[j].TimestampMs { | ||||||||||||||||
result = append(result, a[i]) | ||||||||||||||||
i++ | ||||||||||||||||
} else if a[i].TimestampMs > b[j].TimestampMs { | ||||||||||||||||
result = append(result, b[j]) | ||||||||||||||||
j++ | ||||||||||||||||
} else { | ||||||||||||||||
result = append(result, a[i]) | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure, but should this compare There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is that any data that comes into cortex is replicated to the right ingester replica set, and the write is successful when we're replicated to a quorum (2 out of 3 in our case). So my assumption is that while not every ingester in the replica set would have every exemplar, there shouldn't be any ingester that has an exemplar at a certain timestamp that doesn't exist in another ingester. That is, because we're not scraping and (potentially) setting the timestamps ourselves within cortex, merging by timestamp should be enough. Anything I'm missing here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Think about this edge case:
When you read back, you will have two exemplars with TS=1 and values 1 and 2. The current implementation picks a random exemplar (between the two) if the timestamp is the same, without checking if value is equal as well. I personally think it's fine, but I wanted to outline the edge case above so you can take an informed decision. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does Cortex handle this situation for samples? I think exemplars are handled the same way here since this is essentially a copy of The only way this edge case could really happen that I know of is bad relabel configs client side, resulting in series from different Prometheus instances to result in the same series within Cortex. In Prometheus' exemplar storage we rejects duplicate exemplars (description of what is considered a duplicate), and do the same in Cortex since we're just using Prometheus' exemplar storage currently. Does this make sense, or am I missing something? cc @mdisibio There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Thinking loudly:
So in all cases, I believe samples deduplication is equal to your Given the time it took this analysis, what do you think adding a test case to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks again for the detailed explanation. |
||||||||||||||||
i++ | ||||||||||||||||
j++ | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
// Add the rest of a or b. One of them is empty now. | ||||||||||||||||
result = append(result, a[i:]...) | ||||||||||||||||
result = append(result, b[j:]...) | ||||||||||||||||
return result | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// queryIngestersExemplars queries the ingesters for exemplars. | ||||||||||||||||
func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.ExemplarQueryRequest) (*ingester_client.ExemplarQueryResponse, error) { | ||||||||||||||||
// Fetch exemplars from multiple ingesters in parallel, using the replicationSet | ||||||||||||||||
// to deal with consistency. | ||||||||||||||||
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { | ||||||||||||||||
client, err := d.ingesterPool.GetClientFor(ing.Addr) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return nil, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
resp, err := client.(ingester_client.IngesterClient).QueryExemplars(ctx, req) | ||||||||||||||||
d.ingesterQueries.WithLabelValues(ing.Addr).Inc() | ||||||||||||||||
if err != nil { | ||||||||||||||||
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc() | ||||||||||||||||
return nil, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
return resp, nil | ||||||||||||||||
}) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return nil, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Merge results from replication set. | ||||||||||||||||
var keys []string | ||||||||||||||||
exemplarResults := make(map[string]cortexpb.TimeSeries) | ||||||||||||||||
for _, result := range results { | ||||||||||||||||
r := result.(*ingester_client.ExemplarQueryResponse) | ||||||||||||||||
for _, ts := range r.Timeseries { | ||||||||||||||||
lbls := cortexpb.FromLabelAdaptersToLabels(ts.Labels).String() | ||||||||||||||||
pracucci marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||
e, ok := exemplarResults[lbls] | ||||||||||||||||
if !ok { | ||||||||||||||||
exemplarResults[lbls] = ts | ||||||||||||||||
keys = append(keys, lbls) | ||||||||||||||||
} | ||||||||||||||||
// Merge in any missing values from another ingesters exemplars for this series. | ||||||||||||||||
e.Exemplars = mergeExemplarSets(e.Exemplars, ts.Exemplars) | ||||||||||||||||
Comment on lines
+274
to
+276
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: I may be missing something, but if I'm not mistaken, there is no need to call this function if there isn't an existing result, correct?
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah probably not, I don't think it's functionally any different. I guess there's the potential for slightly more memory being allocated than is really necessary without the else. Is this what you were thinking? |
||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Query results from each ingester were sorted, but are not necessarily still sorted after merging. | ||||||||||||||||
sort.Strings(keys) | ||||||||||||||||
|
||||||||||||||||
result := make([]cortexpb.TimeSeries, len(exemplarResults)) | ||||||||||||||||
for i, k := range keys { | ||||||||||||||||
result[i] = exemplarResults[k] | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
return &ingester_client.ExemplarQueryResponse{Timeseries: result}, nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// queryIngesterStream queries the ingesters using the new streaming API. | ||||||||||||||||
func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) { | ||||||||||||||||
var ( | ||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,10 @@ package distributor | |
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/prometheus/prometheus/pkg/labels" | ||
"github.com/prometheus/prometheus/pkg/timestamp" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/cortexproject/cortex/pkg/cortexpb" | ||
|
@@ -106,3 +109,52 @@ func TestMergeSamplesIntoFirstNilB(t *testing.T) { | |
|
||
require.Equal(t, b, a) | ||
} | ||
|
||
func TestMergeExemplarSets(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clean test! 👍 |
||
now := timestamp.FromTime(time.Now()) | ||
exemplar1 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-1")), TimestampMs: now, Value: 1} | ||
exemplar2 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-2")), TimestampMs: now + 1, Value: 2} | ||
exemplar3 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-3")), TimestampMs: now + 4, Value: 3} | ||
exemplar4 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-4")), TimestampMs: now + 8, Value: 7} | ||
exemplar5 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-4")), TimestampMs: now, Value: 7} | ||
|
||
for _, c := range []struct { | ||
exemplarsA []cortexpb.Exemplar | ||
exemplarsB []cortexpb.Exemplar | ||
expected []cortexpb.Exemplar | ||
}{ | ||
{ | ||
exemplarsA: []cortexpb.Exemplar{}, | ||
exemplarsB: []cortexpb.Exemplar{}, | ||
expected: []cortexpb.Exemplar{}, | ||
}, | ||
{ | ||
exemplarsA: []cortexpb.Exemplar{exemplar1}, | ||
exemplarsB: []cortexpb.Exemplar{}, | ||
expected: []cortexpb.Exemplar{exemplar1}, | ||
}, | ||
{ | ||
exemplarsA: []cortexpb.Exemplar{}, | ||
exemplarsB: []cortexpb.Exemplar{exemplar1}, | ||
expected: []cortexpb.Exemplar{exemplar1}, | ||
}, | ||
{ | ||
exemplarsA: []cortexpb.Exemplar{exemplar1}, | ||
exemplarsB: []cortexpb.Exemplar{exemplar1}, | ||
expected: []cortexpb.Exemplar{exemplar1}, | ||
}, | ||
{ | ||
exemplarsA: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3}, | ||
exemplarsB: []cortexpb.Exemplar{exemplar1, exemplar3, exemplar4}, | ||
expected: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}, | ||
}, | ||
{ // Ensure that when there are exemplars with duplicate timestamps, the first one wins. | ||
exemplarsA: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3}, | ||
exemplarsB: []cortexpb.Exemplar{exemplar5, exemplar3, exemplar4}, | ||
expected: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}, | ||
}, | ||
} { | ||
e := mergeExemplarSets(c.exemplarsA, c.exemplarsB) | ||
require.Equal(t, c.expected, e) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.