Skip to content

Commit 41c934d

Browse files
committed
Simplify the query function a little
1 parent 4a43961 commit 41c934d

File tree

1 file changed

+56
-50
lines changed

1 file changed

+56
-50
lines changed

distributor/distributor.go

Lines changed: 56 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -429,71 +429,77 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
429429
return err
430430
}
431431

432-
ingesters, err := d.ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor, ring.Read)
432+
req, err := util.ToQueryRequest(from, to, matchers)
433433
if err != nil {
434434
return err
435435
}
436436

437-
// We need a response from a quorum of ingesters, which is n/2 + 1.
438-
minSuccess := (len(ingesters) / 2) + 1
439-
maxErrs := len(ingesters) - minSuccess
440-
if len(ingesters) < minSuccess {
441-
return fmt.Errorf("could only find %d ingesters for query. Need at least %d", len(ingesters), minSuccess)
442-
}
443-
444-
req, err := util.ToQueryRequest(from, to, matchers)
437+
ingesters, err := d.ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor, ring.Read)
445438
if err != nil {
446439
return err
447440
}
448441

449-
// Fetch samples from multiple ingesters
450-
var numErrs int32
451-
errReceived := make(chan error)
452-
results := make(chan model.Matrix, len(ingesters))
453-
454-
for _, ing := range ingesters {
455-
go func(ing *ring.IngesterDesc) {
456-
result, err := d.queryIngester(ctx, ing, req)
457-
if err != nil {
458-
if atomic.AddInt32(&numErrs, 1) == int32(maxErrs+1) {
459-
errReceived <- err
460-
}
461-
} else {
462-
results <- result
442+
result, err = d.queryIngesters(ctx, ingesters, req)
443+
return err
444+
})
445+
return result, err
446+
}
447+
448+
// Query implements Querier.
449+
func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.IngesterDesc, req *cortex.QueryRequest) (model.Matrix, error) {
450+
// We need a response from a quorum of ingesters, which is n/2 + 1.
451+
minSuccess := (len(ingesters) / 2) + 1
452+
maxErrs := len(ingesters) - minSuccess
453+
if len(ingesters) < minSuccess {
454+
return nil, fmt.Errorf("could only find %d ingesters for query. Need at least %d", len(ingesters), minSuccess)
455+
}
456+
457+
// Fetch samples from multiple ingesters
458+
var numErrs int32
459+
errReceived := make(chan error)
460+
results := make(chan model.Matrix, len(ingesters))
461+
462+
for _, ing := range ingesters {
463+
go func(ing *ring.IngesterDesc) {
464+
result, err := d.queryIngester(ctx, ing, req)
465+
if err != nil {
466+
if atomic.AddInt32(&numErrs, 1) == int32(maxErrs+1) {
467+
errReceived <- err
463468
}
464-
}(ing)
465-
}
469+
} else {
470+
results <- result
471+
}
472+
}(ing)
473+
}
474+
475+
// Only wait for minSuccess ingesters (or an error), and accumulate the samples
476+
// by fingerprint, merging them into any existing samples.
477+
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}
478+
for i := 0; i < minSuccess; i++ {
479+
select {
480+
case err := <-errReceived:
481+
return nil, err
466482

467-
// Only wait for minSuccess ingesters (or an error), and accumulate the samples
468-
// by fingerprint, merging them into any existing samples.
469-
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}
470-
for i := 0; i < minSuccess; i++ {
471-
select {
472-
case err := <-errReceived:
473-
return err
474-
475-
case result := <-results:
476-
for _, ss := range result {
477-
fp := ss.Metric.Fingerprint()
478-
if mss, ok := fpToSampleStream[fp]; !ok {
479-
fpToSampleStream[fp] = &model.SampleStream{
480-
Metric: ss.Metric,
481-
Values: ss.Values,
482-
}
483-
} else {
484-
mss.Values = util.MergeSamples(fpToSampleStream[fp].Values, ss.Values)
483+
case result := <-results:
484+
for _, ss := range result {
485+
fp := ss.Metric.Fingerprint()
486+
mss, ok := fpToSampleStream[fp]
487+
if !ok {
488+
mss = &model.SampleStream{
489+
Metric: ss.Metric,
485490
}
491+
fpToSampleStream[fp] = mss
486492
}
493+
mss.Values = util.MergeSamples(mss.Values, ss.Values)
487494
}
488495
}
496+
}
489497

490-
result = make(model.Matrix, 0, len(fpToSampleStream))
491-
for _, ss := range fpToSampleStream {
492-
result = append(result, ss)
493-
}
494-
return nil
495-
})
496-
return result, err
498+
result := model.Matrix{}
499+
for _, ss := range fpToSampleStream {
500+
result = append(result, ss)
501+
}
502+
return result, nil
497503
}
498504

499505
func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc, req *cortex.QueryRequest) (model.Matrix, error) {

0 commit comments

Comments
 (0)