From 4357bac6830eb4b837ee82365b9b16e4fc65dd2d Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Tue, 5 Dec 2023 10:56:48 -0800 Subject: [PATCH] client: rework resolver and balancer wrappers to avoid deadlock (#6804) --- balancer_wrapper.go | 297 +++++++++++++-------------------- balancer_wrapper_test.go | 2 +- clientconn.go | 290 +++++++++++--------------------- internal/channelz/funcs.go | 7 + internal/idle/idle.go | 175 +++++++++---------- internal/idle/idle_e2e_test.go | 8 +- internal/idle/idle_test.go | 77 +++++---- internal/internal.go | 4 +- picker_wrapper.go | 21 +-- resolver_balancer_ext_test.go | 265 +++++++++++++++++++++++++++++ resolver_wrapper.go | 196 ++++++++++------------ 11 files changed, 696 insertions(+), 646 deletions(-) create mode 100644 resolver_balancer_ext_test.go diff --git a/balancer_wrapper.go b/balancer_wrapper.go index da78051f578a..b5e30cff0215 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -32,21 +32,13 @@ import ( "google.golang.org/grpc/resolver" ) -type ccbMode int - -const ( - ccbModeActive = iota - ccbModeIdle - ccbModeClosed - ccbModeExitingIdle -) - // ccBalancerWrapper sits between the ClientConn and the Balancer. // // ccBalancerWrapper implements methods corresponding to the ones on the // balancer.Balancer interface. The ClientConn is free to call these methods // concurrently and the ccBalancerWrapper ensures that calls from the ClientConn -// to the Balancer happen synchronously and in order. +// to the Balancer happen in order by performing them in the serializer, without +// any mutexes held. // // ccBalancerWrapper also implements the balancer.ClientConn interface and is // passed to the Balancer implementations. It invokes unexported methods on the @@ -57,84 +49,75 @@ const ( type ccBalancerWrapper struct { // The following fields are initialized when the wrapper is created and are // read-only afterwards, and therefore can be accessed without a mutex. - cc *ClientConn - opts balancer.BuildOptions + cc *ClientConn + opts balancer.BuildOptions + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc - // Outgoing (gRPC --> balancer) calls are guaranteed to execute in a - // mutually exclusive manner as they are scheduled in the serializer. Fields - // accessed *only* in these serializer callbacks, can therefore be accessed - // without a mutex. - balancer *gracefulswitch.Balancer + // The following fields are only accessed within the serializer or during + // initialization. curBalancerName string + balancer *gracefulswitch.Balancer - // mu guards access to the below fields. Access to the serializer and its - // cancel function needs to be mutex protected because they are overwritten - // when the wrapper exits idle mode. - mu sync.Mutex - serializer *grpcsync.CallbackSerializer // To serialize all outoing calls. - serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time. - mode ccbMode // Tracks the current mode of the wrapper. + // The following field is protected by mu. Caller must take cc.mu before + // taking mu. + mu sync.Mutex + closed bool } // newCCBalancerWrapper creates a new balancer wrapper in idle state. The // underlying balancer is not created until the switchTo() method is invoked. -func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper { +func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper { + ctx, cancel := context.WithCancel(cc.ctx) ccb := &ccBalancerWrapper{ - cc: cc, - opts: bopts, - mode: ccbModeIdle, + cc: cc, + opts: balancer.BuildOptions{ + DialCreds: cc.dopts.copts.TransportCredentials, + CredsBundle: cc.dopts.copts.CredsBundle, + Dialer: cc.dopts.copts.Dialer, + Authority: cc.authority, + CustomUserAgent: cc.dopts.copts.UserAgent, + ChannelzParentID: cc.channelzID, + Target: cc.parsedTarget, + }, + serializer: grpcsync.NewCallbackSerializer(ctx), + serializerCancel: cancel, } + ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) return ccb } // updateClientConnState is invoked by grpc to push a ClientConnState update to -// the underlying balancer. +// the underlying balancer. This is always executed from the serializer, so +// it is safe to call into the balancer here. func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { - ccb.mu.Lock() - errCh := make(chan error, 1) - // Here and everywhere else where Schedule() is called, it is done with the - // lock held. But the lock guards only the scheduling part. The actual - // callback is called asynchronously without the lock being held. - ok := ccb.serializer.Schedule(func(_ context.Context) { - errCh <- ccb.balancer.UpdateClientConnState(*ccs) + errCh := make(chan error) + ok := ccb.serializer.Schedule(func(ctx context.Context) { + defer close(errCh) + if ctx.Err() != nil || ccb.balancer == nil { + return + } + err := ccb.balancer.UpdateClientConnState(*ccs) + if logger.V(2) && err != nil { + logger.Infof("error from balancer.UpdateClientConnState: %v", err) + } + errCh <- err }) if !ok { - // If we are unable to schedule a function with the serializer, it - // indicates that it has been closed. A serializer is only closed when - // the wrapper is closed or is in idle. - ccb.mu.Unlock() - return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer") - } - ccb.mu.Unlock() - - // We get here only if the above call to Schedule succeeds, in which case it - // is guaranteed that the scheduled function will run. Therefore it is safe - // to block on this channel. - err := <-errCh - if logger.V(2) && err != nil { - logger.Infof("error from balancer.UpdateClientConnState: %v", err) + return nil } - return err -} - -// updateSubConnState is invoked by grpc to push a subConn state update to the -// underlying balancer. -func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) { - ccb.mu.Lock() - ccb.serializer.Schedule(func(_ context.Context) { - // Even though it is optional for balancers, gracefulswitch ensures - // opts.StateListener is set, so this cannot ever be nil. - sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) - }) - ccb.mu.Unlock() + return <-errCh } +// resolverError is invoked by grpc to push a resolver error to the underlying +// balancer. The call to the balancer is executed from the serializer. func (ccb *ccBalancerWrapper) resolverError(err error) { - ccb.mu.Lock() - ccb.serializer.Schedule(func(_ context.Context) { + ccb.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil || ccb.balancer == nil { + return + } ccb.balancer.ResolverError(err) }) - ccb.mu.Unlock() } // switchTo is invoked by grpc to instruct the balancer wrapper to switch to the @@ -148,8 +131,10 @@ func (ccb *ccBalancerWrapper) resolverError(err error) { // the ccBalancerWrapper keeps track of the current LB policy name, and skips // the graceful balancer switching process if the name does not change. func (ccb *ccBalancerWrapper) switchTo(name string) { - ccb.mu.Lock() - ccb.serializer.Schedule(func(_ context.Context) { + ccb.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil || ccb.balancer == nil { + return + } // TODO: Other languages use case-sensitive balancer registries. We should // switch as well. See: https://github.com/grpc/grpc-go/issues/5288. if strings.EqualFold(ccb.curBalancerName, name) { @@ -157,7 +142,6 @@ func (ccb *ccBalancerWrapper) switchTo(name string) { } ccb.buildLoadBalancingPolicy(name) }) - ccb.mu.Unlock() } // buildLoadBalancingPolicy performs the following: @@ -184,115 +168,49 @@ func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) { ccb.curBalancerName = builder.Name() } +// close initiates async shutdown of the wrapper. cc.mu must be held when +// calling this function. To determine the wrapper has finished shutting down, +// the channel should block on ccb.serializer.Done() without cc.mu held. func (ccb *ccBalancerWrapper) close() { - channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") - ccb.closeBalancer(ccbModeClosed) -} - -// enterIdleMode is invoked by grpc when the channel enters idle mode upon -// expiry of idle_timeout. This call blocks until the balancer is closed. -func (ccb *ccBalancerWrapper) enterIdleMode() { - channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode") - ccb.closeBalancer(ccbModeIdle) -} - -// closeBalancer is invoked when the channel is being closed or when it enters -// idle mode upon expiry of idle_timeout. -func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) { ccb.mu.Lock() - if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle { - ccb.mu.Unlock() - return - } - - ccb.mode = m - done := ccb.serializer.Done() - b := ccb.balancer - ok := ccb.serializer.Schedule(func(_ context.Context) { - // Close the serializer to ensure that no more calls from gRPC are sent - // to the balancer. - ccb.serializerCancel() - // Empty the current balancer name because we don't have a balancer - // anymore and also so that we act on the next call to switchTo by - // creating a new balancer specified by the new resolver. - ccb.curBalancerName = "" - }) - if !ok { - ccb.mu.Unlock() - return - } + ccb.closed = true ccb.mu.Unlock() - - // Give enqueued callbacks a chance to finish before closing the balancer. - <-done - b.Close() -} - -// exitIdleMode is invoked by grpc when the channel exits idle mode either -// because of an RPC or because of an invocation of the Connect() API. This -// recreates the balancer that was closed previously when entering idle mode. -// -// If the channel is not in idle mode, we know for a fact that we are here as a -// result of the user calling the Connect() method on the ClientConn. In this -// case, we can simply forward the call to the underlying balancer, instructing -// it to reconnect to the backends. -func (ccb *ccBalancerWrapper) exitIdleMode() { - ccb.mu.Lock() - if ccb.mode == ccbModeClosed { - // Request to exit idle is a no-op when wrapper is already closed. - ccb.mu.Unlock() - return - } - - if ccb.mode == ccbModeIdle { - // Recreate the serializer which was closed when we entered idle. - ctx, cancel := context.WithCancel(context.Background()) - ccb.serializer = grpcsync.NewCallbackSerializer(ctx) - ccb.serializerCancel = cancel - } - - // The ClientConn guarantees that mutual exclusion between close() and - // exitIdleMode(), and since we just created a new serializer, we can be - // sure that the below function will be scheduled. - done := make(chan struct{}) + channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") ccb.serializer.Schedule(func(context.Context) { - defer close(done) - - ccb.mu.Lock() - defer ccb.mu.Unlock() - - if ccb.mode != ccbModeIdle { - ccb.balancer.ExitIdle() + if ccb.balancer == nil { return } - - // Gracefulswitch balancer does not support a switchTo operation after - // being closed. Hence we need to create a new one here. - ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) - ccb.mode = ccbModeActive - channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode") - + ccb.balancer.Close() + ccb.balancer = nil }) - ccb.mu.Unlock() - - <-done + ccb.serializerCancel() } -func (ccb *ccBalancerWrapper) isIdleOrClosed() bool { - ccb.mu.Lock() - defer ccb.mu.Unlock() - return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed +// exitIdle invokes the balancer's exitIdle method in the serializer. +func (ccb *ccBalancerWrapper) exitIdle() { + ccb.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil || ccb.balancer == nil { + return + } + ccb.balancer.ExitIdle() + }) } func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { - if ccb.isIdleOrClosed() { - return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle") + ccb.cc.mu.Lock() + defer ccb.cc.mu.Unlock() + + ccb.mu.Lock() + if ccb.closed { + ccb.mu.Unlock() + return nil, fmt.Errorf("balancer is being closed; no new SubConns allowed") } + ccb.mu.Unlock() if len(addrs) == 0 { return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") } - ac, err := ccb.cc.newAddrConn(addrs, opts) + ac, err := ccb.cc.newAddrConnLocked(addrs, opts) if err != nil { channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) return nil, err @@ -313,10 +231,6 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { } func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { - if ccb.isIdleOrClosed() { - return - } - acbw, ok := sc.(*acBalancerWrapper) if !ok { return @@ -325,25 +239,39 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol } func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { - if ccb.isIdleOrClosed() { + ccb.cc.mu.Lock() + defer ccb.cc.mu.Unlock() + + ccb.mu.Lock() + if ccb.closed { + ccb.mu.Unlock() return } - + ccb.mu.Unlock() // Update picker before updating state. Even though the ordering here does // not matter, it can lead to multiple calls of Pick in the common start-up // case where we wait for ready and then perform an RPC. If the picker is // updated later, we could call the "connecting" picker when the state is // updated, and then call the "ready" picker after the picker gets updated. + + // Note that there is no need to check if the balancer wrapper was closed, + // as we know the graceful switch LB policy will not call cc if it has been + // closed. ccb.cc.pickerWrapper.updatePicker(s.Picker) ccb.cc.csMgr.updateState(s.ConnectivityState) } func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { - if ccb.isIdleOrClosed() { + ccb.cc.mu.RLock() + defer ccb.cc.mu.RUnlock() + + ccb.mu.Lock() + if ccb.closed { + ccb.mu.Unlock() return } - - ccb.cc.resolveNow(o) + ccb.mu.Unlock() + ccb.cc.resolveNowLocked(o) } func (ccb *ccBalancerWrapper) Target() string { @@ -361,6 +289,20 @@ type acBalancerWrapper struct { producers map[balancer.ProducerBuilder]*refCountedProducer } +// updateState is invoked by grpc to push a subConn state update to the +// underlying balancer. +func (acbw *acBalancerWrapper) updateState(s connectivity.State, err error) { + acbw.ccb.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil || acbw.ccb.balancer == nil { + return + } + // Even though it is optional for balancers, gracefulswitch ensures + // opts.StateListener is set, so this cannot ever be nil. + // TODO: delete this comment when UpdateSubConnState is removed. + acbw.stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) + }) +} + func (acbw *acBalancerWrapper) String() string { return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int()) } @@ -374,20 +316,7 @@ func (acbw *acBalancerWrapper) Connect() { } func (acbw *acBalancerWrapper) Shutdown() { - ccb := acbw.ccb - if ccb.isIdleOrClosed() { - // It it safe to ignore this call when the balancer is closed or in idle - // because the ClientConn takes care of closing the connections. - // - // Not returning early from here when the balancer is closed or in idle - // leads to a deadlock though, because of the following sequence of - // calls when holding cc.mu: - // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> - // ccb.RemoveAddrConn --> cc.removeAddrConn - return - } - - ccb.cc.removeAddrConn(acbw.ac, errConnDrain) + acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain) } // NewStream begins a streaming RPC on the addrConn. If the addrConn is not diff --git a/balancer_wrapper_test.go b/balancer_wrapper_test.go index 4fd09c145a9f..2892136384fd 100644 --- a/balancer_wrapper_test.go +++ b/balancer_wrapper_test.go @@ -55,7 +55,7 @@ func (s) TestBalancer_StateListenerBeforeConnect(t *testing.T) { t.Error("Unexpected call to StateListener with:", scs) }, }) - if err != nil && !strings.Contains(err.Error(), "connection is closing") && !strings.Contains(err.Error(), "is deleted") && !strings.Contains(err.Error(), "is closed or idle") { + if err != nil && !strings.Contains(err.Error(), "connection is closing") && !strings.Contains(err.Error(), "is deleted") && !strings.Contains(err.Error(), "is closed or idle") && !strings.Contains(err.Error(), "balancer is being closed") { t.Error("Unexpected error creating subconn:", err) } wg.Done() diff --git a/clientconn.go b/clientconn.go index 75c432928eb2..e6f2625b6844 100644 --- a/clientconn.go +++ b/clientconn.go @@ -33,7 +33,6 @@ import ( "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" @@ -121,17 +120,15 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires // newClient returns a new client in idle mode. func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ - target: target, - conns: make(map[*addrConn]struct{}), - dopts: defaultDialOptions(), - czData: new(channelzData), - idlenessState: ccIdlenessStateIdle, + target: target, + conns: make(map[*addrConn]struct{}), + dopts: defaultDialOptions(), + czData: new(channelzData), } cc.retryThrottler.Store((*retryThrottler)(nil)) cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) cc.ctx, cc.cancel = context.WithCancel(context.Background()) - cc.exitIdleCond = sync.NewCond(&cc.mu) // Apply dial options. disableGlobalOpts := false @@ -188,21 +185,9 @@ func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers) - cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ - DialCreds: cc.dopts.copts.TransportCredentials, - CredsBundle: cc.dopts.copts.CredsBundle, - Dialer: cc.dopts.copts.Dialer, - Authority: cc.authority, - CustomUserAgent: cc.dopts.copts.UserAgent, - ChannelzParentID: cc.channelzID, - Target: cc.parsedTarget, - }) - - // Configure idleness support with configured idle timeout or default idle - // timeout duration. Idleness can be explicitly disabled by the user, by - // setting the dial option to 0. - cc.idlenessMgr = idle.NewManager(idle.ManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout, Logger: logger}) + cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc. + cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout) return cc, nil } @@ -245,8 +230,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } }() - // This creates the name resolver, load balancer, blocking picker etc. - if err := cc.exitIdleMode(); err != nil { + // This creates the name resolver, load balancer, etc. + if err := cc.idlenessMgr.ExitIdleMode(); err != nil { return nil, err } @@ -321,8 +306,8 @@ func (cc *ClientConn) addTraceEvent(msg string) { type idler ClientConn -func (i *idler) EnterIdleMode() error { - return (*ClientConn)(i).enterIdleMode() +func (i *idler) EnterIdleMode() { + (*ClientConn)(i).enterIdleMode() } func (i *idler) ExitIdleMode() error { @@ -330,46 +315,20 @@ func (i *idler) ExitIdleMode() error { } // exitIdleMode moves the channel out of idle mode by recreating the name -// resolver and load balancer. -func (cc *ClientConn) exitIdleMode() error { +// resolver and load balancer. This should never be called directly; use +// cc.idlenessMgr.ExitIdleMode instead. +func (cc *ClientConn) exitIdleMode() (err error) { cc.mu.Lock() if cc.conns == nil { cc.mu.Unlock() return errConnClosing } - if cc.idlenessState != ccIdlenessStateIdle { - channelz.Infof(logger, cc.channelzID, "ClientConn asked to exit idle mode, current mode is %v", cc.idlenessState) - cc.mu.Unlock() - return nil - } - - defer func() { - // When Close() and exitIdleMode() race against each other, one of the - // following two can happen: - // - Close() wins the race and runs first. exitIdleMode() runs after, and - // sees that the ClientConn is already closed and hence returns early. - // - exitIdleMode() wins the race and runs first and recreates the balancer - // and releases the lock before recreating the resolver. If Close() runs - // in this window, it will wait for exitIdleMode to complete. - // - // We achieve this synchronization using the below condition variable. - cc.mu.Lock() - cc.idlenessState = ccIdlenessStateActive - cc.exitIdleCond.Signal() - cc.mu.Unlock() - }() - - cc.idlenessState = ccIdlenessStateExitingIdle - cc.pickerWrapper.exitIdleMode() - - cc.balancerWrapper.exitIdleMode() - cc.firstResolveEvent = grpcsync.NewEvent() cc.mu.Unlock() // This needs to be called without cc.mu because this builds a new resolver - // which might update state or report error inline which needs to be handled - // by cc.updateResolverState() which also grabs cc.mu. - if err := cc.initResolverWrapper(cc.dopts.copts.TransportCredentials); err != nil { + // which might update state or report error inline, which would then need to + // acquire cc.mu. + if err := cc.resolverWrapper.start(); err != nil { return err } @@ -377,46 +336,50 @@ func (cc *ClientConn) exitIdleMode() error { return nil } +// initIdleStateLocked initializes common state to how it should be while idle. +func (cc *ClientConn) initIdleStateLocked() { + cc.resolverWrapper = newCCResolverWrapper(cc) + cc.balancerWrapper = newCCBalancerWrapper(cc) + cc.firstResolveEvent = grpcsync.NewEvent() + // cc.conns == nil is a proxy for the ClientConn being closed. So, instead + // of setting it to nil here, we recreate the map. This also means that we + // don't have to do this when exiting idle mode. + cc.conns = make(map[*addrConn]struct{}) +} + // enterIdleMode puts the channel in idle mode, and as part of it shuts down the -// name resolver, load balancer, and any subchannels. -func (cc *ClientConn) enterIdleMode() error { +// name resolver, load balancer, and any subchannels. This should never be +// called directly; use cc.idlenessMgr.EnterIdleMode instead. +func (cc *ClientConn) enterIdleMode() { cc.mu.Lock() - defer cc.mu.Unlock() if cc.conns == nil { - return ErrClientConnClosing - } - if cc.idlenessState != ccIdlenessStateActive { - channelz.Warningf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState) - return nil + cc.mu.Unlock() + return } - // cc.conns == nil is a proxy for the ClientConn being closed. So, instead - // of setting it to nil here, we recreate the map. This also means that we - // don't have to do this when exiting idle mode. conns := cc.conns - cc.conns = make(map[*addrConn]struct{}) - // TODO: Currently, we close the resolver wrapper upon entering idle mode - // and create a new one upon exiting idle mode. This means that the - // `cc.resolverWrapper` field would be overwritten everytime we exit idle - // mode. While this means that we need to hold `cc.mu` when accessing - // `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should - // try to do the same for the balancer and picker wrappers too. - cc.resolverWrapper.close() - cc.pickerWrapper.enterIdleMode() - cc.balancerWrapper.enterIdleMode() + rWrapper := cc.resolverWrapper + rWrapper.close() + cc.pickerWrapper.reset() + bWrapper := cc.balancerWrapper + bWrapper.close() cc.csMgr.updateState(connectivity.Idle) - cc.idlenessState = ccIdlenessStateIdle cc.addTraceEvent("entering idle mode") - go func() { - for ac := range conns { - ac.tearDown(errConnIdling) - } - }() + cc.initIdleStateLocked() - return nil + cc.mu.Unlock() + + // Block until the name resolver and LB policy are closed. + <-rWrapper.serializer.Done() + <-bWrapper.serializer.Done() + + // Close all subchannels after the LB policy is closed. + for ac := range conns { + ac.tearDown(errConnIdling) + } } // validateTransportCredentials performs a series of checks on the configured @@ -626,8 +589,7 @@ type ClientConn struct { dopts dialOptions // Default and user specified dial options. channelzID *channelz.Identifier // Channelz identifier for the channel. resolverBuilder resolver.Builder // See parseTargetAndFindResolver(). - balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath. - idlenessMgr idle.Manager + idlenessMgr *idle.Manager // The following provide their own synchronization, and therefore don't // require cc.mu to be held to access them. @@ -637,55 +599,25 @@ type ClientConn struct { czData *channelzData retryThrottler atomic.Value // Updated from service config. - // firstResolveEvent is used to track whether the name resolver sent us at - // least one update. RPCs block on this event. - firstResolveEvent *grpcsync.Event - // mu protects the following fields. // TODO: split mu so the same mutex isn't used for everything. mu sync.RWMutex - resolverWrapper *ccResolverWrapper // Initialized in Dial; cleared in Close. + resolverWrapper *ccResolverWrapper // Always recreated whenever entering idle to simplify Close. + balancerWrapper *ccBalancerWrapper // Always recreated whenever entering idle to simplify Close. sc *ServiceConfig // Latest service config received from the resolver. conns map[*addrConn]struct{} // Set to nil on close. mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway. - idlenessState ccIdlenessState // Tracks idleness state of the channel. - exitIdleCond *sync.Cond // Signalled when channel exits idle. + // firstResolveEvent is used to track whether the name resolver sent us at + // least one update. RPCs block on this event. May be accessed without mu + // if we know we cannot be asked to enter idle mode while accessing it (e.g. + // when the idle manager has already been closed, or if we are already + // entering idle mode). + firstResolveEvent *grpcsync.Event lceMu sync.Mutex // protects lastConnectionError lastConnectionError error } -// ccIdlenessState tracks the idleness state of the channel. -// -// Channels start off in `active` and move to `idle` after a period of -// inactivity. When moving back to `active` upon an incoming RPC, they -// transition through `exiting_idle`. This state is useful for synchronization -// with Close(). -// -// This state tracking is mostly for self-protection. The idlenessManager is -// expected to keep track of the state as well, and is expected not to call into -// the ClientConn unnecessarily. -type ccIdlenessState int8 - -const ( - ccIdlenessStateActive ccIdlenessState = iota - ccIdlenessStateIdle - ccIdlenessStateExitingIdle -) - -func (s ccIdlenessState) String() string { - switch s { - case ccIdlenessStateActive: - return "active" - case ccIdlenessStateIdle: - return "idle" - case ccIdlenessStateExitingIdle: - return "exitingIdle" - default: - return "unknown" - } -} - // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or // ctx expires. A true value is returned in former case and false in latter. // @@ -725,10 +657,15 @@ func (cc *ClientConn) GetState() connectivity.State { // Notice: This API is EXPERIMENTAL and may be changed or removed in a later // release. func (cc *ClientConn) Connect() { - cc.exitIdleMode() + if err := cc.idlenessMgr.ExitIdleMode(); err != nil { + cc.addTraceEvent(err.Error()) + return + } // If the ClientConn was not in idle mode, we need to call ExitIdle on the // LB policy so that connections can be created. - cc.balancerWrapper.exitIdleMode() + cc.mu.Lock() + cc.balancerWrapper.exitIdle() + cc.mu.Unlock() } // waitForResolvedAddrs blocks until the resolver has provided addresses or the @@ -762,11 +699,11 @@ func init() { internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() { return cc.csMgr.pubSub.Subscribe(s) } - internal.EnterIdleModeForTesting = func(cc *ClientConn) error { - return cc.enterIdleMode() + internal.EnterIdleModeForTesting = func(cc *ClientConn) { + cc.idlenessMgr.EnterIdleModeForTesting() } internal.ExitIdleModeForTesting = func(cc *ClientConn) error { - return cc.exitIdleMode() + return cc.idlenessMgr.ExitIdleMode() } } @@ -782,9 +719,8 @@ func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { } } -func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { +func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error { defer cc.firstResolveEvent.Fire() - cc.mu.Lock() // Check if the ClientConn is already closed. Some fields (e.g. // balancerWrapper) are set to nil when closing the ClientConn, and could // cause nil pointer panic if we don't have this check. @@ -830,7 +766,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { if cc.sc == nil { // Apply the failing LB only if we haven't received valid service config // from the name resolver in the past. - cc.applyFailingLB(s.ServiceConfig) + cc.applyFailingLBLocked(s.ServiceConfig) cc.mu.Unlock() return ret } @@ -852,15 +788,13 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { return ret } -// applyFailingLB is akin to configuring an LB policy on the channel which +// applyFailingLBLocked is akin to configuring an LB policy on the channel which // always fails RPCs. Here, an actual LB policy is not configured, but an always // erroring picker is configured, which returns errors with information about // what was invalid in the received service config. A config selector with no // service config is configured, and the connectivity state of the channel is // set to TransientFailure. -// -// Caller must hold cc.mu. -func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { +func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) { var err error if sc.Err != nil { err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err) @@ -872,10 +806,6 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { cc.csMgr.updateState(connectivity.TransientFailure) } -func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { - cc.balancerWrapper.updateSubConnState(sc, s, err) -} - // Makes a copy of the input addresses slice and clears out the balancer // attributes field. Addresses are passed during subconn creation and address // update operations. In both cases, we will clear the balancer attributes by @@ -890,10 +820,14 @@ func copyAddressesWithoutBalancerAttributes(in []resolver.Address) []resolver.Ad return out } -// newAddrConn creates an addrConn for addrs and adds it to cc.conns. +// newAddrConnLocked creates an addrConn for addrs and adds it to cc.conns. // // Caller needs to make sure len(addrs) > 0. -func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { +func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { + if cc.conns == nil { + return nil, ErrClientConnClosing + } + ac := &addrConn{ state: connectivity.Idle, cc: cc, @@ -905,12 +839,6 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub stateChan: make(chan struct{}), } ac.ctx, ac.cancel = context.WithCancel(cc.ctx) - // Track ac in cc. This needs to be done before any getTransport(...) is called. - cc.mu.Lock() - defer cc.mu.Unlock() - if cc.conns == nil { - return nil, ErrClientConnClosing - } var err error ac.channelzID, err = channelz.RegisterSubChannel(ac, cc.channelzID, "") @@ -926,6 +854,7 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub }, }) + // Track ac in cc. This needs to be done before any getTransport(...) is called. cc.conns[ac] = struct{}{} return ac, nil } @@ -1174,12 +1103,12 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { cc.mu.RLock() - r := cc.resolverWrapper + cc.resolverWrapper.resolveNow(o) cc.mu.RUnlock() - if r == nil { - return - } - go r.resolveNow(o) +} + +func (cc *ClientConn) resolveNowLocked(o resolver.ResolveNowOptions) { + cc.resolverWrapper.resolveNow(o) } // ResetConnectBackoff wakes up all subchannels in transient failure and causes @@ -1211,16 +1140,16 @@ func (cc *ClientConn) Close() error { <-cc.csMgr.pubSub.Done() }() + // Prevent calls to enter/exit idle immediately, and ensure we are not + // currently entering/exiting idle mode. + cc.idlenessMgr.Close() + cc.mu.Lock() if cc.conns == nil { cc.mu.Unlock() return ErrClientConnClosing } - for cc.idlenessState == ccIdlenessStateExitingIdle { - cc.exitIdleCond.Wait() - } - conns := cc.conns cc.conns = nil cc.csMgr.updateState(connectivity.Shutdown) @@ -1229,16 +1158,14 @@ func (cc *ClientConn) Close() error { // cc.conns==nil, preventing any further operations on cc. cc.mu.Unlock() + cc.resolverWrapper.close() // The order of closing matters here since the balancer wrapper assumes the // picker is closed before it is closed. cc.pickerWrapper.close() cc.balancerWrapper.close() - if rWrapper := cc.resolverWrapper; rWrapper != nil { - rWrapper.close() - } - if idlenessMgr := cc.idlenessMgr; idlenessMgr != nil { - idlenessMgr.Close() - } + + <-cc.resolverWrapper.serializer.Done() + <-cc.balancerWrapper.serializer.Done() for ac := range conns { ac.tearDown(ErrClientConnClosing) @@ -1259,7 +1186,7 @@ type addrConn struct { cc *ClientConn dopts dialOptions - acbw balancer.SubConn + acbw *acBalancerWrapper scopts balancer.NewSubConnOptions // transport is set when there's a viable transport (note: ac state may not be READY as LB channel @@ -1297,7 +1224,7 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) } else { channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v, last error: %s", s, lastErr) } - ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr) + ac.acbw.updateState(s, lastErr) } // adjustParams updates parameters used to create transports upon @@ -1801,7 +1728,7 @@ func (cc *ClientConn) parseTargetAndFindResolver() error { if err != nil { channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err) } else { - channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget) + channelz.Infof(logger, cc.channelzID, "parsed dial target is: %#v", parsedTarget) rb = cc.getResolver(parsedTarget.URL.Scheme) if rb != nil { cc.parsedTarget = parsedTarget @@ -1959,32 +1886,3 @@ func (cc *ClientConn) determineAuthority() error { channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority) return nil } - -// initResolverWrapper creates a ccResolverWrapper, which builds the name -// resolver. This method grabs the lock to assign the newly built resolver -// wrapper to the cc.resolverWrapper field. -func (cc *ClientConn) initResolverWrapper(creds credentials.TransportCredentials) error { - rw, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{ - target: cc.parsedTarget, - builder: cc.resolverBuilder, - bOpts: resolver.BuildOptions{ - DisableServiceConfig: cc.dopts.disableServiceConfig, - DialCreds: creds, - CredsBundle: cc.dopts.copts.CredsBundle, - Dialer: cc.dopts.copts.Dialer, - }, - channelzID: cc.channelzID, - }) - if err != nil { - return fmt.Errorf("failed to build resolver: %v", err) - } - // Resolver implementations may report state update or error inline when - // built (or right after), and this is handled in cc.updateResolverState. - // Also, an error from the resolver might lead to a re-resolution request - // from the balancer, which is handled in resolveNow() where - // `cc.resolverWrapper` is accessed. Hence, we need to hold the lock here. - cc.mu.Lock() - cc.resolverWrapper = rw - cc.mu.Unlock() - return nil -} diff --git a/internal/channelz/funcs.go b/internal/channelz/funcs.go index 5395e77529cd..fc094f3441b8 100644 --- a/internal/channelz/funcs.go +++ b/internal/channelz/funcs.go @@ -31,6 +31,7 @@ import ( "time" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal" ) const ( @@ -58,6 +59,12 @@ func TurnOn() { } } +func init() { + internal.ChannelzTurnOffForTesting = func() { + atomic.StoreInt32(&curState, 0) + } +} + // IsOn returns whether channelz data collection is on. func IsOn() bool { return atomic.LoadInt32(&curState) == 1 diff --git a/internal/idle/idle.go b/internal/idle/idle.go index 6c272476e5ef..fe49cb74c55a 100644 --- a/internal/idle/idle.go +++ b/internal/idle/idle.go @@ -26,8 +26,6 @@ import ( "sync" "sync/atomic" "time" - - "google.golang.org/grpc/grpclog" ) // For overriding in unit tests. @@ -39,27 +37,12 @@ var timeAfterFunc = func(d time.Duration, f func()) *time.Timer { // and exit from idle mode. type Enforcer interface { ExitIdleMode() error - EnterIdleMode() error -} - -// Manager defines the functionality required to track RPC activity on a -// channel. -type Manager interface { - OnCallBegin() error - OnCallEnd() - Close() + EnterIdleMode() } -type noopManager struct{} - -func (noopManager) OnCallBegin() error { return nil } -func (noopManager) OnCallEnd() {} -func (noopManager) Close() {} - -// manager implements the Manager interface. It uses atomic operations to -// synchronize access to shared state and a mutex to guarantee mutual exclusion -// in a critical section. -type manager struct { +// Manager implements idleness detection and calls the configured Enforcer to +// enter/exit idle mode when appropriate. Must be created by NewManager. +type Manager struct { // State accessed atomically. lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed. activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there. @@ -69,8 +52,7 @@ type manager struct { // Can be accessed without atomics or mutex since these are set at creation // time and read-only after that. enforcer Enforcer // Functionality provided by grpc.ClientConn. - timeout int64 // Idle timeout duration nanos stored as an int64. - logger grpclog.LoggerV2 + timeout time.Duration // idleMu is used to guarantee mutual exclusion in two scenarios: // - Opposing intentions: @@ -88,57 +70,48 @@ type manager struct { timer *time.Timer } -// ManagerOptions is a collection of options used by -// NewManager. -type ManagerOptions struct { - Enforcer Enforcer - Timeout time.Duration - Logger grpclog.LoggerV2 +// NewManager creates a new idleness manager implementation for the +// given idle timeout. It begins in idle mode. +func NewManager(enforcer Enforcer, timeout time.Duration) *Manager { + return &Manager{ + enforcer: enforcer, + timeout: timeout, + actuallyIdle: true, + activeCallsCount: -math.MaxInt32, + } } -// NewManager creates a new idleness manager implementation for the -// given idle timeout. -func NewManager(opts ManagerOptions) Manager { - if opts.Timeout == 0 { - return noopManager{} +// resetIdleTimerLocked resets the idle timer to the given duration. Called +// when exiting idle mode or when the timer fires and we need to reset it. +func (m *Manager) resetIdleTimerLocked(d time.Duration) { + if m.isClosed() || m.timeout == 0 || m.actuallyIdle { + return } - m := &manager{ - enforcer: opts.Enforcer, - timeout: int64(opts.Timeout), - logger: opts.Logger, + // It is safe to ignore the return value from Reset() because this method is + // only ever called from the timer callback or when exiting idle mode. + if m.timer != nil { + m.timer.Stop() } - m.timer = timeAfterFunc(opts.Timeout, m.handleIdleTimeout) - return m + m.timer = timeAfterFunc(d, m.handleIdleTimeout) } -// resetIdleTimer resets the idle timer to the given duration. This method -// should only be called from the timer callback. -func (m *manager) resetIdleTimer(d time.Duration) { +func (m *Manager) resetIdleTimer(d time.Duration) { m.idleMu.Lock() defer m.idleMu.Unlock() - - if m.timer == nil { - // Only close sets timer to nil. We are done. - return - } - - // It is safe to ignore the return value from Reset() because this method is - // only ever called from the timer callback, which means the timer has - // already fired. - m.timer.Reset(d) + m.resetIdleTimerLocked(d) } // handleIdleTimeout is the timer callback that is invoked upon expiry of the // configured idle timeout. The channel is considered inactive if there are no // ongoing calls and no RPC activity since the last time the timer fired. -func (m *manager) handleIdleTimeout() { +func (m *Manager) handleIdleTimeout() { if m.isClosed() { return } if atomic.LoadInt32(&m.activeCallsCount) > 0 { - m.resetIdleTimer(time.Duration(m.timeout)) + m.resetIdleTimer(m.timeout) return } @@ -148,24 +121,12 @@ func (m *manager) handleIdleTimeout() { // Set the timer to fire after a duration of idle timeout, calculated // from the time the most recent RPC completed. atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0) - m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime) + m.timeout - time.Now().UnixNano())) + m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime)-time.Now().UnixNano()) + m.timeout) return } - // This CAS operation is extremely likely to succeed given that there has - // been no activity since the last time we were here. Setting the - // activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() that the - // channel is either in idle mode or is trying to get there. - if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { - // This CAS operation can fail if an RPC started after we checked for - // activity at the top of this method, or one was ongoing from before - // the last time we were here. In both case, reset the timer and return. - m.resetIdleTimer(time.Duration(m.timeout)) - return - } - - // Now that we've set the active calls count to -math.MaxInt32, it's time to - // actually move to idle mode. + // Now that we've checked that there has been no activity, attempt to enter + // idle mode, which is very likely to succeed. if m.tryEnterIdleMode() { // Successfully entered idle mode. No timer needed until we exit idle. return @@ -174,8 +135,7 @@ func (m *manager) handleIdleTimeout() { // Failed to enter idle mode due to a concurrent RPC that kept the channel // active, or because of an error from the channel. Undo the attempt to // enter idle, and reset the timer to try again later. - atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) - m.resetIdleTimer(time.Duration(m.timeout)) + m.resetIdleTimer(m.timeout) } // tryEnterIdleMode instructs the channel to enter idle mode. But before @@ -185,36 +145,49 @@ func (m *manager) handleIdleTimeout() { // Return value indicates whether or not the channel moved to idle mode. // // Holds idleMu which ensures mutual exclusion with exitIdleMode. -func (m *manager) tryEnterIdleMode() bool { +func (m *Manager) tryEnterIdleMode() bool { + // Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() + // that the channel is either in idle mode or is trying to get there. + if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { + // This CAS operation can fail if an RPC started after we checked for + // activity in the timer handler, or one was ongoing from before the + // last time the timer fired, or if a test is attempting to enter idle + // mode without checking. In all cases, abort going into idle mode. + return false + } + // N.B. if we fail to enter idle mode after this, we must re-add + // math.MaxInt32 to m.activeCallsCount. + m.idleMu.Lock() defer m.idleMu.Unlock() if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 { // We raced and lost to a new RPC. Very rare, but stop entering idle. + atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) return false } if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 { - // An very short RPC could have come in (and also finished) after we + // A very short RPC could have come in (and also finished) after we // checked for calls count and activity in handleIdleTimeout(), but // before the CAS operation. So, we need to check for activity again. + atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) return false } - // No new RPCs have come in since we last set the active calls count value - // -math.MaxInt32 in the timer callback. And since we have the lock, it is - // safe to enter idle mode now. - if err := m.enforcer.EnterIdleMode(); err != nil { - m.logger.Errorf("Failed to enter idle mode: %v", err) - return false - } - - // Successfully entered idle mode. + // No new RPCs have come in since we set the active calls count value to + // -math.MaxInt32. And since we have the lock, it is safe to enter idle mode + // unconditionally now. + m.enforcer.EnterIdleMode() m.actuallyIdle = true return true } +func (m *Manager) EnterIdleModeForTesting() { + m.tryEnterIdleMode() +} + // OnCallBegin is invoked at the start of every RPC. -func (m *manager) OnCallBegin() error { +func (m *Manager) OnCallBegin() error { if m.isClosed() { return nil } @@ -227,7 +200,7 @@ func (m *manager) OnCallBegin() error { // Channel is either in idle mode or is in the process of moving to idle // mode. Attempt to exit idle mode to allow this RPC. - if err := m.exitIdleMode(); err != nil { + if err := m.ExitIdleMode(); err != nil { // Undo the increment to calls count, and return an error causing the // RPC to fail. atomic.AddInt32(&m.activeCallsCount, -1) @@ -238,28 +211,30 @@ func (m *manager) OnCallBegin() error { return nil } -// exitIdleMode instructs the channel to exit idle mode. -// -// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. -func (m *manager) exitIdleMode() error { +// ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's +// internal state. +func (m *Manager) ExitIdleMode() error { + // Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. m.idleMu.Lock() defer m.idleMu.Unlock() - if !m.actuallyIdle { - // This can happen in two scenarios: + if m.isClosed() || !m.actuallyIdle { + // This can happen in three scenarios: // - handleIdleTimeout() set the calls count to -math.MaxInt32 and called // tryEnterIdleMode(). But before the latter could grab the lock, an RPC // came in and OnCallBegin() noticed that the calls count is negative. // - Channel is in idle mode, and multiple new RPCs come in at the same // time, all of them notice a negative calls count in OnCallBegin and get // here. The first one to get the lock would got the channel to exit idle. + // - Channel is not in idle mode, and the user calls Connect which calls + // m.ExitIdleMode. // - // Either way, nothing to do here. + // In any case, there is nothing to do here. return nil } if err := m.enforcer.ExitIdleMode(); err != nil { - return fmt.Errorf("channel failed to exit idle mode: %v", err) + return fmt.Errorf("failed to exit idle mode: %w", err) } // Undo the idle entry process. This also respects any new RPC attempts. @@ -267,12 +242,12 @@ func (m *manager) exitIdleMode() error { m.actuallyIdle = false // Start a new timer to fire after the configured idle timeout. - m.timer = timeAfterFunc(time.Duration(m.timeout), m.handleIdleTimeout) + m.resetIdleTimerLocked(m.timeout) return nil } // OnCallEnd is invoked at the end of every RPC. -func (m *manager) OnCallEnd() { +func (m *Manager) OnCallEnd() { if m.isClosed() { return } @@ -287,15 +262,17 @@ func (m *manager) OnCallEnd() { atomic.AddInt32(&m.activeCallsCount, -1) } -func (m *manager) isClosed() bool { +func (m *Manager) isClosed() bool { return atomic.LoadInt32(&m.closed) == 1 } -func (m *manager) Close() { +func (m *Manager) Close() { atomic.StoreInt32(&m.closed, 1) m.idleMu.Lock() - m.timer.Stop() - m.timer = nil + if m.timer != nil { + m.timer.Stop() + m.timer = nil + } m.idleMu.Unlock() } diff --git a/internal/idle/idle_e2e_test.go b/internal/idle/idle_e2e_test.go index d2cd9d3e3752..da98c09420dd 100644 --- a/internal/idle/idle_e2e_test.go +++ b/internal/idle/idle_e2e_test.go @@ -586,12 +586,8 @@ func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) { } defer cc.Close() - enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn) error) - enterIdleFunc := func() { - if err := enterIdle(cc); err != nil { - t.Errorf("Failed to enter idle mode: %v", err) - } - } + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + enterIdleFunc := func() { enterIdle(cc) } exitIdle := internal.ExitIdleModeForTesting.(func(*grpc.ClientConn) error) exitIdleFunc := func() { if err := exitIdle(cc); err != nil { diff --git a/internal/idle/idle_test.go b/internal/idle/idle_test.go index 22bde3ba1422..d0fc685d3908 100644 --- a/internal/idle/idle_test.go +++ b/internal/idle/idle_test.go @@ -26,7 +26,6 @@ import ( "testing" "time" - "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/grpctest" ) @@ -55,10 +54,8 @@ func (ti *testEnforcer) ExitIdleMode() error { } -func (ti *testEnforcer) EnterIdleMode() error { +func (ti *testEnforcer) EnterIdleMode() { ti.enterIdleCh <- struct{}{} - return nil - } func newTestEnforcer() *testEnforcer { @@ -91,7 +88,7 @@ func overrideNewTimer(t *testing.T) <-chan struct{} { // TestManager_Disabled tests the case where the idleness manager is // disabled by passing an idle_timeout of 0. Verifies the following things: // - timer callback does not fire -// - an RPC does not trigger a call to ExitIdleMode on the ClientConn +// - an RPC triggers a call to ExitIdleMode on the ClientConn // - more calls to RPC termination (as compared to RPC initiation) does not // result in an error log func (s) TestManager_Disabled(t *testing.T) { @@ -100,7 +97,7 @@ func (s) TestManager_Disabled(t *testing.T) { // Create an idleness manager that is disabled because of idleTimeout being // set to `0`. enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(0), Logger: grpclog.Component("test")}) + mgr := NewManager(enforcer, time.Duration(0)) // Ensure that the timer callback does not fire within a short deadline. select { @@ -109,13 +106,13 @@ func (s) TestManager_Disabled(t *testing.T) { case <-time.After(defaultTestShortTimeout): } - // The first invocation of OnCallBegin() would lead to a call to - // ExitIdleMode() on the enforcer, unless the idleness manager is disabled. - mgr.OnCallBegin() + // The first invocation of OnCallBegin() should lead to a call to + // ExitIdleMode() on the enforcer. + go mgr.OnCallBegin() select { case <-enforcer.exitIdleCh: - t.Fatalf("ExitIdleMode() called on enforcer when manager is disabled") case <-time.After(defaultTestShortTimeout): + t.Fatal("Timeout waiting for channel to move out of idle mode") } // If the number of calls to OnCallEnd() exceeds the number of calls to @@ -137,8 +134,9 @@ func (s) TestManager_Enabled_TimerFires(t *testing.T) { callbackCh := overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout), Logger: grpclog.Component("test")}) + mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout)) defer mgr.Close() + mgr.ExitIdleMode() // Ensure that the timer callback fires within a appropriate amount of time. select { @@ -162,8 +160,9 @@ func (s) TestManager_Enabled_OngoingCall(t *testing.T) { callbackCh := overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout), Logger: grpclog.Component("test")}) + mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout)) defer mgr.Close() + mgr.ExitIdleMode() // Fire up a goroutine that simulates an ongoing RPC that is terminated // after the timer callback fires for the first time. @@ -207,8 +206,9 @@ func (s) TestManager_Enabled_ActiveSinceLastCheck(t *testing.T) { callbackCh := overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout), Logger: grpclog.Component("test")}) + mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout)) defer mgr.Close() + mgr.ExitIdleMode() // Fire up a goroutine that simulates unary RPCs until the timer callback // fires. @@ -233,6 +233,7 @@ func (s) TestManager_Enabled_ActiveSinceLastCheck(t *testing.T) { case <-callbackCh: close(timerFired) case <-time.After(2 * defaultTestIdleTimeout): + close(timerFired) t.Fatal("Timeout waiting for idle timer callback to fire") } select { @@ -257,9 +258,11 @@ func (s) TestManager_Enabled_ExitIdleOnRPC(t *testing.T) { overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout), Logger: grpclog.Component("test")}) + mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout)) defer mgr.Close() + mgr.ExitIdleMode() + <-enforcer.exitIdleCh // Ensure that the channel moves to idle since there are no RPCs. select { case <-enforcer.enterIdleCh: @@ -297,7 +300,7 @@ func (s) TestManager_Enabled_ExitIdleOnRPC(t *testing.T) { type racyState int32 const ( - stateInital racyState = iota + stateInitial racyState = iota stateEnteredIdle stateExitedIdle stateActiveRPCs @@ -306,12 +309,22 @@ const ( // racyIdlnessEnforcer is a test idleness enforcer used specifically to test the // race between idle timeout and incoming RPCs. type racyEnforcer struct { - state *racyState // Accessed atomically. + t *testing.T + state *racyState // Accessed atomically. + started bool } // ExitIdleMode sets the internal state to stateExitedIdle. We should only ever // exit idle when we are currently in idle. func (ri *racyEnforcer) ExitIdleMode() error { + // Set only on the initial ExitIdleMode + if ri.started == false { + if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInitial), int32(stateInitial)) { + return fmt.Errorf("idleness enforcer's first ExitIdleMode after EnterIdleMode") + } + ri.started = true + return nil + } if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateEnteredIdle), int32(stateExitedIdle)) { return fmt.Errorf("idleness enforcer asked to exit idle when it did not enter idle earlier") } @@ -319,46 +332,44 @@ func (ri *racyEnforcer) ExitIdleMode() error { } // EnterIdleMode attempts to set the internal state to stateEnteredIdle. We should only ever enter idle before RPCs start. -func (ri *racyEnforcer) EnterIdleMode() error { - if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInital), int32(stateEnteredIdle)) { - return fmt.Errorf("idleness enforcer asked to enter idle after rpcs started") +func (ri *racyEnforcer) EnterIdleMode() { + if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInitial), int32(stateEnteredIdle)) { + ri.t.Errorf("idleness enforcer asked to enter idle after rpcs started") } - return nil } -// TestManager_IdleTimeoutRacesWithOnCallBegin tests the case where -// firing of the idle timeout races with an incoming RPC. The test verifies that -// if the timer callback win the race and puts the channel in idle, the RPCs can -// kick it out of idle. And if the RPCs win the race and keep the channel -// active, then the timer callback should not attempt to put the channel in idle -// mode. +// TestManager_IdleTimeoutRacesWithOnCallBegin tests the case where firing of +// the idle timeout races with an incoming RPC. The test verifies that if the +// timer callback wins the race and puts the channel in idle, the RPCs can kick +// it out of idle. And if the RPCs win the race and keep the channel active, +// then the timer callback should not attempt to put the channel in idle mode. func (s) TestManager_IdleTimeoutRacesWithOnCallBegin(t *testing.T) { // Run multiple iterations to simulate different possibilities. for i := 0; i < 20; i++ { t.Run(fmt.Sprintf("iteration=%d", i), func(t *testing.T) { var idlenessState racyState - enforcer := &racyEnforcer{state: &idlenessState} + enforcer := &racyEnforcer{t: t, state: &idlenessState} // Configure a large idle timeout so that we can control the // race between the timer callback and RPCs. - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(10 * time.Minute), Logger: grpclog.Component("test")}) + mgr := NewManager(enforcer, time.Duration(10*time.Minute)) defer mgr.Close() + mgr.ExitIdleMode() var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - m := mgr.(interface{ handleIdleTimeout() }) - <-time.After(defaultTestIdleTimeout / 10) - m.handleIdleTimeout() + <-time.After(defaultTestIdleTimeout / 50) + mgr.handleIdleTimeout() }() - for j := 0; j < 100; j++ { + for j := 0; j < 5; j++ { wg.Add(1) go func() { defer wg.Done() // Wait for the configured idle timeout and simulate an RPC to // race with the idle timeout timer callback. - <-time.After(defaultTestIdleTimeout / 10) + <-time.After(defaultTestIdleTimeout / 50) if err := mgr.OnCallBegin(); err != nil { t.Errorf("OnCallBegin() failed: %v", err) } diff --git a/internal/internal.go b/internal/internal.go index f28791b89b0c..2549fe8e3b88 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -182,10 +182,12 @@ var ( GRPCResolverSchemeExtraMetadata string = "xds" // EnterIdleModeForTesting gets the ClientConn to enter IDLE mode. - EnterIdleModeForTesting any // func(*grpc.ClientConn) error + EnterIdleModeForTesting any // func(*grpc.ClientConn) // ExitIdleModeForTesting gets the ClientConn to exit IDLE mode. ExitIdleModeForTesting any // func(*grpc.ClientConn) error + + ChannelzTurnOffForTesting func() ) // HealthChecker defines the signature of the client-side LB channel health checking function. diff --git a/picker_wrapper.go b/picker_wrapper.go index 236837f4157c..bf56faa76d3d 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -37,7 +37,6 @@ import ( type pickerWrapper struct { mu sync.Mutex done bool - idle bool blockingCh chan struct{} picker balancer.Picker statsHandlers []stats.Handler // to record blocking picker calls @@ -53,11 +52,7 @@ func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper { // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. func (pw *pickerWrapper) updatePicker(p balancer.Picker) { pw.mu.Lock() - if pw.done || pw.idle { - // There is a small window where a picker update from the LB policy can - // race with the channel going to idle mode. If the picker is idle here, - // it is because the channel asked it to do so, and therefore it is sage - // to ignore the update from the LB policy. + if pw.done { pw.mu.Unlock() return } @@ -210,23 +205,15 @@ func (pw *pickerWrapper) close() { close(pw.blockingCh) } -func (pw *pickerWrapper) enterIdleMode() { - pw.mu.Lock() - defer pw.mu.Unlock() - if pw.done { - return - } - pw.idle = true -} - -func (pw *pickerWrapper) exitIdleMode() { +// reset clears the pickerWrapper and prepares it for being used again when idle +// mode is exited. +func (pw *pickerWrapper) reset() { pw.mu.Lock() defer pw.mu.Unlock() if pw.done { return } pw.blockingCh = make(chan struct{}) - pw.idle = false } // dropError is a wrapper error that indicates the LB policy wishes to drop the diff --git a/resolver_balancer_ext_test.go b/resolver_balancer_ext_test.go new file mode 100644 index 000000000000..aebc6652fa4b --- /dev/null +++ b/resolver_balancer_ext_test.go @@ -0,0 +1,265 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc_test + +import ( + "context" + "errors" + "fmt" + "runtime" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" +) + +// TestResolverBalancerInteraction tests: +// 1. resolver.Builder.Build() -> +// 2. resolver.ClientConn.UpdateState() -> +// 3. balancer.Balancer.UpdateClientConnState() -> +// 4. balancer.ClientConn.ResolveNow() -> +// 5. resolver.Resolver.ResolveNow() -> +func (s) TestResolverBalancerInteraction(t *testing.T) { + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) + fmt.Println(name) + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + bd.ClientConn.ResolveNow(resolver.ResolveNowOptions{}) + return nil + }, + } + stub.Register(name, bf) + + rb := manual.NewBuilderWithScheme(name) + rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + sc := cc.ParseServiceConfig(`{"loadBalancingConfig": [{"` + name + `":{}}]}`) + cc.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: "test"}}, + ServiceConfig: sc, + }) + } + rnCh := make(chan struct{}) + rb.ResolveNowCallback = func(resolver.ResolveNowOptions) { close(rnCh) } + resolver.Register(rb) + + cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + select { + case <-rnCh: + case <-time.After(defaultTestTimeout): + t.Fatalf("timed out waiting for resolver.ResolveNow") + } +} + +type resolverBuilderWithErr struct { + resolver.Resolver + errCh <-chan error + scheme string +} + +func (b *resolverBuilderWithErr) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { + if err := <-b.errCh; err != nil { + return nil, err + } + return b, nil +} + +func (b *resolverBuilderWithErr) Scheme() string { + return b.scheme +} + +func (b *resolverBuilderWithErr) Close() {} + +// TestResolverBuildFailure tests: +// 1. resolver.Builder.Build() passes. +// 2. Channel enters idle mode. +// 3. An RPC happens. +// 4. resolver.Builder.Build() fails. +func (s) TestResolverBuildFailure(t *testing.T) { + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) + resErrCh := make(chan error, 1) + resolver.Register(&resolverBuilderWithErr{errCh: resErrCh, scheme: name}) + + resErrCh <- nil + cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + enterIdle(cc) + const errStr = "test error from resolver builder" + t.Log("pushing res err") + resErrCh <- errors.New(errStr) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := cc.Invoke(ctx, "/a/b", nil, nil); err == nil || !strings.Contains(err.Error(), errStr) { + t.Fatalf("Invoke = %v; want %v", err, errStr) + } +} + +// TestEnterIdleDuringResolverUpdateState tests a scenario that used to deadlock +// while calling UpdateState at the same time as the resolver being closed while +// the channel enters idle mode. +func (s) TestEnterIdleDuringResolverUpdateState(t *testing.T) { + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) + + // Create a manual resolver that spams UpdateState calls until it is closed. + rb := manual.NewBuilderWithScheme(name) + var cancel context.CancelFunc + rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + var ctx context.Context + ctx, cancel = context.WithCancel(context.Background()) + go func() { + for ctx.Err() == nil { + cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}}) + } + }() + } + rb.CloseCallback = func() { + cancel() + } + resolver.Register(rb) + + cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + + // Enter/exit idle mode repeatedly. + for i := 0; i < 2000; i++ { + // Start a timer so we panic out of the deadlock and can see all the + // stack traces to debug the problem. + p := time.AfterFunc(time.Second, func() { + buf := make([]byte, 8192) + buf = buf[0:runtime.Stack(buf, true)] + t.Error("Timed out waiting for enterIdle") + panic(fmt.Sprint("Stack trace:\n", string(buf))) + }) + enterIdle(cc) + p.Stop() + cc.Connect() + } +} + +// TestEnterIdleDuringBalancerUpdateState tests calling UpdateState at the same +// time as the balancer being closed while the channel enters idle mode. +func (s) TestEnterIdleDuringBalancerUpdateState(t *testing.T) { + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) + + // Create a balancer that calls UpdateState once asynchronously, attempting + // to make the channel appear ready even after entering idle. + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + go func() { + bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready}) + }() + return nil + }, + } + stub.Register(name, bf) + + rb := manual.NewBuilderWithScheme(name) + rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}}) + } + resolver.Register(rb) + + cc, err := grpc.Dial( + name+":///", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`)) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + + // Enter/exit idle mode repeatedly. + for i := 0; i < 2000; i++ { + enterIdle(cc) + if got, want := cc.GetState(), connectivity.Idle; got != want { + t.Fatalf("cc state = %v; want %v", got, want) + } + cc.Connect() + } +} + +// TestEnterIdleDuringBalancerNewSubConn tests calling NewSubConn at the same +// time as the balancer being closed while the channel enters idle mode. +func (s) TestEnterIdleDuringBalancerNewSubConn(t *testing.T) { + channelz.TurnOn() + defer internal.ChannelzTurnOffForTesting() + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) + + // Create a balancer that calls NewSubConn once asynchronously, attempting + // to create a subchannel after going idle. + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + go func() { + bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "test"}}, balancer.NewSubConnOptions{}) + }() + return nil + }, + } + stub.Register(name, bf) + + rb := manual.NewBuilderWithScheme(name) + rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}}) + } + resolver.Register(rb) + + cc, err := grpc.Dial( + name+":///", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`)) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + + // Enter/exit idle mode repeatedly. + for i := 0; i < 2000; i++ { + enterIdle(cc) + tcs, _ := channelz.GetTopChannels(0, 0) + if len(tcs) != 1 { + t.Fatalf("Found channels: %v; expected 1 entry", tcs) + } + if len(tcs[0].SubChans) != 0 { + t.Fatalf("Found subchannels: %v; expected 0 entries", tcs[0].SubChans) + } + cc.Connect() + } +} diff --git a/resolver_wrapper.go b/resolver_wrapper.go index 2a564190e040..c79bab12149f 100644 --- a/resolver_wrapper.go +++ b/resolver_wrapper.go @@ -23,7 +23,6 @@ import ( "strings" "sync" - "google.golang.org/grpc/balancer" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" @@ -31,119 +30,97 @@ import ( "google.golang.org/grpc/serviceconfig" ) -// resolverStateUpdater wraps the single method used by ccResolverWrapper to -// report a state update from the actual resolver implementation. -type resolverStateUpdater interface { - updateResolverState(s resolver.State, err error) error -} - // ccResolverWrapper is a wrapper on top of cc for resolvers. // It implements resolver.ClientConn interface. type ccResolverWrapper struct { // The following fields are initialized when the wrapper is created and are // read-only afterwards, and therefore can be accessed without a mutex. - cc resolverStateUpdater - channelzID *channelz.Identifier + cc *ClientConn ignoreServiceConfig bool - opts ccResolverWrapperOpts - serializer *grpcsync.CallbackSerializer // To serialize all incoming calls. - serializerCancel context.CancelFunc // To close the serializer, accessed only from close(). - - // All incoming (resolver --> gRPC) calls are guaranteed to execute in a - // mutually exclusive manner as they are scheduled on the serializer. - // Fields accessed *only* in these serializer callbacks, can therefore be - // accessed without a mutex. - curState resolver.State + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc + + resolver resolver.Resolver // only accessed within the serializer - // mu guards access to the below fields. + // The following fields are protected by mu. Caller must take cc.mu before + // taking mu. mu sync.Mutex + curState resolver.State closed bool - resolver resolver.Resolver // Accessed only from outgoing calls. -} - -// ccResolverWrapperOpts wraps the arguments to be passed when creating a new -// ccResolverWrapper. -type ccResolverWrapperOpts struct { - target resolver.Target // User specified dial target to resolve. - builder resolver.Builder // Resolver builder to use. - bOpts resolver.BuildOptions // Resolver build options to use. - channelzID *channelz.Identifier // Channelz identifier for the channel. } -// newCCResolverWrapper uses the resolver.Builder to build a Resolver and -// returns a ccResolverWrapper object which wraps the newly built resolver. -func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) (*ccResolverWrapper, error) { - ctx, cancel := context.WithCancel(context.Background()) - ccr := &ccResolverWrapper{ +// newCCResolverWrapper initializes the ccResolverWrapper. It can only be used +// after calling start, which builds the resolver. +func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper { + ctx, cancel := context.WithCancel(cc.ctx) + return &ccResolverWrapper{ cc: cc, - channelzID: opts.channelzID, - ignoreServiceConfig: opts.bOpts.DisableServiceConfig, - opts: opts, + ignoreServiceConfig: cc.dopts.disableServiceConfig, serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, } +} - // Cannot hold the lock at build time because the resolver can send an - // update or error inline and these incoming calls grab the lock to schedule - // a callback in the serializer. - r, err := opts.builder.Build(opts.target, ccr, opts.bOpts) - if err != nil { - cancel() - return nil, err - } - - // Any error reported by the resolver at build time that leads to a - // re-resolution request from the balancer is dropped by grpc until we - // return from this function. So, we don't have to handle pending resolveNow - // requests here. - ccr.mu.Lock() - ccr.resolver = r - ccr.mu.Unlock() - - return ccr, nil +// start builds the name resolver using the resolver.Builder in cc and returns +// any error encountered. It must always be the first operation performed on +// any newly created ccResolverWrapper, except that close may be called instead. +func (ccr *ccResolverWrapper) start() error { + errCh := make(chan error) + ccr.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil { + return + } + opts := resolver.BuildOptions{ + DisableServiceConfig: ccr.cc.dopts.disableServiceConfig, + DialCreds: ccr.cc.dopts.copts.TransportCredentials, + CredsBundle: ccr.cc.dopts.copts.CredsBundle, + Dialer: ccr.cc.dopts.copts.Dialer, + } + var err error + ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts) + errCh <- err + }) + return <-errCh } func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) { - ccr.mu.Lock() - defer ccr.mu.Unlock() - - // ccr.resolver field is set only after the call to Build() returns. But in - // the process of building, the resolver may send an error update which when - // propagated to the balancer may result in a re-resolution request. - if ccr.closed || ccr.resolver == nil { - return - } - ccr.resolver.ResolveNow(o) + ccr.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil || ccr.resolver == nil { + return + } + ccr.resolver.ResolveNow(o) + }) } +// close initiates async shutdown of the wrapper. To determine the wrapper has +// finished shutting down, the channel should block on ccr.serializer.Done() +// without cc.mu held. func (ccr *ccResolverWrapper) close() { + channelz.Info(logger, ccr.cc.channelzID, "Closing the name resolver") ccr.mu.Lock() - if ccr.closed { - ccr.mu.Unlock() - return - } - - channelz.Info(logger, ccr.channelzID, "Closing the name resolver") - - // Close the serializer to ensure that no more calls from the resolver are - // handled, before actually closing the resolver. - ccr.serializerCancel() ccr.closed = true - r := ccr.resolver ccr.mu.Unlock() - // Give enqueued callbacks a chance to finish. - <-ccr.serializer.Done() - - // Spawn a goroutine to close the resolver (since it may block trying to - // cleanup all allocated resources) and return early. - go r.Close() + ccr.serializer.Schedule(func(context.Context) { + if ccr.resolver == nil { + return + } + ccr.resolver.Close() + ccr.resolver = nil + }) + ccr.serializerCancel() } // UpdateState is called by resolver implementations to report new state to gRPC // which includes addresses and service config. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { - errCh := make(chan error, 1) + ccr.cc.mu.Lock() + ccr.mu.Lock() + if ccr.closed { + ccr.mu.Unlock() + ccr.cc.mu.Unlock() + return nil + } if s.Endpoints == nil { s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses)) for _, a := range s.Addresses { @@ -152,41 +129,42 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { s.Endpoints = append(s.Endpoints, ep) } } - ok := ccr.serializer.Schedule(func(context.Context) { - ccr.addChannelzTraceEvent(s) - ccr.curState = s - if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState { - errCh <- balancer.ErrBadResolverState - return - } - errCh <- nil - }) - if !ok { - // The only time when Schedule() fail to add the callback to the - // serializer is when the serializer is closed, and this happens only - // when the resolver wrapper is closed. - return nil - } - return <-errCh + ccr.addChannelzTraceEvent(s) + ccr.curState = s + ccr.mu.Unlock() + return ccr.cc.updateResolverStateAndUnlock(s, nil) } // ReportError is called by resolver implementations to report errors // encountered during name resolution to gRPC. func (ccr *ccResolverWrapper) ReportError(err error) { - ccr.serializer.Schedule(func(_ context.Context) { - channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) - ccr.cc.updateResolverState(resolver.State{}, err) - }) + ccr.cc.mu.Lock() + ccr.mu.Lock() + if ccr.closed { + ccr.mu.Unlock() + ccr.cc.mu.Unlock() + return + } + ccr.mu.Unlock() + channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) + ccr.cc.updateResolverStateAndUnlock(resolver.State{}, err) } // NewAddress is called by the resolver implementation to send addresses to // gRPC. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { - ccr.serializer.Schedule(func(_ context.Context) { - ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}) - ccr.curState.Addresses = addrs - ccr.cc.updateResolverState(ccr.curState, nil) - }) + ccr.cc.mu.Lock() + ccr.mu.Lock() + if ccr.closed { + ccr.mu.Unlock() + ccr.cc.mu.Unlock() + return + } + s := resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig} + ccr.addChannelzTraceEvent(s) + ccr.curState = s + ccr.mu.Unlock() + ccr.cc.updateResolverStateAndUnlock(s, nil) } // ParseServiceConfig is called by resolver implementations to parse a JSON @@ -215,5 +193,5 @@ func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) { } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 { updates = append(updates, "resolver returned new addresses") } - channelz.Infof(logger, ccr.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; ")) + channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; ")) }