Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snssqs: fix consumer starvation #3478

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

qustavo
Copy link

@qustavo qustavo commented Jul 5, 2024

Closes #3479

@qustavo qustavo requested review from a team as code owners July 5, 2024 14:28
@qustavo qustavo force-pushed the aws-concurrency-beyond-batch branch from d7ff99a to 6e6601d Compare July 5, 2024 14:29
The current implementation uses separate goroutines to process each
message retrieved by `sqsClient.ReceiveMessageWithContext`, which is
executed when `concurrencyMode` is set to `parallel`.
The `consumeSubscription` function waits for all spawned goroutines to
complete before receiving the next batch. However, this design has a
critical flaw: it starves the consumer if new messages are published
during the processing of an existing batch. These newly published
messages won't be processed until all goroutines from the original batch
have finished their work, effectively blocking the consumption of the
new messages.

This PR changes that by removing the waiting mechanism.

Signed-off-by: Gustavo Chain <me@qustavo.cc>
@qustavo qustavo force-pushed the aws-concurrency-beyond-batch branch from 6e6601d to 3248974 Compare July 5, 2024 14:44
@ItalyPaleAle
Copy link
Contributor

Thanks for this PR. I can see why the change is useful and agree the current implementation has some issues.

However, by removing all waiters, what I am concerned with now is that Dapr could be "overloaded" if too many messages are received at once, as it would create new goroutines without any limit.

Perhaps a better solution could be to limit the number of concurrent goroutines that are created? Some other components (like Azure Service Bus) have similar configuration options where you can set a max number of active messages. This way there's also some "natural backpressure" implemented.

@qustavo
Copy link
Author

qustavo commented Jul 5, 2024

Thanks for this PR. I can see why the change is useful and agree the current implementation has some issues.

However, by removing all waiters, what I am concerned with now is that Dapr could be "overloaded" if too many messages are received at once, as it would create new goroutines without any limit.

Perhaps a better solution could be to limit the number of concurrent goroutines that are created? Some other components (like Azure Service Bus) have similar configuration options where you can set a max number of active messages. This way there's also some "natural backpressure" implemented.

Backpressure is definitively valuable, and from what I can see, implementations like Redis are already controlling the number of goroutines: https://github.com/dapr/components-contrib/blob/main/pubsub/redis/redis.go#L89.

However, this sounds like a global concern that could be implemented at the pubsub level and not as a driver feature. I think that a global concurrency control allow operators to switch from pubsub implementation keeping the same backpressure settings.
Does that makes sense?

@ItalyPaleAle
Copy link
Contributor

However, this sounds like a global concern that could be implemented at the pubsub level and not as a driver feature. I think that a global concurrency control allow operators to switch from pubsub implementation keeping the same backpressure settings.

That does make sense, but it would require some major changes to a lot of components. Additionally integrating that would not be straightforward, since Dapr currently delegates concurrency control for this scenario to individual components.

If for now you could implement a maxConcurrentMessages (or maxParallelism) option, that would be nice and we can then fix this bug!

@qustavo
Copy link
Author

qustavo commented Jul 5, 2024

That does make sense, but it would require some major changes to a lot of components. Additionally integrating that would not be straightforward, since Dapr currently delegates concurrency control for this scenario to individual components.

You are right. Is this something you think its worth working on?

If for now you could implement a maxConcurrentMessages (or maxParallelism) option, that would be nice and we can then fix this bug!

