Skip to content

Commit 70ee874

Browse files
authored
Merge pull request cortexproject#272 from weaveworks/short-cut-push
Short cut distributor sample pushes.
2 parents 2195675 + 98dbdad commit 70ee874

File tree

4 files changed

+279
-76
lines changed

4 files changed

+279
-76
lines changed

chunk/chunk_store.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ outer:
282282
}
283283

284284
func (c *AWSStore) lookupChunks(ctx context.Context, userID string, from, through model.Time, matchers []*metric.LabelMatcher) ([]Chunk, error) {
285-
metricName, matchers, err := extractMetricNameFromMatchers(matchers)
285+
metricName, matchers, err := util.ExtractMetricNameFromMatchers(matchers)
286286
if err != nil {
287287
return nil, err
288288
}
@@ -473,18 +473,3 @@ func (c *AWSStore) fetchChunkData(ctx context.Context, userID string, chunkSet [
473473
}
474474
return chunks, nil
475475
}
476-
477-
func extractMetricNameFromMatchers(matchers []*metric.LabelMatcher) (model.LabelValue, []*metric.LabelMatcher, error) {
478-
for i, matcher := range matchers {
479-
if matcher.Name != model.MetricNameLabel {
480-
continue
481-
}
482-
if matcher.Type != metric.Equal {
483-
return "", nil, fmt.Errorf("must have equality matcher for MetricNameLabel")
484-
}
485-
metricName := matcher.Value
486-
matchers = matchers[:i+copy(matchers[i:], matchers[i+1:])]
487-
return metricName, matchers, nil
488-
}
489-
return "", nil, fmt.Errorf("no matcher for MetricNameLabel")
490-
}

distributor/distributor.go

Lines changed: 93 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ type Config struct {
8787
ClientCleanupPeriod time.Duration
8888
IngestionRateLimit float64
8989
IngestionBurstSize int
90+
91+
// for testing
92+
ingesterClientFactory func(string) cortex.IngesterClient
9093
}
9194

9295
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -218,22 +221,27 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (cortex.Ingester
218221
return client, nil
219222
}
220223

221-
conn, err := grpc.Dial(
222-
ingester.Addr,
223-
grpc.WithTimeout(d.cfg.RemoteTimeout),
224-
grpc.WithInsecure(),
225-
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
226-
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
227-
middleware.ClientUserHeaderInterceptor,
228-
)),
229-
)
230-
if err != nil {
231-
return nil, err
232-
}
233-
234-
client = ingesterClient{
235-
IngesterClient: cortex.NewIngesterClient(conn),
236-
conn: conn,
224+
if d.cfg.ingesterClientFactory != nil {
225+
client = ingesterClient{
226+
IngesterClient: d.cfg.ingesterClientFactory(ingester.Addr),
227+
}
228+
} else {
229+
conn, err := grpc.Dial(
230+
ingester.Addr,
231+
grpc.WithTimeout(d.cfg.RemoteTimeout),
232+
grpc.WithInsecure(),
233+
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
234+
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
235+
middleware.ClientUserHeaderInterceptor,
236+
)),
237+
)
238+
if err != nil {
239+
return nil, err
240+
}
241+
client = ingesterClient{
242+
IngesterClient: cortex.NewIngesterClient(conn),
243+
conn: conn,
244+
}
237245
}
238246
d.clients[ingester.Addr] = client
239247
return client, nil
@@ -252,9 +260,18 @@ func tokenFor(userID string, name model.LabelValue) uint32 {
252260
}
253261

254262
type sampleTracker struct {
255-
sample *model.Sample
256-
minSuccess int
257-
succeeded int32
263+
sample *model.Sample
264+
minSuccess int
265+
maxFailures int
266+
succeeded int32
267+
failed int32
268+
}
269+
270+
type pushTracker struct {
271+
samplesPending int32
272+
samplesFailed int32
273+
done chan struct{}
274+
err chan error
258275
}
259276

260277
// Push implements cortex.IngesterServer
@@ -267,6 +284,10 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
267284
samples := util.FromWriteRequest(req)
268285
d.receivedSamples.Add(float64(len(samples)))
269286

287+
if len(samples) == 0 {
288+
return &cortex.WriteResponse{}, nil
289+
}
290+
270291
limiter := d.getOrCreateIngestLimiter(userID)
271292
if !limiter.AllowN(time.Now(), len(samples)) {
272293
return nil, errIngestionRateLimitExceeded
@@ -285,11 +306,13 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
285306
sampleTrackers := make([]sampleTracker, len(samples), len(samples))
286307
samplesByIngester := map[*ring.IngesterDesc][]*sampleTracker{}
287308
for i := range samples {
309+
// We need a response from a quorum of ingesters, which is n/2 + 1.
310+
minSuccess := (len(ingesters[i]) / 2) + 1
311+
288312
sampleTrackers[i] = sampleTracker{
289-
sample: samples[i],
290-
// We need a response from a quorum of ingesters, which is n/2 + 1.
291-
minSuccess: (len(ingesters[i]) / 2) + 1,
292-
succeeded: 0,
313+
sample: samples[i],
314+
minSuccess: minSuccess,
315+
maxFailures: len(ingesters[i]) - minSuccess,
293316
}
294317

295318
// Skip those that have not heartbeated in a while. NB these are still
@@ -315,26 +338,22 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
315338
}
316339
}
317340

