@@ -220,133 +220,240 @@ func TestOTLPConvertToPromTS(t *testing.T) {
220
220
}
221
221
}
222
222
223
+ // for testing
224
+ type resetReader struct {
225
+ * bytes.Reader
226
+ body []byte
227
+ }
228
+
229
+ func newResetReader (body []byte ) * resetReader {
230
+ return & resetReader {
231
+ Reader : bytes .NewReader (body ),
232
+ body : body ,
233
+ }
234
+ }
235
+
236
+ func (r * resetReader ) Reset () {
237
+ r .Reader .Reset (r .body )
238
+ }
239
+
240
+ func (r * resetReader ) Close () error {
241
+ return nil
242
+ }
243
+
244
+ func getOTLPHttpRequest (otlpRequest * pmetricotlp.ExportRequest , contentType , encodingType string ) (* http.Request , error ) {
245
+ ctx := context .Background ()
246
+ ctx = user .InjectOrgID (ctx , "user-1" )
247
+
248
+ var body []byte
249
+ var err error
250
+ switch contentType {
251
+ case jsonContentType :
252
+ body , err = otlpRequest .MarshalJSON ()
253
+ if err != nil {
254
+ return nil , err
255
+ }
256
+ case pbContentType :
257
+ body , err = otlpRequest .MarshalProto ()
258
+ if err != nil {
259
+ return nil , err
260
+ }
261
+ }
262
+
263
+ if encodingType == "gzip" {
264
+ var gzipBody bytes.Buffer
265
+ gz := gzip .NewWriter (& gzipBody )
266
+ _ , err = gz .Write (body )
267
+ if err != nil {
268
+ return nil , err
269
+ }
270
+ if err = gz .Close (); err != nil {
271
+ return nil , err
272
+ }
273
+ body = gzipBody .Bytes ()
274
+ }
275
+
276
+ req , err := http .NewRequestWithContext (ctx , "" , "" , newResetReader (body ))
277
+ if err != nil {
278
+ return nil , err
279
+ }
280
+
281
+ switch contentType {
282
+ case jsonContentType :
283
+ req .Header .Set ("Content-Type" , jsonContentType )
284
+ case pbContentType :
285
+ req .Header .Set ("Content-Type" , pbContentType )
286
+ }
287
+
288
+ if encodingType != "" {
289
+ req .Header .Set ("Content-Encoding" , encodingType )
290
+ }
291
+ req .ContentLength = int64 (len (body ))
292
+
293
+ return req , nil
294
+ }
295
+
296
+ func BenchmarkOTLPWriteHandler (b * testing.B ) {
297
+ cfg := distributor.OTLPConfig {
298
+ ConvertAllAttributes : false ,
299
+ DisableTargetInfo : false ,
300
+ }
301
+ overrides , err := validation .NewOverrides (querier .DefaultLimitsConfig (), nil )
302
+ require .NoError (b , err )
303
+
304
+ exportRequest := generateOTLPWriteRequest ()
305
+ mockPushFunc := func (context.Context , * cortexpb.WriteRequest ) (* cortexpb.WriteResponse , error ) {
306
+ return & cortexpb.WriteResponse {}, nil
307
+ }
308
+ handler := OTLPHandler (10000 , overrides , cfg , nil , mockPushFunc )
309
+
310
+ b .Run ("json with no compression" , func (b * testing.B ) {
311
+ req , err := getOTLPHttpRequest (& exportRequest , jsonContentType , "" )
312
+ require .NoError (b , err )
313
+
314
+ b .ResetTimer ()
315
+ b .ReportAllocs ()
316
+ for i := 0 ; i < b .N ; i ++ {
317
+ recorder := httptest .NewRecorder ()
318
+ handler .ServeHTTP (recorder , req )
319
+
320
+ resp := recorder .Result ()
321
+ require .Equal (b , http .StatusOK , resp .StatusCode )
322
+ req .Body .(* resetReader ).Reset ()
323
+ }
324
+ })
325
+ b .Run ("json with gzip" , func (b * testing.B ) {
326
+ req , err := getOTLPHttpRequest (& exportRequest , jsonContentType , "gzip" )
327
+ require .NoError (b , err )
328
+
329
+ b .ResetTimer ()
330
+ b .ReportAllocs ()
331
+ for i := 0 ; i < b .N ; i ++ {
332
+ recorder := httptest .NewRecorder ()
333
+ handler .ServeHTTP (recorder , req )
334
+
335
+ resp := recorder .Result ()
336
+ require .Equal (b , http .StatusOK , resp .StatusCode )
337
+ req .Body .(* resetReader ).Reset ()
338
+ }
339
+ })
340
+ b .Run ("proto with no compression" , func (b * testing.B ) {
341
+ req , err := getOTLPHttpRequest (& exportRequest , pbContentType , "" )
342
+ require .NoError (b , err )
343
+
344
+ b .ResetTimer ()
345
+ b .ReportAllocs ()
346
+ for i := 0 ; i < b .N ; i ++ {
347
+ recorder := httptest .NewRecorder ()
348
+ handler .ServeHTTP (recorder , req )
349
+
350
+ resp := recorder .Result ()
351
+ require .Equal (b , http .StatusOK , resp .StatusCode )
352
+ req .Body .(* resetReader ).Reset ()
353
+ }
354
+ })
355
+ b .Run ("proto with gzip" , func (b * testing.B ) {
356
+ req , err := getOTLPHttpRequest (& exportRequest , pbContentType , "gzip" )
357
+ require .NoError (b , err )
358
+
359
+ b .ResetTimer ()
360
+ b .ReportAllocs ()
361
+ for i := 0 ; i < b .N ; i ++ {
362
+ recorder := httptest .NewRecorder ()
363
+ handler .ServeHTTP (recorder , req )
364
+
365
+ resp := recorder .Result ()
366
+ require .Equal (b , http .StatusOK , resp .StatusCode )
367
+ req .Body .(* resetReader ).Reset ()
368
+ }
369
+ })
370
+ }
371
+
223
372
func TestOTLPWriteHandler (t * testing.T ) {
224
373
cfg := distributor.OTLPConfig {
225
374
ConvertAllAttributes : false ,
226
375
DisableTargetInfo : false ,
227
376
}
228
377
229
- exportRequest := generateOTLPWriteRequest (t )
378
+ exportRequest := generateOTLPWriteRequest ()
230
379
231
380
tests := []struct {
232
381
description string
233
382
maxRecvMsgSize int
234
- format string
383
+ contentType string
235
384
expectedStatusCode int
236
385
expectedErrMsg string
237
- gzipCompression bool
238
386
encodingType string
239
387
}{
240
388
{
241
389
description : "Test proto format write with no compression" ,
242
390
maxRecvMsgSize : 10000 ,
243
- format : pbContentType ,
391
+ contentType : pbContentType ,
244
392
expectedStatusCode : http .StatusOK ,
245
393
},
246
394
{
247
395
description : "Test proto format write with gzip" ,
248
396
maxRecvMsgSize : 10000 ,
249
- format : pbContentType ,
397
+ contentType : pbContentType ,
250
398
expectedStatusCode : http .StatusOK ,
251
399
encodingType : "gzip" ,
252
- gzipCompression : true ,
253
400
},
254
401
{
255
402
description : "Test json format write with no compression" ,
256
403
maxRecvMsgSize : 10000 ,
257
- format : jsonContentType ,
404
+ contentType : jsonContentType ,
258
405
expectedStatusCode : http .StatusOK ,
259
406
},
260
407
{
261
408
description : "Test json format write with gzip" ,
262
409
maxRecvMsgSize : 10000 ,
263
- format : jsonContentType ,
410
+ contentType : jsonContentType ,
264
411
expectedStatusCode : http .StatusOK ,
265
412
encodingType : "gzip" ,
266
- gzipCompression : true ,
267
413
},
268
414
{
269
415
description : "request too big than maxRecvMsgSize (proto) with no compression" ,
270
416
maxRecvMsgSize : 10 ,
271
- format : pbContentType ,
417
+ contentType : pbContentType ,
272
418
expectedStatusCode : http .StatusBadRequest ,
273
419
expectedErrMsg : "received message larger than max" ,
274
420
},
275
421
{
276
422
description : "request too big than maxRecvMsgSize (proto) with gzip" ,
277
423
maxRecvMsgSize : 10 ,
278
- format : pbContentType ,
424
+ contentType : pbContentType ,
279
425
expectedStatusCode : http .StatusBadRequest ,
280
426
expectedErrMsg : "received message larger than max" ,
281
427
encodingType : "gzip" ,
282
- gzipCompression : true ,
283
428
},
284
429
{
285
430
description : "request too big than maxRecvMsgSize (json) with no compression" ,
286
431
maxRecvMsgSize : 10 ,
287
- format : jsonContentType ,
432
+ contentType : jsonContentType ,
288
433
expectedStatusCode : http .StatusBadRequest ,
289
434
expectedErrMsg : "received message larger than max" ,
290
435
},
291
436
{
292
437
description : "request too big than maxRecvMsgSize (json) with gzip" ,
293
438
maxRecvMsgSize : 10 ,
294
- format : jsonContentType ,
439
+ contentType : jsonContentType ,
295
440
expectedStatusCode : http .StatusBadRequest ,
296
441
expectedErrMsg : "received message larger than max" ,
297
442
encodingType : "gzip" ,
298
- gzipCompression : true ,
299
443
},
300
444
{
301
445
description : "invalid encoding type: snappy" ,
302
446
maxRecvMsgSize : 10000 ,
303
- format : jsonContentType ,
447
+ contentType : jsonContentType ,
304
448
expectedStatusCode : http .StatusBadRequest ,
305
449
encodingType : "snappy" ,
306
450
},
307
451
}
308
452
309
453
for _ , test := range tests {
310
454
t .Run (test .description , func (t * testing.T ) {
311
- ctx := context .Background ()
312
- ctx = user .InjectOrgID (ctx , "user-1" )
313
- var req * http.Request
314
-
315
- compressionFunc := func (t * testing.T , body []byte ) []byte {
316
- var b bytes.Buffer
317
- gz := gzip .NewWriter (& b )
318
- _ , err := gz .Write (body )
319
- require .NoError (t , err )
320
- require .NoError (t , gz .Close ())
321
-
322
- return b .Bytes ()
323
- }
324
-
325
- if test .format == pbContentType {
326
- buf , err := exportRequest .MarshalProto ()
327
- require .NoError (t , err )
328
-
329
- if test .gzipCompression {
330
- buf = compressionFunc (t , buf )
331
- }
332
-
333
- req , err = http .NewRequestWithContext (ctx , "" , "" , bytes .NewReader (buf ))
334
- require .NoError (t , err )
335
- req .Header .Set ("Content-Type" , pbContentType )
336
- req .Header .Set ("Content-Encoding" , test .encodingType )
337
- } else {
338
- buf , err := exportRequest .MarshalJSON ()
339
- require .NoError (t , err )
340
-
341
- if test .gzipCompression {
342
- buf = compressionFunc (t , buf )
343
- }
344
-
345
- req , err = http .NewRequestWithContext (ctx , "" , "" , bytes .NewReader (buf ))
346
- require .NoError (t , err )
347
- req .Header .Set ("Content-Type" , jsonContentType )
348
- req .Header .Set ("Content-Encoding" , test .encodingType )
349
- }
455
+ req , err := getOTLPHttpRequest (& exportRequest , test .contentType , test .encodingType )
456
+ require .NoError (t , err )
350
457
351
458
push := verifyOTLPWriteRequestHandler (t , cortexpb .API )
352
459
overrides , err := validation .NewOverrides (querier .DefaultLimitsConfig (), nil )
@@ -368,7 +475,7 @@ func TestOTLPWriteHandler(t *testing.T) {
368
475
}
369
476
}
370
477
371
- func generateOTLPWriteRequest (t * testing. T ) pmetricotlp.ExportRequest {
478
+ func generateOTLPWriteRequest () pmetricotlp.ExportRequest {
372
479
d := pmetric .NewMetrics ()
373
480
374
481
// Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram
0 commit comments