Skip to content

xds/clusterresolver: stop forwarding UpdateSubConnState calls #6526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 2 additions & 31 deletions xds/internal/balancer/clusterresolver/clusterresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

// Package clusterresolver contains the implementation of the
// xds_cluster_resolver_experimental LB policy which resolves endpoint addresses
// cluster_resolver_experimental LB policy which resolves endpoint addresses
// using a list of one or more discovery mechanisms.
package clusterresolver

Expand Down Expand Up @@ -150,13 +150,6 @@ type ccUpdate struct {
err error
}

// scUpdate wraps a subConn update received from gRPC. This is directly passed
// on to the child policy.
type scUpdate struct {
subConn balancer.SubConn
state balancer.SubConnState
}

type exitIdle struct{}

// clusterResolverBalancer resolves endpoint addresses using a list of one or
Expand Down Expand Up @@ -314,14 +307,6 @@ func (b *clusterResolverBalancer) run() {
switch update := u.(type) {
case *ccUpdate:
b.handleClientConnUpdate(update)
case *scUpdate:
// SubConn updates are simply handed over to the underlying
// child balancer.
if b.child == nil {
b.logger.Errorf("Received a SubConn update {%+v} with no child policy", update)
break
}
b.child.UpdateSubConnState(update.subConn, update.state)
case exitIdle:
if b.child == nil {
b.logger.Errorf("xds: received ExitIdle with no child balancer")
Expand Down Expand Up @@ -388,11 +373,7 @@ func (b *clusterResolverBalancer) ResolverError(err error) {

// UpdateSubConnState handles subConn updates from gRPC.
func (b *clusterResolverBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
if b.closed.HasFired() {
b.logger.Warningf("Received subConn update {%v, %v} after close", sc, state)
return
}
b.updateCh.Put(&scUpdate{subConn: sc, state: state})
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

// Close closes the cdsBalancer and the underlying child balancer.
Expand All @@ -419,13 +400,3 @@ type ccWrapper struct {
func (c *ccWrapper) ResolveNow(resolver.ResolveNowOptions) {
c.resourceWatcher.resolveNow()
}

func (c *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (sc balancer.SubConn, err error) {
if opts.StateListener == nil {
// If already set, just allow updates to be sent directly to the
// child's listener. Otherwise, we are responsible for forwarding the
// update we'll receive to the proper child.
opts.StateListener = func(state balancer.SubConnState) { c.b.UpdateSubConnState(sc, state) }
}
return c.ClientConn.NewSubConn(addrs, opts)
}
94 changes: 0 additions & 94 deletions xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/buffer"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
Expand Down Expand Up @@ -318,99 +317,6 @@ func (s) TestErrorFromParentLB_ResourceNotFound(t *testing.T) {
}
}

// testCCWrapper wraps a balancer.ClientConn and intercepts NewSubConn to make
// subConn state changes available to the test.
type testCCWrapper struct {
balancer.ClientConn
scStateCh *buffer.Unbounded
}

func (t *testCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (sc balancer.SubConn, err error) {
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) {
t.scStateCh.Put(state)
if oldListener != nil {
oldListener(state)
}
}
return t.ClientConn.NewSubConn(addrs, opts)
}

// Test verifies that SubConn state changes are propagated to the child policy
// by the cluster resolver LB policy.
func (s) TestSubConnStateChangePropagationToChildPolicy(t *testing.T) {
// Unregister the priority balancer builder for the duration of this test,
// and register a policy under the same name that makes SubConn state
// changes pushed to it available to the test.
priorityBuilder := balancer.Get(priority.Name)
internal.BalancerUnregister(priorityBuilder.Name())
var ccWrapper *testCCWrapper
stub.Register(priority.Name, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
ccWrapper = &testCCWrapper{
ClientConn: bd.ClientConn,
scStateCh: buffer.NewUnbounded(),
}
bd.Data = priorityBuilder.Build(ccWrapper, bd.BuildOptions)
},
ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return priorityBuilder.(balancer.ConfigParser).ParseConfig(lbCfg)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bal := bd.Data.(balancer.Balancer)
return bal.UpdateClientConnState(ccs)
},
Close: func(bd *stub.BalancerData) {
bal := bd.Data.(balancer.Balancer)
bal.Close()
},
})

defer balancer.Register(priorityBuilder)

managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()

server := stubserver.StartTestService(t, nil)
defer server.Stop()

// Configure cluster and endpoints resources in the management server.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()

client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

for {
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for child policy to see a READY SubConn")
case s := <-ccWrapper.scStateCh.Get():
ccWrapper.scStateCh.Load()
state := s.(balancer.SubConnState)
if state.ConnectivityState == connectivity.Ready {
return
}
}
}
}

// Test verifies that when the received Cluster resource contains outlier
// detection configuration, the LB config pushed to the child policy contains
// the appropriate configuration for the outlier detection LB policy.
Expand Down