Skip to content

Commit

Permalink
- List -> Cache
Browse files Browse the repository at this point in the history
- Add configuration for THROTTLE_TIMEOUT
- Address proposed naming changes
- Remove RedisSscan in favor of `.sscan_each` from redis-rb
- aed -> heartbeat
- cthulhu -> resurrect
  • Loading branch information
sensorsasha committed Apr 24, 2023
1 parent 0d39c72 commit 403d428
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 113 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,18 @@ Sidekiq::Ultimate::Resurrector::Count.read(:job_id => "2647c4fe13acc692326bd4c2"
=> 1
```

### Empty Queues List Refresh Interval
### Empty Queues Cache Refresh Interval

```ruby
Sidekiq::Ultimate.setup! do |config|
config.empty_queues_refresh_interval_sec = 42
config.empty_queues_cache_refresh_interval_sec = 42
end
```

Specifies how often the list of empty queues should be refreshed.
Specifies how often the cache of empty queues should be refreshed.
In a nutshell, this sets the maximum possible delay between when a job was pushed to previously empty queue and earliest the moment when that new job could be picked up.

**Note:** every worker maintains its own local list of empty queues.
**Note:** every sidekiq process maintains its own local cache of empty queues.
Setting this interval to a low value will increase the number of Redis calls needed to check for empty queues, increasing the total load on Redis.

This setting helps manage the tradeoff between performance penalties and latency needed for reliable fetch.
Expand Down
36 changes: 26 additions & 10 deletions lib/sidekiq/ultimate/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,43 @@ class Configuration
# @return [Boolean]
attr_accessor :enable_resurrection_counter

# It specifies how often the list of empty queues should be refreshed.
# It specifies how often the cache of empty queues should be refreshed.
# In a nutshell, it specifies the maximum possible delay between a job was pushed to previously empty queue and
# the moment when that new job is picked up.
# Note that every worker needs to maintain its own local list of empty queues. Setting this interval to a low
# values will increase the number of redis calls and will increase the load on redis.
# @return [Integer] interval in seconds to refresh the list of empty queues
attr_reader :empty_queues_refresh_interval_sec
# Note that every sidekiq process needs to maintain its own local cache of empty queues. Setting this interval
# to a low values will increase the number of redis calls and will increase the load on redis.
# @return [Integer] interval in seconds to refresh the cache of empty queues
attr_reader :empty_queues_cache_refresh_interval_sec

DEFAULT_EMPTY_QUEUES_REFRESH_INTERVAL_SEC = 30
DEFAULT_EMPTY_QUEUES_CACHE_REFRESH_INTERVAL_SEC = 30

# If fetching attempt from a queue was throttled, it puts the queue to the exhausted list for this amount of time
# to avoid throttling for the same queue
# @return [Float] timeout in seconds
attr_writer :throttled_fetch_timeout_sec

DEFAULT_THROTTLED_FETCH_TIMEOUT_SEC = 15

def initialize
@empty_queues_refresh_interval_sec = DEFAULT_EMPTY_QUEUES_REFRESH_INTERVAL_SEC
@empty_queues_cache_refresh_interval_sec = DEFAULT_EMPTY_QUEUES_CACHE_REFRESH_INTERVAL_SEC
@throttled_fetch_timeout_sec = DEFAULT_THROTTLED_FETCH_TIMEOUT_SEC
super
end

def empty_queues_refresh_interval_sec=(value)
def empty_queues_cache_refresh_interval_sec=(value)
unless value.is_a?(Numeric)
raise ArgumentError, "Invalid 'empty_queues_refresh_interval_sec' value: #{value}. Must be Numeric"
raise ArgumentError, "Invalid 'empty_queues_cache_refresh_interval_sec' value: #{value}. Must be Numeric"
end

@empty_queues_refresh_interval_sec = value
@empty_queues_cache_refresh_interval_sec = value
end

def throttled_fetch_timeout_sec
if @throttled_fetch_timeout_sec.respond_to?(:call)
@throttled_fetch_timeout_sec.call.to_f
else
@throttled_fetch_timeout_sec.to_f
end
end
end
end
Expand Down
71 changes: 36 additions & 35 deletions lib/sidekiq/ultimate/empty_queues.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@

require "sidekiq/ultimate/configuration"
require "sidekiq/ultimate/use_exists_question_mark"
require "sidekiq/ultimate/redis_sscan"
require "sidekiq/ultimate/empty_queues/refresh_timer_task"

module Sidekiq
module Ultimate
# Maintains a list of empty queues. It has a global list and a local list.
# The global list is stored in redis and updated periodically. The local list is updated either by using the fresh
# list fetched for global list update or by using existing global list.
# Only one process can update the global list at a time.
# Maintains a cache of empty queues. It has a global cache and a local cache.
# The global cache is stored in redis and updated periodically. The local cache is updated either by using the fresh
# cache fetched after global cache update or by using existing global cache.
# Only one process can update the global cache at a time.
class EmptyQueues
include Singleton

Expand All @@ -30,59 +29,60 @@ def initialize
super
end

# Sets up automatic empty queues list updater.
# Sets up automatic empty queues cache updater.
# It will call #refresh! every
# `Sidekiq::Ultimate::Configuration.instance.empty_queues_refresh_interval_sec` seconds
# `Sidekiq::Ultimate::Configuration.instance.empty_queues_cache_refresh_interval_sec` seconds
def self.setup!
refresher = nil
refresher_timer_task = nil

Sidekiq.on(:startup) do
refresher&.shutdown
refresher = RefreshTimerTask.setup!(self)
refresher_timer_task&.shutdown
refresher_timer_task = RefreshTimerTask.setup!(self)
end

Sidekiq.on(:shutdown) { refresher&.shutdown }
Sidekiq.on(:shutdown) { refresher_timer_task&.shutdown }
end

# Attempts to update the global cache of empty queues by first acquiring a global lock
# If the lock is acquired, it brute force generates an accurate list of currently empty queues and then writes this
# updated list to the global cache
# The local queue cache is always updated as a result of this operation, either by using the recently generated
# If the lock is acquired, it brute force generates an accurate list of currently empty queues and
# then writes the updated list to the global cache
# The local queue cache is always updated as a result of this operation, either by using the recently generated
# list or fetching the most recent list from the global cache
#
# @return [Boolean] true if local list was updated
# @return [Boolean] true if local cache was updated
def refresh!
return false unless local_lock.try_lock

begin
refresh_global_list! || refresh_local_list!
refresh_global_cache! || refresh_local_cache
ensure
local_lock.unlock
end
rescue => e
Sidekiq.logger.error { "Empty queues list update failed: #{e}" }
Sidekiq.logger.error { "Empty queues cache update failed: #{e}" }
raise
end

private

# Automatically updates local list if global list was updated
# @return [Boolean] true if list was updated
def refresh_global_list!
Sidekiq.logger.debug { "Refreshing global list" }
# Automatically updates local cache if global cache was updated
# @return [Boolean] true if cache was updated
def refresh_global_cache!
Sidekiq.logger.debug { "Refreshing global cache" }

global_lock do
Sidekiq.redis do |redis|
empty_queues = fetch_empty_queues(redis)
empty_queues = generate_empty_queues(redis)

set_global_list!(redis, empty_queues)
set_local_list!(empty_queues)
update_global_cache(redis, empty_queues)
update_local_cache(empty_queues)
end
end
end

def fetch_empty_queues(redis)
queues = Sidekiq::Ultimate::RedisSscan.read(redis, "queues")
def generate_empty_queues(redis)
# Cursor is not atomic, so there may be duplicates because of concurrent update operations
queues = Sidekiq.redis { |r| r.sscan_each("queues").to_a.uniq }

queues_statuses =
redis.pipelined do |p|
Expand All @@ -96,24 +96,25 @@ def fetch_empty_queues(redis)
queues.zip(queues_statuses).reject { |(_, exists)| exists }.map(&:first)
end

def refresh_local_list!
Sidekiq.logger.debug { "Refreshing local list" }
def refresh_local_cache
Sidekiq.logger.debug { "Refreshing local cache" }

list = Sidekiq.redis { |redis| Sidekiq::Ultimate::RedisSscan.read(redis, KEY) }
set_local_list!(list)
# Cursor is not atomic, so there may be duplicates because of concurrent update operations
list = Sidekiq.redis { |redis| redis.sscan_each(KEY).to_a.uniq }
update_local_cache(list)
end

def set_global_list!(redis, list)
Sidekiq.logger.debug { "Setting global list: #{list}" }
def update_global_cache(redis, list)
Sidekiq.logger.debug { "Setting global cache: #{list}" }

redis.multi do |multi|
multi.del(KEY)
multi.sadd(KEY, list) if list.any?
end
end

def set_local_list!(list) # rubocop:disable Naming/AccessorMethodName
Sidekiq.logger.debug { "Setting local list: #{list}" }
def update_local_cache(list)
Sidekiq.logger.debug { "Setting local cache: #{list}" }

@queues = list
end
Expand All @@ -138,7 +139,7 @@ def skip_update?(redis)
results = redis.pipelined { |pipeline| [pipeline.time, pipeline.get(LAST_RUN_KEY)] }
last_run_distance = results[0][0] - results[1].to_i

last_run_distance < Sidekiq::Ultimate::Configuration.instance.empty_queues_refresh_interval_sec
last_run_distance < Sidekiq::Ultimate::Configuration.instance.empty_queues_cache_refresh_interval_sec
end

def namespaced_lock_key
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/ultimate/empty_queues/refresh_timer_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class RefreshTimerTask

class << self
def setup!(empty_queues_class)
interval = Sidekiq::Ultimate::Configuration.instance.empty_queues_refresh_interval_sec
interval = Sidekiq::Ultimate::Configuration.instance.empty_queues_cache_refresh_interval_sec
task = TASK_CLASS.new({
:run_now => true,
:execution_interval => Sidekiq::Ultimate::IntervalWithJitter.call(interval)
Expand Down
8 changes: 4 additions & 4 deletions lib/sidekiq/ultimate/fetch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require "sidekiq/ultimate/resurrector"
require "sidekiq/ultimate/unit_of_work"
require "sidekiq/ultimate/empty_queues"
require "sidekiq/ultimate/configuration"

module Sidekiq
module Ultimate
Expand All @@ -15,9 +16,6 @@ class Fetch
# Delay between fetch retries in case of no job received.
TIMEOUT = 2

# Delay between queue poll attempts if it's last job was throttled.
THROTTLE_TIMEOUT = 15

def initialize(options)
@exhausted_by_throttling = ExpirableSet.new
@empty_queues = Sidekiq::Ultimate::EmptyQueues.instance
Expand All @@ -37,7 +35,9 @@ def retrieve_work
if work&.throttled?
work.requeue_throttled

@exhausted_by_throttling.add(work.queue_name, :ttl => THROTTLE_TIMEOUT)
@exhausted_by_throttling.add(
work.queue_name, :ttl => Sidekiq::Ultimate::Configuration.instance.throttled_fetch_timeout_sec
)

return nil
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/ultimate/interval_with_jitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class << self
# Returns execution interval with jitter.
# Jitter is +- RANDOM_OFFSET_RATIO from the original value.
def call(interval)
jitter_factor = 1 + rand(-RANDOM_OFFSET_RATIO..RANDOM_OFFSET_RATIO)
jitter_factor = 1 + rand(-RANDOM_OFFSET_RATIO..RANDOM_OFFSET_RATIO)
jitter_factor * interval
end
end
Expand Down
23 changes: 0 additions & 23 deletions lib/sidekiq/ultimate/redis_sscan.rb

This file was deleted.

38 changes: 19 additions & 19 deletions lib/sidekiq/ultimate/resurrector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ module Resurrector
DEFIBRILLATE_INTERVAL = 5
private_constant :DEFIBRILLATE_INTERVAL

CtulhuTimerTask = Class.new(Concurrent::TimerTask)
AedTimerTask = Class.new(Concurrent::TimerTask)
ResurrectorTimerTask = Class.new(Concurrent::TimerTask)
HeartbeatTimerTask = Class.new(Concurrent::TimerTask)

class << self
def setup!
register_aed!
call_cthulhu!
register_process_heartbeat
register_resurrector
end

# go over all sidekiq processes (identities) that were shut down recently, get all their queues and
Expand All @@ -52,40 +52,40 @@ def current_process_identity

private

def call_cthulhu!
cthulhu = nil
def register_resurrector
resurrector_timer_task = nil

Sidekiq.on(:startup) do
cthulhu&.shutdown
resurrector_timer_task&.shutdown

cthulhu = CtulhuTimerTask.new({
resurrector_timer_task = ResurrectorTimerTask.new({
:run_now => true,
:execution_interval => Sidekiq::Ultimate::IntervalWithJitter.call(CommonConstants::RESURRECTOR_INTERVAL)
}) { resurrect! }
cthulhu.execute
resurrector_timer_task.execute
end

Sidekiq.on(:shutdown) { cthulhu&.shutdown }
Sidekiq.on(:shutdown) { resurrector_timer_task&.shutdown }
end

def register_aed!
aed = nil
def register_process_heartbeat
heartbeat_timer_task = nil

This comment has been minimized.

Copy link
@ixti

ixti Oct 18, 2023

Contributor

boring...


Sidekiq.on(:heartbeat) do
aed&.shutdown
heartbeat_timer_task&.shutdown

aed = AedTimerTask.new({
heartbeat_timer_task = HeartbeatTimerTask.new({
:run_now => true,
:execution_interval => Sidekiq::Ultimate::IntervalWithJitter.call(DEFIBRILLATE_INTERVAL)
}) { defibrillate! }
aed.execute
}) { save_watched_queues }
heartbeat_timer_task.execute
end

Sidekiq.on(:shutdown) { aed&.shutdown }
Sidekiq.on(:shutdown) { heartbeat_timer_task&.shutdown }
end

# put current list of queues into resurrection candidates
def defibrillate!
def save_watched_queues
Sidekiq.redis do |redis|
log(:debug) { "Defibrillating" }

Expand All @@ -94,7 +94,7 @@ def defibrillate!
end
end

# list of processes that disappeared after latest #defibrillate!
# list of processes that disappeared after latest #save_watched_queues
def casualties
Sidekiq.redis do |redis|
sidekiq_processes = redis.hkeys(CommonConstants::MAIN_KEY)
Expand Down
Loading

0 comments on commit 403d428

Please sign in to comment.