Skip to content

Commit a123f54

Browse files
author
Miguel Varela Ramos
authored
Add readiness probe to realtime cortex proxy (#2176)
1 parent 4976481 commit a123f54

File tree

9 files changed

+605
-49
lines changed

9 files changed

+605
-49
lines changed

cmd/proxy/main.go

Lines changed: 59 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package main
1919
import (
2020
"context"
2121
"flag"
22+
"io/ioutil"
2223
"net/http"
2324
"os"
2425
"os/signal"
@@ -30,6 +31,7 @@ import (
3031
"github.com/cortexlabs/cortex/pkg/lib/logging"
3132
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
3233
"github.com/cortexlabs/cortex/pkg/proxy"
34+
"github.com/cortexlabs/cortex/pkg/proxy/probe"
3335
"github.com/cortexlabs/cortex/pkg/types/clusterconfig"
3436
"github.com/cortexlabs/cortex/pkg/types/userconfig"
3537
"go.uber.org/zap"
@@ -40,45 +42,24 @@ const (
4042
_requestSampleInterval = 1 * time.Second
4143
)
4244

43-
var (
44-
proxyLogger = logging.GetLogger()
45-
)
46-
47-
func Exit(err error, wrapStrs ...string) {
48-
for _, str := range wrapStrs {
49-
err = errors.Wrap(err, str)
50-
}
51-
52-
if err != nil && !errors.IsNoTelemetry(err) {
53-
telemetry.Error(err)
54-
}
55-
56-
if err != nil && !errors.IsNoPrint(err) {
57-
proxyLogger.Error(err)
58-
}
59-
60-
telemetry.Close()
61-
62-
os.Exit(1)
63-
}
64-
6545
func main() {
6646
var (
6747
port int
68-
metricsPort int
48+
adminPort int
6949
userContainerPort int
7050
maxConcurrency int
7151
maxQueueLength int
52+
probeDefPath string
7253
clusterConfigPath string
7354
)
7455

75-
flag.IntVar(&port, "port", 8888, "port where the proxy is served")
76-
flag.IntVar(&metricsPort, "metrics-port", 15000, "metrics port for prometheus")
77-
flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy redirects to the traffic to")
56+
flag.IntVar(&port, "port", 8000, "port where the proxy server will be exposed")
57+
flag.IntVar(&adminPort, "admin-port", 15000, "port where the admin server (for metrics and probes) will be exposed")
58+
flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy will redirect to the traffic to")
7859
flag.IntVar(&maxConcurrency, "max-concurrency", 0, "max concurrency allowed for user container")
7960
flag.IntVar(&maxQueueLength, "max-queue-length", 0, "max request queue length for user container")
8061
flag.StringVar(&clusterConfigPath, "cluster-config", "", "cluster config path")
81-
62+
flag.StringVar(&probeDefPath, "probe", "", "path to the desired probe json definition")
8263
flag.Parse()
8364

8465
log := logging.GetLogger()
@@ -88,26 +69,26 @@ func main() {
8869

8970
switch {
9071
case maxConcurrency == 0:
91-
log.Fatal("-max-concurrency flag is required")
72+
log.Fatal("--max-concurrency flag is required")
9273
case maxQueueLength == 0:
93-
log.Fatal("-max-queue-length flag is required")
74+
log.Fatal("--max-queue-length flag is required")
9475
case clusterConfigPath == "":
95-
log.Fatal("-cluster-config flag is required")
76+
log.Fatal("--cluster-config flag is required")
9677
}
9778

9879
clusterConfig, err := clusterconfig.NewForFile(clusterConfigPath)
9980
if err != nil {
100-
Exit(err)
81+
exit(log, err)
10182
}
10283

10384
awsClient, err := aws.NewForRegion(clusterConfig.Region)
10485
if err != nil {
105-
Exit(err)
86+
exit(log, err)
10687
}
10788

10889
_, userID, err := awsClient.CheckCredentials()
10990
if err != nil {
110-
Exit(err)
91+
exit(log, err)
11192
}
11293

11394
err = telemetry.Init(telemetry.Config{
@@ -122,7 +103,7 @@ func main() {
122103
BackoffMode: telemetry.BackoffDuplicateMessages,
123104
})
124105
if err != nil {
125-
Exit(err)
106+
exit(log, err)
126107
}
127108

128109
target := "http://127.0.0.1:" + strconv.Itoa(userContainerPort)
@@ -139,6 +120,23 @@ func main() {
139120

140121
promStats := proxy.NewPrometheusStatsReporter()
141122

123+
var readinessProbe *probe.Probe
124+
if probeDefPath != "" {
125+
jsonProbe, err := ioutil.ReadFile(probeDefPath)
126+
if err != nil {
127+
log.Fatal(err)
128+
}
129+
130+
probeDef, err := probe.DecodeJSON(string(jsonProbe))
131+
if err != nil {
132+
log.Fatal(err)
133+
}
134+
135+
readinessProbe = probe.NewProbe(probeDef, log)
136+
} else {
137+
readinessProbe = probe.NewDefaultProbe(target, log)
138+
}
139+
142140
go func() {
143141
reportTicker := time.NewTicker(_reportInterval)
144142
defer reportTicker.Stop()
@@ -161,14 +159,18 @@ func main() {
161159
}
162160
}()
163161

162+
adminHandler := http.NewServeMux()
163+
adminHandler.Handle("/metrics", promStats)
164+
adminHandler.Handle("/healthz", probe.Handler(readinessProbe))
165+
164166
servers := map[string]*http.Server{
165167
"proxy": {
166-
Addr: ":" + strconv.Itoa(port),
168+
Addr: ":" + strconv.Itoa(userContainerPort),
167169
Handler: proxy.Handler(breaker, httpProxy),
168170
},
169-
"metrics": {
170-
Addr: ":" + strconv.Itoa(metricsPort),
171-
Handler: promStats,
171+
"admin": {
172+
Addr: ":" + strconv.Itoa(adminPort),
173+
Handler: adminHandler,
172174
},
173175
}
174176

@@ -184,8 +186,8 @@ func main() {
184186
signal.Notify(sigint, os.Interrupt)
185187

186188
select {
187-
case err := <-errCh:
188-
Exit(errors.Wrap(err, "failed to start proxy server"))
189+
case err = <-errCh:
190+
exit(log, errors.Wrap(err, "failed to start proxy server"))
189191
case <-sigint:
190192
// We received an interrupt signal, shut down.
191193
log.Info("Received TERM signal, handling a graceful shutdown...")
@@ -202,3 +204,20 @@ func main() {
202204
telemetry.Close()
203205
}
204206
}
207+
208+
func exit(log *zap.SugaredLogger, err error, wrapStrs ...string) {
209+
for _, str := range wrapStrs {
210+
err = errors.Wrap(err, str)
211+
}
212+
213+
if err != nil && !errors.IsNoTelemetry(err) {
214+
telemetry.Error(err)
215+
}
216+
217+
if err != nil && !errors.IsNoPrint(err) {
218+
log.Error(err)
219+
}
220+
221+
telemetry.Close()
222+
os.Exit(1)
223+
}

pkg/proxy/consts.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@ limitations under the License.
1717
package proxy
1818

1919
const (
20-
_userAgentKey = "User-Agent"
20+
// UserAgentKey is the user agent header key
21+
UserAgentKey = "User-Agent"
2122

23+
// KubeProbeUserAgentPrefix is the user agent header prefix used in k8s probes
2224
// Since K8s 1.8, prober requests have
2325
// User-Agent = "kube-probe/{major-version}.{minor-version}".
24-
_kubeProbeUserAgentPrefix = "kube-probe/"
25-
26-
// KubeletProbeHeaderName is the header name to augment the probes, because
27-
// Istio with mTLS rewrites probes, but their probes pass a different
28-
// user-agent.
29-
_kubeletProbeHeaderName = "K-Kubelet-Probe"
26+
KubeProbeUserAgentPrefix = "kube-probe/"
3027
)

pkg/proxy/handler.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,5 @@ func Handler(breaker *Breaker, next http.Handler) http.HandlerFunc {
4343
}
4444

4545
func isKubeletProbe(r *http.Request) bool {
46-
return strings.HasPrefix(r.Header.Get(_userAgentKey), _kubeProbeUserAgentPrefix) ||
47-
r.Header.Get(_kubeletProbeHeaderName) != ""
46+
return strings.HasPrefix(r.Header.Get(UserAgentKey), KubeProbeUserAgentPrefix)
4847
}

pkg/proxy/probe/encoding.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
Copyright 2021 Cortex Labs, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package probe
18+
19+
import (
20+
"encoding/json"
21+
"errors"
22+
23+
kcore "k8s.io/api/core/v1"
24+
)
25+
26+
// DecodeJSON takes a json serialised *kcore.Probe and returns a Probe or an error.
27+
func DecodeJSON(jsonProbe string) (*kcore.Probe, error) {
28+
pb := &kcore.Probe{}
29+
if err := json.Unmarshal([]byte(jsonProbe), pb); err != nil {
30+
return nil, err
31+
}
32+
return pb, nil
33+
}
34+
35+
// EncodeJSON takes *kcore.Probe object and returns marshalled Probe JSON string and an error.
36+
func EncodeJSON(pb *kcore.Probe) (string, error) {
37+
if pb == nil {
38+
return "", errors.New("cannot encode nil probe")
39+
}
40+
41+
probeJSON, err := json.Marshal(pb)
42+
if err != nil {
43+
return "", err
44+
}
45+
return string(probeJSON), nil
46+
}

pkg/proxy/probe/encoding_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
Copyright 2021 Cortex Labs, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package probe_test
18+
19+
import (
20+
"encoding/json"
21+
"testing"
22+
23+
"github.com/cortexlabs/cortex/pkg/proxy/probe"
24+
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
26+
kcore "k8s.io/api/core/v1"
27+
"k8s.io/apimachinery/pkg/util/intstr"
28+
)
29+
30+
func TestDecodeProbeSuccess(t *testing.T) {
31+
t.Parallel()
32+
33+
expectedProbe := &kcore.Probe{
34+
PeriodSeconds: 1,
35+
TimeoutSeconds: 2,
36+
SuccessThreshold: 1,
37+
FailureThreshold: 1,
38+
Handler: kcore.Handler{
39+
TCPSocket: &kcore.TCPSocketAction{
40+
Host: "127.0.0.1",
41+
Port: intstr.FromString("8080"),
42+
},
43+
},
44+
}
45+
probeBytes, err := json.Marshal(expectedProbe)
46+
require.NoError(t, err)
47+
48+
gotProbe, err := probe.DecodeJSON(string(probeBytes))
49+
require.NoError(t, err)
50+
51+
require.Equal(t, expectedProbe, gotProbe)
52+
}
53+
54+
func TestDecodeProbeFailure(t *testing.T) {
55+
t.Parallel()
56+
57+
probeBytes, err := json.Marshal("blah")
58+
require.NoError(t, err)
59+
60+
_, err = probe.DecodeJSON(string(probeBytes))
61+
require.Error(t, err)
62+
}
63+
64+
func TestEncodeProbe(t *testing.T) {
65+
t.Parallel()
66+
67+
pb := &kcore.Probe{
68+
SuccessThreshold: 1,
69+
Handler: kcore.Handler{
70+
TCPSocket: &kcore.TCPSocketAction{
71+
Host: "127.0.0.1",
72+
Port: intstr.FromString("8080"),
73+
},
74+
},
75+
}
76+
77+
jsonProbe, err := probe.EncodeJSON(pb)
78+
require.NoError(t, err)
79+
80+
wantProbe := `{"tcpSocket":{"port":"8080","host":"127.0.0.1"},"successThreshold":1}`
81+
require.Equal(t, wantProbe, jsonProbe)
82+
}
83+
84+
func TestEncodeNilProbe(t *testing.T) {
85+
t.Parallel()
86+
87+
jsonProbe, err := probe.EncodeJSON(nil)
88+
assert.Error(t, err)
89+
assert.Empty(t, jsonProbe)
90+
}

pkg/proxy/probe/handler.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
Copyright 2021 Cortex Labs, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package probe
18+
19+
import "net/http"
20+
21+
func Handler(pb *Probe) http.HandlerFunc {
22+
return func(w http.ResponseWriter, r *http.Request) {
23+
healthy := pb.ProbeContainer()
24+
if !healthy {
25+
w.WriteHeader(http.StatusInternalServerError)
26+
_, _ = w.Write([]byte("unhealthy"))
27+
return
28+
}
29+
30+
w.WriteHeader(http.StatusOK)
31+
_, _ = w.Write([]byte("healthy"))
32+
}
33+
}

0 commit comments

Comments
 (0)