Skip to content

Commit c2046f5

Browse files
committed
rls: update picker synchronously upon receipt of configuration update
1 parent 59954c8 commit c2046f5

File tree

2 files changed

+207
-5
lines changed

2 files changed

+207
-5
lines changed

balancer/rls/balancer.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,10 @@ var (
7474

7575
// Following functions are no-ops in actual code, but can be overridden in
7676
// tests to give tests visibility into exactly when certain events happen.
77-
clientConnUpdateHook = func() {}
78-
dataCachePurgeHook = func() {}
79-
resetBackoffHook = func() {}
77+
clientConnUpdateHook = func() {}
78+
dataCachePurgeHook = func() {}
79+
resetBackoffHook = func() {}
80+
entryWithValidBackoffEvicted = func() {}
8081
)
8182

8283
func init() {
@@ -297,13 +298,19 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
297298
if resizeCache {
298299
// If the new config changes reduces the size of the data cache, we
299300
// might have to evict entries to get the cache size down to the newly
300-
// specified size.
301+
// specified size. If we do evict an entry with valid backoff timer,
302+
// the new picker needs to be sent to the channel to re-process any
303+
// RPCs queued as a result of this backoff timer.
301304
//
302305
// And we cannot do this operation above (where we compute the
303306
// `resizeCache` boolean) because `cacheMu` needs to be grabbed before
304307
// `stateMu` if we are to hold both locks at the same time.
305308
b.cacheMu.Lock()
306-
b.dataCache.resize(newCfg.cacheSizeBytes)
309+
evicted := b.dataCache.resize(newCfg.cacheSizeBytes)
310+
if evicted {
311+
b.sendNewPickerLocked()
312+
entryWithValidBackoffEvicted()
313+
}
307314
b.cacheMu.Unlock()
308315
}
309316
return nil

balancer/rls/balancer_test.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,201 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
650650
verifyRLSRequest(t, rlsReqCh, true)
651651
}
652652

