@@ -14,10 +14,15 @@ import (
14
14
"time"
15
15
16
16
"github.com/go-kit/kit/log"
17
+ "github.com/gogo/status"
17
18
"github.com/oklog/ulid"
19
+ "github.com/prometheus/client_golang/prometheus"
18
20
"github.com/prometheus/prometheus/pkg/labels"
19
21
"github.com/prometheus/prometheus/pkg/relabel"
20
22
"github.com/prometheus/prometheus/pkg/timestamp"
23
+ "github.com/weaveworks/common/httpgrpc"
24
+ "google.golang.org/grpc/codes"
25
+
21
26
"github.com/thanos-io/thanos/pkg/block"
22
27
"github.com/thanos-io/thanos/pkg/block/metadata"
23
28
"github.com/thanos-io/thanos/pkg/model"
@@ -45,6 +50,20 @@ type swappableCache struct {
45
50
ptr storecache.IndexCache
46
51
}
47
52
53
+ type customLimiter struct {
54
+ limiter * Limiter
55
+ code codes.Code
56
+ }
57
+
58
+ func (c * customLimiter ) Reserve (num uint64 ) error {
59
+ err := c .limiter .Reserve (num )
60
+ if err != nil {
61
+ return httpgrpc .Errorf (int (c .code ), err .Error ())
62
+ }
63
+
64
+ return nil
65
+ }
66
+
48
67
func (c * swappableCache ) SwapWith (ptr2 storecache.IndexCache ) {
49
68
c .ptr = ptr2
50
69
}
@@ -113,7 +132,25 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o
113
132
return
114
133
}
115
134
116
- func prepareStoreWithTestBlocks (t testing.TB , dir string , bkt objstore.Bucket , manyParts bool , maxChunksLimit uint64 , relabelConfig []* relabel.Config , filterConf * FilterConfig ) * storeSuite {
135
+ func newCustomChunksLimiterFactory (limit uint64 , code codes.Code ) ChunksLimiterFactory {
136
+ return func (failedCounter prometheus.Counter ) ChunksLimiter {
137
+ return & customLimiter {
138
+ limiter : NewLimiter (limit , failedCounter ),
139
+ code : code ,
140
+ }
141
+ }
142
+ }
143
+
144
+ func newCustomSeriesLimiterFactory (limit uint64 , code codes.Code ) SeriesLimiterFactory {
145
+ return func (failedCounter prometheus.Counter ) SeriesLimiter {
146
+ return & customLimiter {
147
+ limiter : NewLimiter (limit , failedCounter ),
148
+ code : code ,
149
+ }
150
+ }
151
+ }
152
+
153
+ func prepareStoreWithTestBlocks (t testing.TB , dir string , bkt objstore.Bucket , manyParts bool , chunksLimiterFactory ChunksLimiterFactory , seriesLimiterFactory SeriesLimiterFactory , relabelConfig []* relabel.Config , filterConf * FilterConfig ) * storeSuite {
117
154
series := []labels.Labels {
118
155
labels .FromStrings ("a" , "1" , "b" , "1" ),
119
156
labels .FromStrings ("a" , "1" , "b" , "2" ),
@@ -151,8 +188,8 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
151
188
s .cache ,
152
189
nil ,
153
190
nil ,
154
- NewChunksLimiterFactory ( maxChunksLimit ) ,
155
- NewSeriesLimiterFactory ( 0 ) ,
191
+ chunksLimiterFactory ,
192
+ seriesLimiterFactory ,
156
193
NewGapBasedPartitioner (PartitionerMaxGapSize ),
157
194
false ,
158
195
20 ,
@@ -425,7 +462,7 @@ func TestBucketStore_e2e(t *testing.T) {
425
462
testutil .Ok (t , err )
426
463
defer func () { testutil .Ok (t , os .RemoveAll (dir )) }()
427
464
428
- s := prepareStoreWithTestBlocks (t , dir , bkt , false , 0 , emptyRelabelConfig , allowAllFilterConf )
465
+ s := prepareStoreWithTestBlocks (t , dir , bkt , false , NewChunksLimiterFactory ( 0 ), NewSeriesLimiterFactory ( 0 ) , emptyRelabelConfig , allowAllFilterConf )
429
466
430
467
if ok := t .Run ("no index cache" , func (t * testing.T ) {
431
468
s .cache .SwapWith (noopCache {})
@@ -480,7 +517,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
480
517
testutil .Ok (t , err )
481
518
defer func () { testutil .Ok (t , os .RemoveAll (dir )) }()
482
519
483
- s := prepareStoreWithTestBlocks (t , dir , bkt , true , 0 , emptyRelabelConfig , allowAllFilterConf )
520
+ s := prepareStoreWithTestBlocks (t , dir , bkt , true , NewChunksLimiterFactory ( 0 ), NewSeriesLimiterFactory ( 0 ) , emptyRelabelConfig , allowAllFilterConf )
484
521
485
522
indexCache , err := storecache .NewInMemoryIndexCacheWithConfig (s .logger , nil , storecache.InMemoryIndexCacheConfig {
486
523
MaxItemSize : 1e5 ,
@@ -508,7 +545,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
508
545
// The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks.
509
546
expectedChunks := uint64 (2 * 2 )
510
547
511
- s := prepareStoreWithTestBlocks (t , dir , bkt , false , expectedChunks , emptyRelabelConfig , & FilterConfig {
548
+ s := prepareStoreWithTestBlocks (t , dir , bkt , false , NewChunksLimiterFactory ( expectedChunks ), NewSeriesLimiterFactory ( 0 ) , emptyRelabelConfig , & FilterConfig {
512
549
MinTime : minTimeDuration ,
513
550
MaxTime : filterMaxTime ,
514
551
})
@@ -554,14 +591,28 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
554
591
555
592
cases := map [string ]struct {
556
593
maxChunksLimit uint64
594
+ maxSeriesLimit uint64
557
595
expectedErr string
596
+ code codes.Code
558
597
}{
559
598
"should succeed if the max chunks limit is not exceeded" : {
560
599
maxChunksLimit : expectedChunks ,
561
600
},
562
- "should fail if the max chunks limit is exceeded" : {
601
+ "should fail if the max chunks limit is exceeded - ResourceExhausted " : {
563
602
maxChunksLimit : expectedChunks - 1 ,
564
603
expectedErr : "exceeded chunks limit" ,
604
+ code : codes .ResourceExhausted ,
605
+ },
606
+ "should fail if the max chunks limit is exceeded - 422" : {
607
+ maxChunksLimit : expectedChunks - 1 ,
608
+ expectedErr : "exceeded chunks limit" ,
609
+ code : 422 ,
610
+ },
611
+ "should fail if the max series limit is exceeded - 422" : {
612
+ maxChunksLimit : expectedChunks ,
613
+ expectedErr : "exceeded series limit" ,
614
+ maxSeriesLimit : 1 ,
615
+ code : 422 ,
565
616
},
566
617
}
567
618
@@ -575,7 +626,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
575
626
testutil .Ok (t , err )
576
627
defer func () { testutil .Ok (t , os .RemoveAll (dir )) }()
577
628
578
- s := prepareStoreWithTestBlocks (t , dir , bkt , false , testData .maxChunksLimit , emptyRelabelConfig , allowAllFilterConf )
629
+ s := prepareStoreWithTestBlocks (t , dir , bkt , false , newCustomChunksLimiterFactory ( testData .maxChunksLimit , testData . code ), newCustomSeriesLimiterFactory ( testData . maxSeriesLimit , testData . code ) , emptyRelabelConfig , allowAllFilterConf )
579
630
testutil .Ok (t , s .store .SyncBlocks (ctx ))
580
631
581
632
req := & storepb.SeriesRequest {
@@ -595,6 +646,9 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
595
646
} else {
596
647
testutil .NotOk (t , err )
597
648
testutil .Assert (t , strings .Contains (err .Error (), testData .expectedErr ))
649
+ status , ok := status .FromError (err )
650
+ testutil .Equals (t , true , ok )
651
+ testutil .Equals (t , testData .code , status .Code ())
598
652
}
599
653
})
600
654
}
@@ -609,7 +663,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {
609
663
testutil .Ok (t , err )
610
664
defer func () { testutil .Ok (t , os .RemoveAll (dir )) }()
611
665
612
- s := prepareStoreWithTestBlocks (t , dir , bkt , false , 0 , emptyRelabelConfig , allowAllFilterConf )
666
+ s := prepareStoreWithTestBlocks (t , dir , bkt , false , NewChunksLimiterFactory ( 0 ), NewSeriesLimiterFactory ( 0 ) , emptyRelabelConfig , allowAllFilterConf )
613
667
614
668
mint , maxt := s .store .TimeRange ()
615
669
testutil .Equals (t , s .minTime , mint )
@@ -642,7 +696,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) {
642
696
testutil .Ok (t , err )
643
697
defer func () { testutil .Ok (t , os .RemoveAll (dir )) }()
644
698
645
- s := prepareStoreWithTestBlocks (t , dir , bkt , false , 0 , emptyRelabelConfig , allowAllFilterConf )
699
+ s := prepareStoreWithTestBlocks (t , dir , bkt , false , NewChunksLimiterFactory ( 0 ), NewSeriesLimiterFactory ( 0 ) , emptyRelabelConfig , allowAllFilterConf )
646
700
647
701
mint , maxt := s .store .TimeRange ()
648
702
testutil .Equals (t , s .minTime , mint )
0 commit comments