Skip to content

Commit

Permalink
Add TCP probe to target port if no readiness probe is found in the AP…
Browse files Browse the repository at this point in the history
…I spec (#2379)
  • Loading branch information
RobertLucian authored Aug 3, 2021
1 parent 2203a92 commit 83eb9f6
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 15 deletions.
30 changes: 18 additions & 12 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"context"
"flag"
"fmt"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -48,6 +49,7 @@ func main() {
userContainerPort int
maxConcurrency int
maxQueueLength int
hasTCPProbe bool
clusterConfigPath string
)

Expand All @@ -56,6 +58,7 @@ func main() {
flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy will redirect to the traffic to")
flag.IntVar(&maxConcurrency, "max-concurrency", 0, "max concurrency allowed for user container")
flag.IntVar(&maxQueueLength, "max-queue-length", 0, "max request queue length for user container")
flag.BoolVar(&hasTCPProbe, "has-tcp-probe", false, "tcp probe to the user-provided container port")
flag.StringVar(&clusterConfigPath, "cluster-config", "", "cluster config path")
flag.Parse()

Expand Down Expand Up @@ -142,7 +145,7 @@ func main() {

adminHandler := http.NewServeMux()
adminHandler.Handle("/metrics", promStats)
adminHandler.Handle("/healthz", readinessTCPHandler(userContainerPort, log))
adminHandler.Handle("/healthz", readinessTCPHandler(userContainerPort, hasTCPProbe, log))

servers := map[string]*http.Server{
"proxy": {
Expand Down Expand Up @@ -201,19 +204,22 @@ func exit(log *zap.SugaredLogger, err error, wrapStrs ...string) {
os.Exit(1)
}

func readinessTCPHandler(port int, logger *zap.SugaredLogger) http.HandlerFunc {
func readinessTCPHandler(port int, enableTCPProbe bool, logger *zap.SugaredLogger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
timeout := time.Duration(1) * time.Second
address := net.JoinHostPort("localhost", strconv.FormatInt(int64(port), 10))

conn, err := net.DialTimeout("tcp", address, timeout)
if err != nil {
logger.Warn(errors.Wrap(err, "TCP probe to user-provided container port failed"))
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("unhealthy"))
return
if enableTCPProbe {
ctx := r.Context()
address := net.JoinHostPort("localhost", fmt.Sprintf("%d", port))

var d net.Dialer
conn, err := d.DialContext(ctx, "tcp", address)
if err != nil {
logger.Warn(errors.Wrap(err, "TCP probe to user-provided container port failed"))
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("unhealthy"))
return
}
_ = conn.Close()
}
_ = conn.Close()

w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("healthy"))
Expand Down
16 changes: 15 additions & 1 deletion pkg/workloads/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func GetReadinessProbesFromContainers(containers []*userconfig.Container) map[st
if container == nil {
continue
}

if container.ReadinessProbe != nil {
probes[container.Name] = *GetProbeSpec(container.ReadinessProbe)
}
Expand All @@ -92,6 +91,21 @@ func GetReadinessProbesFromContainers(containers []*userconfig.Container) map[st
return probes
}

func HasReadinessProbesTargetingPort(containers []*userconfig.Container, targetPort int32) bool {
for _, container := range containers {
if container == nil || container.ReadinessProbe == nil {
continue
}

probe := container.ReadinessProbe
if (probe.TCPSocket != nil && probe.TCPSocket.Port == targetPort) ||
probe.HTTPGet != nil && probe.HTTPGet.Port == targetPort {
return true
}
}
return false
}

func BaseClusterEnvVars() []kcore.EnvFromSource {
envVars := []kcore.EnvFromSource{
{
Expand Down
8 changes: 6 additions & 2 deletions pkg/workloads/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ func batchDequeuerProxyContainer(api spec.API, jobID, queueURL string) (kcore.Co
}

func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) {
proxyHasTCPProbe := !HasReadinessProbesTargetingPort(api.Pod.Containers, *api.Pod.Port)

return kcore.Container{
Name: ProxyContainerName,
Image: config.ClusterConfig.ImageProxy,
Expand All @@ -189,6 +191,8 @@ func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) {
s.Int32(int32(api.Pod.MaxConcurrency)),
"--max-queue-length",
s.Int32(int32(api.Pod.MaxQueueLength)),
"--has-tcp-probe",
s.Bool(proxyHasTCPProbe),
},
Ports: []kcore.ContainerPort{
{Name: consts.AdminPortName, ContainerPort: consts.AdminPortInt32},
Expand All @@ -213,10 +217,10 @@ func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) {
},
},
InitialDelaySeconds: 1,
TimeoutSeconds: 1,
TimeoutSeconds: 3,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 1,
FailureThreshold: 3,
},
}, ClusterConfigVolume()
}
Expand Down

0 comments on commit 83eb9f6

Please sign in to comment.