Skip to content

Add concurrency to dequeuer #2376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/dequeuer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func main() {
statsdAddress string
apiKind string
adminPort int
workers int
)
flag.StringVar(&clusterConfigPath, "cluster-config", "", "cluster config path")
flag.StringVar(&clusterUID, "cluster-uid", "", "cluster unique identifier")
Expand All @@ -61,6 +62,7 @@ func main() {
flag.StringVar(&statsdAddress, "statsd-address", "", "address to push statsd metrics")
flag.IntVar(&userContainerPort, "user-port", 8080, "target port to which the dequeued messages will be sent to")
flag.IntVar(&adminPort, "admin-port", 0, "port where the admin server (for the probes) will be exposed")
flag.IntVar(&workers, "workers", 1, "number of workers pulling from the queue")

flag.Parse()

Expand Down Expand Up @@ -166,6 +168,7 @@ func main() {
Region: clusterConfig.Region,
QueueURL: queueURL,
StopIfNoMessages: true,
Workers: workers,
}

case userconfig.AsyncAPIKind.String():
Expand All @@ -186,6 +189,7 @@ func main() {
Region: clusterConfig.Region,
QueueURL: queueURL,
StopIfNoMessages: false,
Workers: workers,
}

// report prometheus metrics for async api kinds
Expand Down
16 changes: 12 additions & 4 deletions docs/workloads/async/autoscaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ Cortex auto-scales AsyncAPIs on a per-API basis based on your configuration.

## Autoscaling replicas

### Relevant pod configuration

In addition to the autoscaling configuration options (described below), there is one field in the pod configuration which is relevant to replica autoscaling:

**`max_concurrency`** (default: 1): The maximum number of requests that will be concurrently sent into the container by Cortex. If your web server is designed to handle multiple concurrent requests, increasing `max_concurrency` will increase the throughput of a replica (and result in fewer total replicas for a given load).

<br>

### Autoscaling configuration

**`min_replicas`** (default: 1): The lower bound on how many replicas can be running for an API. Scale-to-zero is supported.
Expand All @@ -14,13 +22,13 @@ Cortex auto-scales AsyncAPIs on a per-API basis based on your configuration.

<br>

**`target_in_flight`** (default: 1): This is the desired number of in-flight requests per replica, and is the metric which the autoscaler uses to make scaling decisions. The number of in-flight requests is simply how many requests have been submitted and are not yet finished being processed. Therefore, this number includes requests which are actively being processed as well as requests which are waiting in the queue.
**`target_in_flight`** (default: `max_concurrency` in the pod configuration): This is the desired number of in-flight requests per replica, and is the metric which the autoscaler uses to make scaling decisions. The number of in-flight requests is simply how many requests have been submitted and are not yet finished being processed. Therefore, this number includes requests which are actively being processed as well as requests which are waiting in the queue.

The autoscaler uses this formula to determine the number of desired replicas:

`desired replicas = total in-flight requests / target_in_flight`

For example, setting `target_in_flight` to 1 (the default) causes the cluster to adjust the number of replicas so that on average, there are no requests waiting in the queue.
For example, setting `target_in_flight` to `max_concurrency` (the default) causes the cluster to adjust the number of replicas so that on average, there are no requests waiting in the queue.

<br>

Expand Down Expand Up @@ -58,9 +66,9 @@ Cortex spins up and down instances based on the aggregate resource requests of a

## Overprovisioning

The default value for `target_in_flight` is 1, which behaves well in many situations (see above for an explanation of how `target_in_flight` affects autoscaling). However, if your application is sensitive to spikes in traffic or if creating new replicas takes too long (see below), you may find it helpful to maintain extra capacity to handle the increased traffic while new replicas are being created. This can be accomplished by setting `target_in_flight` to a lower value. The smaller `target_in_flight` is, the more unused capacity your API will have, and the more room it will have to handle sudden increased load. The increased request rate will still trigger the autoscaler, and your API will stabilize again (maintaining the overprovisioned capacity).
The default value for `target_in_flight` is `max_concurrency`, which behaves well in many situations (see above for an explanation of how `target_in_flight` affects autoscaling). However, if your application is sensitive to spikes in traffic or if creating new replicas takes too long (see below), you may find it helpful to maintain extra capacity to handle the increased traffic while new replicas are being created. This can be accomplished by setting `target_in_flight` to a lower value relative to the expected replica's concurrency. The smaller `target_in_flight` is, the more unused capacity your API will have, and the more room it will have to handle sudden increased load. The increased request rate will still trigger the autoscaler, and your API will stabilize again (maintaining the overprovisioned capacity).

For example, if you wanted to overprovision by 25%, you could set `target_in_flight` to 0.8. If your API has an average of 4 concurrent requests, the autoscaler would maintain 5 live replicas (4/0.8 = 5).
For example, if you've determined that each replica in your API can efficiently handle 2 concurrent requests, you would typically set `target_in_flight` to 2. In a scenario where your API is receiving 8 concurrent requests on average, the autoscaler would maintain 4 live replicas (8/2 = 4). If you wanted to overprovision by 25%, you could set `target_in_flight` to 1.6, causing the autoscaler maintain 5 live replicas (8/1.6 = 5).

## Autoscaling responsiveness

Expand Down
3 changes: 2 additions & 1 deletion docs/workloads/async/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
kind: AsyncAPI # must be "AsyncAPI" for async APIs (required)
pod: # pod configuration (required)
port: <int> # port to which requests will be sent (default: 8080; exported as $CORTEX_PORT)
max_concurrency: <int> # maximum number of requests that will be concurrently sent into the container (default: 1, max allowed: 100)
containers: # configurations for the containers to run (at least one constainer must be provided)
- name: <string> # name of the container (required)
image: <string> # docker image to use for the container (required)
Expand Down Expand Up @@ -45,7 +46,7 @@
min_replicas: <int> # minimum number of replicas (default: 1; min value: 0)
max_replicas: <int> # maximum number of replicas (default: 100)
init_replicas: <int> # initial number of replicas (default: <min_replicas>)
target_in_flight: <float> # desired number of in-flight requests per replica (including requests actively being processed as well as queued), which the autoscaler tries to maintain (default: 1)
target_in_flight: <float> # desired number of in-flight requests per replica (including requests actively being processed as well as queued), which the autoscaler tries to maintain (default: <max_concurrency>)
window: <duration> # duration over which to average the API's in-flight requests per replica (default: 60s)
downscale_stabilization_period: <duration> # the API will not scale below the highest recommendation made during this period (default: 5m)
upscale_stabilization_period: <duration> # the API will not scale above the lowest recommendation made during this period (default: 1m)
Expand Down
2 changes: 1 addition & 1 deletion docs/workloads/realtime/autoscaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Cortex spins up and down instances based on the aggregate resource requests of a

The default value for `target_in_flight` is `max_concurrency`, which behaves well in many situations (see above for an explanation of how `target_in_flight` affects autoscaling). However, if your application is sensitive to spikes in traffic or if creating new replicas takes too long (see below), you may find it helpful to maintain extra capacity to handle the increased traffic while new replicas are being created. This can be accomplished by setting `target_in_flight` to a lower value relative to the expected replica's concurrency. The smaller `target_in_flight` is, the more unused capacity your API will have, and the more room it will have to handle sudden increased load. The increased request rate will still trigger the autoscaler, and your API will stabilize again (maintaining the overprovisioned capacity).

For example, if you've determined that each replica in your API can handle 2 concurrent requests, you would typically set `target_in_flight` to 2. In a scenario where your API is receiving 8 concurrent requests on average, the autoscaler would maintain 4 live replicas (8/2 = 4). If you wanted to overprovision by 25%, you could set `target_in_flight` to 1.6, causing the autoscaler maintain 5 live replicas (8/1.6 = 5).
For example, if you've determined that each replica in your API can efficiently handle 2 concurrent requests, you would typically set `target_in_flight` to 2. In a scenario where your API is receiving 8 concurrent requests on average, the autoscaler would maintain 4 live replicas (8/2 = 4). If you wanted to overprovision by 25%, you could set `target_in_flight` to 1.6, causing the autoscaler maintain 5 live replicas (8/1.6 = 5).

## Autoscaling responsiveness

Expand Down
6 changes: 2 additions & 4 deletions pkg/dequeuer/batch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (h *BatchMessageHandler) handleBatch(message *sqs.Message) error {
return nil
}

endTime := time.Now().Sub(startTime)
endTime := time.Since(startTime)

err = h.recordSuccess()
if err != nil {
Expand All @@ -175,7 +175,7 @@ func (h *BatchMessageHandler) handleBatch(message *sqs.Message) error {
func (h *BatchMessageHandler) onJobComplete(message *sqs.Message) error {
shouldRunOnJobComplete := false
h.log.Info("received job_complete message")
for true {
for {
queueAttributes, err := GetQueueAttributes(h.aws, h.config.QueueURL)
if err != nil {
return err
Expand Down Expand Up @@ -223,8 +223,6 @@ func (h *BatchMessageHandler) onJobComplete(message *sqs.Message) error {

time.Sleep(h.jobCompleteMessageDelay)
}

return nil
}

func isOnJobCompleteMessage(message *sqs.Message) bool {
Expand Down
35 changes: 31 additions & 4 deletions pkg/dequeuer/dequeuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/aws/aws-sdk-go/service/sqs"
awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/math"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
"go.uber.org/zap"
)
Expand All @@ -40,6 +41,7 @@ type SQSDequeuerConfig struct {
Region string
QueueURL string
StopIfNoMessages bool
Workers int
}

type SQSDequeuer struct {
Expand Down Expand Up @@ -96,12 +98,37 @@ func (d *SQSDequeuer) ReceiveMessage() (*sqs.Message, error) {
}

func (d *SQSDequeuer) Start(messageHandler MessageHandler, readinessProbeFunc func() bool) error {
numWorkers := math.MaxInt(d.config.Workers, 1)

d.log.Infof("Starting %d workers", numWorkers)
errCh := make(chan error)
doneChs := make([]chan struct{}, d.config.Workers)
for i := 0; i < numWorkers; i++ {
doneChs[i] = make(chan struct{})
go func(i int) {
errCh <- d.worker(messageHandler, readinessProbeFunc, doneChs[i])
}(i)
}

select {
case err := <-errCh:
return err
case <-d.done:
for _, doneCh := range doneChs {
doneCh <- struct{}{}
}
}

return nil
}

func (d SQSDequeuer) worker(messageHandler MessageHandler, readinessProbeFunc func() bool, workerDone chan struct{}) error {
noMessagesInPreviousIteration := false

loop:
for {
select {
case <-d.done:
case <-workerDone:
break loop
default:
if !readinessProbeFunc() {
Expand Down Expand Up @@ -134,8 +161,8 @@ loop:

noMessagesInPreviousIteration = false
receiptHandle := *message.ReceiptHandle
done := d.StartMessageRenewer(receiptHandle)
err = d.handleMessage(message, messageHandler, done)
renewerDone := d.StartMessageRenewer(receiptHandle)
err = d.handleMessage(message, messageHandler, renewerDone)
if err != nil {
d.log.Error(err)
telemetry.Error(err)
Expand Down Expand Up @@ -196,7 +223,7 @@ func (d *SQSDequeuer) StartMessageRenewer(receiptHandle string) chan struct{} {
startTime := time.Now()
go func() {
defer ticker.Stop()
for true {
for {
select {
case <-done:
return
Expand Down
75 changes: 75 additions & 0 deletions pkg/dequeuer/dequeuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/aws/aws-sdk-go/service/sqs"
awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/random"
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
"github.com/ory/dockertest/v3"
dc "github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -179,6 +180,7 @@ func TestSQSDequeuer_ReceiveMessage(t *testing.T) {
Region: _localStackDefaultRegion,
QueueURL: queueURL,
StopIfNoMessages: true,
Workers: 1,
}, awsClient, logger,
)
require.NoError(t, err)
Expand All @@ -205,6 +207,7 @@ func TestSQSDequeuer_StartMessageRenewer(t *testing.T) {
Region: _localStackDefaultRegion,
QueueURL: queueURL,
StopIfNoMessages: true,
Workers: 1,
}, awsClient, logger,
)
require.NoError(t, err)
Expand Down Expand Up @@ -253,6 +256,7 @@ func TestSQSDequeuerTerminationOnEmptyQueue(t *testing.T) {
Region: _localStackDefaultRegion,
QueueURL: queueURL,
StopIfNoMessages: true,
Workers: 1,
}, awsClient, logger,
)
require.NoError(t, err)
Expand Down Expand Up @@ -303,6 +307,7 @@ func TestSQSDequeuer_Shutdown(t *testing.T) {
Region: _localStackDefaultRegion,
QueueURL: queueURL,
StopIfNoMessages: true,
Workers: 1,
}, awsClient, logger,
)
require.NoError(t, err)
Expand Down Expand Up @@ -345,6 +350,7 @@ func TestSQSDequeuer_Start_HandlerError(t *testing.T) {
Region: _localStackDefaultRegion,
QueueURL: queueURL,
StopIfNoMessages: true,
Workers: 1,
}, awsClient, logger,
)
require.NoError(t, err)
Expand Down Expand Up @@ -383,3 +389,72 @@ func TestSQSDequeuer_Start_HandlerError(t *testing.T) {
return msg != nil
}, 5*time.Second, time.Second)
}

func TestSQSDequeuer_MultipleWorkers(t *testing.T) {
t.Parallel()

awsClient := testAWSClient(t)
queueURL := createQueue(t, awsClient)

numMessages := 3
expectedMsgs := make([]string, numMessages)
for i := 0; i < numMessages; i++ {
message := fmt.Sprintf("%d", i)
expectedMsgs[i] = message
_, err := awsClient.SQS().SendMessage(&sqs.SendMessageInput{
MessageBody: aws.String(message),
MessageDeduplicationId: aws.String(message),
MessageGroupId: aws.String(message),
QueueUrl: aws.String(queueURL),
})
require.NoError(t, err)
}

logger := newLogger(t)
defer func() { _ = logger.Sync() }()

dq, err := NewSQSDequeuer(
SQSDequeuerConfig{
Region: _localStackDefaultRegion,
QueueURL: queueURL,
StopIfNoMessages: true,
Workers: numMessages,
}, awsClient, logger,
)
require.NoError(t, err)

dq.waitTimeSeconds = aws.Int64(0)
dq.notFoundSleepTime = 0

msgCh := make(chan string, numMessages)
handler := NewMessageHandlerFunc(
func(message *sqs.Message) error {
msgCh <- *message.Body
return nil
},
)

errCh := make(chan error)
go func() {
errCh <- dq.Start(handler, func() bool { return true })
}()

receivedMessages := make([]string, numMessages)
for i := 0; i < numMessages; i++ {
receivedMessages[i] = <-msgCh
}
dq.Shutdown()

// timeout test after 10 seconds
time.AfterFunc(10*time.Second, func() {
close(msgCh)
errCh <- errors.New("test timed out")
})

require.Len(t, receivedMessages, numMessages)

set := strset.FromSlice(receivedMessages)
require.True(t, set.Has(expectedMsgs...))

require.NoError(t, <-errCh)
}
6 changes: 4 additions & 2 deletions pkg/dequeuer/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ func ProbesFromFile(probesPath string, logger *zap.SugaredLogger) ([]*probe.Prob
return nil, err
}

var probesSlice []*probe.Probe
probesSlice := make([]*probe.Probe, len(probesMap))
var i int
for _, p := range probesMap {
auxProbe := p
probesSlice = append(probesSlice, probe.NewProbe(&auxProbe, logger))
probesSlice[i] = probe.NewProbe(&auxProbe, logger)
i++
}
return probesSlice, nil
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/types/spec/validations.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,19 @@ func podValidation(kind userconfig.Kind) *cr.StructFieldValidation {
)
}

if kind == userconfig.AsyncAPIKind {
validation.StructValidation.StructFieldValidations = append(validation.StructValidation.StructFieldValidations,
&cr.StructFieldValidation{
StructField: "MaxConcurrency",
Int64Validation: &cr.Int64Validation{
Default: consts.DefaultMaxConcurrency,
GreaterThan: pointer.Int64(0),
LessThanOrEqualTo: pointer.Int64(100),
},
},
)
}

return validation
}

Expand Down Expand Up @@ -818,7 +831,7 @@ func validateAutoscaling(api *userconfig.API) error {

if api.Kind == userconfig.AsyncAPIKind {
if autoscaling.TargetInFlight == nil {
autoscaling.TargetInFlight = pointer.Float64(1)
autoscaling.TargetInFlight = pointer.Float64(float64(pod.MaxConcurrency))
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/types/userconfig/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (api *API) ToK8sAnnotations() map[string]string {
annotations[MaxQueueLengthAnnotationKey] = s.Int64(api.Pod.MaxQueueLength)
}

if api.Pod != nil && api.Kind == AsyncAPIKind {
annotations[MaxConcurrencyAnnotationKey] = s.Int64(api.Pod.MaxConcurrency)
}

if api.Networking != nil {
annotations[EndpointAnnotationKey] = *api.Networking.Endpoint
}
Expand Down Expand Up @@ -339,6 +343,10 @@ func (pod *Pod) UserStr(kind Kind) string {
sb.WriteString(fmt.Sprintf("%s: %s\n", MaxQueueLengthKey, s.Int64(pod.MaxQueueLength)))
}

if kind == AsyncAPIKind {
sb.WriteString(fmt.Sprintf("%s: %s\n", MaxConcurrencyKey, s.Int64(pod.MaxConcurrency)))
}

sb.WriteString(fmt.Sprintf("%s:\n", ContainersKey))
for _, container := range pod.Containers {
containerUserStr := s.Indent(container.UserStr(), " ")
Expand Down
Loading