Skip to content

Commit 57729ea

Browse files
author
Kubernetes Submit Queue
authored
Merge pull request kubernetes#63832 from liggitt/automated-cherry-pick-of-#63492-upstream-release-1.9
Automatic merge from submit-queue. Automated cherry pick of kubernetes#63492: Always track kubelet -> API connections Cherry pick of kubernetes#63492 on release-1.9. kubernetes#63492: Always track kubelet -> API connections
2 parents f4cf484 + d73b198 commit 57729ea

File tree

6 files changed

+72
-46
lines changed

6 files changed

+72
-46
lines changed

cmd/kubelet/app/server.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
331331
var kubeClient clientset.Interface
332332
var eventClient v1core.EventsGetter
333333
var heartbeatClient v1core.CoreV1Interface
334+
var closeAllConns func()
334335
var externalKubeClient clientset.Interface
335336

336337
clientConfig, err := CreateAPIServerClientConfig(s)
@@ -342,11 +343,12 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
342343
if err != nil {
343344
return err
344345
}
345-
// we set exitIfExpired to true because we use this client configuration to request new certs - if we are unable
346-
// to request new certs, we will be unable to continue normal operation
347-
if err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, true); err != nil {
348-
return err
349-
}
346+
}
347+
// we set exitIfExpired to true because we use this client configuration to request new certs - if we are unable
348+
// to request new certs, we will be unable to continue normal operation
349+
closeAllConns, err = kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, true)
350+
if err != nil {
351+
return err
350352
}
351353

