Skip to content

Commit

Permalink
kube-aggregator: correctly use client-go TLS cache with custom dialer
Browse files Browse the repository at this point in the history
Signed-off-by: Monis Khan <mok@microsoft.com>

Kubernetes-commit: fddf859d6a55ce12c78214f11d5354b1213f9a66
  • Loading branch information
enj authored and k8s-publishing-bot committed Apr 12, 2023
1 parent 64c4915 commit f26cf84
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 205 deletions.
31 changes: 19 additions & 12 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/version"
"k8s.io/client-go/transport"
openapicommon "k8s.io/kube-openapi/pkg/common"

"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand Down Expand Up @@ -129,7 +130,7 @@ type APIAggregator struct {

// proxyCurrentCertKeyContent holds he client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
proxyCurrentCertKeyContent certKeyFunc
proxyTransport *http.Transport
proxyTransportDial *transport.DialHolder

// proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name
proxyHandlers map[string]*proxyHandler
Expand Down Expand Up @@ -160,10 +161,6 @@ type APIAggregator struct {
// when discovery with resources are requested
discoveryAggregationController DiscoveryAggregationController

// egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector

// rejectForwardingRedirects is whether to allow to forward redirect response
rejectForwardingRedirects bool
}
Expand Down Expand Up @@ -210,18 +207,30 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return nil, err
}

var proxyTransportDial *transport.DialHolder
if c.GenericConfig.EgressSelector != nil {
egressDialer, err := c.GenericConfig.EgressSelector.Lookup(egressselector.Cluster.AsNetworkContext())
if err != nil {
return nil, err
}
if egressDialer != nil {
proxyTransportDial = &transport.DialHolder{Dial: egressDialer}
}
} else if c.ExtraConfig.ProxyTransport != nil && c.ExtraConfig.ProxyTransport.DialContext != nil {
proxyTransportDial = &transport.DialHolder{Dial: c.ExtraConfig.ProxyTransport.DialContext}
}

