Skip to content

Commit

Permalink
gracefulswitch: add ParseConfig and make UpdateClientConnState call S…
Browse files Browse the repository at this point in the history
…witchTo if needed (#7035)
  • Loading branch information
dfawley authored Mar 19, 2024
1 parent 800a8e0 commit faf9964
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 115 deletions.
5 changes: 3 additions & 2 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ var (
// an init() function), and is not thread-safe. If multiple Balancers are
// registered with the same name, the one registered last will take effect.
func Register(b Builder) {
if strings.ToLower(b.Name()) != b.Name() {
name := strings.ToLower(b.Name())
if name != b.Name() {
// TODO: Skip the use of strings.ToLower() to index the map after v1.59
// is released to switch to case sensitive balancer registry. Also,
// remove this warning and update the docstrings for Register and Get.
logger.Warningf("Balancer registered with name %q. grpc-go will be switching to case sensitive balancer registries soon", b.Name())
}
m[strings.ToLower(b.Name())] = b
m[name] = b
}

// unregisterForTesting deletes the balancer with the given name from the
Expand Down
2 changes: 1 addition & 1 deletion balancer/rls/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (s) TestParseConfigErrors(t *testing.T) {
"childPolicy": [{"grpclb": {"childPolicy": [{"pickfirst": {}}]}}],
"childPolicyConfigTargetFieldName": "serviceName"
}`),
wantErr: "invalid loadBalancingConfig: no supported policies found",
wantErr: "no supported policies found in config",
},
{
desc: "no child policy",
Expand Down
57 changes: 7 additions & 50 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package grpc
import (
"context"
"fmt"
"strings"
"sync"

"google.golang.org/grpc/balancer"
Expand Down Expand Up @@ -66,7 +65,8 @@ type ccBalancerWrapper struct {
}

// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
// underlying balancer is not created until the switchTo() method is invoked.
// underlying balancer is not created until the updateClientConnState() method
// is invoked.
func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
ctx, cancel := context.WithCancel(cc.ctx)
ccb := &ccBalancerWrapper{
Expand Down Expand Up @@ -97,6 +97,11 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
if ctx.Err() != nil || ccb.balancer == nil {
return
}
name := gracefulswitch.ChildName(ccs.BalancerConfig)
if ccb.curBalancerName != name {
ccb.curBalancerName = name
channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name)
}
err := ccb.balancer.UpdateClientConnState(*ccs)
if logger.V(2) && err != nil {
logger.Infof("error from balancer.UpdateClientConnState: %v", err)
Expand All @@ -120,54 +125,6 @@ func (ccb *ccBalancerWrapper) resolverError(err error) {
})
}

// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
// LB policy identified by name.
//
// ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the
// first good update from the name resolver, it determines the LB policy to use
// and invokes the switchTo() method. Upon receipt of every subsequent update
// from the name resolver, it invokes this method.
//
// the ccBalancerWrapper keeps track of the current LB policy name, and skips
// the graceful balancer switching process if the name does not change.
func (ccb *ccBalancerWrapper) switchTo(name string) {
ccb.serializer.Schedule(func(ctx context.Context) {
if ctx.Err() != nil || ccb.balancer == nil {
return
}
// TODO: Other languages use case-sensitive balancer registries. We should
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
if strings.EqualFold(ccb.curBalancerName, name) {
return
}
ccb.buildLoadBalancingPolicy(name)
})
}

// buildLoadBalancingPolicy performs the following:
// - retrieve a balancer builder for the given name. Use the default LB
// policy, pick_first, if no LB policy with name is found in the registry.
// - instruct the gracefulswitch balancer to switch to the above builder. This
// will actually build the new balancer.
// - update the `curBalancerName` field
//
// Must be called from a serializer callback.
func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) {
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, ccb.cc.channelz, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name)
}

if err := ccb.balancer.SwitchTo(builder); err != nil {
channelz.Errorf(logger, ccb.cc.channelz, "Channel failed to build new LB policy %q: %v", name, err)
return
}
ccb.curBalancerName = builder.Name()
}

// close initiates async shutdown of the wrapper. cc.mu must be held when
// calling this function. To determine the wrapper has finished shutting down,
// the channel should block on ccb.serializer.Done() without cc.mu held.
Expand Down
14 changes: 2 additions & 12 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
var emptyServiceConfig *ServiceConfig

func init() {
balancer.Register(pickfirstBuilder{})
cfg := parseServiceConfig("{}")
if cfg.Err != nil {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
Expand Down Expand Up @@ -777,7 +778,7 @@ func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error)

var balCfg serviceconfig.LoadBalancingConfig
if cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
balCfg = cc.sc.lbConfig
}
bw := cc.balancerWrapper
cc.mu.Unlock()
Expand Down Expand Up @@ -1074,17 +1075,6 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
} else {
cc.retryThrottler.Store((*retryThrottler)(nil))
}

var newBalancerName string
if cc.sc == nil || (cc.sc.lbConfig == nil && cc.sc.LB == nil) {
// No service config or no LB policy specified in config.
newBalancerName = PickFirstBalancerName
} else if cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else { // cc.sc.LB != nil
newBalancerName = *cc.sc.LB
}
cc.balancerWrapper.switchTo(newBalancerName)
}

func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
Expand Down
83 changes: 83 additions & 0 deletions internal/balancer/gracefulswitch/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package gracefulswitch

import (
"encoding/json"
"fmt"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/serviceconfig"
)

type lbConfig struct {
serviceconfig.LoadBalancingConfig

childBuilder balancer.Builder
childConfig serviceconfig.LoadBalancingConfig
}

func ChildName(l serviceconfig.LoadBalancingConfig) string {
return l.(*lbConfig).childBuilder.Name()
}

// ParseConfig parses a child config list and returns a LB config for the
// gracefulswitch Balancer.
//
// cfg is expected to be a json.RawMessage containing a JSON array of LB policy
// names + configs as the format of the "loadBalancingConfig" field in
// ServiceConfig. It returns a type that should be passed to
// UpdateClientConnState in the BalancerConfig field.
func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var lbCfg []map[string]json.RawMessage
if err := json.Unmarshal(cfg, &lbCfg); err != nil {
return nil, err
}
for i, e := range lbCfg {
if len(e) != 1 {
return nil, fmt.Errorf("expected a JSON struct with one entry; received entry %v at index %d", e, i)
}

var name string
var jsonCfg json.RawMessage
for name, jsonCfg = range e {
}

builder := balancer.Get(name)
if builder == nil {
// Skip unregistered balancer names.
continue
}

parser, ok := builder.(balancer.ConfigParser)
if !ok {
// This is a valid child with no config.
return &lbConfig{childBuilder: builder}, nil
}

cfg, err := parser.ParseConfig(jsonCfg)
if err != nil {
return nil, fmt.Errorf("error parsing config for policy %q: %v", name, err)
}

return &lbConfig{childBuilder: builder, childConfig: cfg}, nil
}

return nil, fmt.Errorf("no supported policies found in config: %v", string(cfg))
}
45 changes: 40 additions & 5 deletions internal/balancer/gracefulswitch/gracefulswitch.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,23 @@ func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {
// process is not complete when this method returns. This method must be called
// synchronously alongside the rest of the balancer.Balancer methods this
// Graceful Switch Balancer implements.
//
// Deprecated: use ParseConfig and pass a parsed config to UpdateClientConnState
// to cause the Balancer to automatically change to the new child when necessary.
func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
_, err := gsb.switchTo(builder)
return err
}

func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) {
gsb.mu.Lock()
if gsb.closed {
gsb.mu.Unlock()
return errBalancerClosed
return nil, errBalancerClosed
}
bw := &balancerWrapper{
gsb: gsb,
builder: builder,
gsb: gsb,
lastState: balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
Expand Down Expand Up @@ -129,7 +138,7 @@ func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
gsb.balancerCurrent = nil
}
gsb.mu.Unlock()
return balancer.ErrBadResolverState
return nil, balancer.ErrBadResolverState
}

// This write doesn't need to take gsb.mu because this field never gets read
Expand All @@ -138,7 +147,7 @@ func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
// bw.Balancer field will never be forwarded to until this SwitchTo()
// function returns.
bw.Balancer = newBalancer
return nil
return bw, nil
}

// Returns nil if the graceful switch balancer is closed.
Expand All @@ -152,12 +161,33 @@ func (gsb *Balancer) latestBalancer() *balancerWrapper {
}

// UpdateClientConnState forwards the update to the latest balancer created.
//
// If the state's BalancerConfig is the config returned by a call to
// gracefulswitch.ParseConfig, then this function will automatically SwitchTo
// the balancer indicated by the config before forwarding its config to it, if
// necessary.
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
// The resolver data is only relevant to the most recent LB Policy.
balToUpdate := gsb.latestBalancer()

gsbCfg, ok := state.BalancerConfig.(*lbConfig)
if ok {
// Switch to the child in the config unless it is already active.
if balToUpdate == nil || gsbCfg.childBuilder.Name() != balToUpdate.builder.Name() {
var err error
balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder)
if err != nil {
return fmt.Errorf("could not switch to new child balancer: %w", err)
}
}
// Unwrap the child balancer's config.
state.BalancerConfig = gsbCfg.childConfig
}

if balToUpdate == nil {
return errBalancerClosed
}

// Perform this call without gsb.mu to prevent deadlocks if the child calls
// back into the channel. The latest balancer can never be closed during a
// call from the channel, even without gsb.mu held.
Expand All @@ -169,6 +199,10 @@ func (gsb *Balancer) ResolverError(err error) {
// The resolver data is only relevant to the most recent LB Policy.
balToUpdate := gsb.latestBalancer()
if balToUpdate == nil {
gsb.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(err),
})
return
}
// Perform this call without gsb.mu to prevent deadlocks if the child calls
Expand Down Expand Up @@ -261,7 +295,8 @@ func (gsb *Balancer) Close() {
// graceful switch logic.
type balancerWrapper struct {
balancer.Balancer
gsb *Balancer
gsb *Balancer
builder balancer.Builder

lastState balancer.State
subconns map[balancer.SubConn]bool // subconns created by this balancer
Expand Down
14 changes: 3 additions & 11 deletions pickfirst.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,15 @@ const (
logPrefix = "[pick-first-lb %p] "
)

func newPickfirstBuilder() balancer.Builder {
return &pickfirstBuilder{}
}

type pickfirstBuilder struct{}

func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
func (pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
b := &pickfirstBalancer{cc: cc}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
return b
}

func (*pickfirstBuilder) Name() string {
func (pickfirstBuilder) Name() string {
return PickFirstBalancerName
}

Expand All @@ -63,7 +59,7 @@ type pfConfig struct {
ShuffleAddressList bool `json:"shuffleAddressList"`
}

func (*pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var cfg pfConfig
if err := json.Unmarshal(js, &cfg); err != nil {
return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
Expand Down Expand Up @@ -243,7 +239,3 @@ func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
i.subConn.Connect()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

func init() {
balancer.Register(newPickfirstBuilder())
}
Loading

0 comments on commit faf9964

Please sign in to comment.