Skip to content

Commit a58ef5f

Browse files
committed
Short put distributor sample pushes; when enough samples succeed (or fail), return the rpc - don't wait for all of them.
1 parent 2195675 commit a58ef5f

File tree

3 files changed

+85
-60
lines changed

3 files changed

+85
-60
lines changed

chunk/chunk_store.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ outer:
282282
}
283283

284284
func (c *AWSStore) lookupChunks(ctx context.Context, userID string, from, through model.Time, matchers []*metric.LabelMatcher) ([]Chunk, error) {
285-
metricName, matchers, err := extractMetricNameFromMatchers(matchers)
285+
metricName, matchers, err := util.ExtractMetricNameFromMatchers(matchers)
286286
if err != nil {
287287
return nil, err
288288
}
@@ -473,18 +473,3 @@ func (c *AWSStore) fetchChunkData(ctx context.Context, userID string, chunkSet [
473473
}
474474
return chunks, nil
475475
}
476-
477-
func extractMetricNameFromMatchers(matchers []*metric.LabelMatcher) (model.LabelValue, []*metric.LabelMatcher, error) {
478-
for i, matcher := range matchers {
479-
if matcher.Name != model.MetricNameLabel {
480-
continue
481-
}
482-
if matcher.Type != metric.Equal {
483-
return "", nil, fmt.Errorf("must have equality matcher for MetricNameLabel")
484-
}
485-
metricName := matcher.Value
486-
matchers = matchers[:i+copy(matchers[i:], matchers[i+1:])]
487-
return metricName, matchers, nil
488-
}
489-
return "", nil, fmt.Errorf("no matcher for MetricNameLabel")
490-
}

distributor/distributor.go

Lines changed: 68 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,18 @@ func tokenFor(userID string, name model.LabelValue) uint32 {
252252
}
253253

254254
type sampleTracker struct {
255-
sample *model.Sample
256-
minSuccess int
257-
succeeded int32
255+
sample *model.Sample
256+
minSuccess int
257+
maxFailures int
258+
succeeded int32
259+
failed int32
260+
}
261+
262+
type pushTracker struct {
263+
samplesPending int32
264+
samplesFailed int32
265+
done chan struct{}
266+
err chan error
258267
}
259268

260269
// Push implements cortex.IngesterServer
@@ -285,11 +294,15 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
285294
sampleTrackers := make([]sampleTracker, len(samples), len(samples))
286295
samplesByIngester := map[*ring.IngesterDesc][]*sampleTracker{}
287296
for i := range samples {
297+
// We need a response from a quorum of ingesters, which is n/2 + 1.
298+
minSuccess := (len(ingesters[i]) / 2) + 1
299+
288300
sampleTrackers[i] = sampleTracker{
289-
sample: samples[i],
290-
// We need a response from a quorum of ingesters, which is n/2 + 1.
291-
minSuccess: (len(ingesters[i]) / 2) + 1,
292-
succeeded: 0,
301+
sample: samples[i],
302+
minSuccess: minSuccess,
303+
maxFailures: len(ingesters[i]) - minSuccess,
304+
succeeded: 0,
305+
failed: 0,
293306
}
294307

295308
// Skip those that have not heartbeated in a while. NB these are still
@@ -315,26 +328,23 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
315328
}
316329
}
317330

