Skip to content

Operator changes for CaaS implementation #2177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 72 additions & 8 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ import (
"strconv"
"time"

"github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/logging"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
"github.com/cortexlabs/cortex/pkg/proxy"
"github.com/cortexlabs/cortex/pkg/types/clusterconfig"
"github.com/cortexlabs/cortex/pkg/types/userconfig"
"go.uber.org/zap"
)

Expand All @@ -35,20 +40,45 @@ const (
_requestSampleInterval = 1 * time.Second
)

var (
proxyLogger = logging.GetLogger()
)

func Exit(err error, wrapStrs ...string) {
for _, str := range wrapStrs {
err = errors.Wrap(err, str)
}

if err != nil && !errors.IsNoTelemetry(err) {
telemetry.Error(err)
}

if err != nil && !errors.IsNoPrint(err) {
proxyLogger.Error(err)
}

telemetry.Close()

os.Exit(1)
}

func main() {
var (
port int
metricsPort int
userContainerPort int
maxConcurrency int
maxQueueLength int
clusterConfigPath string
)

flag.IntVar(&port, "port", 8000, "port where the proxy will be served")
flag.IntVar(&metricsPort, "metrics-port", 8001, "port where the proxy will be served")
flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy will redirect to the traffic to")
flag.IntVar(&port, "port", 8888, "port where the proxy is served")
flag.IntVar(&metricsPort, "metrics-port", 15000, "metrics port for prometheus")
flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy redirects to the traffic to")
flag.IntVar(&maxConcurrency, "max-concurrency", 0, "max concurrency allowed for user container")
flag.IntVar(&maxQueueLength, "max-queue-length", 0, "max request queue length for user container")
flag.StringVar(&clusterConfigPath, "cluster-config", "", "cluster config path")

flag.Parse()

log := logging.GetLogger()
Expand All @@ -58,12 +88,44 @@ func main() {

switch {
case maxConcurrency == 0:
log.Fatal("--max-concurrency flag is required")
log.Fatal("-max-concurrency flag is required")
case maxQueueLength == 0:
maxQueueLength = maxConcurrency * 10
log.Fatal("-max-queue-length flag is required")
case clusterConfigPath == "":
log.Fatal("-cluster-config flag is required")
}

clusterConfig, err := clusterconfig.NewForFile(clusterConfigPath)
if err != nil {
Exit(err)
}

awsClient, err := aws.NewForRegion(clusterConfig.Region)
if err != nil {
Exit(err)
}

_, userID, err := awsClient.CheckCredentials()
if err != nil {
Exit(err)
}

err = telemetry.Init(telemetry.Config{
Enabled: clusterConfig.Telemetry,
UserID: userID,
Properties: map[string]string{
"kind": userconfig.RealtimeAPIKind.String(),
"image_type": "proxy",
},
Environment: "api",
LogErrors: true,
BackoffMode: telemetry.BackoffDuplicateMessages,
})
if err != nil {
Exit(err)
}

target := "http://127.0.0.1:" + strconv.Itoa(port)
target := "http://127.0.0.1:" + strconv.Itoa(userContainerPort)
httpProxy := proxy.NewReverseProxy(target, maxQueueLength, maxQueueLength)

requestCounterStats := &proxy.RequestStats{}
Expand Down Expand Up @@ -101,7 +163,7 @@ func main() {

servers := map[string]*http.Server{
"proxy": {
Addr: ":" + strconv.Itoa(userContainerPort),
Addr: ":" + strconv.Itoa(port),
Handler: proxy.Handler(breaker, httpProxy),
},
"metrics": {
Expand All @@ -123,7 +185,7 @@ func main() {

select {
case err := <-errCh:
log.Fatal("failed to start proxy server", zap.Error(err))
Exit(errors.Wrap(err, "failed to start proxy server"))
case <-sigint:
// We received an interrupt signal, shut down.
log.Info("Received TERM signal, handling a graceful shutdown...")
Expand All @@ -133,8 +195,10 @@ func main() {
if err := server.Shutdown(context.Background()); err != nil {
// Error from closing listeners, or context timeout:
log.Warn("HTTP server Shutdown Error", zap.Error(err))
telemetry.Error(errors.Wrap(err, "HTTP server Shutdown Error"))
}
}
log.Info("Shutdown complete, exiting...")
telemetry.Close()
}
}
6 changes: 3 additions & 3 deletions dev/registry.sh
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ elif [ "$cmd" = "create" ]; then
# usage: registry.sh update-single IMAGE
elif [ "$cmd" = "update-single" ]; then
image=$sub_cmd
if [ "$image" = "operator" ] || [ "$image" = "request-monitor" ]; then
if [ "$image" = "operator" ] || [ "$image" = "proxy" ]; then
cache_builder $image
fi
build_and_push $image
Expand All @@ -245,8 +245,8 @@ elif [ "$cmd" = "update" ]; then
if [[ " ${images_to_build[@]} " =~ " operator " ]]; then
cache_builder operator
fi
if [[ " ${images_to_build[@]} " =~ " request-monitor " ]]; then
cache_builder request-monitor
if [[ " ${images_to_build[@]} " =~ " proxy " ]]; then
cache_builder proxy
fi
if [[ " ${images_to_build[@]} " =~ " async-gateway " ]]; then
cache_builder async-gateway
Expand Down
10 changes: 5 additions & 5 deletions manager/manifests/prometheus-monitoring.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ spec:
matchExpressions:
- key: "monitoring.cortex.dev"
operator: "In"
values: [ "istio", "request-monitor", "statsd-exporter", "dcgm-exporter", "kube-state-metrics" ]
values: [ "istio", "proxy", "statsd-exporter", "dcgm-exporter", "kube-state-metrics" ]
serviceMonitorSelector:
matchExpressions:
- key: "monitoring.cortex.dev"
Expand Down Expand Up @@ -168,9 +168,9 @@ spec:
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: request-monitor-stats
name: proxy-stats
labels:
monitoring.cortex.dev: "request-monitor"
monitoring.cortex.dev: "proxy"
spec:
selector:
matchLabels:
Expand All @@ -179,7 +179,7 @@ spec:
- { key: prometheus-ignore, operator: DoesNotExist }
namespaceSelector:
any: true
jobLabel: request-monitor-stats
jobLabel: proxy-stats
podMetricsEndpoints:
- path: /metrics
scheme: http
Expand All @@ -188,7 +188,7 @@ spec:
relabelings:
- action: keep
sourceLabels: [ __meta_kubernetes_pod_container_name ]
regex: "request-monitor"
regex: "proxy"
- sourceLabels: [ __meta_kubernetes_pod_label_apiName ]
action: replace
targetLabel: api_name
Expand Down
19 changes: 12 additions & 7 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ var (
CortexVersion = "master" // CORTEX_VERSION
CortexVersionMinor = "master" // CORTEX_VERSION_MINOR

ProxyListeningPort = int64(8888)
ProxyListeningPortStr = "8888"
DefaultMaxReplicaQueueLength = int64(1024)
DefaultMaxReplicaConcurrency = int64(1024)
DefaultTargetReplicaConcurrency = float64(8)
NeuronCoresPerInf = int64(4)
DefaultMaxQueueLength = int64(1024)
DefaultMaxConcurrency = int64(16)

DefaultUserPodPortStr = "8080"
DefaultUserPodPortInt32 = int32(8080)

ProxyListeningPortStr = "8888"
ProxyListeningPortInt32 = int32(8888)

MetricsPortStr = "15000"
MetricsPortInt32 = int32(15000)

AuthHeader = "X-Cortex-Authorization"

Expand All @@ -38,7 +43,7 @@ var (
AsyncWorkloadsExpirationDays = int64(7)

ReservedContainerNames = []string{
"neuron-rtd",
"proxy",
}
)

Expand Down
3 changes: 2 additions & 1 deletion pkg/crds/controllers/batch/batchjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

batch "github.com/cortexlabs/cortex/pkg/crds/apis/batch/v1alpha1"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
"github.com/cortexlabs/cortex/pkg/lib/random"
"github.com/cortexlabs/cortex/pkg/types/spec"
"github.com/cortexlabs/cortex/pkg/types/status"
Expand All @@ -43,7 +44,7 @@ func uploadTestAPISpec(apiName string, apiID string) error {
Kind: userconfig.BatchAPIKind,
},
Pod: &userconfig.Pod{
// TODO use a real image
Port: pointer.Int32(8080),
Containers: []*userconfig.Container{
{
Name: "api",
Expand Down
4 changes: 4 additions & 0 deletions pkg/lib/strings/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ func PluralEs(str string, count interface{}) string {
return PluralCustom(str, str+"es", count)
}

func PluralIs(count interface{}) string {
return PluralCustom("is", "are", count)
}

func PluralCustom(singular string, plural string, count interface{}) string {
countInt, _ := cast.InterfaceToInt64(count)
if countInt == 1 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/lib/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func AutoscaleFn(initialDeployment *kapps.Deployment, apiSpec *spec.API, getInFl
return nil
}

rawRecommendation := *avgInFlight / autoscalingSpec.TargetReplicaConcurrency
rawRecommendation := *avgInFlight / *autoscalingSpec.TargetInFlight
recommendation := int32(math.Ceil(rawRecommendation))

if rawRecommendation < float64(currentReplicas) && rawRecommendation > float64(currentReplicas)*(1-autoscalingSpec.DownscaleTolerance) {
Expand Down Expand Up @@ -199,7 +199,7 @@ func AutoscaleFn(initialDeployment *kapps.Deployment, apiSpec *spec.API, getInFl
apiLogger.Debugw(fmt.Sprintf("%s autoscaler tick", apiName),
"autoscaling", map[string]interface{}{
"avg_in_flight": *avgInFlight,
"target_replica_concurrency": autoscalingSpec.TargetReplicaConcurrency,
"target_in_flight": *autoscalingSpec.TargetInFlight,
"raw_recommendation": rawRecommendation,
"current_replicas": currentReplicas,
"downscale_tolerance": autoscalingSpec.DownscaleTolerance,
Expand Down
7 changes: 4 additions & 3 deletions pkg/operator/resources/asyncapi/k8s_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package asyncapi

import (
"github.com/cortexlabs/cortex/pkg/consts"
"github.com/cortexlabs/cortex/pkg/lib/k8s"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
"github.com/cortexlabs/cortex/pkg/types/spec"
Expand Down Expand Up @@ -128,8 +129,8 @@ func gatewayServiceSpec(api spec.API) kcore.Service {
return *k8s.Service(&k8s.ServiceSpec{
Name: workloads.K8sName(api.Name),
PortName: "http",
Port: workloads.DefaultPortInt32,
TargetPort: workloads.DefaultPortInt32,
Port: consts.ProxyListeningPortInt32,
TargetPort: consts.ProxyListeningPortInt32,
Annotations: api.ToK8sAnnotations(),
Labels: map[string]string{
"apiName": api.Name,
Expand All @@ -152,7 +153,7 @@ func gatewayVirtualServiceSpec(api spec.API) v1beta1.VirtualService {
Destinations: []k8s.Destination{{
ServiceName: workloads.K8sName(api.Name),
Weight: 100,
Port: uint32(workloads.DefaultPortInt32),
Port: uint32(consts.ProxyListeningPortInt32),
}},
PrefixPath: api.Networking.Endpoint,
Rewrite: pointer.String("/"),
Expand Down
3 changes: 2 additions & 1 deletion pkg/operator/resources/job/batchapi/k8s_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"path"

"github.com/cortexlabs/cortex/pkg/config"
"github.com/cortexlabs/cortex/pkg/consts"
batch "github.com/cortexlabs/cortex/pkg/crds/apis/batch/v1alpha1"
"github.com/cortexlabs/cortex/pkg/lib/k8s"
"github.com/cortexlabs/cortex/pkg/lib/parallel"
Expand All @@ -40,7 +41,7 @@ func virtualServiceSpec(api *spec.API) *istioclientnetworking.VirtualService {
Destinations: []k8s.Destination{{
ServiceName: _operatorService,
Weight: 100,
Port: uint32(workloads.DefaultPortInt32),
Port: uint32(consts.ProxyListeningPortInt32),
}},
PrefixPath: api.Networking.Endpoint,
Rewrite: pointer.String(path.Join("batch", api.Name)),
Expand Down
3 changes: 2 additions & 1 deletion pkg/operator/resources/job/taskapi/k8s_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"path"

"github.com/cortexlabs/cortex/pkg/config"
"github.com/cortexlabs/cortex/pkg/consts"
"github.com/cortexlabs/cortex/pkg/lib/k8s"
"github.com/cortexlabs/cortex/pkg/lib/parallel"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
Expand All @@ -42,7 +43,7 @@ func virtualServiceSpec(api *spec.API) *istioclientnetworking.VirtualService {
Destinations: []k8s.Destination{{
ServiceName: _operatorService,
Weight: 100,
Port: uint32(workloads.DefaultPortInt32),
Port: uint32(consts.ProxyListeningPortInt32),
}},
PrefixPath: api.Networking.Endpoint,
Rewrite: pointer.String(path.Join("tasks", api.Name)),
Expand Down
8 changes: 6 additions & 2 deletions pkg/operator/resources/realtimeapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A
}

func RefreshAPI(apiName string, force bool) (string, error) {
prevDeployment, err := config.K8s.GetDeployment(workloads.K8sName(apiName))
prevDeployment, prevService, prevVirtualService, err := getK8sResources(&userconfig.API{
Resource: userconfig.Resource{
Name: apiName,
},
})
if err != nil {
return "", err
} else if prevDeployment == nil {
Expand Down Expand Up @@ -153,7 +157,7 @@ func RefreshAPI(apiName string, force bool) (string, error) {
return "", errors.Wrap(err, "upload handler spec")
}

if err := applyK8sDeployment(api, prevDeployment); err != nil {
if err := applyK8sResources(api, prevDeployment, prevService, prevVirtualService); err != nil {
return "", err
}

Expand Down
Loading