Skip to content

Provide an internal-only API to report connectivity state changes on the ClientConn #6036

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/connectivitystate"
"google.golang.org/grpc/internal/grpcsync"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
Expand Down Expand Up @@ -405,11 +407,16 @@ func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStr

// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
//
// TODO: If possible, get rid of the `connectivityStateManager` type, and
// provide this functionality using the `Tracker`, to avoid keeping track of
// the connectivity state at two places.
type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID *channelz.Identifier
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID *channelz.Identifier
connectivityStateTracker *connectivitystate.Tracker
}

// updateState updates the connectivity.State of ClientConn.
Expand All @@ -425,6 +432,9 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {
return
}
csm.state = state
if csm.connectivityStateTracker != nil {
csm.connectivityStateTracker.SetState(state)
}
channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
if csm.notifyChan != nil {
// There are other goroutines waiting on this channel.
Expand Down Expand Up @@ -598,6 +608,13 @@ func init() {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
}
emptyServiceConfig = cfg.Config.(*ServiceConfig)

internal.AddConnectivityStateWatcher = func(cc *ClientConn, w connectivitystate.Watcher) func() {
if cc.csMgr.connectivityStateTracker == nil {
cc.csMgr.connectivityStateTracker = connectivitystate.NewTracker(cc.csMgr.getState())
}
return cc.csMgr.connectivityStateTracker.AddWatcher(w)
}
}

func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
Expand Down Expand Up @@ -1031,6 +1048,13 @@ func (cc *ClientConn) ResetConnectBackoff() {
func (cc *ClientConn) Close() error {
defer cc.cancel()

cc.csMgr.mu.Lock()
if cc.csMgr.connectivityStateTracker != nil {
cc.csMgr.connectivityStateTracker.Stop()
cc.csMgr.connectivityStateTracker = nil
}
cc.csMgr.mu.Unlock()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved this code block here, added Lock() and changed to release resources of cc.csMgr.connectivityStateTracker.
It would be better to invoke this code block before cc.csMgr.updateState(connectivity.Shutdown) on Close method because it wasn't stable whether connectivity shutdown state change would be reported to Watcher.

cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
Expand Down
111 changes: 111 additions & 0 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"golang.org/x/net/http2"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/connectivitystate"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/transport"
Expand Down Expand Up @@ -1079,6 +1082,114 @@ func (s) TestDefaultServiceConfig(t *testing.T) {
testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t, r, addr, js)
}

type watcher struct {
mu sync.Mutex
connectingCh chan struct{}
onStateChangeResult []string
}

func (w *watcher) OnStateChange(state connectivity.State) {
w.mu.Lock()
defer w.mu.Unlock()
switch state {
case connectivity.Idle:
w.onStateChangeResult = append(w.onStateChangeResult, state.String())
case connectivity.Connecting:
w.onStateChangeResult = append(w.onStateChangeResult, state.String())
close(w.connectingCh)
case connectivity.Ready:
w.onStateChangeResult = append(w.onStateChangeResult, state.String())
case connectivity.TransientFailure:
w.onStateChangeResult = append(w.onStateChangeResult, state.String())
case connectivity.Shutdown:
w.onStateChangeResult = append(w.onStateChangeResult, state.String())
}
}

func (s) TestReportStateChangesToWatcher(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis.Close()
done := make(chan struct{})
sent := make(chan struct{})
dialDone := make(chan struct{})
go func() { // Launch the server.
defer func() {
close(done)
}()
conn, err := lis.Accept()
if err != nil {
t.Errorf("Error while accepting. Err: %v", err)
return
}
defer conn.Close()
// Sleep for a little bit to make sure that Dial on client
// side blocks until settings are received.
time.Sleep(100 * time.Millisecond)
framer := http2.NewFramer(conn, conn)
close(sent)
if err := framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings. Err: %v", err)
return
}
<-dialDone // Close conn only after dial returns.
}()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := DialContext(ctx, lis.Addr().String(), WithTransportCredentials(insecure.NewCredentials()), WithBlock())
if err != nil {
t.Fatalf("Error while dialing. Err: %v", err)
}

// Wait until connectivity state of client changes to Ready
for client.GetState() != connectivity.Ready {
time.Sleep(100 * time.Millisecond)
}

var onStateChangeResult []string
connectingCh := make(chan struct{})
w := watcher{
connectingCh: connectingCh,
onStateChangeResult: onStateChangeResult,
}
internal.AddConnectivityStateWatcher.(func(cc *ClientConn, w connectivitystate.Watcher) func())(client, &w)

close(dialDone)

// Wait until sourceState changes from Ready to Idle
client.WaitForStateChange(ctx, connectivity.Ready)

client.Connect()

// Wait until sourceState changes from Idle to Connecting
client.WaitForStateChange(ctx, connectivity.Idle)

select {
case <-sent:
default:
t.Fatalf("Dial returned before server settings were sent")
}
<-done

client.Close()

expectedStateChangeResult := []string{
connectivity.Ready.String(),
connectivity.Idle.String(),
connectivity.Connecting.String(),
}
<-w.connectingCh
if diff := cmp.Diff(w.onStateChangeResult, expectedStateChangeResult); diff != "" {
t.Fatalf("OnStateChange methods of Watcher are not executed in scheduled order. diff(-want, +got):\n%s", diff)
}

if client.csMgr.connectivityStateTracker != nil {
t.Fatalf("client.csMgr.connectivityStateTracker is not <nil> (actual: %v)", client.csMgr.connectivityStateTracker)
}
}

