8
8
"crypto/x509"
9
9
"crypto/x509/pkix"
10
10
"fmt"
11
+ "github.com/cortexproject/cortex/pkg/storage/tsdb"
11
12
"math"
12
13
"net/http"
13
14
"os"
@@ -576,8 +577,8 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
576
577
require .Equal (t , 200 , res .StatusCode )
577
578
}
578
579
579
- totalQueries , err := ruler . SumMetrics ([] string { "cortex_ruler_queries_total" } )
580
- require . NoError ( t , err )
580
+ matcher := labels . MustNewMatcher ( labels . MatchEqual , "user" , user )
581
+ var totalQueries = [] float64 { 0 }
581
582
582
583
// Verify that user-failures don't increase cortex_ruler_queries_failed_total
583
584
for groupName , expression := range map [string ]string {
@@ -601,7 +602,7 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
601
602
require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .GreaterOrEqual (1 ), []string {"cortex_prometheus_rule_evaluation_failures_total" }, e2e .WithLabelMatchers (m ), e2e .WaitMissingMetrics ))
602
603
603
604
// But these failures were not reported as "failed queries"
604
- sum , err := ruler .SumMetrics ([]string {"cortex_ruler_queries_failed_total" })
605
+ sum , err := ruler .SumMetrics ([]string {"cortex_ruler_queries_failed_total" }, e2e . WithLabelMatchers ( matcher ) )
605
606
require .NoError (t , err )
606
607
require .Equal (t , float64 (0 ), sum [0 ])
607
608
@@ -612,7 +613,7 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
612
613
require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .Equals (0 ), []string {"cortex_prometheus_rule_group_rules" }, e2e .SkipMissingMetrics ))
613
614
614
615
// Check that cortex_ruler_queries_total went up since last test.
615
- newTotalQueries , err := ruler .SumMetrics ([]string {"cortex_ruler_queries_total" })
616
+ newTotalQueries , err := ruler .SumMetrics ([]string {"cortex_ruler_queries_total" }, e2e . WithLabelMatchers ( matcher ) )
616
617
require .NoError (t , err )
617
618
require .Greater (t , newTotalQueries [0 ], totalQueries [0 ])
618
619
@@ -637,15 +638,119 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
637
638
require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .Equals (0 ), []string {"cortex_prometheus_rule_evaluation_failures_total" }, e2e .WithLabelMatchers (m ), e2e .WaitMissingMetrics ))
638
639
639
640
// Still no failures.
640
- sum , err := ruler .SumMetrics ([]string {"cortex_ruler_queries_failed_total" })
641
+ sum , err := ruler .SumMetrics ([]string {"cortex_ruler_queries_failed_total" }, e2e . WithLabelMatchers ( matcher ) )
641
642
require .NoError (t , err )
642
643
require .Equal (t , float64 (0 ), sum [0 ])
643
644
644
645
// Now let's stop ingester, and recheck metrics. This should increase cortex_ruler_queries_failed_total failures.
645
646
require .NoError (t , s .Stop (ingester ))
646
647
647
648
// We should start getting "real" failures now.
648
- require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .GreaterOrEqual (1 ), []string {"cortex_ruler_queries_failed_total" }))
649
+ require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .GreaterOrEqual (1 ), []string {"cortex_ruler_queries_failed_total" }, e2e .WithLabelMatchers (matcher )))
650
+ })
651
+ }
652
+
653
+ func TestRulerMetricsWhenIngesterFails (t * testing.T ) {
654
+ s , err := e2e .NewScenario (networkName )
655
+ require .NoError (t , err )
656
+ defer s .Close ()
657
+
658
+ // Start dependencies.
659
+ consul := e2edb .NewConsul ()
660
+ minio := e2edb .NewMinio (9000 , bucketName , rulestoreBucketName )
661
+ require .NoError (t , s .StartAndWaitReady (consul , minio ))
662
+
663
+ const blockRangePeriod = 2 * time .Second
664
+ // Configure the ruler.
665
+ flags := mergeFlags (
666
+ BlocksStorageFlags (),
667
+ RulerFlags (),
668
+ map [string ]string {
669
+ "-blocks-storage.tsdb.block-ranges-period" : blockRangePeriod .String (),
670
+ "-blocks-storage.tsdb.ship-interval" : "1s" ,
671
+ "-blocks-storage.bucket-store.sync-interval" : "1s" ,
672
+ "-blocks-storage.bucket-store.index-cache.backend" : tsdb .IndexCacheBackendInMemory ,
673
+ "-blocks-storage.tsdb.retention-period" : ((blockRangePeriod * 2 ) - 1 ).String (),
674
+
675
+ // Enable the bucket index so we can skip the initial bucket scan.
676
+ "-blocks-storage.bucket-store.bucket-index.enabled" : "false" ,
677
+ // Evaluate rules often, so that we don't need to wait for metrics to show up.
678
+ "-ruler.evaluation-interval" : "2s" ,
679
+ "-ruler.poll-interval" : "2s" ,
680
+ // No delay
681
+ "-ruler.evaluation-delay-duration" : "0" ,
682
+
683
+ // We run single ingester only, no replication.
684
+ "-distributor.replication-factor" : "1" ,
685
+
686
+ // Very low limit so that ruler hits it.
687
+ "-querier.max-fetched-chunks-per-query" : "15" ,
688
+ "-querier.query-store-after" : (1 * time .Second ).String (),
689
+ "-querier.query-ingesters-within" : (2 * time .Second ).String (),
690
+ },
691
+ )
692
+
693
+ const namespace = "test"
694
+ const user = "user"
695
+
696
+ storeGateway := e2ecortex .NewStoreGateway ("store-gateway-1" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
697
+
698
+ flags = mergeFlags (flags , map [string ]string {
699
+ "-querier.store-gateway-addresses" : storeGateway .NetworkGRPCEndpoint (),
700
+ })
701
+
702
+ distributor := e2ecortex .NewDistributor ("distributor" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
703
+ ruler := e2ecortex .NewRuler ("ruler" , consul .NetworkHTTPEndpoint (), flags , "" )
704
+ ingester := e2ecortex .NewIngester ("ingester" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
705
+ require .NoError (t , s .StartAndWaitReady (distributor , ingester , ruler , storeGateway ))
706
+
707
+ // Wait until both the distributor and ruler have updated the ring. The querier will also watch
708
+ // the store-gateway ring if blocks sharding is enabled.
709
+ require .NoError (t , distributor .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
710
+ require .NoError (t , ruler .WaitSumMetrics (e2e .Equals (1024 ), "cortex_ring_tokens_total" ))
711
+
712
+ c , err := e2ecortex .NewClient (distributor .HTTPEndpoint (), "" , "" , ruler .HTTPEndpoint (), user )
713
+ require .NoError (t , err )
714
+
715
+ matcher := labels .MustNewMatcher (labels .MatchEqual , "user" , user )
716
+ expression := "absent(sum_over_time(metric{}[2s] offset 1h))"
717
+
718
+ // Now let's upload a non-failing rule, and make sure that it works.
719
+ t .Run ("real_error" , func (t * testing.T ) {
720
+ const groupName = "good_rule"
721
+
722
+ var ruleEvalCount float64
723
+ ruleGroup := ruleGroupWithRule (groupName , "rule" , expression )
724
+ ruleGroup .Interval = 2
725
+ require .NoError (t , c .SetRuleGroup (ruleGroup , namespace ))
726
+ m := ruleGroupMatcher (user , namespace , groupName )
727
+
728
+ // Wait until ruler has loaded the group.
729
+ require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .Equals (1 ), []string {"cortex_prometheus_rule_group_rules" }, e2e .WithLabelMatchers (m ), e2e .WaitMissingMetrics ))
730
+
731
+ // Wait until rule group has tried to evaluate the rule, and succeeded.
732
+ ruleEvalCount ++
733
+ require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .GreaterOrEqual (ruleEvalCount ), []string {"cortex_prometheus_rule_evaluations_total" }, e2e .WithLabelMatchers (m ), e2e .WaitMissingMetrics ))
734
+ require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .Equals (0 ), []string {"cortex_prometheus_rule_evaluation_failures_total" }, e2e .WithLabelMatchers (m ), e2e .WaitMissingMetrics ))
735
+
736
+ require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .GreaterOrEqual (1 ), []string {"cortex_ruler_write_requests_total" }, e2e .WithLabelMatchers (matcher ), e2e .WaitMissingMetrics ))
737
+ require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .Equals (0 ), []string {"cortex_ruler_write_requests_failed_total" }, e2e .WithLabelMatchers (matcher ), e2e .WaitMissingMetrics ))
738
+
739
+ require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .Equals (0 ), []string {"cortex_ruler_queries_failed_total" }, e2e .WithLabelMatchers (matcher ), e2e .WaitMissingMetrics ))
740
+
741
+ // Wait until the TSDB head is compacted and shipped to the storage.
742
+ // The shipped block contains the 1st series, while the 2ns series in the head.
743
+ require .NoError (t , ingester .WaitSumMetrics (e2e .Equals (1 ), "cortex_ingester_shipper_uploads_total" ))
744
+
745
+ // Now let's stop ingester, and recheck metrics. This should increase cortex_ruler_write_requests_failed_total failures.
746
+ require .NoError (t , s .Stop (ingester ))
747
+ ruleEvalCount ++
748
+ require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .GreaterOrEqual (ruleEvalCount ), []string {"cortex_prometheus_rule_evaluations_total" }, e2e .WithLabelMatchers (m ), e2e .WaitMissingMetrics ))
749
+
750
+ require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .Equals (0 ), []string {"cortex_ruler_queries_failed_total" }, e2e .WithLabelMatchers (matcher ), e2e .WaitMissingMetrics ))
751
+ require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .GreaterOrEqual (2 ), []string {"cortex_ruler_write_requests_total" }, e2e .WithLabelMatchers (matcher ), e2e .WaitMissingMetrics ))
752
+
753
+ require .NoError (t , ruler .WaitSumMetricsWithOptions (e2e .GreaterOrEqual (1 ), []string {"cortex_ruler_write_requests_failed_total" }, e2e .WithLabelMatchers (matcher ), e2e .WaitMissingMetrics ))
649
754
})
650
755
}
651
756
0 commit comments