-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Provide an internal-only API to report connectivity state changes on the ClientConn #6036
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
029f12c
f76b3b6
c1dd7c7
3f2439d
18ddebc
614d836
cc2528a
9136e61
4b58aa3
a86dba7
bc9f888
01ef50a
1168d06
0f2720b
30de38d
bd56e16
6be9c21
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,8 +35,10 @@ import ( | |
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/connectivity" | ||
"google.golang.org/grpc/credentials" | ||
"google.golang.org/grpc/internal" | ||
"google.golang.org/grpc/internal/backoff" | ||
"google.golang.org/grpc/internal/channelz" | ||
"google.golang.org/grpc/internal/connectivitystate" | ||
"google.golang.org/grpc/internal/grpcsync" | ||
iresolver "google.golang.org/grpc/internal/resolver" | ||
"google.golang.org/grpc/internal/transport" | ||
|
@@ -405,11 +407,16 @@ func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStr | |
|
||
// connectivityStateManager keeps the connectivity.State of ClientConn. | ||
// This struct will eventually be exported so the balancers can access it. | ||
// | ||
// TODO: If possible, get rid of the `connectivityStateManager` type, and | ||
// provide this functionality using the `Tracker`, to avoid keeping track of | ||
// the connectivity state at two places. | ||
type connectivityStateManager struct { | ||
mu sync.Mutex | ||
state connectivity.State | ||
notifyChan chan struct{} | ||
channelzID *channelz.Identifier | ||
mu sync.Mutex | ||
state connectivity.State | ||
notifyChan chan struct{} | ||
channelzID *channelz.Identifier | ||
connectivityStateTracker *connectivitystate.Tracker | ||
} | ||
|
||
// updateState updates the connectivity.State of ClientConn. | ||
|
@@ -425,6 +432,9 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) { | |
return | ||
} | ||
csm.state = state | ||
if csm.connectivityStateTracker != nil { | ||
csm.connectivityStateTracker.SetState(state) | ||
} | ||
channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state) | ||
if csm.notifyChan != nil { | ||
// There are other goroutines waiting on this channel. | ||
|
@@ -598,6 +608,13 @@ func init() { | |
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err)) | ||
} | ||
emptyServiceConfig = cfg.Config.(*ServiceConfig) | ||
|
||
internal.AddConnectivityStateWatcher = func(cc *ClientConn, w connectivitystate.Watcher) func() { | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if cc.csMgr.connectivityStateTracker == nil { | ||
cc.csMgr.connectivityStateTracker = connectivitystate.NewTracker(cc.csMgr.getState()) | ||
} | ||
return cc.csMgr.connectivityStateTracker.AddWatcher(w) | ||
} | ||
} | ||
|
||
func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { | ||
|
@@ -1031,6 +1048,13 @@ func (cc *ClientConn) ResetConnectBackoff() { | |
func (cc *ClientConn) Close() error { | ||
defer cc.cancel() | ||
|
||
cc.csMgr.mu.Lock() | ||
if cc.csMgr.connectivityStateTracker != nil { | ||
cc.csMgr.connectivityStateTracker.Stop() | ||
cc.csMgr.connectivityStateTracker = nil | ||
} | ||
cc.csMgr.mu.Unlock() | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've moved this code block here, added Lock() and changed to release resources of |
||
cc.mu.Lock() | ||
if cc.conns == nil { | ||
cc.mu.Unlock() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/* | ||
* | ||
* Copyright 2023 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
// Package connectivitystate provides functionality to report and track | ||
// connectivity state changes of ClientConns and SubConns. | ||
package connectivitystate | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
"google.golang.org/grpc/connectivity" | ||
"google.golang.org/grpc/internal/grpcsync" | ||
) | ||
|
||
// Watcher wraps the functionality to be implemented by components | ||
// interested in watching connectivity state changes. | ||
type Watcher interface { | ||
// OnStateChange is invoked to report connectivity state changes on the | ||
// entity being watched. | ||
OnStateChange(state connectivity.State) | ||
} | ||
|
||
// Tracker provides pubsub-like functionality for connectivity state changes. | ||
// | ||
// The entity whose connectivity state is being tracked publishes updates by | ||
// calling the SetState() method. | ||
// | ||
// Components interested in connectivity state updates of the tracked entity | ||
// subscribe to updates by calling the AddWatcher() method. | ||
type Tracker struct { | ||
cs *grpcsync.CallbackSerializer | ||
cancel context.CancelFunc | ||
|
||
// Access to the below fields are guarded by this mutex. | ||
mu sync.Mutex | ||
state connectivity.State | ||
watchers map[Watcher]bool | ||
stopped bool | ||
} | ||
|
||
// NewTracker returns a new Tracker instance initialized with the provided | ||
// connectivity state. | ||
func NewTracker(state connectivity.State) *Tracker { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
return &Tracker{ | ||
cs: grpcsync.NewCallbackSerializer(ctx), | ||
cancel: cancel, | ||
state: state, | ||
watchers: map[Watcher]bool{}, | ||
} | ||
} | ||
|
||
// AddWatcher adds the provided watcher to the set of watchers in Tracker. | ||
// The OnStateChange() callback will be invoked asynchronously with the current | ||
// state of the tracked entity to begin with, and subsequently for every state | ||
// change. | ||
// | ||
// Returns a function to remove the provided watcher from the set of watchers. | ||
// The caller of this method is responsible for invoking this function when it | ||
// no longer needs to monitor the connectivity state changes on the channel. | ||
func (t *Tracker) AddWatcher(watcher Watcher) func() { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
|
||
if t.stopped { | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return func() {} | ||
} | ||
|
||
t.watchers[watcher] = true | ||
|
||
state := t.state | ||
t.cs.Schedule(func(context.Context) { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
watcher.OnStateChange(state) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've fixed it from |
||
}) | ||
|
||
return func() { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
delete(t.watchers, watcher) | ||
} | ||
} | ||
|
||
// SetState updates the connectivity state of the entity being tracked, and | ||
// invokes the OnStateChange callback of all registered watchers. | ||
func (t *Tracker) SetState(state connectivity.State) { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
|
||
if t.stopped { | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
} | ||
t.state = state | ||
for watcher := range t.watchers { | ||
t.cs.Schedule(func(context.Context) { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
watcher.OnStateChange(state) | ||
}) | ||
} | ||
} | ||
|
||
// Stop shuts down the Tracker and releases any resources allocated by it. | ||
// It is guaranteed that no Watcher callbacks would be invoked once this | ||
// method returns. | ||
func (t *Tracker) Stop() { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
t.stopped = true | ||
|
||
t.cancel() | ||
} |
Uh oh!
There was an error while loading. Please reload this page.