@@ -1290,62 +1290,110 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
1290
1290
1291
1291
// LabelValues returns all label values that are associated with a given label name.
1292
1292
func (i * Ingester ) LabelValues (ctx context.Context , req * client.LabelValuesRequest ) (* client.LabelValuesResponse , error ) {
1293
+ resp , cleanup , err := i .labelsValuesCommon (ctx , req )
1294
+ defer cleanup ()
1295
+ return resp , err
1296
+ }
1297
+
1298
+ // LabelValuesStream returns all label values that are associated with a given label name.
1299
+ func (i * Ingester ) LabelValuesStream (req * client.LabelValuesRequest , stream client.Ingester_LabelValuesStreamServer ) error {
1300
+ resp , cleanup , err := i .labelsValuesCommon (stream .Context (), req )
1301
+ defer cleanup ()
1302
+
1303
+ if err != nil {
1304
+ return err
1305
+ }
1306
+
1307
+ for i := 0 ; i < len (resp .LabelValues ); i += metadataStreamBatchSize {
1308
+ j := i + metadataStreamBatchSize
1309
+ if j > len (resp .LabelValues ) {
1310
+ j = len (resp .LabelValues )
1311
+ }
1312
+ resp := & client.LabelValuesStreamResponse {
1313
+ LabelValues : resp .LabelValues [i :j ],
1314
+ }
1315
+ err := client .SendLabelValuesStream (stream , resp )
1316
+ if err != nil {
1317
+ return err
1318
+ }
1319
+ }
1320
+
1321
+ return nil
1322
+ }
1323
+
1324
+ // labelsValuesCommon returns all label values that are associated with a given label name.
1325
+ // this should be used by LabelValues and LabelValuesStream
1326
+ // the cleanup function should be called in order to close the querier
1327
+ func (i * Ingester ) labelsValuesCommon (ctx context.Context , req * client.LabelValuesRequest ) (* client.LabelValuesResponse , func (), error ) {
1328
+ cleanup := func () {}
1293
1329
if err := i .checkRunning (); err != nil {
1294
- return nil , err
1330
+ return nil , cleanup , err
1295
1331
}
1296
1332
1297
1333
labelName , startTimestampMs , endTimestampMs , matchers , err := client .FromLabelValuesRequest (req )
1298
1334
if err != nil {
1299
- return nil , err
1335
+ return nil , cleanup , err
1300
1336
}
1301
1337
1302
1338
userID , err := tenant .TenantID (ctx )
1303
1339
if err != nil {
1304
- return nil , err
1340
+ return nil , cleanup , err
1305
1341
}
1306
1342
1307
1343
db := i .getTSDB (userID )
1308
1344
if db == nil {
1309
- return & client.LabelValuesResponse {}, nil
1345
+ return & client.LabelValuesResponse {}, cleanup , nil
1310
1346
}
1311
1347
1312
1348
mint , maxt , err := metadataQueryRange (startTimestampMs , endTimestampMs , db , i .cfg .QueryStoreForLabels , i .cfg .QueryIngestersWithin )
1313
1349
if err != nil {
1314
- return nil , err
1350
+ return nil , cleanup , err
1315
1351
}
1316
1352
1317
1353
q , err := db .Querier (ctx , mint , maxt )
1318
1354
if err != nil {
1319
- return nil , err
1355
+ return nil , cleanup , err
1356
+ }
1357
+
1358
+ cleanup = func () {
1359
+ q .Close ()
1320
1360
}
1321
- defer q .Close ()
1322
1361
1323
1362
vals , _ , err := q .LabelValues (labelName , matchers ... )
1324
1363
if err != nil {
1325
- return nil , err
1364
+ return nil , cleanup , err
1326
1365
}
1327
1366
1328
1367
return & client.LabelValuesResponse {
1329
1368
LabelValues : vals ,
1330
- }, nil
1369
+ }, cleanup , nil
1331
1370
}
1332
1371
1333
- func (i * Ingester ) LabelValuesStream (req * client.LabelValuesRequest , stream client.Ingester_LabelValuesStreamServer ) error {
1334
- resp , err := i .LabelValues (stream .Context (), req )
1372
+ // LabelNames return all the label names.
1373
+ func (i * Ingester ) LabelNames (ctx context.Context , req * client.LabelNamesRequest ) (* client.LabelNamesResponse , error ) {
1374
+ resp , cleanup , err := i .labelNamesCommon (ctx , req )
1375
+ defer cleanup ()
1376
+ return resp , err
1377
+ }
1378
+
1379
+ // LabelNamesStream return all the label names.
1380
+ func (i * Ingester ) LabelNamesStream (req * client.LabelNamesRequest , stream client.Ingester_LabelNamesStreamServer ) error {
1381
+ resp , cleanup , err := i .labelNamesCommon (stream .Context (), req )
1382
+ defer cleanup ()
1335
1383
1336
1384
if err != nil {
1337
1385
return err
1338
1386
}
1339
1387
1340
- for i := 0 ; i < len (resp .LabelValues ); i += metadataStreamBatchSize {
1388
+ for i := 0 ; i < len (resp .LabelNames ); i += metadataStreamBatchSize {
1341
1389
j := i + metadataStreamBatchSize
1342
- if j > len (resp .LabelValues ) {
1343
- j = len (resp .LabelValues )
1390
+ if j > len (resp .LabelNames ) {
1391
+ j = len (resp .LabelNames )
1344
1392
}
1345
- resp := & client.LabelValuesStreamResponse {
1346
- LabelValues : resp .LabelValues [i :j ],
1393
+ resp := & client.LabelNamesStreamResponse {
1394
+ LabelNames : resp .LabelNames [i :j ],
1347
1395
}
1348
- err := client .SendLabelValuesStream (stream , resp )
1396
+ err := client .SendLabelNamesStream (stream , resp )
1349
1397
if err != nil {
1350
1398
return err
1351
1399
}
@@ -1354,60 +1402,72 @@ func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream clie
1354
1402
return nil
1355
1403
}
1356
1404
1357
- // LabelNames return all the label names.
1358
- func (i * Ingester ) LabelNames (ctx context.Context , req * client.LabelNamesRequest ) (* client.LabelNamesResponse , error ) {
1405
+ // labelNamesCommon return all the label names.
1406
+ // this should be used by LabelNames and LabelNamesStream.
1407
+ // the cleanup function should be called in order to close the querier
1408
+ func (i * Ingester ) labelNamesCommon (ctx context.Context , req * client.LabelNamesRequest ) (* client.LabelNamesResponse , func (), error ) {
1409
+ cleanup := func () {}
1359
1410
if err := i .checkRunning (); err != nil {
1360
- return nil , err
1411
+ return nil , cleanup , err
1361
1412
}
1362
1413
1363
1414
userID , err := tenant .TenantID (ctx )
1364
1415
if err != nil {
1365
- return nil , err
1416
+ return nil , cleanup , err
1366
1417
}
1367
1418
1368
1419
db := i .getTSDB (userID )
1369
1420
if db == nil {
1370
- return & client.LabelNamesResponse {}, nil
1421
+ return & client.LabelNamesResponse {}, cleanup , nil
1371
1422
}
1372
1423
1373
1424
mint , maxt , err := metadataQueryRange (req .StartTimestampMs , req .EndTimestampMs , db , i .cfg .QueryStoreForLabels , i .cfg .QueryIngestersWithin )
1374
1425
if err != nil {
1375
- return nil , err
1426
+ return nil , cleanup , err
1376
1427
}
1377
1428
1378
1429
q , err := db .Querier (ctx , mint , maxt )
1379
1430
if err != nil {
1380
- return nil , err
1431
+ return nil , cleanup , err
1432
+ }
1433
+
1434
+ cleanup = func () {
1435
+ q .Close ()
1381
1436
}
1382
- defer q .Close ()
1383
1437
1384
1438
names , _ , err := q .LabelNames ()
1385
1439
if err != nil {
1386
- return nil , err
1440
+ return nil , cleanup , err
1387
1441
}
1388
1442
1389
1443
return & client.LabelNamesResponse {
1390
1444
LabelNames : names ,
1391
- }, nil
1445
+ }, cleanup , nil
1392
1446
}
1393
1447
1394
- // LabelNamesStream return all the label names.
1395
- func (i * Ingester ) LabelNamesStream (req * client.LabelNamesRequest , stream client.Ingester_LabelNamesStreamServer ) error {
1396
- resp , err := i .LabelNames (stream .Context (), req )
1448
+ // MetricsForLabelMatchers returns all the metrics which match a set of matchers.
1449
+ func (i * Ingester ) MetricsForLabelMatchers (ctx context.Context , req * client.MetricsForLabelMatchersRequest ) (* client.MetricsForLabelMatchersResponse , error ) {
1450
+ result , cleanup , err := i .metricsForLabelMatchersCommon (ctx , req )
1451
+ defer cleanup ()
1452
+ return result , err
1453
+ }
1397
1454
1455
+ func (i * Ingester ) MetricsForLabelMatchersStream (req * client.MetricsForLabelMatchersRequest , stream client.Ingester_MetricsForLabelMatchersStreamServer ) error {
1456
+ result , cleanup , err := i .metricsForLabelMatchersCommon (stream .Context (), req )
1457
+ defer cleanup ()
1398
1458
if err != nil {
1399
1459
return err
1400
1460
}
1401
1461
1402
- for i := 0 ; i < len (resp . LabelNames ); i += metadataStreamBatchSize {
1462
+ for i := 0 ; i < len (result . Metric ); i += metadataStreamBatchSize {
1403
1463
j := i + metadataStreamBatchSize
1404
- if j > len (resp . LabelNames ) {
1405
- j = len (resp . LabelNames )
1464
+ if j > len (result . Metric ) {
1465
+ j = len (result . Metric )
1406
1466
}
1407
- resp := & client.LabelNamesStreamResponse {
1408
- LabelNames : resp . LabelNames [i :j ],
1467
+ resp := & client.MetricsForLabelMatchersStreamResponse {
1468
+ Metric : result . Metric [i :j ],
1409
1469
}
1410
- err := client .SendLabelNamesStream (stream , resp )
1470
+ err := client .SendMetricsForLabelMatchersStream (stream , resp )
1411
1471
if err != nil {
1412
1472
return err
1413
1473
}
@@ -1416,46 +1476,52 @@ func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client
1416
1476
return nil
1417
1477
}
1418
1478
1419
- // MetricsForLabelMatchers returns all the metrics which match a set of matchers.
1420
- func (i * Ingester ) MetricsForLabelMatchers (ctx context.Context , req * client.MetricsForLabelMatchersRequest ) (* client.MetricsForLabelMatchersResponse , error ) {
1479
+ // metricsForLabelMatchersCommon returns all the metrics which match a set of matchers.
1480
+ // this should be used by MetricsForLabelMatchers and MetricsForLabelMatchersStream.
1481
+ // the cleanup function should be called in order to close the querier
1482
+ func (i * Ingester ) metricsForLabelMatchersCommon (ctx context.Context , req * client.MetricsForLabelMatchersRequest ) (* client.MetricsForLabelMatchersResponse , func (), error ) {
1483
+ cleanup := func () {}
1421
1484
if err := i .checkRunning (); err != nil {
1422
- return nil , err
1485
+ return nil , cleanup , err
1423
1486
}
1424
1487
1425
1488
userID , err := tenant .TenantID (ctx )
1426
1489
if err != nil {
1427
- return nil , err
1490
+ return nil , cleanup , err
1428
1491
}
1429
1492
1430
1493
db := i .getTSDB (userID )
1431
1494
if db == nil {
1432
- return & client.MetricsForLabelMatchersResponse {}, nil
1495
+ return & client.MetricsForLabelMatchersResponse {}, cleanup , nil
1433
1496
}
1434
1497
1435
1498
// Parse the request
1436
1499
_ , _ , matchersSet , err := client .FromMetricsForLabelMatchersRequest (req )
1437
1500
if err != nil {
1438
- return nil , err
1501
+ return nil , cleanup , err
1439
1502
}
1440
1503
1441
1504
mint , maxt , err := metadataQueryRange (req .StartTimestampMs , req .EndTimestampMs , db , i .cfg .QueryStoreForLabels , i .cfg .QueryIngestersWithin )
1442
1505
if err != nil {
1443
- return nil , err
1506
+ return nil , cleanup , err
1444
1507
}
1445
1508
1446
1509
q , err := db .Querier (ctx , mint , maxt )
1447
1510
if err != nil {
1448
- return nil , err
1511
+ return nil , cleanup , err
1512
+ }
1513
+
1514
+ cleanup = func () {
1515
+ q .Close ()
1449
1516
}
1450
- defer q .Close ()
1451
1517
1452
1518
// Run a query for each matchers set and collect all the results.
1453
1519
var sets []storage.SeriesSet
1454
1520
1455
1521
for _ , matchers := range matchersSet {
1456
1522
// Interrupt if the context has been canceled.
1457
1523
if ctx .Err () != nil {
1458
- return nil , ctx .Err ()
1524
+ return nil , cleanup , ctx .Err ()
1459
1525
}
1460
1526
1461
1527
hints := & storage.SelectHints {
@@ -1477,38 +1543,15 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr
1477
1543
for mergedSet .Next () {
1478
1544
// Interrupt if the context has been canceled.
1479
1545
if ctx .Err () != nil {
1480
- return nil , ctx .Err ()
1546
+ return nil , cleanup , ctx .Err ()
1481
1547
}
1482
1548
1483
1549
result .Metric = append (result .Metric , & cortexpb.Metric {
1484
1550
Labels : cortexpb .FromLabelsToLabelAdapters (mergedSet .At ().Labels ()),
1485
1551
})
1486
1552
}
1487
1553
1488
- return result , nil
1489
- }
1490
-
1491
- func (i * Ingester ) MetricsForLabelMatchersStream (req * client.MetricsForLabelMatchersRequest , stream client.Ingester_MetricsForLabelMatchersStreamServer ) error {
1492
- result , err := i .MetricsForLabelMatchers (stream .Context (), req )
1493
- if err != nil {
1494
- return err
1495
- }
1496
-
1497
- for i := 0 ; i < len (result .Metric ); i += metadataStreamBatchSize {
1498
- j := i + metadataStreamBatchSize
1499
- if j > len (result .Metric ) {
1500
- j = len (result .Metric )
1501
- }
1502
- resp := & client.MetricsForLabelMatchersStreamResponse {
1503
- Metric : result .Metric [i :j ],
1504
- }
1505
- err := client .SendMetricsForLabelMatchersStream (stream , resp )
1506
- if err != nil {
1507
- return err
1508
- }
1509
- }
1510
-
1511
- return nil
1554
+ return result , cleanup , nil
1512
1555
}
1513
1556
1514
1557
// MetricsMetadata returns all the metric metadata of a user.
0 commit comments