Skip to content

Commit 4976481

Browse files
authored
Operator changes for CaaS implementation (#2177)
1 parent 2ef2353 commit 4976481

File tree

26 files changed

+415
-273
lines changed

26 files changed

+415
-273
lines changed

cmd/proxy/main.go

Lines changed: 72 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,13 @@ import (
2525
"strconv"
2626
"time"
2727

28+
"github.com/cortexlabs/cortex/pkg/lib/aws"
29+
"github.com/cortexlabs/cortex/pkg/lib/errors"
2830
"github.com/cortexlabs/cortex/pkg/lib/logging"
31+
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
2932
"github.com/cortexlabs/cortex/pkg/proxy"
33+
"github.com/cortexlabs/cortex/pkg/types/clusterconfig"
34+
"github.com/cortexlabs/cortex/pkg/types/userconfig"
3035
"go.uber.org/zap"
3136
)
3237

@@ -35,20 +40,45 @@ const (
3540
_requestSampleInterval = 1 * time.Second
3641
)
3742

43+
var (
44+
proxyLogger = logging.GetLogger()
45+
)
46+
47+
func Exit(err error, wrapStrs ...string) {
48+
for _, str := range wrapStrs {
49+
err = errors.Wrap(err, str)
50+
}
51+
52+
if err != nil && !errors.IsNoTelemetry(err) {
53+
telemetry.Error(err)
54+
}
55+
56+
if err != nil && !errors.IsNoPrint(err) {
57+
proxyLogger.Error(err)
58+
}
59+
60+
telemetry.Close()
61+
62+
os.Exit(1)
63+
}
64+
3865
func main() {
3966
var (
4067
port int
4168
metricsPort int
4269
userContainerPort int
4370
maxConcurrency int
4471
maxQueueLength int
72+
clusterConfigPath string
4573
)
4674

47-
flag.IntVar(&port, "port", 8000, "port where the proxy will be served")
48-
flag.IntVar(&metricsPort, "metrics-port", 8001, "port where the proxy will be served")
49-
flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy will redirect to the traffic to")
75+
flag.IntVar(&port, "port", 8888, "port where the proxy is served")
76+
flag.IntVar(&metricsPort, "metrics-port", 15000, "metrics port for prometheus")
77+
flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy redirects to the traffic to")
5078
flag.IntVar(&maxConcurrency, "max-concurrency", 0, "max concurrency allowed for user container")
5179
flag.IntVar(&maxQueueLength, "max-queue-length", 0, "max request queue length for user container")
80+
flag.StringVar(&clusterConfigPath, "cluster-config", "", "cluster config path")
81+
5282
flag.Parse()
5383

5484
log := logging.GetLogger()
@@ -58,12 +88,44 @@ func main() {
5888

5989
switch {
6090
case maxConcurrency == 0:
61-
log.Fatal("--max-concurrency flag is required")
91+
log.Fatal("-max-concurrency flag is required")
6292
case maxQueueLength == 0:
63-
maxQueueLength = maxConcurrency * 10
93+
log.Fatal("-max-queue-length flag is required")
94+
case clusterConfigPath == "":
95+
log.Fatal("-cluster-config flag is required")
96+
}
97+
98+
clusterConfig, err := clusterconfig.NewForFile(clusterConfigPath)
99+
if err != nil {
100+
Exit(err)
101+
}
102+
103+
awsClient, err := aws.NewForRegion(clusterConfig.Region)
104+
if err != nil {
105+
Exit(err)
106+
}
107+
108+
_, userID, err := awsClient.CheckCredentials()
109+
if err != nil {
110+
Exit(err)
111+
}
112+
113+
err = telemetry.Init(telemetry.Config{
114+
Enabled: clusterConfig.Telemetry,
115+
UserID: userID,
116+
Properties: map[string]string{
117+
"kind": userconfig.RealtimeAPIKind.String(),
118+
"image_type": "proxy",
119+
},
120+
Environment: "api",
121+
LogErrors: true,
122+
BackoffMode: telemetry.BackoffDuplicateMessages,
123+
})
124+
if err != nil {
125+
Exit(err)
64126
}
65127

66-
target := "http://127.0.0.1:" + strconv.Itoa(port)
128+
target := "http://127.0.0.1:" + strconv.Itoa(userContainerPort)
67129
httpProxy := proxy.NewReverseProxy(target, maxQueueLength, maxQueueLength)
68130

69131
requestCounterStats := &proxy.RequestStats{}
@@ -101,7 +163,7 @@ func main() {
101163

102164
servers := map[string]*http.Server{
103165
"proxy": {
104-
Addr: ":" + strconv.Itoa(userContainerPort),
166+
Addr: ":" + strconv.Itoa(port),
105167
Handler: proxy.Handler(breaker, httpProxy),
106168
},
107169
"metrics": {
@@ -123,7 +185,7 @@ func main() {
123185

124186
select {
125187
case err := <-errCh:
126-
log.Fatal("failed to start proxy server", zap.Error(err))
188+
Exit(errors.Wrap(err, "failed to start proxy server"))
127189
case <-sigint:
128190
// We received an interrupt signal, shut down.
129191
log.Info("Received TERM signal, handling a graceful shutdown...")
@@ -133,8 +195,10 @@ func main() {
133195
if err := server.Shutdown(context.Background()); err != nil {
134196
// Error from closing listeners, or context timeout:
135197
log.Warn("HTTP server Shutdown Error", zap.Error(err))
198+
telemetry.Error(errors.Wrap(err, "HTTP server Shutdown Error"))
136199
}
137200
}
138201
log.Info("Shutdown complete, exiting...")
202+
telemetry.Close()
139203
}
140204
}

dev/registry.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ elif [ "$cmd" = "create" ]; then
222222
# usage: registry.sh update-single IMAGE
223223
elif [ "$cmd" = "update-single" ]; then
224224
image=$sub_cmd
225-
if [ "$image" = "operator" ] || [ "$image" = "request-monitor" ]; then
225+
if [ "$image" = "operator" ] || [ "$image" = "proxy" ]; then
226226
cache_builder $image
227227
fi
228228
build_and_push $image
@@ -245,8 +245,8 @@ elif [ "$cmd" = "update" ]; then
245245
if [[ " ${images_to_build[@]} " =~ " operator " ]]; then
246246
cache_builder operator
247247
fi
248-
if [[ " ${images_to_build[@]} " =~ " request-monitor " ]]; then
249-
cache_builder request-monitor
248+
if [[ " ${images_to_build[@]} " =~ " proxy " ]]; then
249+
cache_builder proxy
250250
fi
251251
if [[ " ${images_to_build[@]} " =~ " async-gateway " ]]; then
252252
cache_builder async-gateway

manager/manifests/prometheus-monitoring.yaml.j2

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ spec:
3434
matchExpressions:
3535
- key: "monitoring.cortex.dev"
3636
operator: "In"
37-
values: [ "istio", "request-monitor", "statsd-exporter", "dcgm-exporter", "kube-state-metrics" ]
37+
values: [ "istio", "proxy", "statsd-exporter", "dcgm-exporter", "kube-state-metrics" ]
3838
serviceMonitorSelector:
3939
matchExpressions:
4040
- key: "monitoring.cortex.dev"
@@ -168,9 +168,9 @@ spec:
168168
apiVersion: monitoring.coreos.com/v1
169169
kind: PodMonitor
170170
metadata:
171-
name: request-monitor-stats
171+
name: proxy-stats
172172
labels:
173-
monitoring.cortex.dev: "request-monitor"
173+
monitoring.cortex.dev: "proxy"
174174
spec:
175175
selector:
176176
matchLabels:
@@ -179,7 +179,7 @@ spec:
179179
- { key: prometheus-ignore, operator: DoesNotExist }
180180
namespaceSelector:
181181
any: true
182-
jobLabel: request-monitor-stats
182+
jobLabel: proxy-stats
183183
podMetricsEndpoints:
184184
- path: /metrics
185185
scheme: http
@@ -188,7 +188,7 @@ spec:
188188
relabelings:
189189
- action: keep
190190
sourceLabels: [ __meta_kubernetes_pod_container_name ]
191-
regex: "request-monitor"
191+
regex: "proxy"
192192
- sourceLabels: [ __meta_kubernetes_pod_label_apiName ]
193193
action: replace
194194
targetLabel: api_name

pkg/consts/consts.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,17 @@ var (
2424
CortexVersion = "master" // CORTEX_VERSION
2525
CortexVersionMinor = "master" // CORTEX_VERSION_MINOR
2626

27-
ProxyListeningPort = int64(8888)
28-
ProxyListeningPortStr = "8888"
29-
DefaultMaxReplicaQueueLength = int64(1024)
30-
DefaultMaxReplicaConcurrency = int64(1024)
31-
DefaultTargetReplicaConcurrency = float64(8)
32-
NeuronCoresPerInf = int64(4)
27+
DefaultMaxQueueLength = int64(1024)
28+
DefaultMaxConcurrency = int64(16)
29+
30+
DefaultUserPodPortStr = "8080"
31+
DefaultUserPodPortInt32 = int32(8080)
32+
33+
ProxyListeningPortStr = "8888"
34+
ProxyListeningPortInt32 = int32(8888)
35+
36+
MetricsPortStr = "15000"
37+
MetricsPortInt32 = int32(15000)
3338

3439
AuthHeader = "X-Cortex-Authorization"
3540

@@ -38,7 +43,7 @@ var (
3843
AsyncWorkloadsExpirationDays = int64(7)
3944

4045
ReservedContainerNames = []string{
41-
"neuron-rtd",
46+
"proxy",
4247
}
4348
)
4449

pkg/crds/controllers/batch/batchjob_controller_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"time"
2323

2424
batch "github.com/cortexlabs/cortex/pkg/crds/apis/batch/v1alpha1"
25+
"github.com/cortexlabs/cortex/pkg/lib/pointer"
2526
"github.com/cortexlabs/cortex/pkg/lib/random"
2627
"github.com/cortexlabs/cortex/pkg/types/spec"
2728
"github.com/cortexlabs/cortex/pkg/types/status"
@@ -43,7 +44,7 @@ func uploadTestAPISpec(apiName string, apiID string) error {
4344
Kind: userconfig.BatchAPIKind,
4445
},
4546
Pod: &userconfig.Pod{
46-
// TODO use a real image
47+
Port: pointer.Int32(8080),
4748
Containers: []*userconfig.Container{
4849
{
4950
Name: "api",

pkg/lib/strings/operations.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,10 @@ func PluralEs(str string, count interface{}) string {
220220
return PluralCustom(str, str+"es", count)
221221
}
222222

223+
func PluralIs(count interface{}) string {
224+
return PluralCustom("is", "are", count)
225+
}
226+
223227
func PluralCustom(singular string, plural string, count interface{}) string {
224228
countInt, _ := cast.InterfaceToInt64(count)
225229
if countInt == 1 {

pkg/operator/lib/autoscaler/autoscaler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func AutoscaleFn(initialDeployment *kapps.Deployment, apiSpec *spec.API, getInFl
133133
return nil
134134
}
135135

136-
rawRecommendation := *avgInFlight / autoscalingSpec.TargetReplicaConcurrency
136+
rawRecommendation := *avgInFlight / *autoscalingSpec.TargetInFlight
137137
recommendation := int32(math.Ceil(rawRecommendation))
138138

139139
if rawRecommendation < float64(currentReplicas) && rawRecommendation > float64(currentReplicas)*(1-autoscalingSpec.DownscaleTolerance) {
@@ -199,7 +199,7 @@ func AutoscaleFn(initialDeployment *kapps.Deployment, apiSpec *spec.API, getInFl
199199
apiLogger.Debugw(fmt.Sprintf("%s autoscaler tick", apiName),
200200
"autoscaling", map[string]interface{}{
201201
"avg_in_flight": *avgInFlight,
202-
"target_replica_concurrency": autoscalingSpec.TargetReplicaConcurrency,
202+
"target_in_flight": *autoscalingSpec.TargetInFlight,
203203
"raw_recommendation": rawRecommendation,
204204
"current_replicas": currentReplicas,
205205
"downscale_tolerance": autoscalingSpec.DownscaleTolerance,

pkg/operator/resources/asyncapi/k8s_specs.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package asyncapi
1818

1919
import (
20+
"github.com/cortexlabs/cortex/pkg/consts"
2021
"github.com/cortexlabs/cortex/pkg/lib/k8s"
2122
"github.com/cortexlabs/cortex/pkg/lib/pointer"
2223
"github.com/cortexlabs/cortex/pkg/types/spec"
@@ -128,8 +129,8 @@ func gatewayServiceSpec(api spec.API) kcore.Service {
128129
return *k8s.Service(&k8s.ServiceSpec{
129130
Name: workloads.K8sName(api.Name),
130131
PortName: "http",
131-
Port: workloads.DefaultPortInt32,
132-
TargetPort: workloads.DefaultPortInt32,
132+
Port: consts.ProxyListeningPortInt32,
133+
TargetPort: consts.ProxyListeningPortInt32,
133134
Annotations: api.ToK8sAnnotations(),
134135
Labels: map[string]string{
135136
"apiName": api.Name,
@@ -152,7 +153,7 @@ func gatewayVirtualServiceSpec(api spec.API) v1beta1.VirtualService {
152153
Destinations: []k8s.Destination{{
153154
ServiceName: workloads.K8sName(api.Name),
154155
Weight: 100,
155-
Port: uint32(workloads.DefaultPortInt32),
156+
Port: uint32(consts.ProxyListeningPortInt32),
156157
}},
157158
PrefixPath: api.Networking.Endpoint,
158159
Rewrite: pointer.String("/"),

pkg/operator/resources/job/batchapi/k8s_specs.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"path"
2222

2323
"github.com/cortexlabs/cortex/pkg/config"
24+
"github.com/cortexlabs/cortex/pkg/consts"
2425
batch "github.com/cortexlabs/cortex/pkg/crds/apis/batch/v1alpha1"
2526
"github.com/cortexlabs/cortex/pkg/lib/k8s"
2627
"github.com/cortexlabs/cortex/pkg/lib/parallel"
@@ -40,7 +41,7 @@ func virtualServiceSpec(api *spec.API) *istioclientnetworking.VirtualService {
4041
Destinations: []k8s.Destination{{
4142
ServiceName: _operatorService,
4243
Weight: 100,
43-
Port: uint32(workloads.DefaultPortInt32),
44+
Port: uint32(consts.ProxyListeningPortInt32),
4445
}},
4546
PrefixPath: api.Networking.Endpoint,
4647
Rewrite: pointer.String(path.Join("batch", api.Name)),

pkg/operator/resources/job/taskapi/k8s_specs.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"path"
2121

2222
"github.com/cortexlabs/cortex/pkg/config"
23+
"github.com/cortexlabs/cortex/pkg/consts"
2324
"github.com/cortexlabs/cortex/pkg/lib/k8s"
2425
"github.com/cortexlabs/cortex/pkg/lib/parallel"
2526
"github.com/cortexlabs/cortex/pkg/lib/pointer"
@@ -42,7 +43,7 @@ func virtualServiceSpec(api *spec.API) *istioclientnetworking.VirtualService {
4243
Destinations: []k8s.Destination{{
4344
ServiceName: _operatorService,
4445
Weight: 100,
45-
Port: uint32(workloads.DefaultPortInt32),
46+
Port: uint32(consts.ProxyListeningPortInt32),
4647
}},
4748
PrefixPath: api.Networking.Endpoint,
4849
Rewrite: pointer.String(path.Join("tasks", api.Name)),

pkg/operator/resources/realtimeapi/api.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,11 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A
116116
}
117117

118118
func RefreshAPI(apiName string, force bool) (string, error) {
119-
prevDeployment, err := config.K8s.GetDeployment(workloads.K8sName(apiName))
119+
prevDeployment, prevService, prevVirtualService, err := getK8sResources(&userconfig.API{
120+
Resource: userconfig.Resource{
121+
Name: apiName,
122+
},
123+
})
120124
if err != nil {
121125
return "", err
122126
} else if prevDeployment == nil {
@@ -153,7 +157,7 @@ func RefreshAPI(apiName string, force bool) (string, error) {
153157
return "", errors.Wrap(err, "upload handler spec")
154158
}
155159

156-
if err := applyK8sDeployment(api, prevDeployment); err != nil {
160+
if err := applyK8sResources(api, prevDeployment, prevService, prevVirtualService); err != nil {
157161
return "", err
158162
}
159163

0 commit comments

Comments
 (0)