-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xds/ringhash: make reconnection logic work for a single subConn #5601
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
/* | ||
* | ||
* Copyright 2022 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
package ringhash_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/connectivity" | ||
"google.golang.org/grpc/credentials/insecure" | ||
"google.golang.org/grpc/internal/grpctest" | ||
"google.golang.org/grpc/internal/testutils" | ||
"google.golang.org/grpc/resolver" | ||
"google.golang.org/grpc/resolver/manual" | ||
|
||
testgrpc "google.golang.org/grpc/test/grpc_testing" | ||
testpb "google.golang.org/grpc/test/grpc_testing" | ||
|
||
_ "google.golang.org/grpc/xds/internal/balancer/ringhash" // Register the ring_hash_experimental LB policy. | ||
) | ||
|
||
type s struct { | ||
grpctest.Tester | ||
} | ||
|
||
func Test(t *testing.T) { | ||
grpctest.RunSubTests(t, s{}) | ||
} | ||
|
||
const ( | ||
defaultTestTimeout = 10 * time.Second | ||
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. | ||
) | ||
|
||
type testService struct { | ||
testpb.TestServiceServer | ||
} | ||
|
||
func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) { | ||
return &testpb.Empty{}, nil | ||
} | ||
|
||
// TestRingHash_ReconnectToMoveOutOfTransientFailure tests the case where the | ||
// ring contains a single subConn, and verifies that when the server goes down, | ||
// the LB policy on the client automatically reconnects until the subChannel | ||
// moves out of TRANSIENT_FAILURE. | ||
func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) { | ||
// Create a restartable listener to simulate server being down. | ||
l, err := testutils.LocalTCPListener() | ||
if err != nil { | ||
t.Fatalf("testutils.LocalTCPListener() failed: %v", err) | ||
} | ||
lis := testutils.NewRestartableListener(l) | ||
|
||
// Start a server backend exposing the test service. | ||
server := grpc.NewServer() | ||
defer server.Stop() | ||
testgrpc.RegisterTestServiceServer(server, &testService{}) | ||
go func() { | ||
if err := server.Serve(lis); err != nil { | ||
t.Errorf("Serve() failed: %v", err) | ||
} | ||
}() | ||
|
||
// Create a clientConn with a manual resolver (which is used to push the | ||
// address of the test backend), and a default service config pointing to | ||
// the use of the ring_hash_experimental LB policy. | ||
const ringHashServiceConfig = `{"loadBalancingConfig": [{"ring_hash_experimental":{}}]}` | ||
r := manual.NewBuilderWithScheme("whatever") | ||
dopts := []grpc.DialOption{ | ||
grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
grpc.WithResolvers(r), | ||
grpc.WithDefaultServiceConfig(ringHashServiceConfig), | ||
} | ||
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) | ||
if err != nil { | ||
t.Fatalf("failed to dial local test server: %v", err) | ||
} | ||
defer cc.Close() | ||
|
||
// Push the address of the test backend through the manual resolver. | ||
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
client := testgrpc.NewTestServiceClient(cc) | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { | ||
t.Fatalf("rpc EmptyCall() failed: %v", err) | ||
} | ||
|
||
// Stopping the server listener will close the transport on the client, | ||
// which will lead to the channel eventually moving to IDLE. The ring_hash | ||
// LB policy is not expected to reconnect by itself at this point. | ||
lis.Stop() | ||
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { | ||
if cc.GetState() == connectivity.Idle { | ||
break | ||
} | ||
} | ||
if err := ctx.Err(); err != nil { | ||
t.Fatalf("Timeout waiting for channel to reach %q after server shutdown: %v", connectivity.Idle, err) | ||
} | ||
|
||
// Make an RPC to get the ring_hash LB policy to reconnect and thereby move | ||
// to TRANSIENT_FAILURE upon connection failure. | ||
client.EmptyCall(ctx, &testpb.Empty{}) | ||
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { | ||
if cc.GetState() == connectivity.TransientFailure { | ||
break | ||
} | ||
} | ||
if err := ctx.Err(); err != nil { | ||
t.Fatalf("Timeout waiting for channel to reach %q after server shutdown: %v", connectivity.TransientFailure, err) | ||
} | ||
|
||
// An RPC at this point is expected to fail. | ||
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); err == nil { | ||
t.Fatal("EmptyCall RPC succeeded when the channel is in TRANSIENT_FAILURE") | ||
} | ||
|
||
// Restart the server listener. The ring_hash LB polcy is expected to | ||
// attempt to reconnect on its own and come out of TRANSIENT_FAILURE, even | ||
// without an RPC attempt. | ||
lis.Restart() | ||
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { | ||
if cc.GetState() == connectivity.Ready { | ||
break | ||
} | ||
} | ||
if err := ctx.Err(); err != nil { | ||
t.Fatalf("Timeout waiting for channel to reach READT after server restart: %v", err) | ||
} | ||
|
||
// An RPC at this point is expected to fail. | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { | ||
t.Fatalf("rpc EmptyCall() failed: %v", err) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,6 +68,7 @@ type subConn struct { | |
addr string | ||
weight uint32 | ||
sc balancer.SubConn | ||
logger *grpclog.PrefixLogger | ||
|
||
mu sync.RWMutex | ||
// This is the actual state of this SubConn (as updated by the ClientConn). | ||
|
@@ -117,6 +118,7 @@ func (sc *subConn) setState(s connectivity.State) { | |
// Trigger Connect() if new state is Idle, and there is a queued connect. | ||
if sc.connectQueued { | ||
sc.connectQueued = false | ||
sc.logger.Infof("Executing a queued connect for subConn moving to state: %v", sc.state) | ||
sc.sc.Connect() | ||
} else { | ||
sc.attemptingToConnect = false | ||
|
@@ -161,11 +163,13 @@ func (sc *subConn) queueConnect() { | |
defer sc.mu.Unlock() | ||
sc.attemptingToConnect = true | ||
if sc.state == connectivity.Idle { | ||
sc.logger.Infof("Executing a queued connect for subConn in state: %v", sc.state) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logging the exact same text here could make it hard to distinguish which operation is occurring. Did we just queue the connect attempt, or was it queued when the subconn went IDLE? Maybe this distinction is unimportant? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The connection attempt is queued (by calling But this I think the distinction is not important here, whether the connection attempt happened right when it was queued or when it eventually moved to IDLE. What do you think? |
||
sc.sc.Connect() | ||
return | ||
} | ||
// Queue this connect, and when this SubConn switches back to Idle (happens | ||
// after backoff in TransientFailure), it will Connect(). | ||
sc.logger.Infof("Queueing a connect for subConn in state: %v", sc.state) | ||
sc.connectQueued = true | ||
} | ||
|
||
|
@@ -216,10 +220,11 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { | |
if val, ok := b.subConns.Get(addr); !ok { | ||
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true}) | ||
if err != nil { | ||
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) | ||
b.logger.Warningf("Failed to create new SubConn: %v", err) | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
continue | ||
} | ||
scs := &subConn{addr: addr.Addr, weight: newWeight, sc: sc} | ||
scs.logger = subConnPrefixLogger(b, scs) | ||
scs.setState(connectivity.Idle) | ||
b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle) | ||
b.subConns.Set(addr, scs) | ||
|
@@ -328,49 +333,37 @@ func (b *ringhashBalancer) ResolverError(err error) { | |
// for some RPCs. | ||
func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { | ||
s := state.ConnectivityState | ||
b.logger.Infof("handle SubConn state change: %p, %v", sc, s) | ||
b.logger.Infof("Handle SubConn state change: %p, %v", sc, s) | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
scs, ok := b.scStates[sc] | ||
if !ok { | ||
b.logger.Infof("got state changes for an unknown SubConn: %p, %v", sc, s) | ||
b.logger.Infof("Received state change for an unknown SubConn: %p, %v", sc, s) | ||
return | ||
} | ||
oldSCState := scs.effectiveState() | ||
scs.setState(s) | ||
newSCState := scs.effectiveState() | ||
|
||
var sendUpdate bool | ||
oldBalancerState := b.state | ||
b.logger.Infof("SubConn's effective old state was: %v, new state is %v", oldSCState, newSCState) | ||
b.state = b.csEvltr.recordTransition(oldSCState, newSCState) | ||
if oldBalancerState != b.state { | ||
sendUpdate = true | ||
} | ||
|
||
switch s { | ||
case connectivity.Idle: | ||
// No need to send an update. No queued RPC can be unblocked. If the | ||
// overall state changed because of this, sendUpdate is already true. | ||
case connectivity.Connecting: | ||
// No need to send an update. No queued RPC can be unblocked. If the | ||
// overall state changed because of this, sendUpdate is already true. | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
case connectivity.Ready: | ||
// Resend the picker, there's no need to regenerate the picker because | ||
// the ring didn't change. | ||
sendUpdate = true | ||
// We need to regenerate the picker even if the ring has not changed | ||
// because we could be moving from TRANSIENT_FAILURE to READY, in which | ||
// case, we need to update the error picker returned earlier. | ||
b.regeneratePicker() | ||
case connectivity.TransientFailure: | ||
// Save error to be reported via picker. | ||
b.connErr = state.ConnectionError | ||
// Regenerate picker to update error message. | ||
b.regeneratePicker() | ||
sendUpdate = true | ||
case connectivity.Shutdown: | ||
// When an address was removed by resolver, b called RemoveSubConn but | ||
// kept the sc's state in scStates. Remove state for this sc here. | ||
delete(b.scStates, sc) | ||
} | ||
|
||
if sendUpdate { | ||
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) | ||
} | ||
// Send a picker update unconditionally. | ||
b.logger.Infof("Pushing new state %v and picker %p", b.state, b.picker) | ||
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) | ||
|
||
switch b.state { | ||
case connectivity.Connecting, connectivity.TransientFailure: | ||
|
@@ -399,7 +392,14 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance | |
sc := nextSkippingDuplicatesSubConn(b.ring, scs) | ||
if sc != nil { | ||
sc.queueConnect() | ||
return | ||
} | ||
// This handles the edge case where we have a single subConn in the | ||
// ring. nextSkippingDuplicatesSubCon() would have returned nil. We | ||
// still need to ensure that some subConn is attempting to connect, in | ||
// order to give the LB policy a chance to move out of | ||
// TRANSIENT_FAILURE. Hence, we try connecting on the current subConn. | ||
scs.queueConnect() | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code golf:
But why not use the connectivity state API? :
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.