s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
proxyTransport: c.ExtraConfig.ProxyTransport,
proxyTransportDial: proxyTransportDial,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: c.GenericConfig.OpenAPIConfig,
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
}
Expand Down Expand Up @@ -295,10 +304,9 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
apiregistrationClient.ApiregistrationV1(),
c.ExtraConfig.ProxyTransport,
proxyTransportDial,
(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),
s.serviceResolver,
c.GenericConfig.EgressSelector,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -463,7 +471,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
// Forward calls to discovery manager to update discovery document
if s.discoveryAggregationController != nil {
handlerCopy := *proxyHandler
handlerCopy.setServiceAvailable(true)
handlerCopy.setServiceAvailable()
s.discoveryAggregationController.AddAPIService(apiService, &handlerCopy)
}
return nil
Expand All @@ -479,9 +487,8 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
proxyHandler := &proxyHandler{
localDelegate: s.delegateHandler,
proxyCurrentCertKeyContent: s.proxyCurrentCertKeyContent,
proxyTransport: s.proxyTransport,
proxyTransportDial: s.proxyTransportDial,
serviceResolver: s.serviceResolver,
egressSelector: s.egressSelector,
rejectForwardingRedirects: s.rejectForwardingRedirects,
}
proxyHandler.updateAPIService(apiService)
Expand Down
37 changes: 10 additions & 27 deletions pkg/apiserver/handler_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/egressselector"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/x509metrics"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport"
"k8s.io/klog/v2"
apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
Expand All @@ -59,17 +57,13 @@ type proxyHandler struct {

// proxyCurrentCertKeyContent holds the client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
proxyCurrentCertKeyContent certKeyFunc
proxyTransport *http.Transport
proxyTransportDial *transport.DialHolder

// Endpoints based routing to map from cluster IP to routable IP
serviceResolver ServiceResolver

handlingInfo atomic.Value

// egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector

// reject to forward redirect response
rejectForwardingRedirects bool
}
Expand All @@ -80,8 +74,8 @@ type proxyHandlingInfo struct {

// name is the name of the APIService
name string
// restConfig holds the information for building a roundtripper
restConfig *restclient.Config
// transportConfig holds the information for building a roundtripper
transportConfig *transport.Config
// transportBuildingError is an error produced while building the transport. If this
// is non-nil, it will be reported to clients.
transportBuildingError error
Expand Down Expand Up @@ -233,7 +227,7 @@ func (r *responder) Error(_ http.ResponseWriter, _ *http.Request, err error) {

// Sets serviceAvailable value on proxyHandler
// not thread safe
func (r *proxyHandler) setServiceAvailable(value bool) {
func (r *proxyHandler) setServiceAvailable() {
info := r.handlingInfo.Load().(proxyHandlingInfo)
info.serviceAvailable = true
r.handlingInfo.Store(info)
Expand All @@ -247,41 +241,30 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIServ

proxyClientCert, proxyClientKey := r.proxyCurrentCertKeyContent()

clientConfig := &restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
transportConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: apiService.Spec.InsecureSkipTLSVerify,
ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
CertData: proxyClientCert,
KeyData: proxyClientKey,
CAData: apiService.Spec.CABundle,
},
DialHolder: r.proxyTransportDial,
}
clientConfig.Wrap(x509metrics.NewDeprecatedCertificateRoundTripperWrapperConstructor(
transportConfig.Wrap(x509metrics.NewDeprecatedCertificateRoundTripperWrapperConstructor(
x509MissingSANCounter,
x509InsecureSHA1Counter,
))

newInfo := proxyHandlingInfo{
name: apiService.Name,
restConfig: clientConfig,
transportConfig: transportConfig,
serviceName: apiService.Spec.Service.Name,
serviceNamespace: apiService.Spec.Service.Namespace,
servicePort: *apiService.Spec.Service.Port,
serviceAvailable: apiregistrationv1apihelper.IsAPIServiceConditionTrue(apiService, apiregistrationv1api.Available),
}
if r.egressSelector != nil {
networkContext := egressselector.Cluster.AsNetworkContext()
var egressDialer utilnet.DialFunc
egressDialer, err := r.egressSelector.Lookup(networkContext)
if err != nil {
klog.Warning(err.Error())
} else {
newInfo.restConfig.Dial = egressDialer
}
} else if r.proxyTransport != nil && r.proxyTransport.DialContext != nil {
newInfo.restConfig.Dial = r.proxyTransport.DialContext
}
newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
newInfo.proxyRoundTripper, newInfo.transportBuildingError = transport.New(newInfo.transportConfig)
if newInfo.transportBuildingError != nil {
klog.Warning(newInfo.transportBuildingError.Error())
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/apiserver/handler_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/client-go/transport"

"golang.org/x/net/websocket"

Expand Down Expand Up @@ -325,7 +326,6 @@ func TestProxyHandler(t *testing.T) {
handler := &proxyHandler{
localDelegate: http.NewServeMux(),
serviceResolver: serviceResolver,
proxyTransport: &http.Transport{},
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
}
server := httptest.NewServer(contextHandler(handler, tc.user))
Expand Down Expand Up @@ -551,15 +551,21 @@ func TestProxyUpgrade(t *testing.T) {
serverURL, _ := url.Parse(backendServer.URL)
proxyHandler := &proxyHandler{
serviceResolver: &mockedRouter{destinationHost: serverURL.Host},
proxyTransport: &http.Transport{},
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
}

var dialer *mockEgressDialer
var selector *egressselector.EgressSelector
if tc.NewEgressSelector != nil {
dialer, selector = tc.NewEgressSelector()
proxyHandler.egressSelector = selector

egressDialer, err := selector.Lookup(egressselector.Cluster.AsNetworkContext())
if err != nil {
t.Fatal(err)
}
if egressDialer != nil {
proxyHandler.proxyTransportDial = &transport.DialHolder{Dial: egressDialer}
}
}

proxyHandler.updateAPIService(tc.APIService)
Expand Down
91 changes: 10 additions & 81 deletions pkg/controllers/status/available_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package apiserver
import (
"context"
"fmt"
"net"
"net/http"
"net/url"
"reflect"
Expand All @@ -33,13 +32,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/egressselector"
v1informers "k8s.io/client-go/informers/core/v1"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -77,8 +73,8 @@ type AvailableConditionController struct {
endpointsLister v1listers.EndpointsLister
endpointsSynced cache.InformerSynced

// dialContext specifies the dial function for creating unencrypted TCP connections.
dialContext func(ctx context.Context, network, address string) (net.Conn, error)
// proxyTransportDial specifies the dial function for creating unencrypted TCP connections.
proxyTransportDial *transport.DialHolder
proxyCurrentCertKeyContent certKeyFunc
serviceResolver ServiceResolver

Expand All @@ -91,67 +87,19 @@ type AvailableConditionController struct {
// this lock protects operations on the above cache
cacheLock sync.RWMutex

// TLS config with customized dialer cannot be cached by the client-go
// tlsTransportCache. Use a local cache here to reduce the chance of
// the controller spamming idle connections with short-lived transports.
// NOTE: the cache works because we assume that the transports constructed
// by the controller only vary on the dynamic cert/key.
tlsCache *tlsTransportCache

// metrics registered into legacy registry
metrics *availabilityMetrics
}

type tlsTransportCache struct {
mu sync.Mutex
transports map[tlsCacheKey]http.RoundTripper
}

func (c *tlsTransportCache) get(config *rest.Config) (http.RoundTripper, error) {
// If the available controller doesn't customzie the dialer (and we know from
// the code that the controller doesn't customzie other functions i.e. Proxy
// and GetCert (ExecProvider)), the config is cacheable by the client-go TLS
// transport cache. Let's skip the local cache and depend on the client-go cache.
if config.Dial == nil {
return rest.TransportFor(config)
}
c.mu.Lock()
defer c.mu.Unlock()
// See if we already have a custom transport for this config
key := tlsConfigKey(config)
if t, ok := c.transports[key]; ok {
return t, nil
}
restTransport, err := rest.TransportFor(config)
if err != nil {
return nil, err
}
c.transports[key] = restTransport
return restTransport, nil
}

type tlsCacheKey struct {
certData string
keyData string `datapolicy:"secret-key"`
}

func tlsConfigKey(c *rest.Config) tlsCacheKey {
return tlsCacheKey{
certData: string(c.TLSClientConfig.CertData),
keyData: string(c.TLSClientConfig.KeyData),
}
}

// NewAvailableConditionController returns a new AvailableConditionController.
func NewAvailableConditionController(
apiServiceInformer informers.APIServiceInformer,
serviceInformer v1informers.ServiceInformer,
endpointsInformer v1informers.EndpointsInformer,
apiServiceClient apiregistrationclient.APIServicesGetter,
proxyTransport *http.Transport,
proxyTransportDial *transport.DialHolder,
proxyCurrentCertKeyContent certKeyFunc,
serviceResolver ServiceResolver,
egressSelector *egressselector.EgressSelector,
) (*AvailableConditionController, error) {
c := &AvailableConditionController{
apiServiceClient: apiServiceClient,
Expand All @@ -165,23 +113,11 @@ func NewAvailableConditionController(
// the maximum disruption time to a minimum, but it does prevent hot loops.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
"AvailableConditionController"),
proxyTransportDial: proxyTransportDial,
proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
metrics: newAvailabilityMetrics(),
}

if egressSelector != nil {
networkContext := egressselector.Cluster.AsNetworkContext()
var egressDialer utilnet.DialFunc
egressDialer, err := egressSelector.Lookup(networkContext)
if err != nil {
return nil, err
}
c.dialContext = egressDialer
} else if proxyTransport != nil && proxyTransport.DialContext != nil {
c.dialContext = proxyTransport.DialContext
}

// resync on this one because it is low cardinality and rechecking the actual discovery
// allows us to detect health in a more timely fashion when network connectivity to
// nodes is snipped, but the network still attempts to route there. See
Expand Down Expand Up @@ -236,27 +172,20 @@ func (c *AvailableConditionController) sync(key string) error {
// if a particular transport was specified, use that otherwise build one
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
// that's not so bad) and sets a very short timeout. This is a best effort GET that provides no additional information
restConfig := &rest.Config{
TLSClientConfig: rest.TLSClientConfig{
transportConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: true,
},
DialHolder: c.proxyTransportDial,
}

if c.proxyCurrentCertKeyContent != nil {
proxyClientCert, proxyClientKey := c.proxyCurrentCertKeyContent()

restConfig.TLSClientConfig.CertData = proxyClientCert
restConfig.TLSClientConfig.KeyData = proxyClientKey
}
if c.dialContext != nil {
restConfig.Dial = c.dialContext
transportConfig.TLS.CertData = proxyClientCert
transportConfig.TLS.KeyData = proxyClientKey
}
// TLS config with customized dialer cannot be cached by the client-go
// tlsTransportCache. Use a local cache here to reduce the chance of
// the controller spamming idle connections with short-lived transports.
// NOTE: the cache works because we assume that the transports constructed
// by the controller only vary on the dynamic cert/key.
restTransport, err := c.tlsCache.get(restConfig)
restTransport, err := transport.New(transportConfig)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit f26cf84

Please sign in to comment.