Skip to content

Commit 29ea040

Browse files
committed
Add picker and cache unit tests for RLS Metrics
1 parent 8f920c6 commit 29ea040

File tree

3 files changed

+210
-1
lines changed

3 files changed

+210
-1
lines changed

balancer/rls/cache_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,3 +242,50 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) {
242242
t.Fatalf("unexpected diff in backoffState for cache entry after dataCache.resetBackoffState(): %s", diff)
243243
}
244244
}
245+
246+
var cacheEntriesMetricsTests = []*cacheEntry{
247+
{size: 3},
248+
{size: 3},
249+
{size: 3},
250+
{size: 3},
251+
{size: 3},
252+
}
253+
254+
func (s) TestDataCache_Metrics(t *testing.T) {
255+
tmr := stats.NewTestMetricsRecorder(t)
256+
dc := newDataCache(50, nil, tmr, "")
257+
258+
dc.updateRLSServerTarget("rls-server-target")
259+
for i, k := range cacheKeys {
260+
dc.addEntry(k, cacheEntriesMetricsTests[i])
261+
}
262+
263+
// 5 total entries, size 3 each, so should record 5 entries and 15
264+
// size.
265+
tmr.AssertDataForMetric("grpc.lb.rls.cache_entries", 5)
266+
tmr.AssertDataForMetric("grpc.lb.rls.cache_size", 15)
267+
268+
// Resize down the cache to 3 entries. This should scale down the cache to 3
269+
// entries with 3 bytes each, so should record 3 entries and 9 size.
270+
dc.resize(9)
271+
tmr.AssertDataForMetric("grpc.lb.rls.cache_entries", 3)
272+
tmr.AssertDataForMetric("grpc.lb.rls.cache_size", 9)
273+
274+
// Update an entry to have size 5. This should reflect in the size metrics,
275+
// which will increase by 2 to 11, while the number of cache entries should
276+
// stay same. This write is deterministic and writes to the last one.
277+
dc.updateEntrySize(cacheEntriesMetricsTests[4], 5)
278+
279+
defer func() {
280+
cacheEntriesMetricsTests[4].size = 3
281+
}()
282+
283+
tmr.AssertDataForMetric("grpc.lb.rls.cache_entries", 3)
284+
tmr.AssertDataForMetric("grpc.lb.rls.cache_size", 11)
285+
286+
// Delete this scaled up cache key. This should scale down the cache to 2
287+
// entries, and remove 5 size so cache size should be 6.
288+
dc.deleteAndCleanup(cacheKeys[4], cacheEntriesMetricsTests[4])
289+
tmr.AssertDataForMetric("grpc.lb.rls.cache_entries", 2)
290+
tmr.AssertDataForMetric("grpc.lb.rls.cache_size", 6)
291+
}

balancer/rls/picker_test.go

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ import (
2626
"time"
2727

2828
"google.golang.org/grpc"
29+
"google.golang.org/grpc/balancer"
2930
"google.golang.org/grpc/codes"
3031
"google.golang.org/grpc/credentials/insecure"
3132
"google.golang.org/grpc/internal/grpcsync"
3233
"google.golang.org/grpc/internal/stubserver"
3334
rlstest "google.golang.org/grpc/internal/testutils/rls"
35+
"google.golang.org/grpc/internal/testutils/stats"
3436
"google.golang.org/grpc/metadata"
3537
"google.golang.org/grpc/status"
3638
"google.golang.org/protobuf/types/known/durationpb"
@@ -246,6 +248,115 @@ func (s) TestPick_DataCacheMiss_PendingEntryExists(t *testing.T) {
246248
}
247249
}
248250