318-
errs := make(chan error)
319-
for hostname, samples := range samplesByIngester {
320-
go func(ingester *ring.IngesterDesc, samples []*sampleTracker) {
321-
errs <- d.sendSamples(ctx, ingester, samples)
322-
}(hostname, samples)
341+
pushTracker := pushTracker{
342+
samplesPending: int32(len(samples)),
343+
done: make(chan struct{}),
344+
err: make(chan error),
323345
}
324-
var lastErr error
325-
for i := 0; i < len(samplesByIngester); i++ {
326-
if err := <-errs; err != nil {
327-
lastErr = err
328-
continue
329-
}
346+
for ingester, samples := range samplesByIngester {
347+
go func(ingester *ring.IngesterDesc, samples []*sampleTracker) {
348+
d.sendSamples(ctx, ingester, samples, &pushTracker)
349+
}(ingester, samples)
330350
}
331-
for i := range sampleTrackers {
332-
if sampleTrackers[i].succeeded < int32(sampleTrackers[i].minSuccess) {
333-
return nil, fmt.Errorf("need %d successful writes, only got %d, last error was: %v",
334-
sampleTrackers[i].minSuccess, sampleTrackers[i].succeeded, lastErr)
335-
}
351+
select {
352+
case err := <-pushTracker.err:
353+
return nil, err
354+
case <-pushTracker.done:
355+
return &cortex.WriteResponse{}, nil
336356
}
337-
return &cortex.WriteResponse{}, nil
338357
}
339358

340359
func (d *Distributor) getOrCreateIngestLimiter(userID string) *rate.Limiter {
@@ -350,7 +369,38 @@ func (d *Distributor) getOrCreateIngestLimiter(userID string) *rate.Limiter {
350369
return limiter
351370
}
352371

353-
func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDesc, sampleTrackers []*sampleTracker) error {
372+
func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDesc, sampleTrackers []*sampleTracker, pushTracker *pushTracker) {
373+
err := d.sendSamplesErr(ctx, ingester, sampleTrackers)
374+
375+
// If we succeed, decrement each sample's pending count by one. If we reach
376+
// the required number of successful puts on this sample, then decrement the
377+
// number of pending samples by one. If we successfully push all samples to
378+
// min success ingesters, wake up the waiting rpc so it can return early.
379+
// Similarly, track the number of errors, and if it exceeds maxFailures
380+
// shortcut the waiting rpc.
381+
//
382+
// The use of atomic increments here guarantees only a single sendSamples
383+
// goroutine will write to either channel.
384+
for i := range sampleTrackers {
385+
if err != nil {
386+
if atomic.AddInt32(&sampleTrackers[i].failed, 1) <= int32(sampleTrackers[i].maxFailures) {
387+
continue
388+
}
389+
if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 {
390+
pushTracker.err <- err
391+
}
392+
} else {
393+
if atomic.AddInt32(&sampleTrackers[i].succeeded, 1) != int32(sampleTrackers[i].minSuccess) {
394+
continue
395+
}
396+
if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 {
397+
pushTracker.done <- struct{}{}
398+
}
399+
}
400+
}
401+
}
402+
403+
func (d *Distributor) sendSamplesErr(ctx context.Context, ingester *ring.IngesterDesc, sampleTrackers []*sampleTracker) error {
354404
client, err := d.getClientFor(ingester)
355405
if err != nil {
356406
return err
@@ -366,25 +416,8 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe
366416
d.ingesterAppends.WithLabelValues(ingester.Addr).Inc()
367417
if err != nil {
368418
d.ingesterAppendFailures.WithLabelValues(ingester.Addr).Inc()
369-
return err
370-
}
371-
372-
for i := range sampleTrackers {
373-
atomic.AddInt32(&sampleTrackers[i].succeeded, 1)
374-
}
375-
return nil
376-
}
377-
378-
func metricNameFromLabelMatchers(matchers ...*metric.LabelMatcher) (model.LabelValue, error) {
379-
for _, m := range matchers {
380-
if m.Name == model.MetricNameLabel {
381-
if m.Type != metric.Equal {
382-
return "", fmt.Errorf("non-equality matchers are not supported on the metric name")
383-
}
384-
return m.Value, nil
385-
}
386419
}
387-
return "", fmt.Errorf("no metric name matcher found")
420+
return err
388421
}
389422

390423
// Query implements Querier.
@@ -393,7 +426,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
393426
err := instrument.TimeRequestHistogram(ctx, "Distributor.Query", d.queryDuration, func(ctx context.Context) error {
394427
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}
395428

396-
metricName, err := metricNameFromLabelMatchers(matchers...)
429+
metricName, _, err := util.ExtractMetricNameFromMatchers(matchers)
397430
if err != nil {
398431
return err
399432
}

0 commit comments

Comments
 (0)