Skip to content

Commit

Permalink
Use a shared async gateway between all async apis (#2380)
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalbollu authored Jul 30, 2021
1 parent 81845ae commit 2203a92
Show file tree
Hide file tree
Showing 19 changed files with 237 additions and 381 deletions.
9 changes: 3 additions & 6 deletions cli/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ var _clusterHealthCmd = &cobra.Command{
{"prometheus", console.BoolColor(clusterHealth.Prometheus), clusterWarnings.Prometheus},
{"autoscaler", console.BoolColor(clusterHealth.Autoscaler), ""},
{"activator", console.BoolColor(clusterHealth.Activator), ""},
{"async gateway", console.BoolColor(clusterHealth.AsyncGateway), ""},
{"grafana", console.BoolColor(clusterHealth.Grafana), ""},
{"controller manager", console.BoolColor(clusterHealth.ControllerManager), ""},
{"apis gateway", console.BoolColor(clusterHealth.APIsGateway), ""},
Expand Down Expand Up @@ -1057,7 +1058,7 @@ func printInfoNodes(infoResponse *schema.InfoResponse) {
numAPIInstances := len(infoResponse.WorkerNodeInfos)

var totalReplicas int
var doesClusterHaveGPUs, doesClusterHaveInfs, doesClusterHaveAsyncGateways, doesClusterHaveEnqueuers bool
var doesClusterHaveGPUs, doesClusterHaveInfs, doesClusterHaveEnqueuers bool
for _, nodeInfo := range infoResponse.WorkerNodeInfos {
totalReplicas += nodeInfo.NumReplicas
if nodeInfo.ComputeUserCapacity.GPU > 0 {
Expand All @@ -1066,9 +1067,6 @@ func printInfoNodes(infoResponse *schema.InfoResponse) {
if nodeInfo.ComputeUserCapacity.Inf > 0 {
doesClusterHaveInfs = true
}
if nodeInfo.NumAsyncGatewayReplicas > 0 {
doesClusterHaveAsyncGateways = true
}
if nodeInfo.NumEnqueuerReplicas > 0 {
doesClusterHaveEnqueuers = true
}
Expand All @@ -1089,7 +1087,6 @@ func printInfoNodes(infoResponse *schema.InfoResponse) {
{Title: "instance type"},
{Title: "lifecycle"},
{Title: "replicas"},
{Title: "async gateway replicas", Hidden: !doesClusterHaveAsyncGateways},
{Title: "batch enqueuer replicas", Hidden: !doesClusterHaveEnqueuers},
{Title: "CPU (requested / total allocatable)"},
{Title: "memory (requested / total allocatable)"},
Expand All @@ -1108,7 +1105,7 @@ func printInfoNodes(infoResponse *schema.InfoResponse) {
memStr := nodeInfo.ComputeUserRequested.Mem.String() + " / " + nodeInfo.ComputeUserCapacity.Mem.String()
gpuStr := s.Int64(nodeInfo.ComputeUserRequested.GPU) + " / " + s.Int64(nodeInfo.ComputeUserCapacity.GPU)
infStr := s.Int64(nodeInfo.ComputeUserRequested.Inf) + " / " + s.Int64(nodeInfo.ComputeUserCapacity.Inf)
rows = append(rows, []interface{}{nodeInfo.InstanceType, lifecycle, nodeInfo.NumReplicas, nodeInfo.NumAsyncGatewayReplicas, nodeInfo.NumEnqueuerReplicas, cpuStr, memStr, gpuStr, infStr})
rows = append(rows, []interface{}{nodeInfo.InstanceType, lifecycle, nodeInfo.NumReplicas, nodeInfo.NumEnqueuerReplicas, cpuStr, memStr, gpuStr, infStr})
}

t := table.Table{
Expand Down
38 changes: 14 additions & 24 deletions cmd/async-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
"flag"
"net/http"
"os"
"strings"

gateway "github.com/cortexlabs/cortex/pkg/async-gateway"
"github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/logging"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
"github.com/cortexlabs/cortex/pkg/types/clusterconfig"
"github.com/cortexlabs/cortex/pkg/types/userconfig"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
Expand All @@ -37,38 +37,28 @@ const (
_defaultPort = "8080"
)

// usage: ./gateway -bucket <bucket> -region <region> -port <port> -queue queue <apiName>
// usage: ./gateway -bucket <bucket> -region <region> -port <port>
func main() {
log := logging.GetLogger()
defer func() {
_ = log.Sync()
}()

var (
clusterConfigPath = flag.String("cluster-config", "", "cluster config path")
port = flag.String("port", _defaultPort, "port on which the gateway server runs on")
queueURL = flag.String("queue", "", "SQS queue URL")
bucket = flag.String("bucket", "", "bucket")
clusterUID = flag.String("cluster-uid", "", "cluster uid")
port = flag.String("port", _defaultPort, "port on which the gateway server runs on")
)
flag.Parse()

switch {
case *queueURL == "":
log.Fatal("missing required option: -queue")
case *clusterConfigPath == "":
log.Fatal("missing required option: -cluster-config")
case *bucket == "":
log.Fatal("missing required option: -bucket")
case *clusterUID == "":
log.Fatal("missing required option: -cluster-uid")
}

apiName := flag.Arg(0)
if apiName == "" {
log.Fatal("apiName argument was not provided")
}

clusterConfig, err := clusterconfig.NewForFile(*clusterConfigPath)
if err != nil {
exit(log, err)
}

awsClient, err := aws.NewForRegion(clusterConfig.Region)
awsClient, err := aws.New()
if err != nil {
exit(log, err)
}
Expand All @@ -78,8 +68,9 @@ func main() {
exit(log, err)
}

telemetryEnabled := strings.ToLower(os.Getenv("CORTEX_TELEMETRY_DISABLE")) != "true"
err = telemetry.Init(telemetry.Config{
Enabled: clusterConfig.Telemetry,
Enabled: telemetryEnabled,
UserID: userID,
Properties: map[string]string{
"kind": userconfig.AsyncAPIKind.String(),
Expand All @@ -95,10 +86,9 @@ func main() {
defer telemetry.Close()

sess := awsClient.Session()
s3Storage := gateway.NewS3(sess, clusterConfig.Bucket)
sqsQueue := gateway.NewSQS(*queueURL, sess)
s3Storage := gateway.NewS3(sess, *bucket)

svc := gateway.NewService(clusterConfig.ClusterUID, apiName, sqsQueue, s3Storage, log)
svc := gateway.NewService(*clusterUID, s3Storage, log, *sess)
ep := gateway.NewEndpoint(svc, log)

router := mux.NewRouter()
Expand Down
6 changes: 2 additions & 4 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,8 @@ func main() {
apiKind := deployment.Labels["apiKind"]
switch apiKind {
case userconfig.AsyncAPIKind.String():
if deployment.Labels["cortex.dev/async"] == "api" {
if err := asyncapi.UpdateAPIMetricsCron(&deployment); err != nil {
operatorLogger.Fatal(errors.Wrap(err, "init"))
}
if err := asyncapi.UpdateAPIMetricsCron(&deployment); err != nil {
operatorLogger.Fatal(errors.Wrap(err, "init"))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions manager/generate_eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,9 @@ def generate_eks(
"ami": get_ami(ami_map, "t3.medium"),
"name": "cx-operator",
"instanceType": "t3.medium",
"minSize": 1,
"minSize": 2,
"maxSize": 25,
"desiredCapacity": 1,
"desiredCapacity": 2,
"volumeType": "gp3",
"volumeSize": 20,
"volumeIOPS": 3000,
Expand Down
4 changes: 4 additions & 0 deletions manager/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ function cluster_up() {
python render_template.py $CORTEX_CLUSTER_CONFIG_FILE manifests/cluster-autoscaler.yaml.j2 | kubectl apply -f - >/dev/null
echo ""

echo -n "○ configuring async gateway "
python render_template.py $CORTEX_CLUSTER_CONFIG_FILE manifests/async-gateway.yaml.j2 | kubectl apply -f - >/dev/null
echo ""

echo -n "○ configuring logging "
python render_template.py $CORTEX_CLUSTER_CONFIG_FILE manifests/fluent-bit.yaml.j2 | kubectl apply -f - >/dev/null
envsubst < manifests/event-exporter.yaml | kubectl apply -f - >/dev/null
Expand Down
109 changes: 109 additions & 0 deletions manager/manifests/async-gateway.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright 2021 Cortex Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ServiceAccount
metadata:
name: async-gateway
namespace: default
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: async-gateway
namespace: default
spec:
selector:
matchLabels:
app: async-gateway
strategy:
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
type: RollingUpdate
template:
metadata:
name: async-gateway
labels:
app: async-gateway
spec:
serviceAccountName: async-gateway
containers:
- name: gateway
image: {{ config["image_async_gateway"] }}
imagePullPolicy: Always
args:
- --port
- "8888"
- --cluster-uid
- "{{ config["cluster_uid"] }}"
- --bucket
- "{{ config["bucket"] }}"
envFrom:
- configMapRef:
name: env-vars
ports:
- containerPort: 8888
readinessProbe:
httpGet:
path: /healthz
port: 8888
scheme: HTTP
livenessProbe:
httpGet:
path: /healthz
port: 8888
scheme: HTTP
resources:
requests:
cpu: 400m
memory: 512Mi
limits:
cpu: 400m
---
apiVersion: v1
kind: Service
metadata:
name: async-gateway
spec:
type: ClusterIP
selector:
app: async-gateway
ports:
- port: 8888
---
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: async-gateway
spec:
maxReplicas: 20
minReplicas: 1
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: async-gateway
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 90
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 90
2 changes: 1 addition & 1 deletion manager/manifests/cluster-autoscaler.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ spec:
cpu: 300m
requests:
cpu: 100m
memory: 200Mi
memory: 400Mi
command:
- ./cluster-autoscaler
- --v=4
Expand Down
2 changes: 1 addition & 1 deletion manager/manifests/istio.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ spec:
resources:
requests:
cpu: 400m
memory: 128Mi
memory: 512Mi
limits:
cpu: 1500m
memory: 1024Mi
Expand Down
1 change: 0 additions & 1 deletion manager/manifests/prometheus-monitoring.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ spec:
selector:
matchLabels:
apiKind: AsyncAPI
cortex.dev/async: api
matchExpressions:
- { key: prometheus-ignore, operator: DoesNotExist }
namespaceSelector:
Expand Down
30 changes: 26 additions & 4 deletions pkg/async-gateway/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net/http"

"github.com/cortexlabs/cortex/pkg/consts"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
"github.com/cortexlabs/cortex/pkg/types/async"
Expand Down Expand Up @@ -50,14 +51,28 @@ func (e *Endpoint) CreateWorkload(w http.ResponseWriter, r *http.Request) {
return
}

apiName := r.Header.Get(consts.CortexAPINameHeader)
if requestID == "" {
respondPlainText(w, http.StatusBadRequest, fmt.Sprintf("error: missing %s key in request header", consts.CortexAPINameHeader))
return
}
r.Header.Del(consts.CortexAPINameHeader)

queueURL := r.Header.Get(consts.CortexQueueURLHeader)
if queueURL == "" {
respondPlainText(w, http.StatusBadRequest, fmt.Sprintf("error: missing %s key in request header", consts.CortexQueueURLHeader))
return
}
r.Header.Del(consts.CortexQueueURLHeader)

body := r.Body
defer func() {
_ = r.Body.Close()
}()

log := e.logger.With(zap.String("id", requestID))
log := e.logger.With(zap.String("id", requestID), zap.String("apiName", apiName))

id, err := e.service.CreateWorkload(requestID, body, r.Header)
id, err := e.service.CreateWorkload(requestID, apiName, queueURL, body, r.Header)
if err != nil {
respondPlainText(w, http.StatusInternalServerError, fmt.Sprintf("error: %v", err))
logErrorWithTelemetry(log, errors.Wrap(err, "failed to create workload"))
Expand All @@ -79,9 +94,16 @@ func (e *Endpoint) GetWorkload(w http.ResponseWriter, r *http.Request) {
return
}

log := e.logger.With(zap.String("id", id))
apiName := r.Header.Get(consts.CortexAPINameHeader)
if apiName == "" {
respondPlainText(w, http.StatusBadRequest, fmt.Sprintf("error: missing %s key in request header", consts.CortexAPINameHeader))
return
}
r.Header.Del(consts.CortexAPINameHeader)

log := e.logger.With(zap.String("id", id), zap.String("apiName", apiName))

res, err := e.service.GetWorkload(id)
res, err := e.service.GetWorkload(id, apiName)
if err != nil {
respondPlainText(w, http.StatusInternalServerError, fmt.Sprintf("error: %v", err))
logErrorWithTelemetry(log, errors.Wrap(err, "failed to get workload"))
Expand Down
Loading

0 comments on commit 2203a92

Please sign in to comment.