251+
// Test_RLSDefaultTargetPicksMetric tests the default target picks metric. It
252+
// configures an RLS Balancer which specifies to route to the default target in
253+
// the RLS Configuration, and makes an RPC on a Channel containing this RLS
254+
// Balancer. This test then asserts a default target picks metric is emitted,
255+
// and target pick or failed pick metric is not emitted.
256+
func (s) Test_RLSDefaultTargetPicksMetric(t *testing.T) {
257+
// Start an RLS server and set the throttler to always throttle requests.
258+
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
259+
overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())
260+
261+
// Build RLS service config with a default target.
262+
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
263+
defBackendCh, defBackendAddress := startBackend(t)
264+
rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
265+
266+
// Register a manual resolver and push the RLS service config through it.
267+
r := startManualResolverWithConfig(t, rlsConfig)
268+
269+
tmr := stats.NewTestMetricsRecorder(t)
270+
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
271+
if err != nil {
272+
t.Fatalf("grpc.Dial() failed: %v", err)
273+
}
274+
defer cc.Close()
275+
276+
// Make an RPC and ensure it gets routed to the default target.
277+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
278+
defer cancel()
279+
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
280+
281+
tmr.AssertDataForMetric("grpc.lb.rls.default_target_picks", 1)
282+
tmr.AssertNoDataForMetric("grpc.lb.rls.failed_picks")
283+
tmr.AssertNoDataForMetric("grpc.lb.rls.target_picks")
284+
}
285+
286+
// Test_RLSTargetPicksMetric tests the target picks metric. It configures an RLS
287+
// Balancer which specifies to route to a target through a RouteLookupResponse,
288+
// and makes an RPC on a Channel containing this RLS Balancer. This test then
289+
// asserts a target picks metric is emitted, and default target pick or failed
290+
// pick metric is not emitted.
291+
func (s) Test_RLSTargetPicksMetric(t *testing.T) {
292+
// Start an RLS server and set the throttler to never throttle requests.
293+
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
294+
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
295+
296+
// Build the RLS config without a default target.
297+
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
298+
299+
// Start a test backend, and setup the fake RLS server to return this as a
300+
// target in the RLS response.
301+
testBackendCh, testBackendAddress := startBackend(t)
302+
rlsServer.SetResponseCallback(func(context.Context, *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
303+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
304+
})
305+
306+
// Register a manual resolver and push the RLS service config through it.
307+
r := startManualResolverWithConfig(t, rlsConfig)
308+
309+
tmr := stats.NewTestMetricsRecorder(t)
310+
// Dial the backend.
311+
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
312+
if err != nil {
313+
t.Fatalf("grpc.Dial() failed: %v", err)
314+
}
315+
defer cc.Close()
316+
317+
// Make an RPC and ensure it gets routed to the test backend.
318+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
319+
defer cancel()
320+
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
321+
tmr.AssertDataForMetric("grpc.lb.rls.target_picks", 1)
322+
tmr.AssertNoDataForMetric("grpc.lb.rls.failed_picks")
323+
tmr.AssertNoDataForMetric("grpc.lb.rls.default_target_picks")
324+
}
325+
326+
// Test_RLSFailedPicksMetric tests the failed picks metric. It configures an RLS
327+
// Balancer to fail a pick with unavailable, and makes an RPC on a Channel
328+
// containing this RLS Balancer. This test then asserts a failed picks metric is
329+
// emitted, and default target pick or target pick metric is not emitted.
330+
func (s) Test_RLSFailedPicksMetric(t *testing.T) {
331+
// Start an RLS server and set the throttler to never throttle requests.
332+
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
333+
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
334+
335+
// Build an RLS config without a default target.
336+
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
337+
338+
// Register a manual resolver and push the RLS service config through it.
339+
r := startManualResolverWithConfig(t, rlsConfig)
340+
341+
tmr := stats.NewTestMetricsRecorder(t)
342+
// Dial the backend.
343+
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
344+
if err != nil {
345+
t.Fatalf("grpc.Dial() failed: %v", err)
346+
}
347+
defer cc.Close()
348+
349+
// Make an RPC and expect it to fail with deadline exceeded error. We use a
350+
// smaller timeout to ensure that the test doesn't run very long.
351+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
352+
defer cancel()
353+
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errors.New("RLS response's target list does not contain any entries for key"))
354+
355+
tmr.AssertDataForMetric("grpc.lb.rls.failed_picks", 1)
356+
tmr.AssertNoDataForMetric("grpc.lb.rls.target_picks")
357+
tmr.AssertNoDataForMetric("grpc.lb.rls.default_target_picks")
358+
}
359+
249360
// Test verifies the scenario where there is a matching entry in the data cache
250361
// which is valid and there is no pending request. The pick is expected to be
251362
// delegated to the child policy.
@@ -256,7 +367,6 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry(t *testing.T) {
256367

257368
// Build the RLS config without a default target.
258369
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
259-
260370
// Start a test backend, and setup the fake RLS server to return this as a
261371
// target in the RLS response.
262372
testBackendCh, testBackendAddress := startBackend(t)
@@ -881,3 +991,41 @@ func TestIsFullMethodNameValid(t *testing.T) {
881991
})
882992
}
883993
}
994+
995+
// Tests the conversion of the child pickers error to the pick result attribute.
996+
func (s) TestChildPickResultError(t *testing.T) {
997+
tests := []struct {
998+
name string
999+
err error
1000+
want string
1001+
}{
1002+
{
1003+
name: "nil",
1004+
err: nil,
1005+
want: "complete",
1006+
},
1007+
{
1008+
name: "errNoSubConnAvailable",
1009+
err: balancer.ErrNoSubConnAvailable,
1010+
want: "queue",
1011+
},
1012+
{
1013+
name: "status error",
1014+
err: status.Error(codes.Unimplemented, "unimplemented"),
1015+
want: "drop",
1016+
},
1017+
{
1018+
name: "other error",
1019+
err: errors.New("some error"),
1020+
want: "fail",
1021+
},
1022+
}
1023+
1024+
for _, test := range tests {
1025+
t.Run(test.name, func(t *testing.T) {
1026+
if got := errToPickResult(test.err); got != test.want {
1027+
t.Fatalf("errToPickResult(%q) = %v, want %v", test.err, got, test.want)
1028+
}
1029+
})
1030+
}
1031+
}

