Skip to content

Commit

Permalink
Merge pull request kubernetes#115278 from SergeyKanzhelev/automated-c…
Browse files Browse the repository at this point in the history
…herry-pick-of-#115143-upstream-release-1.26

Automated cherry pick of kubernetes#115143: add custom dialer optimized for probes
  • Loading branch information
k8s-ci-robot authored Jan 30, 2023
2 parents c688261 + d3b0f3f commit dfda43d
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 2 deletions.
280 changes: 280 additions & 0 deletions pkg/kubelet/prober/scale_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package prober

import (
"context"
"fmt"
"net"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/kubelet/configmap"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/secret"
"k8s.io/kubernetes/pkg/kubelet/status"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
utilpointer "k8s.io/utils/pointer"
)

// TCP sockets goes through a TIME-WAIT state (default 60 sec) before being freed,
// causing conntrack entries and ephemeral ports to be hold for 60 seconds
// despite the probe may have finished in less than 1 second.
// If the rate of probes is higher than the rate the OS recycles the ports used,
// it can consume a considerable number of ephemeral ports or conntrack entries.
// These tests verify that after certain period the probes keep working, if the probes
// don't close the sockets faster, they will start to fail.
// The test creates a TCP or HTTP server to fake a pod. It creates 1 pod with 600 fake
// containers each and runs one probe for each of these containers (all the probes comes
// from the same process, same as in the Kubelet, and targets the same IP:port to verify
// that the ephemeral port is not exhausted.

// The default port range on a normal Linux system has 28321 free ephemeral ports per
// tuple srcIP,srcPort:dstIP:dstPort:Proto: /proc/sys/net/ipv4/ip_local_port_range 32768 60999
// 1 pods x 600 containers/pod x 1 probes/container x 1 req/sec = 600 req/sec
// 600 req/sec x 59 sec = 35400
// The test should run out of ephemeral ports in less than one minute and start failing connections
// Ref: https://github.com/kubernetes/kubernetes/issues/89898#issuecomment-1383207322

func TestTCPPortExhaustion(t *testing.T) {
// This test creates a considereable number of connections in a short time
// and flakes on constrained environments, thus it is skipped by default.
// The test is left for manual verification or experimentation with new
// changes on the probes.
t.Skip("skipping TCP port exhaustion tests")

const (
numTestPods = 1
numContainers = 600
)

if testing.Short() {
t.Skip("skipping TCP port exhaustion in short mode")
}

tests := []struct {
name string
http bool // it can be tcp or http
}{
{"TCP", false},
{"HTTP", true},
}
for _, tt := range tests {
t.Run(fmt.Sprintf(tt.name), func(t *testing.T) {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
m := NewManager(
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker),
results.NewManager(),
results.NewManager(),
results.NewManager(),
nil, // runner
&record.FakeRecorder{},
).(*manager)
defer cleanup(t, m)

now := time.Now()
fakePods := make([]*fakePod, numTestPods)
for i := 0; i < numTestPods; i++ {
fake, err := newFakePod(tt.http)
if err != nil {
t.Fatalf("unexpected error creating fake pod: %v", err)
}
defer fake.stop()
handler := fake.probeHandler()
fakePods[i] = fake

pod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(fmt.Sprintf("pod%d", i)),
Name: fmt.Sprintf("pod%d", i),
Namespace: "test",
},
Spec: v1.PodSpec{},
Status: v1.PodStatus{
Phase: v1.PodPhase(v1.PodReady),
PodIPs: []v1.PodIP{{IP: "127.0.0.1"}},
},
}
for j := 0; j < numContainers; j++ {
// use only liveness probes for simplicity, initial state is success for them
pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{
Name: fmt.Sprintf("container%d", j),
LivenessProbe: newProbe(handler),
})
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
Name: fmt.Sprintf("container%d", j),
ContainerID: fmt.Sprintf("pod%d://container%d", i, j),
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{
StartedAt: metav1.Now(),
},
},
Started: utilpointer.Bool(true),
})
}
podManager.AddPod(&pod)
m.statusManager.SetPodStatus(&pod, pod.Status)
m.AddPod(&pod)
}
t.Logf("Adding %d pods with %d containers each in %v", numTestPods, numContainers, time.Since(now))

ctx, cancel := context.WithTimeout(context.Background(), 59*time.Second)
defer cancel()
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
for {
var result results.Update
var probeType string
select {
case result = <-m.startupManager.Updates():
probeType = "startup"
case result = <-m.livenessManager.Updates():
probeType = "liveness"
case result = <-m.readinessManager.Updates():
probeType = "readiness"
case <-ctx.Done():
return
}
switch result.Result.String() {
// The test will fail if any of the probes fails
case "Failure":
t.Errorf("Failure %s on contantinerID: %v Pod %v", probeType, result.ContainerID, result.PodUID)
case "UNKNOWN": // startup probes
t.Logf("UNKNOWN state for %v", result)
default:
}
}
}()
wg.Wait()

// log the number of connections received in each pod for debugging test failures.
for _, pod := range fakePods {
n := pod.connections()
t.Logf("Number of connections %d", n)
}

})
}

}

