Skip to content

Add readiness probe to realtime cortex proxy #2176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 59 additions & 40 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"context"
"flag"
"io/ioutil"
"net/http"
"os"
"os/signal"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/cortexlabs/cortex/pkg/lib/logging"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
"github.com/cortexlabs/cortex/pkg/proxy"
"github.com/cortexlabs/cortex/pkg/proxy/probe"
"github.com/cortexlabs/cortex/pkg/types/clusterconfig"
"github.com/cortexlabs/cortex/pkg/types/userconfig"
"go.uber.org/zap"
Expand All @@ -40,45 +42,24 @@ const (
_requestSampleInterval = 1 * time.Second
)

var (
proxyLogger = logging.GetLogger()
)

func Exit(err error, wrapStrs ...string) {
for _, str := range wrapStrs {
err = errors.Wrap(err, str)
}

if err != nil && !errors.IsNoTelemetry(err) {
telemetry.Error(err)
}

if err != nil && !errors.IsNoPrint(err) {
proxyLogger.Error(err)
}

telemetry.Close()

os.Exit(1)
}

func main() {
var (
port int
metricsPort int
adminPort int
userContainerPort int
maxConcurrency int
maxQueueLength int
probeDefPath string
clusterConfigPath string
)

flag.IntVar(&port, "port", 8888, "port where the proxy is served")
flag.IntVar(&metricsPort, "metrics-port", 15000, "metrics port for prometheus")
flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy redirects to the traffic to")
flag.IntVar(&port, "port", 8000, "port where the proxy server will be exposed")
flag.IntVar(&adminPort, "admin-port", 15000, "port where the admin server (for metrics and probes) will be exposed")
flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy will redirect to the traffic to")
flag.IntVar(&maxConcurrency, "max-concurrency", 0, "max concurrency allowed for user container")
flag.IntVar(&maxQueueLength, "max-queue-length", 0, "max request queue length for user container")
flag.StringVar(&clusterConfigPath, "cluster-config", "", "cluster config path")

flag.StringVar(&probeDefPath, "probe", "", "path to the desired probe json definition")
flag.Parse()

log := logging.GetLogger()
Expand All @@ -88,26 +69,26 @@ func main() {

switch {
case maxConcurrency == 0:
log.Fatal("-max-concurrency flag is required")
log.Fatal("--max-concurrency flag is required")
case maxQueueLength == 0:
log.Fatal("-max-queue-length flag is required")
log.Fatal("--max-queue-length flag is required")
case clusterConfigPath == "":
log.Fatal("-cluster-config flag is required")
log.Fatal("--cluster-config flag is required")
}

clusterConfig, err := clusterconfig.NewForFile(clusterConfigPath)
if err != nil {
Exit(err)
exit(log, err)
}

awsClient, err := aws.NewForRegion(clusterConfig.Region)
if err != nil {
Exit(err)
exit(log, err)
}

_, userID, err := awsClient.CheckCredentials()
if err != nil {
Exit(err)
exit(log, err)
}

err = telemetry.Init(telemetry.Config{
Expand All @@ -122,7 +103,7 @@ func main() {
BackoffMode: telemetry.BackoffDuplicateMessages,
})
if err != nil {
Exit(err)
exit(log, err)
}

target := "http://127.0.0.1:" + strconv.Itoa(userContainerPort)
Expand All @@ -139,6 +120,23 @@ func main() {

promStats := proxy.NewPrometheusStatsReporter()

var readinessProbe *probe.Probe
if probeDefPath != "" {
jsonProbe, err := ioutil.ReadFile(probeDefPath)
if err != nil {
log.Fatal(err)
}

probeDef, err := probe.DecodeJSON(string(jsonProbe))
if err != nil {
log.Fatal(err)
}

readinessProbe = probe.NewProbe(probeDef, log)
} else {
readinessProbe = probe.NewDefaultProbe(target, log)
}

go func() {
reportTicker := time.NewTicker(_reportInterval)
defer reportTicker.Stop()
Expand All @@ -161,14 +159,18 @@ func main() {
}
}()

adminHandler := http.NewServeMux()
adminHandler.Handle("/metrics", promStats)
adminHandler.Handle("/healthz", probe.Handler(readinessProbe))

servers := map[string]*http.Server{
"proxy": {
Addr: ":" + strconv.Itoa(port),
Addr: ":" + strconv.Itoa(userContainerPort),
Handler: proxy.Handler(breaker, httpProxy),
},
"metrics": {
Addr: ":" + strconv.Itoa(metricsPort),
Handler: promStats,
"admin": {
Addr: ":" + strconv.Itoa(adminPort),
Handler: adminHandler,
},
}

Expand All @@ -184,8 +186,8 @@ func main() {
signal.Notify(sigint, os.Interrupt)

select {
case err := <-errCh:
Exit(errors.Wrap(err, "failed to start proxy server"))
case err = <-errCh:
exit(log, errors.Wrap(err, "failed to start proxy server"))
case <-sigint:
// We received an interrupt signal, shut down.
log.Info("Received TERM signal, handling a graceful shutdown...")
Expand All @@ -202,3 +204,20 @@ func main() {
telemetry.Close()
}
}

func exit(log *zap.SugaredLogger, err error, wrapStrs ...string) {
for _, str := range wrapStrs {
err = errors.Wrap(err, str)
}

if err != nil && !errors.IsNoTelemetry(err) {
telemetry.Error(err)
}

if err != nil && !errors.IsNoPrint(err) {
log.Error(err)
}

telemetry.Close()
os.Exit(1)
}
11 changes: 4 additions & 7 deletions pkg/proxy/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@ limitations under the License.
package proxy

const (
_userAgentKey = "User-Agent"
// UserAgentKey is the user agent header key
UserAgentKey = "User-Agent"

// KubeProbeUserAgentPrefix is the user agent header prefix used in k8s probes
// Since K8s 1.8, prober requests have
// User-Agent = "kube-probe/{major-version}.{minor-version}".
_kubeProbeUserAgentPrefix = "kube-probe/"

// KubeletProbeHeaderName is the header name to augment the probes, because
// Istio with mTLS rewrites probes, but their probes pass a different
// user-agent.
_kubeletProbeHeaderName = "K-Kubelet-Probe"
KubeProbeUserAgentPrefix = "kube-probe/"
)
3 changes: 1 addition & 2 deletions pkg/proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,5 @@ func Handler(breaker *Breaker, next http.Handler) http.HandlerFunc {
}

func isKubeletProbe(r *http.Request) bool {
return strings.HasPrefix(r.Header.Get(_userAgentKey), _kubeProbeUserAgentPrefix) ||
r.Header.Get(_kubeletProbeHeaderName) != ""
return strings.HasPrefix(r.Header.Get(UserAgentKey), KubeProbeUserAgentPrefix)
}
46 changes: 46 additions & 0 deletions pkg/proxy/probe/encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2021 Cortex Labs, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package probe

import (
"encoding/json"
"errors"

kcore "k8s.io/api/core/v1"
)

// DecodeJSON takes a json serialised *kcore.Probe and returns a Probe or an error.
func DecodeJSON(jsonProbe string) (*kcore.Probe, error) {
pb := &kcore.Probe{}
if err := json.Unmarshal([]byte(jsonProbe), pb); err != nil {
return nil, err
}
return pb, nil
}

// EncodeJSON takes *kcore.Probe object and returns marshalled Probe JSON string and an error.
func EncodeJSON(pb *kcore.Probe) (string, error) {
if pb == nil {
return "", errors.New("cannot encode nil probe")
}

probeJSON, err := json.Marshal(pb)
if err != nil {
return "", err
}
return string(probeJSON), nil
}
90 changes: 90 additions & 0 deletions pkg/proxy/probe/encoding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2021 Cortex Labs, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package probe_test

import (
"encoding/json"
"testing"

"github.com/cortexlabs/cortex/pkg/proxy/probe"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
kcore "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

func TestDecodeProbeSuccess(t *testing.T) {
t.Parallel()

expectedProbe := &kcore.Probe{
PeriodSeconds: 1,
TimeoutSeconds: 2,
SuccessThreshold: 1,
FailureThreshold: 1,
Handler: kcore.Handler{
TCPSocket: &kcore.TCPSocketAction{
Host: "127.0.0.1",
Port: intstr.FromString("8080"),
},
},
}
probeBytes, err := json.Marshal(expectedProbe)
require.NoError(t, err)

gotProbe, err := probe.DecodeJSON(string(probeBytes))
require.NoError(t, err)

require.Equal(t, expectedProbe, gotProbe)
}

func TestDecodeProbeFailure(t *testing.T) {
t.Parallel()

probeBytes, err := json.Marshal("blah")
require.NoError(t, err)

_, err = probe.DecodeJSON(string(probeBytes))
require.Error(t, err)
}

func TestEncodeProbe(t *testing.T) {
t.Parallel()

pb := &kcore.Probe{
SuccessThreshold: 1,
Handler: kcore.Handler{
TCPSocket: &kcore.TCPSocketAction{
Host: "127.0.0.1",
Port: intstr.FromString("8080"),
},
},
}

jsonProbe, err := probe.EncodeJSON(pb)
require.NoError(t, err)

wantProbe := `{"tcpSocket":{"port":"8080","host":"127.0.0.1"},"successThreshold":1}`
require.Equal(t, wantProbe, jsonProbe)
}

func TestEncodeNilProbe(t *testing.T) {
t.Parallel()

jsonProbe, err := probe.EncodeJSON(nil)
assert.Error(t, err)
assert.Empty(t, jsonProbe)
}
33 changes: 33 additions & 0 deletions pkg/proxy/probe/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright 2021 Cortex Labs, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package probe

import "net/http"

func Handler(pb *Probe) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
healthy := pb.ProbeContainer()
if !healthy {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("unhealthy"))
return
}

w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("healthy"))
}
}
Loading