Sure, I'm happy to add the feature. Initially I thought about concurrencyLimit which matches the concurrencyMode prefix, but I don't have a string opinion.
Also, some questions:

  • what do you think is a sensible default?
  • should we allow the operator to uncap the concurrency limit (let's say you want to control the limit in the caller's code (outside dapr) ?

@ItalyPaleAle
Copy link
Contributor

Initially I thought about concurrencyLimit which matches the concurrencyMode prefix, but I don't have a string opinion.

concurrencyLimit is a good value!

what do you think is a sensible default?

I just looked at the Azure Service Bus component and looks like the default is actually 0, i.e. unlimited. I'm not a fan of unlimited, but for consistency it could make sense. However, I'll accept other suggestions too!

should we allow the operator to uncap the concurrency limit (let's say you want to control the limit in the caller's code (outside dapr) ?

My gut initially said "no", but then I saw that other components do allow un-capping the limit (like Service Bus), so perhaps it does make sense. (although I would personally not recommend controlling the limit on the caller (i.e. the publisher) side)

@qustavo
Copy link
Author

qustavo commented Jul 6, 2024

concurrencyLimit is a good value!

Ok, let'a use that one then.

I just looked at the Azure Service Bus component and looks like the default is actually 0, i.e. unlimited. I'm not a fan of unlimited, but for consistency it could make sense. However, I'll accept other suggestions too!

While Azure and --app-max-concurrency defaults to unlimited, Redis defaults to "10". However, since there is a wide range of use cases, I think default to unlimited makes more sense.

My gut initially said "no", but then I saw that other components do allow un-capping the limit (like Service Bus), so perhaps it does make sense. (although I would personally not recommend controlling the limit on the caller (i.e. the publisher) side)

Right, by caller's code I mean the Subscriber's handler. An analogous example will be Go's http server, which does not impose any limitation on the number of handlers and it's up to the user to set such a limit.

In summary, I'd document this as follows:

Field Required Details Example
concurrencyLimit N Defines the maximum number of concurrent workers handling messages. This value is ignored when concurrentMode is set to "single". To avoid limiting the number of concurrent workers set this to "0". Default: "0" "100"

@qustavo qustavo force-pushed the aws-concurrency-beyond-batch branch 2 times, most recently from e846809 to 74537ab Compare July 6, 2024 02:15
// This is the back pressure mechanism.
// It will block until another goroutine frees a slot.
if sem != nil {
sem <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this line (that blocks) should be outside of the lambda. Currently, the goroutine is still spawned for each message, but then the goroutine is blocked. Perhaps it'd be wiser to not create the goroutine at all, and block the loop, since too many goroutines (even blocked) can "clog" the entire Dapr application (and cause a lot of pressure on the GC)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great point, didn't see that. I've just fixed it.

qustavo and others added 2 commits July 8, 2024 12:38
The `ConcurrencyLimit` param controls the number of concurrent process
that handles messages.

Signed-off-by: Gustavo Chain <me@qustavo.cc>
Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
@qustavo qustavo force-pushed the aws-concurrency-beyond-batch branch from 8f559d0 to 9acedbf Compare July 8, 2024 16:38
@berndverst
Copy link
Member

@qustavo can you open an issue in this repo and also explain the problem there then link to it in this PR? We don't usually accept PRs from non-maintainers/approvers without an issue being open first (and I don't mean issues that have been opened at the same time as the PR)

Please note that the 1.14 release is already underway and we have already cut the components-contrib release branch for 1.14. So this will not make it into the 1.14 release most likely. We can assess the need for this fix to improvement to make it into the 1.14 release within the issue (to be created).

@qustavo
Copy link
Author

qustavo commented Jul 8, 2024

@qustavo can you open an issue in this repo and also explain the problem there then link to it in this PR? We don't usually accept PRs from non-maintainers/approvers without an issue being open first (and I don't mean issues that have been opened at the same time as the PR)

Sure, will create issue

@berndverst
Copy link
Member

I'm ok with merging now - but in the absence of an issue this definitely cannot be considered to be included in 1.14.

@berndverst
Copy link
Member

/ok-to-test

@dapr-bot
Copy link
Collaborator

dapr-bot commented Jul 8, 2024

Components certification test

🔗 Link to Action run

Commit ref: 9acedbf

❌ Some certification tests failed

These tests failed:

  • bindings.azure.eventhubs
  • bindings.azure.servicebusqueues
  • pubsub.aws.snssqs
  • pubsub.mqtt3
  • secretstores.hashicorp.vault
  • state.azure.cosmosdb

@dapr-bot
Copy link
Collaborator

dapr-bot commented Jul 8, 2024

Components conformance test

🔗 Link to Action run

Commit ref: 9acedbf

❌ Some conformance tests failed

These tests failed:

  • bindings.azure.eventgrid
  • pubsub.aws.snssqs.terraform

@dapr-bot
Copy link
Collaborator

dapr-bot commented Jul 8, 2024

Complete Build Matrix

The build status is currently not updated here. Please visit the action run below directly.

🔗 Link to Action run

Commit ref: 9acedbf

@berndverst
Copy link
Member

Unfortunately the SNS integration tests have been broken. I think this needs to be fixed by @sicoyle @artursouza or someone else with more AWS integration test experience (and the credentials to manage the test infrastructure).

While I'm sure nothing is inherently wrong with this PR, we just cannot be certain due to the broken tests (which were not broken by this PR).

@berndverst berndverst added the do-not-merge PR is not ready for merging label Jul 8, 2024
@qustavo
Copy link
Author

qustavo commented Jul 19, 2024

@berndverst AFAICS the issue is with pubsub.mqtt3 certification. How is this related to SNS/SQS?

@berndverst berndverst removed the do-not-merge PR is not ready for merging label Aug 28, 2024
@berndverst
Copy link
Member

/ok-to-test

@dapr-bot
Copy link
Collaborator

Complete Build Matrix

The build status is currently not updated here. Please visit the action run below directly.

🔗 Link to Action run

Commit ref: 882ac66

@dapr-bot
Copy link
Collaborator

dapr-bot commented Aug 28, 2024

Components conformance test

🔗 Link to Action run

Commit ref: 882ac66

❌ Some conformance tests failed

These tests failed:

  • bindings.azure.eventgrid
  • bindings.influx
  • bindings.kafka-confluent
  • bindings.kafka-wurstmeister
  • bindings.kubemq
  • bindings.mqtt3-emqx
  • bindings.mqtt3-mosquitto
  • bindings.mqtt3-vernemq
  • bindings.postgresql.docker
  • bindings.rabbitmq
  • bindings.redis.v6
  • bindings.redis.v7
  • configuration.postgresql.docker
  • configuration.redis.v6
  • configuration.redis.v7
  • lock.redis.v6
  • lock.redis.v7
  • pubsub.aws.snssqs.terraform
  • pubsub.jetstream
  • pubsub.kafka-confluent
  • pubsub.kafka-wurstmeister
  • pubsub.kubemq
  • pubsub.mqtt3-emqx
  • pubsub.mqtt3-vernemq
  • pubsub.pulsar
  • pubsub.rabbitmq
  • pubsub.redis.v6
  • pubsub.solace
  • secretstores.hashicorp.vault
  • state.cassandra
  • state.cockroachdb.v1
  • state.etcd.v1
  • state.etcd.v2
  • state.memcached
  • state.mysql.mariadb
  • state.mysql.mysql
  • state.oracledatabase
  • state.postgresql.v1.docker
  • state.postgresql.v2.docker
  • state.redis.v6
  • state.redis.v7
  • state.rethinkdb
  • state.sqlserver

@dapr-bot
Copy link
Collaborator

dapr-bot commented Aug 28, 2024

Components certification test

🔗 Link to Action run

Commit ref: 882ac66

❌ Some certification tests failed

These tests failed:

  • bindings.azure.eventhubs
  • bindings.zeebe.command
  • bindings.zeebe.jobworker
  • bindings.kafka
  • bindings.postgres
  • bindings.rabbitmq
  • bindings.redis
  • configuration.postgres
  • configuration.redis
  • pubsub.aws.snssqs
  • pubsub.kafka
  • pubsub.mqtt3
  • pubsub.pulsar
  • pubsub.rabbitmq
  • secretstores.hashicorp.vault
  • state.azure.cosmosdb
  • state.cassandra
  • state.cockroachdb.v1
  • state.memcached
  • state.mongodb
  • state.mysql
  • state.postgresql.v1
  • state.postgresql.v2
  • state.redis
  • state.sqlserver

@qustavo
Copy link
Author

qustavo commented Sep 4, 2024

any progress on this? This is currently stopping our sns/sqs deployment, please lmk if there's anything I can do to get it merged.

@yaron2
Copy link
Member

yaron2 commented Sep 10, 2024

any progress on this? This is currently stopping our sns/sqs deployment, please lmk if there's anything I can do to get it merged.

Taking a look today

@qustavo
Copy link
Author

qustavo commented Sep 25, 2024

any progress on this? This is currently stopping our sns/sqs deployment, please lmk if there's anything I can do to get it merged.

Taking a look today

any insight?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

snssqs: fix consumer starvation
5 participants