func newProbe(handler v1.ProbeHandler) *v1.Probe {
return &v1.Probe{
ProbeHandler: handler,
TimeoutSeconds: 1,
PeriodSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 3,
}
}

// newFakePod runs a server (TCP or HTTP) in a random port
func newFakePod(httpServer bool) (*fakePod, error) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, fmt.Errorf("failed to bind: %v", err)
}
f := &fakePod{ln: ln, http: httpServer}

// spawn an http server or a TCP server that counts the number of connections received
if httpServer {
var mu sync.Mutex
visitors := map[string]struct{}{}
go http.Serve(ln, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
if _, ok := visitors[r.RemoteAddr]; !ok {
atomic.AddInt64(&f.numConnection, 1)
visitors[r.RemoteAddr] = struct{}{}
}
}))
} else {
go func() {
for {
conn, err := ln.Accept()
if err != nil {
// exit when the listener is closed
return
}
atomic.AddInt64(&f.numConnection, 1)
// handle request but not block
go func(c net.Conn) {
defer c.Close()
// read but swallow the errors since the probe doesn't send data
buffer := make([]byte, 1024)
c.Read(buffer)
// respond
conn.Write([]byte("Hi back!\n"))
}(conn)

}
}()
}
return f, nil

}

type fakePod struct {
ln net.Listener
numConnection int64
http bool
}

func (f *fakePod) probeHandler() v1.ProbeHandler {
port := f.ln.Addr().(*net.TCPAddr).Port
var handler v1.ProbeHandler
if f.http {
handler = v1.ProbeHandler{
HTTPGet: &v1.HTTPGetAction{
Host: "127.0.0.1",
Port: intstr.FromInt(port),
},
}
} else {
handler = v1.ProbeHandler{
TCPSocket: &v1.TCPSocketAction{
Host: "127.0.0.1",
Port: intstr.FromInt(port),
},
}
}
return handler
}

func (f *fakePod) stop() {
f.ln.Close()
}

func (f *fakePod) connections() int {
return int(atomic.LoadInt64(&f.numConnection))
}
42 changes: 42 additions & 0 deletions pkg/probe/dialer_others.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//go:build !windows
// +build !windows

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package probe

import (
"net"
"syscall"
)

// ProbeDialer returns a dialer optimized for probes to avoid lingering sockets on TIME-WAIT state.
// The dialer reduces the TIME-WAIT period to 1 seconds instead of the OS default of 60 seconds.
// Using 1 second instead of 0 because SO_LINGER socket option to 0 causes pending data to be
// discarded and the connection to be aborted with an RST rather than for the pending data to be
// transmitted and the connection closed cleanly with a FIN.
// Ref: https://issues.k8s.io/89898
func ProbeDialer() *net.Dialer {
dialer := &net.Dialer{
Control: func(network, address string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) {
syscall.SetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &syscall.Linger{Onoff: 1, Linger: 1})
})
},
}
return dialer
}
42 changes: 42 additions & 0 deletions pkg/probe/dialer_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//go:build windows
// +build windows

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package probe

import (
"net"
"syscall"
)

// ProbeDialer returns a dialer optimized for probes to avoid lingering sockets on TIME-WAIT state.
// The dialer reduces the TIME-WAIT period to 1 seconds instead of the OS default of 60 seconds.
// Using 1 second instead of 0 because SO_LINGER socket option to 0 causes pending data to be
// discarded and the connection to be aborted with an RST rather than for the pending data to be
// transmitted and the connection closed cleanly with a FIN.
// Ref: https://issues.k8s.io/89898
func ProbeDialer() *net.Dialer {
dialer := &net.Dialer{
Control: func(network, address string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) {
syscall.SetsockoptLinger(syscall.Handle(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &syscall.Linger{Onoff: 1, Linger: 1})
})
},
}
return dialer
}
4 changes: 4 additions & 0 deletions pkg/probe/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ func NewWithTLSConfig(config *tls.Config, followNonLocalRedirects bool) Prober {
DisableKeepAlives: true,
Proxy: http.ProxyURL(nil),
DisableCompression: true, // removes Accept-Encoding header
// DialContext creates unencrypted TCP connections
// and is also used by the transport for HTTPS connection
DialContext: probe.ProbeDialer().DialContext,
})

return httpProber{transport, followNonLocalRedirects}
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/probe/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Prober interface {

type tcpProber struct{}

// Probe returns a ProbeRunner capable of running an TCP check.
// Probe checks that a TCP connection to the address can be opened.
func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) {
return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout)
}
Expand All @@ -48,7 +48,9 @@ func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.R
// If the socket fails to open, it returns Failure.
// This is exported because some other packages may want to do direct TCP probes.
func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
conn, err := net.DialTimeout("tcp", addr, timeout)
d := probe.ProbeDialer()
d.Timeout = timeout
conn, err := d.Dial("tcp", addr)
if err != nil {
// Convert errors to failures to handle timeouts.
return probe.Failure, err.Error(), nil
Expand Down

0 comments on commit dfda43d

Please sign in to comment.