func verifyWaitForReadyEqualsTrue(cc *ClientConn) bool {
var i int
for i = 0; i < 10; i++ {
Expand Down
129 changes: 129 additions & 0 deletions internal/connectivitystate/connectivitystate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package connectivitystate provides functionality to report and track
// connectivity state changes of ClientConns and SubConns.
package connectivitystate

import (
"context"
"sync"

"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpcsync"
)

// Watcher wraps the functionality to be implemented by components
// interested in watching connectivity state changes.
type Watcher interface {
// OnStateChange is invoked to report connectivity state changes on the
// entity being watched.
OnStateChange(state connectivity.State)
}

// Tracker provides pubsub-like functionality for connectivity state changes.
//
// The entity whose connectivity state is being tracked publishes updates by
// calling the SetState() method.
//
// Components interested in connectivity state updates of the tracked entity
// subscribe to updates by calling the AddWatcher() method.
type Tracker struct {
cs *grpcsync.CallbackSerializer
cancel context.CancelFunc

// Access to the below fields are guarded by this mutex.
mu sync.Mutex
state connectivity.State
watchers map[Watcher]bool
stopped bool
}

// NewTracker returns a new Tracker instance initialized with the provided
// connectivity state.
func NewTracker(state connectivity.State) *Tracker {
ctx, cancel := context.WithCancel(context.Background())
return &Tracker{
cs: grpcsync.NewCallbackSerializer(ctx),
cancel: cancel,
state: state,
watchers: map[Watcher]bool{},
}
}

// AddWatcher adds the provided watcher to the set of watchers in Tracker.
// The OnStateChange() callback will be invoked asynchronously with the current
// state of the tracked entity to begin with, and subsequently for every state
// change.
//
// Returns a function to remove the provided watcher from the set of watchers.
// The caller of this method is responsible for invoking this function when it
// no longer needs to monitor the connectivity state changes on the channel.
func (t *Tracker) AddWatcher(watcher Watcher) func() {
t.mu.Lock()
defer t.mu.Unlock()

if t.stopped {
return func() {}
}

t.watchers[watcher] = true

state := t.state
t.cs.Schedule(func(context.Context) {
t.mu.Lock()
defer t.mu.Unlock()
watcher.OnStateChange(state)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've fixed it from watcher.OnStateChange(t.state) to watcher.OnStateChange(state) for the same reason of SetState.

})

return func() {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.watchers, watcher)
}
}

// SetState updates the connectivity state of the entity being tracked, and
// invokes the OnStateChange callback of all registered watchers.
func (t *Tracker) SetState(state connectivity.State) {
t.mu.Lock()
defer t.mu.Unlock()

if t.stopped {
return
}
t.state = state
for watcher := range t.watchers {
t.cs.Schedule(func(context.Context) {
t.mu.Lock()
defer t.mu.Unlock()
watcher.OnStateChange(state)
})
}
}

// Stop shuts down the Tracker and releases any resources allocated by it.
// It is guaranteed that no Watcher callbacks would be invoked once this
// method returns.
func (t *Tracker) Stop() {
t.mu.Lock()
defer t.mu.Unlock()
t.stopped = true

t.cancel()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,30 @@
*
*/

package xdsclient
package grpcsync

import (
"context"

"google.golang.org/grpc/internal/buffer"
)

// callbackSerializer provides a mechanism to schedule callbacks in a
// CallbackSerializer provides a mechanism to schedule callbacks in a
// synchronized manner. It provides a FIFO guarantee on the order of execution
// of scheduled callbacks. New callbacks can be scheduled by invoking the
// Schedule() method.
//
// This type is safe for concurrent access.
type callbackSerializer struct {
type CallbackSerializer struct {
callbacks *buffer.Unbounded
}

// newCallbackSerializer returns a new callbackSerializer instance. The provided
// NewCallbackSerializer returns a new callbackSerializer instance. The provided
// context will be passed to the scheduled callbacks. Users should cancel the
// provided context to shutdown the callbackSerializer. It is guaranteed that no
// callbacks will be executed once this context is canceled.
func newCallbackSerializer(ctx context.Context) *callbackSerializer {
t := &callbackSerializer{callbacks: buffer.NewUnbounded()}
func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
t := &CallbackSerializer{callbacks: buffer.NewUnbounded()}
go t.run(ctx)
return t
}
Expand All @@ -48,11 +48,11 @@ func newCallbackSerializer(ctx context.Context) *callbackSerializer {
//
// Callbacks are expected to honor the context when performing any blocking
// operations, and should return early when the context is canceled.
func (t *callbackSerializer) Schedule(f func(ctx context.Context)) {
func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) {
t.callbacks.Put(f)
}

func (t *callbackSerializer) run(ctx context.Context) {
func (t *CallbackSerializer) run(ctx context.Context) {
for ctx.Err() == nil {
select {
case <-ctx.Done():
Expand Down
Loading