Skip to content

Commit

Permalink
xds: add ConfigSelector to support RouteAction timeouts (#3991)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Nov 17, 2020
1 parent 20636e7 commit b88744b
Show file tree
Hide file tree
Showing 11 changed files with 774 additions and 96 deletions.
26 changes: 26 additions & 0 deletions benchmark/primitives/primitives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,29 @@ func runStructTypeAssertion(b *testing.B, fer interface{}) {
b.Fatal("error")
}
}

func BenchmarkWaitGroupAddDone(b *testing.B) {
wg := sync.WaitGroup{}
b.RunParallel(func(pb *testing.PB) {
i := 0
for ; pb.Next(); i++ {
wg.Add(1)
}
for ; i > 0; i-- {
wg.Done()
}
})
}

func BenchmarkRLockUnlock(b *testing.B) {
mu := sync.RWMutex{}
b.RunParallel(func(pb *testing.PB) {
i := 0
for ; pb.Next(); i++ {
mu.RLock()
}
for ; i > 0; i-- {
mu.RUnlock()
}
})
}
114 changes: 114 additions & 0 deletions benchmark/primitives/safe_config_selector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Benchmark options for safe config selector type.

package primitives_test

import (
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"
)

type safeUpdaterAtomicAndCounter struct {
ptr unsafe.Pointer // *countingFunc
}

type countingFunc struct {
mu sync.RWMutex
f func()
}

func (s *safeUpdaterAtomicAndCounter) call() {
cfPtr := atomic.LoadPointer(&s.ptr)
var cf *countingFunc
for {
cf = (*countingFunc)(cfPtr)
cf.mu.RLock()
cfPtr2 := atomic.LoadPointer(&s.ptr)
if cfPtr == cfPtr2 {
// Use cf with confidence!
break
}
// cf changed; try to use the new one instead, because the old one is
// no longer valid to use.
cf.mu.RUnlock()
cfPtr = cfPtr2
}
defer cf.mu.RUnlock()
cf.f()
}

func (s *safeUpdaterAtomicAndCounter) update(f func()) {
newCF := &countingFunc{f: f}
oldCFPtr := atomic.SwapPointer(&s.ptr, unsafe.Pointer(newCF))
if oldCFPtr == nil {
return
}
(*countingFunc)(oldCFPtr).mu.Lock()
(*countingFunc)(oldCFPtr).mu.Unlock() //lint:ignore SA2001 necessary to unlock after locking to unblock any RLocks
}

type safeUpdaterRWMutex struct {
mu sync.RWMutex
f func()
}

func (s *safeUpdaterRWMutex) call() {
s.mu.RLock()
defer s.mu.RUnlock()
s.f()
}

func (s *safeUpdaterRWMutex) update(f func()) {
s.mu.Lock()
defer s.mu.Unlock()
s.f = f
}

type updater interface {
call()
update(f func())
}

func benchmarkSafeUpdater(b *testing.B, u updater) {
t := time.NewTicker(time.Second)
go func() {
for range t.C {
u.update(func() {})
}
}()
b.RunParallel(func(pb *testing.PB) {
u.update(func() {})
for pb.Next() {
u.call()
}
})
t.Stop()
}

func BenchmarkSafeUpdaterAtomicAndCounter(b *testing.B) {
benchmarkSafeUpdater(b, &safeUpdaterAtomicAndCounter{})
}

func BenchmarkSafeUpdaterRWMutex(b *testing.B) {
benchmarkSafeUpdater(b, &safeUpdaterRWMutex{})
}
65 changes: 49 additions & 16 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -104,6 +105,17 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}

type defaultConfigSelector struct {
sc *ServiceConfig
}

func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) *iresolver.RPCConfig {
return &iresolver.RPCConfig{
Context: rpcInfo.Context,
MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
}
}

// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
Expand Down Expand Up @@ -224,6 +236,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
scSet = true
}
default:
Expand Down Expand Up @@ -273,6 +286,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
}
case <-ctx.Done():
return nil, ctx.Err()
Expand Down Expand Up @@ -479,6 +493,8 @@ type ClientConn struct {
balancerBuildOpts balancer.BuildOptions
blockingpicker *pickerWrapper

safeConfigSelector iresolver.SafeConfigSelector

mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc *ServiceConfig
Expand Down Expand Up @@ -539,6 +555,7 @@ func (cc *ClientConn) scWatcher() {
// TODO: load balance policy runtime change is ignored.
// We may revisit this decision in the future.
cc.sc = &sc
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
cc.mu.Unlock()
case <-cc.ctx.Done():
return
Expand Down Expand Up @@ -577,13 +594,13 @@ func init() {

func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
if cc.sc != nil {
cc.applyServiceConfigAndBalancer(cc.sc, addrs)
cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)
return
}
if cc.dopts.defaultServiceConfig != nil {
cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
} else {
cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
}
}

Expand Down Expand Up @@ -620,7 +637,15 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
// default, per the error handling design?
} else {
if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
cc.applyServiceConfigAndBalancer(sc, s.Addresses)
configSelector := iresolver.GetConfigSelector(s)
if configSelector != nil {
if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
}
} else {
configSelector = &defaultConfigSelector{sc}
}
cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
} else {
ret = balancer.ErrBadResolverState
if cc.balancerWrapper == nil {
Expand All @@ -630,6 +655,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
} else {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
}
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
cc.mu.Unlock()
Expand Down Expand Up @@ -864,6 +890,20 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
return curAddrFound
}

func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
if sc == nil {
return MethodConfig{}
}
if m, ok := sc.Methods[method]; ok {
return m
}
i := strings.LastIndex(method, "/")
if m, ok := sc.Methods[method[:i+1]]; ok {
return m
}
return sc.Methods[""]
}

// GetMethodConfig gets the method config of the input method.
// If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
Expand All @@ -876,17 +916,7 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
// TODO: Avoid the locking here.
cc.mu.RLock()
defer cc.mu.RUnlock()
if cc.sc == nil {
return MethodConfig{}
}
if m, ok := cc.sc.Methods[method]; ok {
return m
}
i := strings.LastIndex(method, "/")
if m, ok := cc.sc.Methods[method[:i+1]]; ok {
return m
}
return cc.sc.Methods[""]
return getMethodConfig(cc.sc, method)
}

func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
Expand All @@ -909,12 +939,15 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st
return t, done, nil
}

func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
if sc == nil {
// should never reach here.
return
}
cc.sc = sc
if configSelector != nil {
cc.safeConfigSelector.UpdateConfigSelector(configSelector)
}

if cc.sc.retryThrottling != nil {
newThrottler := &retryThrottler{
Expand Down
93 changes: 93 additions & 0 deletions internal/resolver/config_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package resolver provides internal resolver-related functionality.
package resolver

import (
"context"
"sync"

"google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
)

// ConfigSelector controls what configuration to use for every RPC.
type ConfigSelector interface {
// Selects the configuration for the RPC.
SelectConfig(RPCInfo) *RPCConfig
}

// RPCInfo contains RPC information needed by a ConfigSelector.
type RPCInfo struct {
// Context is the user's context for the RPC and contains headers and
// application timeout. It is passed for interception purposes and for
// efficiency reasons. SelectConfig should not be blocking.
Context context.Context
Method string // i.e. "/Service/Method"
}

// RPCConfig describes the configuration to use for each RPC.
type RPCConfig struct {
// The context to use for the remainder of the RPC; can pass info to LB
// policy or affect timeout or metadata.
Context context.Context
MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC
OnCommitted func() // Called when the RPC has been committed (retries no longer possible)
}

type csKeyType string

const csKey = csKeyType("grpc.internal.resolver.configSelector")

// SetConfigSelector sets the config selector in state and returns the new
// state.
func SetConfigSelector(state resolver.State, cs ConfigSelector) resolver.State {
state.Attributes = state.Attributes.WithValues(csKey, cs)
return state
}

// GetConfigSelector retrieves the config selector from state, if present, and
// returns it or nil if absent.
func GetConfigSelector(state resolver.State) ConfigSelector {
cs, _ := state.Attributes.Value(csKey).(ConfigSelector)
return cs
}

// SafeConfigSelector allows for safe switching of ConfigSelector
// implementations such that previous values are guaranteed to not be in use
// when UpdateConfigSelector returns.
type SafeConfigSelector struct {
mu sync.RWMutex
cs ConfigSelector
}

// UpdateConfigSelector swaps to the provided ConfigSelector and blocks until
// all uses of the previous ConfigSelector have completed.
func (scs *SafeConfigSelector) UpdateConfigSelector(cs ConfigSelector) {
scs.mu.Lock()
defer scs.mu.Unlock()
scs.cs = cs
}

// SelectConfig defers to the current ConfigSelector in scs.
func (scs *SafeConfigSelector) SelectConfig(r RPCInfo) *RPCConfig {
scs.mu.RLock()
defer scs.mu.RUnlock()
return scs.cs.SelectConfig(r)
}
Loading

0 comments on commit b88744b

Please sign in to comment.