@@ -20,7 +20,6 @@ import (
20
20
"github.com/prometheus/common/log"
21
21
"github.com/prometheus/common/model"
22
22
"github.com/prometheus/prometheus/storage/metric"
23
- "github.com/prometheus/prometheus/storage/remote"
24
23
25
24
"github.com/weaveworks/common/instrument"
26
25
"github.com/weaveworks/common/middleware"
38
37
"The current number of ingester clients." ,
39
38
nil , nil ,
40
39
)
40
+ labelNameBytes = []byte (model .MetricNameLabel )
41
41
)
42
42
43
43
// Distributor is a storage.SampleAppender and a cortex.Querier which
@@ -242,20 +242,25 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (cortex.Ingester
242
242
return client , nil
243
243
}
244
244
245
- func tokenForMetric (userID string , metric model.Metric ) uint32 {
246
- name := metric [model .MetricNameLabel ]
247
- return tokenFor (userID , name )
245
+ func tokenForLabels (userID string , labels []cortex.LabelPair ) (uint32 , error ) {
246
+ for _ , label := range labels {
247
+ if label .Name .Equal (labelNameBytes ) {
248
+ return tokenFor (userID , label .Value ), nil
249
+ }
250
+ }
251
+ return 0 , fmt .Errorf ("No metric name label" )
248
252
}
249
253
250
- func tokenFor (userID string , name model. LabelValue ) uint32 {
254
+ func tokenFor (userID string , name [] byte ) uint32 {
251
255
h := fnv .New32 ()
252
256
h .Write ([]byte (userID ))
253
- h .Write ([] byte ( name ) )
257
+ h .Write (name )
254
258
return h .Sum32 ()
255
259
}
256
260
257
261
type sampleTracker struct {
258
- sample * model.Sample
262
+ labels []cortex.LabelPair
263
+ sample cortex.Sample
259
264
minSuccess int
260
265
maxFailures int
261
266
succeeded int32
@@ -270,13 +275,30 @@ type pushTracker struct {
270
275
}
271
276
272
277
// Push implements cortex.IngesterServer
273
- func (d * Distributor ) Push (ctx context.Context , req * remote .WriteRequest ) (* cortex.WriteResponse , error ) {
278
+ func (d * Distributor ) Push (ctx context.Context , req * cortex .WriteRequest ) (* cortex.WriteResponse , error ) {
274
279
userID , err := user .GetID (ctx )
275
280
if err != nil {
276
281
return nil , err
277
282
}
278
283
279
- samples := util .FromWriteRequest (req )
284
+ // First we flatten out the request into a list of samples.
285
+ // We use the heuristic of 1 sample per TS to size the array.
286
+ // We also work out the hash value at the same time.
287
+ samples := make ([]sampleTracker , 0 , len (req .Timeseries ))
288
+ keys := make ([]uint32 , 0 , len (req .Timeseries ))
289
+ for _ , ts := range req .Timeseries {
290
+ key , err := tokenForLabels (userID , ts .Labels )
291
+ if err != nil {
292
+ return nil , err
293
+ }
294
+ for _ , s := range ts .Samples {
295
+ keys = append (keys , key )
296
+ samples = append (samples , sampleTracker {
297
+ labels : ts .Labels ,
298
+ sample : s ,
299
+ })
300
+ }
301
+ }
280
302
d .receivedSamples .Add (float64 (len (samples )))
281
303
282
304
if len (samples ) == 0 {
@@ -288,27 +310,24 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
288
310
return nil , errIngestionRateLimitExceeded
289
311
}
290
312
291
- keys := make ([]uint32 , len (samples ), len (samples ))
292
- for i , sample := range samples {
293
- keys [i ] = tokenForMetric (userID , sample .Metric )
294
- }
295
-
296
- ingesters , err := d .ring .BatchGet (keys , d .cfg .ReplicationFactor , ring .Write )
297
- if err != nil {
313
+ var ingesters [][]* ring.IngesterDesc
314
+ if err := instrument .TimeRequestHistogram (ctx , "Distributor.Push[ring-lookup]" , nil , func (ctx context.Context ) error {
315
+ var err error
316
+ ingesters , err = d .ring .BatchGet (keys , d .cfg .ReplicationFactor , ring .Write )
317
+ if err != nil {
318
+ return err
319
+ }
320
+ return nil
321
+ }); err != nil {
298
322
return nil , err
299
323
}
300
324
301
- sampleTrackers := make ([]sampleTracker , len (samples ), len (samples ))
302
325
samplesByIngester := map [* ring.IngesterDesc ][]* sampleTracker {}
303
326
for i := range samples {
304
327
// We need a response from a quorum of ingesters, which is n/2 + 1.
305
328
minSuccess := (len (ingesters [i ]) / 2 ) + 1
306
-
307
- sampleTrackers [i ] = sampleTracker {
308
- sample : samples [i ],
309
- minSuccess : minSuccess ,
310
- maxFailures : len (ingesters [i ]) - minSuccess ,
311
- }
329
+ samples [i ].minSuccess = minSuccess
330
+ samples [i ].maxFailures = len (ingesters [i ]) - minSuccess
312
331
313
332
// Skip those that have not heartbeated in a while. NB these are still
314
333
// included in the calculation of minSuccess, so if too many failed ingesters
@@ -322,14 +341,14 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
322
341
323
342
// This is just a shortcut - if there are not minSuccess available ingesters,
324
343
// after filtering out dead ones, don't even bother trying.
325
- if len (liveIngesters ) < sampleTrackers [ i ]. minSuccess {
344
+ if len (liveIngesters ) < minSuccess {
326
345
return nil , fmt .Errorf ("wanted at least %d live ingesters to process write, had %d" ,
327
- sampleTrackers [ i ]. minSuccess , len (liveIngesters ))
346
+ minSuccess , len (liveIngesters ))
328
347
}
329
348
330
349
for _ , liveIngester := range liveIngesters {
331
350
sampleForIngester := samplesByIngester [liveIngester ]
332
- samplesByIngester [liveIngester ] = append (sampleForIngester , & sampleTrackers [i ])
351
+ samplesByIngester [liveIngester ] = append (sampleForIngester , & samples [i ])
333
352
}
334
353
}
335
354
@@ -395,17 +414,24 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe
395
414
}
396
415
}
397
416
398
- func (d * Distributor ) sendSamplesErr (ctx context.Context , ingester * ring.IngesterDesc , sampleTrackers []* sampleTracker ) error {
417
+ func (d * Distributor ) sendSamplesErr (ctx context.Context , ingester * ring.IngesterDesc , samples []* sampleTracker ) error {
399
418
client , err := d .getClientFor (ingester )
400
419
if err != nil {
401
420
return err
402
421
}
403
- samples := make ([] * model. Sample , len ( sampleTrackers ), len ( sampleTrackers ))
404
- for i := range sampleTrackers {
405
- samples [ i ] = sampleTrackers [ i ]. sample
422
+
423
+ req := & cortex. WriteRequest {
424
+ Timeseries : make ([]cortex. TimeSeries , 0 , len ( samples )),
406
425
}
426
+ for _ , s := range samples {
427
+ req .Timeseries = append (req .Timeseries , cortex.TimeSeries {
428
+ Labels : s .labels ,
429
+ Samples : []cortex.Sample {s .sample },
430
+ })
431
+ }
432
+
407
433
err = instrument .TimeRequestHistogram (ctx , "Distributor.sendSamples" , d .sendDuration , func (ctx context.Context ) error {
408
- _ , err := client .Push (ctx , util . ToWriteRequest ( samples ) )
434
+ _ , err := client .Push (ctx , req )
409
435
return err
410
436
})
411
437
d .ingesterAppends .WithLabelValues (ingester .Addr ).Inc ()
@@ -434,7 +460,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
434
460
return err
435
461
}
436
462
437
- ingesters , err := d .ring .Get (tokenFor (userID , metricName ), d .cfg .ReplicationFactor , ring .Read )
463
+ ingesters , err := d .ring .Get (tokenFor (userID , [] byte ( metricName ) ), d .cfg .ReplicationFactor , ring .Read )
438
464
if err != nil {
439
465
return err
440
466
}
0 commit comments