internal/testutils/stats/test_metrics_recorder.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ func (r *TestMetricsRecorder) AssertDataForMetric(metricName string, wantVal flo
7575
}
7676
}
7777

78+
// AssertNoDataForMetric asserts no data is present for metric.
79+
func (r *TestMetricsRecorder) AssertNoDataForMetric(metricName string) {
80+
r.mu.Lock()
81+
defer r.mu.Unlock()
82+
if _, ok := r.data[estats.Metric(metricName)]; ok {
83+
r.t.Fatalf("Data is present for metric %v", metricName)
84+
}
85+
}
86+
7887
// PollForDataForMetric polls the metric data for the want. Fails if context
7988
// provided expires before data for metric is found.
8089
func (r *TestMetricsRecorder) PollForDataForMetric(ctx context.Context, metricName string, wantVal float64) {
@@ -121,6 +130,7 @@ func (r *TestMetricsRecorder) WaitForInt64Count(ctx context.Context, metricsData
121130
}
122131

123132
func (r *TestMetricsRecorder) RecordInt64Count(handle *estats.Int64CountHandle, incr int64, labels ...string) {
133+
r.intCountCh.ReceiveOrFail()
124134
r.intCountCh.Send(MetricsData{
125135
Handle: handle.Descriptor(),
126136
IntIncr: incr,
@@ -145,6 +155,7 @@ func (r *TestMetricsRecorder) WaitForFloat64Count(ctx context.Context, metricsDa
145155
}
146156

147157
func (r *TestMetricsRecorder) RecordFloat64Count(handle *estats.Float64CountHandle, incr float64, labels ...string) {
158+
r.floatCountCh.ReceiveOrFail()
148159
r.floatCountCh.Send(MetricsData{
149160
Handle: handle.Descriptor(),
150161
FloatIncr: incr,
@@ -169,6 +180,7 @@ func (r *TestMetricsRecorder) WaitForInt64Histo(ctx context.Context, metricsData
169180
}
170181

171182
func (r *TestMetricsRecorder) RecordInt64Histo(handle *estats.Int64HistoHandle, incr int64, labels ...string) {
183+
r.intHistoCh.ReceiveOrFail()
172184
r.intHistoCh.Send(MetricsData{
173185
Handle: handle.Descriptor(),
174186
IntIncr: incr,
@@ -193,6 +205,7 @@ func (r *TestMetricsRecorder) WaitForFloat64Histo(ctx context.Context, metricsDa
193205
}
194206

195207
func (r *TestMetricsRecorder) RecordFloat64Histo(handle *estats.Float64HistoHandle, incr float64, labels ...string) {
208+
r.floatHistoCh.ReceiveOrFail()
196209
r.floatHistoCh.Send(MetricsData{
197210
Handle: handle.Descriptor(),
198211
FloatIncr: incr,
@@ -217,6 +230,7 @@ func (r *TestMetricsRecorder) WaitForInt64Gauge(ctx context.Context, metricsData
217230
}
218231

219232
func (r *TestMetricsRecorder) RecordInt64Gauge(handle *estats.Int64GaugeHandle, incr int64, labels ...string) {
233+
r.intGaugeCh.ReceiveOrFail()
220234
r.intGaugeCh.Send(MetricsData{
221235
Handle: handle.Descriptor(),
222236
IntIncr: incr,

0 commit comments

Comments
 (0)