@@ -2051,100 +2051,6 @@ func TestIngester_QueryStream(t *testing.T) {
2051
2051
t .Run ("chunks" , chunksTest )
2052
2052
}
2053
2053
2054
- func TestIngester_QueryStreamManySamples (t * testing.T ) {
2055
- // Create ingester.
2056
- i , err := prepareIngesterWithBlocksStorage (t , defaultIngesterTestConfig (t ), nil )
2057
- require .NoError (t , err )
2058
- require .NoError (t , services .StartAndAwaitRunning (context .Background (), i ))
2059
- defer services .StopAndAwaitTerminated (context .Background (), i ) //nolint:errcheck
2060
-
2061
- // Wait until it's ACTIVE.
2062
- test .Poll (t , 1 * time .Second , ring .ACTIVE , func () interface {} {
2063
- return i .lifecycler .GetState ()
2064
- })
2065
-
2066
- // Push series.
2067
- ctx := user .InjectOrgID (context .Background (), userID )
2068
-
2069
- const samplesCount = 100000
2070
- samples := make ([]cortexpb.Sample , 0 , samplesCount )
2071
-
2072
- for i := 0 ; i < samplesCount ; i ++ {
2073
- samples = append (samples , cortexpb.Sample {
2074
- Value : float64 (i ),
2075
- TimestampMs : int64 (i ),
2076
- })
2077
- }
2078
-
2079
- // 10k samples encode to around 140 KiB,
2080
- _ , err = i .Push (ctx , writeRequestSingleSeries (labels.Labels {{Name : labels .MetricName , Value : "foo" }, {Name : "l" , Value : "1" }}, samples [0 :10000 ]))
2081
- require .NoError (t , err )
2082
-
2083
- // 100k samples encode to around 1.4 MiB,
2084
- _ , err = i .Push (ctx , writeRequestSingleSeries (labels.Labels {{Name : labels .MetricName , Value : "foo" }, {Name : "l" , Value : "2" }}, samples ))
2085
- require .NoError (t , err )
2086
-
2087
- // 50k samples encode to around 716 KiB,
2088
- _ , err = i .Push (ctx , writeRequestSingleSeries (labels.Labels {{Name : labels .MetricName , Value : "foo" }, {Name : "l" , Value : "3" }}, samples [0 :50000 ]))
2089
- require .NoError (t , err )
2090
-
2091
- // Create a GRPC server used to query back the data.
2092
- serv := grpc .NewServer (grpc .StreamInterceptor (middleware .StreamServerUserHeaderInterceptor ))
2093
- defer serv .GracefulStop ()
2094
- client .RegisterIngesterServer (serv , i )
2095
-
2096
- listener , err := net .Listen ("tcp" , "localhost:0" )
2097
- require .NoError (t , err )
2098
-
2099
- go func () {
2100
- require .NoError (t , serv .Serve (listener ))
2101
- }()
2102
-
2103
- // Query back the series using GRPC streaming.
2104
- c , err := client .MakeIngesterClient (listener .Addr ().String (), defaultClientTestConfig ())
2105
- require .NoError (t , err )
2106
- defer c .Close ()
2107
-
2108
- s , err := c .QueryStream (ctx , & client.QueryRequest {
2109
- StartTimestampMs : 0 ,
2110
- EndTimestampMs : samplesCount + 1 ,
2111
-
2112
- Matchers : []* client.LabelMatcher {{
2113
- Type : client .EQUAL ,
2114
- Name : model .MetricNameLabel ,
2115
- Value : "foo" ,
2116
- }},
2117
- })
2118
- require .NoError (t , err )
2119
-
2120
- recvMsgs := 0
2121
- series := 0
2122
- totalSamples := 0
2123
-
2124
- for {
2125
- resp , err := s .Recv ()
2126
- if err == io .EOF {
2127
- break
2128
- }
2129
- require .NoError (t , err )
2130
- require .True (t , len (resp .Timeseries ) > 0 ) // No empty messages.
2131
-
2132
- recvMsgs ++
2133
- series += len (resp .Timeseries )
2134
-
2135
- for _ , ts := range resp .Timeseries {
2136
- totalSamples += len (ts .Samples )
2137
- }
2138
- }
2139
-
2140
- // As ingester doesn't guarantee sorting of series, we can get 2 (10k + 50k in first, 100k in second)
2141
- // or 3 messages (small series first, 100k second, small series last).
2142
-
2143
- require .True (t , 2 <= recvMsgs && recvMsgs <= 3 )
2144
- require .Equal (t , 3 , series )
2145
- require .Equal (t , 10000 + 50000 + samplesCount , totalSamples )
2146
- }
2147
-
2148
2054
func TestIngester_QueryStreamManySamplesChunks (t * testing.T ) {
2149
2055
// Create ingester.
2150
2056
cfg := defaultIngesterTestConfig (t )
0 commit comments