Concurrency and threshold throttling for Sidekiq.
Add this line to your application’s Gemfile:
gem "sidekiq-throttled"
And then execute:
$ bundle
Or install it yourself as:
$ gem install sidekiq-throttled
Add somewhere in your app’s bootstrap (e.g. config/initializers/sidekiq.rb
if
you are using Rails):
require "sidekiq/throttled"
Once you’ve done that you can include Sidekiq::Throttled::Job
to your
job classes and configure throttling:
class MyJob
include Sidekiq::Job
include Sidekiq::Throttled::Job
sidekiq_options :queue => :my_queue
sidekiq_throttle(
# Allow maximum 10 concurrent jobs of this class at a time.
concurrency: { limit: 10 },
# Allow maximum 1K jobs being processed within one hour window.
threshold: { limit: 1_000, period: 1.hour }
)
def perform
# ...
end
end
Tip
|
Sidekiq::Throttled::Job is aliased as Sidekiq::Throttled::Worker ,
thus if you’re using Sidekiq::Worker naming convention, you can use the
alias for consistency:
|
class MyWorker
include Sidekiq::Worker
include Sidekiq::Throttled::Worker
# ...
end
The default requeue strategy :enqueue
puts jobs immiediately back on the queue if they are being limited, with a potential cooldown_period
. This is often an appropriate strategy but in some situations may cause the system to repeatedly dequeue and reenque the same job causing excessive redis CPU usage.
An alternative requeue strategy :schedule
is available that schedules the work for the future. This can be less appropriate for highly concurrent jobs, but may be a good way to reduce redis load when the number of jobs to be processed is modest.
The alternative :schedule
strategy may be configured globally with config.default_requeue_options = { with: :schedule }
It may be configured on a per job basis with:
sidekiq_throttle(
# Allow 5 jobs to processed within 1 minute, using the
threshold: { limit: 5, period: 1.minute, requeue: {with: :schedule} }
)
It is also possible to requeue jobs to another queue:
sidekiq_throttle(
# Allow 5 jobs to processed within 1 minute, using the
threshold: { limit: 5, period: 1.minute, requeue: {to: :other_queue, with: :schedule} }
)
To add a Throttled tab to your sidekiq web dashboard, require it durring your application initialization.
require "sidekiq/throttled/web"
Sidekiq::Throttled.configure do |config|
# Period in seconds to exclude queue from polling in case it returned
# {config.cooldown_threshold} amount of throttled jobs in a row. Set
# this value to `nil` to disable cooldown manager completely.
# Default: 1.0
config.cooldown_period = 1.0
# Exclude queue from polling after it returned given amount of throttled
# jobs in a row.
# Default: 100 (cooldown after hundredth throttled job in a row)
config.cooldown_threshold = 100
end
Warning
|
Cooldown Settings
If a queue contains a thousand jobs in a row that will be throttled, the cooldown will kick-in 10 times in a row, meaning it will take 10 seconds before all those jobs are put back at the end of the queue and you actually start processing other jobs. You may want to adjust the cooldown_threshold and cooldown_period, keeping in mind that this will also impact the load on your Redis server. |
Sidekiq::Throttled
relies on following bundled middlewares:
-
Sidekiq::Throttled::Middlewares::Server
The middleware is automatically injected when you require sidekiq/throttled
.
In rare cases, when this causes an issue, you can change middleware order manually:
Sidekiq.configure_server do |config|
# ...
config.server_middleware do |chain|
chain.prepend(Sidekiq::Throttled::Middlewares::Server)
end
end
You can specify an observer that will be called on throttling. To do so pass an
:observer
option with callable object:
class MyJob
include Sidekiq::Job
include Sidekiq::Throttled::Job
MY_OBSERVER = lambda do |strategy, *args|
# do something
end
sidekiq_options queue: :my_queue
sidekiq_throttle(
concurrency: { limit: 10 },
threshold: { limit: 100, period: 1.hour },
observer: MY_OBSERVER
)
def perform(*args)
# ...
end
end
Observer will receive strategy, *args
arguments, where strategy
is a Symbol
:concurrency
or :threshold
, and *args
are the arguments that were passed
to the job.
You can throttle jobs dynamically with :key_suffix
option:
class MyJob
include Sidekiq::Job
include Sidekiq::Throttled::Job
sidekiq_options queue: :my_queue
sidekiq_throttle(
# Allow maximum 10 concurrent jobs per user at a time.
concurrency: { limit: 10, key_suffix: -> (user_id) { user_id } }
)
def perform(user_id)
# ...
end
end
You can also supply dynamic values for limits and periods by supplying a proc for these values. The proc will be evaluated at the time the job is fetched and will receive the same arguments that are passed to the job.
class MyJob
include Sidekiq::Job
include Sidekiq::Throttled::Job
sidekiq_options queue: :my_queue
sidekiq_throttle(
# Allow maximum 1000 concurrent jobs of this class at a time for VIPs and 10 for all other users.
concurrency: {
limit: ->(user_id) { User.vip?(user_id) ? 1_000 : 10 },
key_suffix: ->(user_id) { User.vip?(user_id) ? "vip" : "std" }
},
# Allow 1000 jobs/hour to be processed for VIPs and 10/day for all others
threshold: {
limit: ->(user_id) { User.vip?(user_id) ? 1_000 : 10 },
period: ->(user_id) { User.vip?(user_id) ? 1.hour : 1.day },
key_suffix: ->(user_id) { User.vip?(user_id) ? "vip" : "std" }
}
)
def perform(user_id)
# ...
end
end
You also can use several different keys to throttle one worker.
class MyJob
include Sidekiq::Job
include Sidekiq::Throttled::Job
sidekiq_options queue: :my_queue
sidekiq_throttle(
# Allow maximum 10 concurrent jobs per project at a time and maximum 2 jobs per user
concurrency: [
{ limit: 10, key_suffix: -> (project_id, user_id) { project_id } },
{ limit: 2, key_suffix: -> (project_id, user_id) { user_id } }
]
# For :threshold it works the same
)
def perform(project_id, user_id)
# ...
end
end
Important
|
Don’t forget to specify :key_suffix and make it return different
values if you are using dynamic limit/period options. Otherwise, you risk
getting into some trouble.
|
class MyJob
include Sidekiq::Job
include Sidekiq::Throttled::Job
sidekiq_options queue: :my_queue
sidekiq_throttle(
concurrency: { limit: 10 },
# Allow 500 jobs per minute, 5,000 per hour, and 50,000 per day:
threshold: [
{ limit: 500, period: 1.minute, key_suffix: "minutely" },
{ limit: 5_000, period: 1.hour, key_suffix: "hourly" },
{ limit: 50_000, period: 1.day, key_suffix: "daily" },
]
)
def perform(project_id, user_id)
# ...
end
end
Note
|
key_suffix does not have to be a proc/lambda, it can just be a
string value. This can come in handy to set throttle limits for different
ranges of time
|
Concurrency throttling is based on distributed locks. Those locks have default time to live (TTL) set to 15 minutes. If your job takes more than 15 minutes to finish, lock will be released and you might end up with more jobs running concurrently than you expect.
This is done to avoid deadlocks - when by any reason (e.g. Sidekiq process was OOM-killed) cleanup middleware wasn’t executed and locks were not released.
If your job takes more than 15 minutes to complete, you can tune concurrency lock TTL to fit your needs:
# Set concurrency strategy lock TTL to 1 hour.
sidekiq_throttle(concurrency: { limit: 20, ttl: 1.hour.to_i })
The default concurrency throttling algorithm immediately requeues throttled
jobs. This can lead to a lot of wasted work picking up the same set of still
throttled jobs repeatedly. This churn also often starves lower priority
jobs/queues. The :schedule
requeue strategy delays checking the runability of
throttled jobs until likely to be runnable. This future time is estimated based
on the expected runtime of the job and current number of throttled jobs. This
eliminates — or greatly reduces — the negative impacts to non-throttled job
types and queues and reduces wasted work constantly rechecking the same still
throttled jobs.
Config items: * limit - max number of this job to run simultaneously * avg_job_duration - expected runtime in seconds of this type of job. Pick a value on the high-side of plausible. Under heavy load values less than the actual average will lead to sub-optimal delays in job processing. * lost_job_threshold - duration in seconds of a job’s lease on it’s concurrency slot * ttl - alias for lost_job_threshold
---
sidekiq_throttle(
concurrency: {
# only run 10 of this job at a time
limit: 10,
# these jobs finish in less that 30 seconds avg_job_duration: 30,
# if it doesn't release it's lease in 2 minutes it's never going to lost_job_threshold: 120 }, requeue: { with: :schedule } ) ---
This library aims to support and is tested against the following Ruby versions:
-
Ruby 3.2.x
-
Ruby 3.3.x
-
Ruby 3.4.x
If something doesn’t work on one of these versions, it’s a bug.
This library may inadvertently work (or seem to work) on other Ruby versions, however support will only be provided for the versions listed above.
If you would like this library to support another Ruby version or implementation, you may volunteer to be a maintainer. Being a maintainer entails making sure all tests run and pass on that implementation. When something breaks on your implementation, you will be responsible for providing patches in a timely fashion. If critical issues for a particular implementation exist at the time of a major release, support for that Ruby version may be dropped.
This library aims to support and work with following Sidekiq versions:
-
Sidekiq 8.0.x
And the following Sidekiq Pro versions:
-
Sidekiq Pro 8.0.x
-
Fork sidekiq-throttled on GitHub
-
Make your changes
-
Ensure all tests pass (
bundle exec rake
) -
Send a pull request
-
If we like them we’ll merge them
-
If we’ve accepted a patch, feel free to ask for commit access!
The initial work on the project was initiated to address the needs of SensorTower.