-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Provide an internal-only API to report connectivity state changes on the ClientConn #6036
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@my4-dev : Thanks for taking a stab at this one. I feel that the changes in this PR don't quite add up to what I had imagined in #5818. Let me give an overview of what I had in mind. I'm not too opposed to your package package connectivitystate
// Watcher is the interface to be implemented by components which wish to receive
// connectivity state updates from the Tracker.
type Watcher interface {
// Implementations must not block.
OnStateChange(state connectivity.State)
}
// Tracker is equivalent to the Publisher type that you have. But it does more.
type Tracker struct {
mu sync.Mutex // Access to the underlying fields needs to be guarded by this mutex.
state connectivity.State // Caches the most recent state update
watchers map[Watchers]bool // Set of registered watchers
}
func NewTracker() *Tracker { ... }
func (t *Tracker) AddWatcher(watcher Watcher) func () {
// Adds the provided watcher to the set of watchers in Tracker.
// Schedules a callback on the provided watcher with current state.
// Returns a function to remove the provided watcher from the set of watchers. The caller
// of this method is responsible for invoking this function when it no longer needs to
// monitor the connectivity state changes on the channel.
}
func (t *Tracker) SetState(state connectivity.State) {
// Update the cached state
// Invoke callbacks on all registered watchers.
} I would assume that you would also need the Then, the type ClientConn struct {
...
connectivityStateTracker *tracker
...
} The one thing I would want to add is an unexported method on the package internal
var AddConnectivityStateWatcher interface{} // The actual type for this would be `func(*grpc.ClientConn, connectivitystate.Watcher) See examples in the internal package for how this is done for other cases to get an idea. package grpc
func init() {
internal.AddConnectivityStateWatcher = func(cc *grpc.ClientConn, w connectivitystate.Watcher) {
if cc.connectivityStateTracker == nil {
cc.connectivityStateTracker == connectivitystate.NewTracker()
}
cc.connectivityStateTracker.AddWatcher(w)
}
} We need to ensure that when there are no watchers, we don't do any wasted work. @dfawley : What do you think about the above approach? |
@easwars : Thank you for some comments! I have reflected your advice. I couldn't understand why Testing piplelines failed. Could you please tell me how to solve this? |
This PR is labeled as requiring an update from the reporter, and no update has been received after 6 days. If no update is provided in the next 7 days, this issue will be automatically closed. |
Hi @easwars! |
The tests are failing to with the following errors:
|
Thank you very much. |
t.cs.Schedule(func(context.Context) { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
watcher.OnStateChange(t.state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While implementing test cases, I noticed that watcher.OnStateChange
could be called with an incorrect argument because of characteristics of closure if t.state
would be changed very quickly.
e.g.
SetState
is called withconnectivity.Idle
.watcher.OnStateChange
is scheduled.SetState
is called withconnectivity.Connecting
immediately after step 2.watcher.OnStateChange
is scheduled.- Scheduled
watcher.OnStateChange
is called witht.state
( =connectivity.Connecting
) twice incorrectly.
In order to resolve this problem, I'm going to change t.state
to state
.
We would need to fix watcher.OnStateChange(t.state)
in AddWatcher
func as well.
Do you have any other opinions?
If so, I'll adopt yours.
t.cs.Schedule(func(context.Context) { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
watcher.OnStateChange(state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've fixed it from watcher.OnStateChange(t.state)
to watcher.OnStateChange(state)
for the same reason of SetState
.
cc.csMgr.connectivityStateTracker = nil | ||
} | ||
cc.csMgr.mu.Unlock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved this code block here, added Lock() and changed to release resources of cc.csMgr.connectivityStateTracker
.
It would be better to invoke this code block before cc.csMgr.updateState(connectivity.Shutdown)
on Close
method because it wasn't stable whether connectivity shutdown state change would be reported to Watcher
.
I discussed this PR with @dfawley and we are leaning towards the following approach:
We are already very close on most of the above, just need some minor massaging. Please let me know what you think about this approach. |
I agree with you. Let's go with this idea. I want to go ahead the first approach.
|
Thanks @my4-dev. I had one comment on #6153 that I forgot to hit the send button on :( |
Now that #6153 is merged, we can move forward with the next PR where we define the generic pub-sub type. Thanks. |
I've submitted the next PR #6167 . |
IIUC this is unblocked now? |
Now that #6167 is merged, we can move forward with next PR where we make changes to the ClientConn. |
RELEASE NOTES:
Motivation:
Please refer to #5818
Modifications:
internal/clientutil/connectivity_state_pubsub.go
publisher
toconnectivityStateManager
in order to publish the connectivity changes on ClientConn to subscribers.Result:
RELEASE NOTES: none