Skip to content

Commit 2059c6e

Browse files
my4-deveaswars
andauthored
grpc: report connectivity state changes on the ClientConn for Subscribers (#6437)
Co-authored-by: Easwar Swaminathan <easwars@google.com>
1 parent 4832deb commit 2059c6e

File tree

3 files changed

+139
-2
lines changed

3 files changed

+139
-2
lines changed

clientconn.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"google.golang.org/grpc/codes"
3535
"google.golang.org/grpc/connectivity"
3636
"google.golang.org/grpc/credentials"
37+
"google.golang.org/grpc/internal"
3738
"google.golang.org/grpc/internal/backoff"
3839
"google.golang.org/grpc/internal/channelz"
3940
"google.golang.org/grpc/internal/grpcsync"
@@ -136,7 +137,6 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires
136137
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
137138
cc := &ClientConn{
138139
target: target,
139-
csMgr: &connectivityStateManager{},
140140
conns: make(map[*addrConn]struct{}),
141141
dopts: defaultDialOptions(),
142142
czData: new(channelzData),
@@ -189,6 +189,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
189189
// Register ClientConn with channelz.
190190
cc.channelzRegistration(target)
191191

192+
cc.csMgr = newConnectivityStateManager(cc.channelzID)
193+
192194
if err := cc.validateTransportCredentials(); err != nil {
193195
return nil, err
194196
}
@@ -473,7 +475,6 @@ func (cc *ClientConn) validateTransportCredentials() error {
473475
func (cc *ClientConn) channelzRegistration(target string) {
474476
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
475477
cc.addTraceEvent("created")
476-
cc.csMgr.channelzID = cc.channelzID
477478
}
478479

479480
// chainUnaryClientInterceptors chains all unary client interceptors into one.
@@ -538,13 +539,27 @@ func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStr
538539
}
539540
}
540541

542+
// newConnectivityStateManager creates an connectivityStateManager with
543+
// the specified id.
544+
func newConnectivityStateManager(id *channelz.Identifier) *connectivityStateManager {
545+
return &connectivityStateManager{
546+
channelzID: id,
547+
pubSub: grpcsync.NewPubSub(),
548+
}
549+
}
550+
541551
// connectivityStateManager keeps the connectivity.State of ClientConn.
542552
// This struct will eventually be exported so the balancers can access it.
553+
//
554+
// TODO: If possible, get rid of the `connectivityStateManager` type, and
555+
// provide this functionality using the `PubSub`, to avoid keeping track of
556+
// the connectivity state at two places.
543557
type connectivityStateManager struct {
544558
mu sync.Mutex
545559
state connectivity.State
546560
notifyChan chan struct{}
547561
channelzID *channelz.Identifier
562+
pubSub *grpcsync.PubSub
548563
}
549564

550565
// updateState updates the connectivity.State of ClientConn.
@@ -560,6 +575,8 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {
560575
return
561576
}
562577
csm.state = state
578+
csm.pubSub.Publish(state)
579+
563580
channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
564581
if csm.notifyChan != nil {
565582
// There are other goroutines waiting on this channel.
@@ -583,6 +600,10 @@ func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
583600
return csm.notifyChan
584601
}
585602

603+
func (csm *connectivityStateManager) close() {
604+
csm.pubSub.Stop()
605+
}
606+
586607
// ClientConnInterface defines the functions clients need to perform unary and
587608
// streaming RPCs. It is implemented by *ClientConn, and is only intended to
588609
// be referenced by generated code.
@@ -771,6 +792,10 @@ func init() {
771792
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
772793
}
773794
emptyServiceConfig = cfg.Config.(*ServiceConfig)
795+
796+
internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {
797+
return cc.csMgr.pubSub.Subscribe(s)
798+
}
774799
}
775800

776801
func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
@@ -1224,6 +1249,7 @@ func (cc *ClientConn) Close() error {
12241249
conns := cc.conns
12251250
cc.conns = nil
12261251
cc.csMgr.updateState(connectivity.Shutdown)
1252+
cc.csMgr.close()
12271253

12281254
pWrapper := cc.blockingpicker
12291255
rWrapper := cc.resolverWrapper

internal/internal.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ var (
122122
// deleted or changed.
123123
BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption
124124

125+
// SubscribeToConnectivityStateChanges adds a grpcsync.Subscriber to a provided grpc.ClientConn
126+
SubscribeToConnectivityStateChanges interface{} // func(*grpc.ClientConn, grpcsync.Subscriber)
127+
125128
// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
126129
// the provided xds bootstrap config instead of the global configuration from
127130
// the supported environment variables. The resolver.Builder is meant to be
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
*
3+
* Copyright 2023 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package test
20+
21+
import (
22+
"context"
23+
"testing"
24+
25+
"google.golang.org/grpc"
26+
"google.golang.org/grpc/connectivity"
27+
"google.golang.org/grpc/credentials/insecure"
28+
"google.golang.org/grpc/internal"
29+
"google.golang.org/grpc/internal/grpcsync"
30+
"google.golang.org/grpc/internal/stubserver"
31+
"google.golang.org/grpc/resolver"
32+
"google.golang.org/grpc/resolver/manual"
33+
)
34+
35+
type testSubscriber struct {
36+
onMsgCh chan connectivity.State
37+
}
38+
39+
func newTestSubscriber() *testSubscriber {
40+
return &testSubscriber{onMsgCh: make(chan connectivity.State, 1)}
41+
}
42+
43+
func (ts *testSubscriber) OnMessage(msg interface{}) {
44+
select {
45+
case ts.onMsgCh <- msg.(connectivity.State):
46+
default:
47+
}
48+
}
49+
50+
func (s) TestConnectivityStateUpdates(t *testing.T) {
51+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
52+
defer cancel()
53+
54+
// Create a ClientConn with a short idle_timeout.
55+
r := manual.NewBuilderWithScheme("whatever")
56+
dopts := []grpc.DialOption{
57+
grpc.WithTransportCredentials(insecure.NewCredentials()),
58+
grpc.WithResolvers(r),
59+
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
60+
}
61+
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
62+
if err != nil {
63+
t.Fatalf("grpc.Dial() failed: %v", err)
64+
}
65+
t.Cleanup(func() { cc.Close() })
66+
67+
s := newTestSubscriber()
68+
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s)
69+
70+
backend := stubserver.StartTestService(t, nil)
71+
t.Cleanup(backend.Stop)
72+
73+
wantStates := []connectivity.State{
74+
connectivity.Connecting,
75+
connectivity.Ready,
76+
connectivity.Idle,
77+
connectivity.Shutdown,
78+
}
79+
80+
doneCh := make(chan struct{})
81+
go func() {
82+
defer close(doneCh)
83+
for _, wantState := range wantStates {
84+
select {
85+
case gotState := <-s.onMsgCh:
86+
if gotState != wantState {
87+
t.Errorf("Received unexpected state: %q; want: %q", gotState, wantState)
88+
}
89+
case <-ctx.Done():
90+
t.Error("Timeout when expecting the onMessage() callback to be invoked")
91+
}
92+
if t.Failed() {
93+
break
94+
}
95+
}
96+
}()
97+
98+
// Verify that the ClientConn moves to READY.
99+
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
100+
101+
// Verify that the ClientConn moves to IDLE as there is no activity.
102+
awaitState(ctx, t, cc, connectivity.Idle)
103+
104+
cc.Close()
105+
awaitState(ctx, t, cc, connectivity.Shutdown)
106+
107+
<-doneCh
108+
}

0 commit comments

Comments
 (0)