653+
// Test that when a data cache entry is evicted due to config change
654+
// in cache size, the picker is updated accordingly.
655+
func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
656+
// Create a restartable listener which can close existing connections.
657+
l, err := testutils.LocalTCPListener()
658+
if err != nil {
659+
t.Fatalf("net.Listen() failed: %v", err)
660+
}
661+
lis := testutils.NewRestartableListener(l)
662+
663+
// Override the clientConn update hook to get notified.
664+
clientConnUpdateDone := make(chan struct{}, 1)
665+
origClientConnUpdateHook := clientConnUpdateHook
666+
clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} }
667+
defer func() { clientConnUpdateHook = origClientConnUpdateHook }()
668+
669+
// Override the cache entry size func, and always return 1.
670+
origEntrySizeFunc := computeDataCacheEntrySize
671+
computeDataCacheEntrySize = func(cacheKey, *cacheEntry) int64 { return 1 }
672+
defer func() { computeDataCacheEntrySize = origEntrySizeFunc }()
673+
674+
// Override the backoff strategy to return a large backoff which
675+
// will make sure the date cache entry remains in backoff for the
676+
// duration of the test.
677+
origBackoffStrategy := defaultBackoffStrategy
678+
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
679+
defer func() { defaultBackoffStrategy = origBackoffStrategy }()
680+
681+
// Override the minEvictionDuration to ensure that when the config update
682+
// reduces the cache size, the resize operation is not stopped because
683+
// we find an entry whose minExpiryDuration has not elapsed.
684+
origMinEvictDuration := minEvictDuration
685+
minEvictDuration = time.Duration(0)
686+
defer func() { minEvictDuration = origMinEvictDuration }()
687+
688+
// Override the entryWithValidBackoffEvicted to ensure we update
689+
// picker when an entry with valid backoff time was evicted.
690+
backOffItemEvicted := make(chan struct{}, 1)
691+
origBackoffItemEvicted := entryWithValidBackoffEvicted
692+
entryWithValidBackoffEvicted = func() { backOffItemEvicted <- struct{}{} }
693+
defer func() { entryWithValidBackoffEvicted = origBackoffItemEvicted }()
694+
695+
// Register the top-level wrapping balancer which forwards calls to RLS.
696+
topLevelBalancerName := t.Name() + "top-level"
697+
var ccWrapper *testCCWrapper
698+
stub.Register(topLevelBalancerName, stub.BalancerFuncs{
699+
Init: func(bd *stub.BalancerData) {
700+
ccWrapper = &testCCWrapper{ClientConn: bd.ClientConn}
701+
bd.Data = balancer.Get(Name).Build(ccWrapper, bd.BuildOptions)
702+
},
703+
ParseConfig: func(sc json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
704+
parser := balancer.Get(Name).(balancer.ConfigParser)
705+
return parser.ParseConfig(sc)
706+
},
707+
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
708+
bal := bd.Data.(balancer.Balancer)
709+
return bal.UpdateClientConnState(ccs)
710+
},
711+
Close: func(bd *stub.BalancerData) {
712+
bal := bd.Data.(balancer.Balancer)
713+
bal.Close()
714+
},
715+
})
716+
717+
// Start an RLS server and set the throttler to never throttle requests.
718+
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis)
719+
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
720+
721+
// Register an LB policy to act as the child policy for RLS LB policy.
722+
childPolicyName := "test-child-policy" + t.Name()
723+
e2e.RegisterRLSChildPolicy(childPolicyName, nil)
724+
t.Logf("Registered child policy with name %q", childPolicyName)
725+
726+
// Start a couple of test backends, and set up the fake RLS server to return
727+
// these as targets in the RLS response, based on request keys.
728+
_, backendAddress1 := startBackend(t)
729+
backendCh2, backendAddress2 := startBackend(t)
730+
backendCh3, backendAddress3 := startBackend(t)
731+
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
732+
if req.KeyMap["k1"] == "v1" {
733+
return &rlstest.RouteLookupResponse{Err: errors.New("dummy error to set this entry in backoff"), Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
734+
}
735+
if req.KeyMap["k2"] == "v2" {
736+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
737+
}
738+
if req.KeyMap["k3"] == "v3" {
739+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress3}}}
740+
}
741+
return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
742+
})
743+
744+
// Register a manual resolver and push the RLS service config through it.
745+
r := manual.NewBuilderWithScheme("rls-e2e")
746+
headers := `
747+
[
748+
{
749+
"key": "k1",
750+
"names": [
751+
"n1"
752+
]
753+
},
754+
{
755+
"key": "k2",
756+
"names": [
757+
"n2"
758+
]
759+
},
760+
{
761+
"key": "k3",
762+
"names": [
763+
"n3"
764+
]
765+
}
766+
]
767+
`
768+
scJSON := fmt.Sprintf(`
769+
{
770+
"loadBalancingConfig": [
771+
{
772+
"%s": {
773+
"routeLookupConfig": {
774+
"grpcKeybuilders": [{
775+
"names": [{"service": "grpc.testing.TestService"}],
776+
"headers": %s
777+
}],
778+
"lookupService": "%s",
779+
"cacheSizeBytes": 1000
780+
},
781+
"childPolicy": [{"%s": {}}],
782+
"childPolicyConfigTargetFieldName": "Backend"
783+
}
784+
}
785+
]
786+
}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName)
787+
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
788+
r.InitialState(resolver.State{ServiceConfig: sc})
789+
790+
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
791+
if err != nil {
792+
t.Fatalf("create grpc.Dial() failed: %v", err)
793+
}
794+
defer cc.Close()
795+
796+
<-clientConnUpdateDone
797+
798+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
799+
defer cancel()
800+
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil)
801+
t.Logf("Verifying if RPC failed when listener is stopped.")
802+
803+
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n2", "v2")
804+
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2)
805+
verifyRLSRequest(t, rlsReqCh, true)
806+
807+
ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n3", "v3")
808+
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh3)
809+
verifyRLSRequest(t, rlsReqCh, true)
810+
811+
// Setting the size to 1 will cause the entries to be
812+
// evicted.
813+
scJSON1 := fmt.Sprintf(`
814+
{
815+
"loadBalancingConfig": [
816+
{
817+
"%s": {
818+
"routeLookupConfig": {
819+
"grpcKeybuilders": [{
820+
"names": [{"service": "grpc.testing.TestService"}],
821+
"headers": %s
822+
}],
823+
"lookupService": "%s",
824+
"cacheSizeBytes": 1
825+
},
826+
"childPolicy": [{"%s": {}}],
827+
"childPolicyConfigTargetFieldName": "Backend"
828+
}
829+
}
830+
]
831+
}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName)
832+
sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1)
833+
r.UpdateState(resolver.State{ServiceConfig: sc1})
834+
<-clientConnUpdateDone
835+
836+
// Stop the listener
837+
lis.Stop()
838+
839+
select {
840+
// Wait for backOffItemEvicted to ensure picker was updated
841+
// synchronously when there was cache resize on config update.
842+
case <-backOffItemEvicted:
843+
case <-ctx.Done():
844+
t.Error("Error sending picker update on eviction of cache entry with valid backoff: context timed out.")
845+
}
846+
}
847+
653848
// TestDataCachePurging verifies that the LB policy periodically evicts expired
654849
// entries from the data cache.
655850
func (s) TestDataCachePurging(t *testing.T) {

0 commit comments

Comments
 (0)