352354
kubeClient, err = clientset.NewForConfig(clientConfig)
@@ -392,6 +394,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
392394
kubeDeps.ExternalKubeClient = externalKubeClient
393395
if heartbeatClient != nil {
394396
kubeDeps.HeartbeatClient = heartbeatClient
397+
kubeDeps.OnHeartbeatFailure = closeAllConns
395398
}
396399
if eventClient != nil {
397400
kubeDeps.EventClient = eventClient

pkg/kubelet/certificate/transport.go

Lines changed: 47 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -38,37 +38,24 @@ import (
3838
//
3939
// The config must not already provide an explicit transport.
4040
//
41+
// The returned function allows forcefully closing all active connections.
42+
//
4143
// The returned transport periodically checks the manager to determine if the
4244
// certificate has changed. If it has, the transport shuts down all existing client
4345
// connections, forcing the client to re-handshake with the server and use the
4446
// new certificate.
4547
//
4648
// stopCh should be used to indicate when the transport is unused and doesn't need
4749
// to continue checking the manager.
48-
func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitIfExpired bool) error {
50+
func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitIfExpired bool) (func(), error) {
4951
return updateTransport(stopCh, 10*time.Second, clientConfig, clientCertificateManager, exitIfExpired)
5052
}
5153

5254
// updateTransport is an internal method that exposes how often this method checks that the
5355
// client cert has changed. Intended for testing.
54-
func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitIfExpired bool) error {
55-
if clientConfig.Transport != nil {
56-
return fmt.Errorf("there is already a transport configured")
57-
}
58-
tlsConfig, err := restclient.TLSConfigFor(clientConfig)
59-
if err != nil {
60-
return fmt.Errorf("unable to configure TLS for the rest client: %v", err)
61-
}
62-
if tlsConfig == nil {
63-
tlsConfig = &tls.Config{}
64-
}
65-
tlsConfig.Certificates = nil
66-
tlsConfig.GetClientCertificate = func(requestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error) {
67-
cert := clientCertificateManager.Current()
68-
if cert == nil {
69-
return &tls.Certificate{Certificate: nil}, nil
70-
}
71-
return cert, nil
56+
func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitIfExpired bool) (func(), error) {
57+
if clientConfig.Transport != nil || clientConfig.Dial != nil {
58+
return nil, fmt.Errorf("there is already a transport or dialer configured")
7259
}
7360

7461
// Custom dialer that will track all connections it creates.
@@ -77,29 +64,48 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig
7764
conns: make(map[*closableConn]struct{}),
7865
}
7966

80-
lastCert := clientCertificateManager.Current()
81-
go wait.Until(func() {
82-
curr := clientCertificateManager.Current()
83-
if exitIfExpired && curr != nil && time.Now().After(curr.Leaf.NotAfter) {
84-
if clientCertificateManager.ServerHealthy() {
85-
glog.Fatalf("The currently active client certificate has expired and the server is responsive, exiting.")
86-
} else {
87-
glog.Errorf("The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.")
67+
tlsConfig, err := restclient.TLSConfigFor(clientConfig)
68+
if err != nil {
69+
return nil, fmt.Errorf("unable to configure TLS for the rest client: %v", err)
70+
}
71+
if tlsConfig == nil {
72+
tlsConfig = &tls.Config{}
73+
}
74+
75+
if clientCertificateManager != nil {
76+
tlsConfig.Certificates = nil
77+
tlsConfig.GetClientCertificate = func(requestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error) {
78+
cert := clientCertificateManager.Current()
79+
if cert == nil {
80+
return &tls.Certificate{Certificate: nil}, nil
8881
}
82+
return cert, nil
8983
}
90-
if curr == nil || lastCert == curr {
91-
// Cert hasn't been rotated.
92-
return
93-
}
94-
lastCert = curr
9584

96-
glog.Infof("certificate rotation detected, shutting down client connections to start using new credentials")
97-
// The cert has been rotated. Close all existing connections to force the client
98-
// to reperform its TLS handshake with new cert.
99-
//
100-
// See: https://github.com/kubernetes-incubator/bootkube/pull/663#issuecomment-318506493
101-
t.closeAllConns()
102-
}, period, stopCh)
85+
lastCert := clientCertificateManager.Current()
86+
go wait.Until(func() {
87+
curr := clientCertificateManager.Current()
88+
if exitIfExpired && curr != nil && time.Now().After(curr.Leaf.NotAfter) {
89+
if clientCertificateManager.ServerHealthy() {
90+
glog.Fatalf("The currently active client certificate has expired and the server is responsive, exiting.")
91+
} else {
92+
glog.Errorf("The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.")
93+
}
94+
}
95+
if curr == nil || lastCert == curr {
96+
// Cert hasn't been rotated.
97+
return
98+
}
99+
lastCert = curr
100+
101+
glog.Infof("certificate rotation detected, shutting down client connections to start using new credentials")
102+
// The cert has been rotated. Close all existing connections to force the client
103+
// to reperform its TLS handshake with new cert.
104+
//
105+
// See: https://github.com/kubernetes-incubator/bootkube/pull/663#issuecomment-318506493
106+
t.closeAllConns()
107+
}, period, stopCh)
108+
}
103109

104110
clientConfig.Transport = utilnet.SetTransportDefaults(&http.Transport{
105111
Proxy: http.ProxyFromEnvironment,
@@ -117,7 +123,8 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig
117123
clientConfig.CAData = nil
118124
clientConfig.CAFile = ""
119125
clientConfig.Insecure = false
120-
return nil
126+
127+
return t.closeAllConns, nil
121128
}
122129

123130
// connTracker is a dialer that tracks all open connections it creates.

pkg/kubelet/certificate/transport_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func TestRotateShutsDownConnections(t *testing.T) {
187187
}
188188

189189
// Check for a new cert every 10 milliseconds
190-
if err := updateTransport(stop, 10*time.Millisecond, c, m, false); err != nil {
190+
if _, err := updateTransport(stop, 10*time.Millisecond, c, m, false); err != nil {
191191
t.Fatal(err)
192192
}
193193

pkg/kubelet/kubelet.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ type Dependencies struct {
257257
DockerClientConfig *dockershim.ClientConfig
258258
EventClient v1core.EventsGetter
259259
HeartbeatClient v1core.CoreV1Interface
260+
OnHeartbeatFailure func()
260261
KubeClient clientset.Interface
261262
ExternalKubeClient clientset.Interface
262263
Mounter mount.Interface
@@ -514,6 +515,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
514515
nodeName: nodeName,
515516
kubeClient: kubeDeps.KubeClient,
516517
heartbeatClient: kubeDeps.HeartbeatClient,
518+
onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
517519
rootDirectory: rootDirectory,
518520
resyncInterval: kubeCfg.SyncFrequency.Duration,
519521
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
@@ -939,6 +941,9 @@ type Kubelet struct {
939941
iptClient utilipt.Interface
940942
rootDirectory string
941943

944+
// onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional.
945+
onRepeatedHeartbeatFailure func()
946+
942947
// podWorkers handle syncing Pods in response to events.
943948
podWorkers PodWorkers
944949

pkg/kubelet/kubelet_node_status.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,9 @@ func (kl *Kubelet) syncNodeStatus() {
380380
func (kl *Kubelet) updateNodeStatus() error {
381381
for i := 0; i < nodeStatusUpdateRetry; i++ {
382382
if err := kl.tryUpdateNodeStatus(i); err != nil {
383+
if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
384+
kl.onRepeatedHeartbeatFailure()
385+
}
383386
glog.Errorf("Error updating node status, will retry: %v", err)
384387
} else {
385388
return nil

pkg/kubelet/kubelet_node_status_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
533533

534534
func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
535535
attempts := int64(0)
536+
failureCallbacks := int64(0)
536537

537538
// set up a listener that hangs connections
538539
ln, err := net.Listen("tcp", "127.0.0.1:0")
@@ -563,6 +564,9 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
563564
kubelet := testKubelet.kubelet
564565
kubelet.kubeClient = nil // ensure only the heartbeat client is used
565566
kubelet.heartbeatClient, err = v1core.NewForConfig(config)
567+
kubelet.onRepeatedHeartbeatFailure = func() {
568+
atomic.AddInt64(&failureCallbacks, 1)
569+
}
566570
kubelet.containerManager = &localCM{
567571
ContainerManager: cm.NewStubContainerManager(),
568572
allocatable: v1.ResourceList{
@@ -582,6 +586,10 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
582586
if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts != nodeStatusUpdateRetry {
583587
t.Errorf("Expected %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts)
584588
}
589+
// should have gotten multiple failure callbacks
590+
if actualFailureCallbacks := atomic.LoadInt64(&failureCallbacks); actualFailureCallbacks < (nodeStatusUpdateRetry - 1) {
591+
t.Errorf("Expected %d failure callbacks, got %d", (nodeStatusUpdateRetry - 1), actualFailureCallbacks)
592+
}
585593
}
586594

587595
func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {

0 commit comments

Comments
 (0)