318-
errs := make(chan error)
319-
for hostname, samples := range samplesByIngester {
320-
go func(ingester *ring.IngesterDesc, samples []*sampleTracker) {
321-
errs <- d.sendSamples(ctx, ingester, samples)
322-
}(hostname, samples)
331+
pushTracker := pushTracker{
332+
samplesPending: int32(len(samples)),
333+
samplesFailed: 0,
334+
done: make(chan struct{}),
335+
err: make(chan error),
323336
}
324-
var lastErr error
325-
for i := 0; i < len(samplesByIngester); i++ {
326-
if err := <-errs; err != nil {
327-
lastErr = err
328-
continue
329-
}
337+
for ingester, samples := range samplesByIngester {
338+
go func(ingester *ring.IngesterDesc, samples []*sampleTracker) {
339+
d.sendSamples(ctx, ingester, samples, &pushTracker)
340+
}(ingester, samples)
330341
}
331-
for i := range sampleTrackers {
332-
if sampleTrackers[i].succeeded < int32(sampleTrackers[i].minSuccess) {
333-
return nil, fmt.Errorf("need %d successful writes, only got %d, last error was: %v",
334-
sampleTrackers[i].minSuccess, sampleTrackers[i].succeeded, lastErr)
335-
}
342+
select {
343+
case err := <-pushTracker.err:
344+
return nil, err
345+
case <-pushTracker.done:
346+
return &cortex.WriteResponse{}, nil
336347
}
337-
return &cortex.WriteResponse{}, nil
338348
}
339349

340350
func (d *Distributor) getOrCreateIngestLimiter(userID string) *rate.Limiter {
@@ -350,7 +360,38 @@ func (d *Distributor) getOrCreateIngestLimiter(userID string) *rate.Limiter {
350360
return limiter
351361
}
352362

353-
func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDesc, sampleTrackers []*sampleTracker) error {
363+
func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDesc, sampleTrackers []*sampleTracker, pushTracker *pushTracker) {
364+
err := d.sendSamplesErr(ctx, ingester, sampleTrackers)
365+
366+
// If we suceed, decrement each sample's pending count by one. If we reach
367+
// the requred number of successful puts on this sample, then decrement the
368+
// number of pending samples by one. If we successfully push all samples to
369+
// min success ingesters, wake up the waiting rpc so it can return early.
370+
// Similarly, track the number of errors, and if it exceeds maxFailures
371+
// shortcut the waiting rpc.
372+
//
373+
// The use of atomic increments here guarantees only a single sendSamples
374+
// goroutine will write to either channel.
375+
for i := range sampleTrackers {
376+
if err != nil {
377+
if atomic.AddInt32(&sampleTrackers[i].failed, 1) > int32(sampleTrackers[i].maxFailures) {
378+
continue
379+
}
380+
if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 {
381+
pushTracker.err <- err
382+
}
383+
} else {
384+
if atomic.AddInt32(&sampleTrackers[i].succeeded, 1) != int32(sampleTrackers[i].minSuccess) {
385+
continue
386+
}
387+
if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 {
388+
pushTracker.done <- struct{}{}
389+
}
390+
}
391+
}
392+
}
393+
394+
func (d *Distributor) sendSamplesErr(ctx context.Context, ingester *ring.IngesterDesc, sampleTrackers []*sampleTracker) error {
354395
client, err := d.getClientFor(ingester)
355396
if err != nil {
356397
return err
@@ -366,25 +407,8 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe
366407
d.ingesterAppends.WithLabelValues(ingester.Addr).Inc()
367408
if err != nil {
368409
d.ingesterAppendFailures.WithLabelValues(ingester.Addr).Inc()
369-
return err
370-
}
371-
372-
for i := range sampleTrackers {
373-
atomic.AddInt32(&sampleTrackers[i].succeeded, 1)
374-
}
375-
return nil
376-
}
377-
378-
func metricNameFromLabelMatchers(matchers ...*metric.LabelMatcher) (model.LabelValue, error) {
379-
for _, m := range matchers {
380-
if m.Name == model.MetricNameLabel {
381-
if m.Type != metric.Equal {
382-
return "", fmt.Errorf("non-equality matchers are not supported on the metric name")
383-
}
384-
return m.Value, nil
385-
}
386410
}
387-
return "", fmt.Errorf("no metric name matcher found")
411+
return err
388412
}
389413

390414
// Query implements Querier.
@@ -393,7 +417,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
393417
err := instrument.TimeRequestHistogram(ctx, "Distributor.Query", d.queryDuration, func(ctx context.Context) error {
394418
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}
395419

396-
metricName, err := metricNameFromLabelMatchers(matchers...)
420+
metricName, _, err := util.ExtractMetricNameFromMatchers(matchers)
397421
if err != nil {
398422
return err
399423
}

util/matchers.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,19 @@ func ExtractMetricNameFromMetric(m model.Metric) (model.LabelValue, error) {
2828
}
2929
return "", fmt.Errorf("no MetricNameLabel for chunk")
3030
}
31+
32+
// ExtractMetricNameFromMatchers extracts the metric name from a set of matchers
33+
func ExtractMetricNameFromMatchers(matchers []*metric.LabelMatcher) (model.LabelValue, []*metric.LabelMatcher, error) {
34+
for i, matcher := range matchers {
35+
if matcher.Name != model.MetricNameLabel {
36+
continue
37+
}
38+
if matcher.Type != metric.Equal {
39+
return "", nil, fmt.Errorf("must have equality matcher for MetricNameLabel")
40+
}
41+
metricName := matcher.Value
42+
matchers = matchers[:i+copy(matchers[i:], matchers[i+1:])]
43+
return metricName, matchers, nil
44+
}
45+
return "", nil, fmt.Errorf("no matcher for MetricNameLabel")
46+
}

0 commit comments

Comments
 (0)