Skip to content

Commit

Permalink
[feature] delayed_sidekiq strategy (#869)
Browse files Browse the repository at this point in the history
  • Loading branch information
skcc321 authored Mar 31, 2023
1 parent 7ee4005 commit 8ea2cfe
Show file tree
Hide file tree
Showing 10 changed files with 539 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### New Features

* [#869](https://github.com/toptal/chewy/pull/869): New strategy - `delayed_sidekiq`. Allow passing `strategy: :delayed_sidekiq` option to `SomeIndex.import([1, ...], strategy: :delayed_sidekiq)`. The strategy is compatible with `update_fields` option as well. ([@skcc321][])

### Changes

### Bugs Fixed
Expand Down
74 changes: 74 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,80 @@ The default queue name is `chewy`, you can customize it in settings: `sidekiq.qu
Chewy.settings[:sidekiq] = {queue: :low}
```

#### `:delayed_sidekiq`

It accumulates ids of records to be reindexed during the latency window in redis and then does the reindexing of all accumulated records at once.
The strategy is very useful in case of frequently mutated records.
It supports `update_fields` option, so it will try to select just enough data from the DB

There are three options that can be defined in the index:
```ruby
class CitiesIndex...
strategy_config delayed_sidekiq: {
latency: 3,
margin: 2,
ttl: 60 * 60 * 24,
reindex_wrapper: ->(&reindex) {
ActiveRecord::Base.connected_to(role: :reading) { reindex.call }
}
# latency - will prevent scheduling identical jobs
# margin - main purpose is to cover db replication lag by the margin
# ttl - a chunk expiration time (in seconds)
# reindex_wrapper - lambda that accepts block to wrap that reindex process AR connection block.
}

...
end
```

Also you can define defaults in the `initializers/chewy.rb`
```ruby
Chewy.settings = {
strategy_config: {
delayed_sidekiq: {
latency: 3,
margin: 2,
ttl: 60 * 60 * 24,
reindex_wrapper: ->(&reindex) {
ActiveRecord::Base.connected_to(role: :reading) { reindex.call }
}
}
}
}

```
or in `config/chewy.yml`
```ruby
strategy_config:
delayed_sidekiq:
latency: 3
margin: 2
ttl: <%= 60 * 60 * 24 %>
# reindex_wrapper setting is not possible here!!! use the initializer instead
```
You can use the strategy identically to other strategies
```ruby
Chewy.strategy(:delayed_sidekiq) do
City.popular.map(&:do_some_update_action!)
end
```
The default queue name is `chewy`, you can customize it in settings: `sidekiq.queue_name`
```
Chewy.settings[:sidekiq] = {queue: :low}
```
Explicit call of the reindex using `:delayed_sidekiq strategy`
```ruby
CitiesIndex.import([1, 2, 3], strategy: :delayed_sidekiq)
```
Explicit call of the reindex using `:delayed_sidekiq` strategy with `:update_fields` support
```ruby
CitiesIndex.import([1, 2, 3], update_fields: [:name], strategy: :delayed_sidekiq)
```
#### `:active_job`
This does the same thing as `:atomic`, but using ActiveJob. This will inherit the ActiveJob configuration settings including the `active_job.queue_adapter` setting for the environment. Patch `Chewy::Strategy::ActiveJob::Worker` for index updates improving.
Expand Down
1 change: 1 addition & 0 deletions chewy.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength

spec.add_development_dependency 'database_cleaner'
spec.add_development_dependency 'elasticsearch-extensions'
spec.add_development_dependency 'mock_redis'
spec.add_development_dependency 'rake'
spec.add_development_dependency 'rspec', '>= 3.7.0'
spec.add_development_dependency 'rspec-collection_matchers'
Expand Down
25 changes: 25 additions & 0 deletions lib/chewy/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class Index
pipeline raw_import refresh replication
].freeze

STRATEGY_OPTIONS = {
delayed_sidekiq: %i[latency margin ttl reindex_wrapper]
}.freeze

include Search
include Actions
include Aliases
Expand Down Expand Up @@ -221,6 +225,27 @@ def default_import_options(params)
params.assert_valid_keys(IMPORT_OPTIONS_KEYS)
self._default_import_options = _default_import_options.merge(params)
end

def strategy_config(params = {})
@strategy_config ||= begin
config_struct = Struct.new(*STRATEGY_OPTIONS.keys).new

STRATEGY_OPTIONS.each_with_object(config_struct) do |(strategy, options), res|
res[strategy] = case strategy
when :delayed_sidekiq
Struct.new(*STRATEGY_OPTIONS[strategy]).new.tap do |config|
options.each do |option|
config[option] = params.dig(strategy, option) || Chewy.configuration.dig(:strategy_config, strategy, option)
end

config[:reindex_wrapper] ||= ->(&reindex) { reindex.call } # default wrapper
end
else
raise NotImplementedError, "Unsupported strategy: '#{strategy}'"
end
end
end
end
end
end
end
31 changes: 29 additions & 2 deletions lib/chewy/index/import.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ module ClassMethods
# @option options [true, Integer, Hash] parallel enables parallel import processing with the Parallel gem, accepts the number of workers or any Parallel gem acceptable options
# @return [true, false] false in case of errors
ruby2_keywords def import(*args)
import_routine(*args).blank?
intercept_import_using_strategy(*args).blank?
end

# @!method import!(*collection, **options)
Expand All @@ -84,7 +84,8 @@ module ClassMethods
#
# @raise [Chewy::ImportFailed] in case of errors
ruby2_keywords def import!(*args)
errors = import_routine(*args)
errors = intercept_import_using_strategy(*args)

raise Chewy::ImportFailed.new(self, errors) if errors.present?

true
Expand Down Expand Up @@ -126,6 +127,32 @@ def compose(object, crutches = nil, fields: [])

private

def intercept_import_using_strategy(*args)
args_clone = args.deep_dup
options = args_clone.extract_options!
strategy = options.delete(:strategy)

return import_routine(*args) if strategy.blank?

ids = args_clone.flatten
return {} if ids.blank?
return {argument: {"#{strategy} supports ids only!" => ids}} unless ids.all? do |id|
id.respond_to?(:to_i)
end

case strategy
when :delayed_sidekiq
begin
Chewy::Strategy::DelayedSidekiq::Scheduler.new(self, ids, options).postpone
{} # success. errors handling convention
rescue StandardError => e
{scheduler: {e.message => ids}}
end
else
{argument: {"unsupported strategy: '#{strategy}'" => ids}}
end
end

def import_routine(*args)
return if !args.first.nil? && empty_objects_or_scope?(args.first)

Expand Down
1 change: 1 addition & 0 deletions lib/chewy/strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require 'sidekiq'
require 'chewy/strategy/sidekiq'
require 'chewy/strategy/lazy_sidekiq'
require 'chewy/strategy/delayed_sidekiq'
rescue LoadError
nil
end
Expand Down
17 changes: 17 additions & 0 deletions lib/chewy/strategy/delayed_sidekiq.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

module Chewy
class Strategy
class DelayedSidekiq < Sidekiq
require_relative 'delayed_sidekiq/scheduler'

def leave
@stash.each do |type, ids|
next if ids.empty?

DelayedSidekiq::Scheduler.new(type, ids).postpone
end
end
end
end
end
148 changes: 148 additions & 0 deletions lib/chewy/strategy/delayed_sidekiq/scheduler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# frozen_string_literal: true

require_relative '../../index'

# The class is responsible for accumulating in redis [type, ids]
# that were requested to be reindexed during `latency` seconds.
# The reindex job is going to be scheduled after a `latency` seconds.
# that job is going to read accumulated [type, ids] from the redis
# and reindex all them at once.
module Chewy
class Strategy
class DelayedSidekiq
require_relative 'worker'

class Scheduler
DEFAULT_TTL = 60 * 60 * 24 # in seconds
DEFAULT_LATENCY = 10
DEFAULT_MARGIN = 2
DEFAULT_QUEUE = 'chewy'
KEY_PREFIX = 'chewy:delayed_sidekiq'
FALLBACK_FIELDS = 'all'
FIELDS_IDS_SEPARATOR = ';'
IDS_SEPARATOR = ','

def initialize(type, ids, options = {})
@type = type
@ids = ids
@options = options
end

# the diagram:
#
# inputs:
# latency == 2
# reindex_time = Time.current
#
# Parallel OR Sequential triggers of reindex: | What is going on in reindex store (Redis):
# --------------------------------------------------------------------------------------------------
# |
# process 1 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1]
# Schedule.new(CitiesIndex, [1]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}]
# | & schedule a DelayedSidekiq::Worker at 1679347869 (at + 3)
# | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347866 score and reindex all ids with zpoped keys
# | chewy:delayed_sidekiq:CitiesIndex:1679347866
# |
# |
# process 2 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2]
# Schedule.new(CitiesIndex, [2]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}]
# | & do not schedule a new worker
# |
# |
# process 1 (reindex_time + (latency - 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3]
# Schedule.new(CitiesIndex, [3]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}]
# | & do not schedule a new worker
# |
# |
# process 2 (reindex_time + (latency + 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3]
# Schedule.new(CitiesIndex, [4]).postpone | chewy:delayed_sidekiq:CitiesIndex:1679347868 = [4]
# | chewy:delayed_sidekiq:timechunks = [
# | { score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}
# | { score: 1679347868, "chewy:delayed_sidekiq:CitiesIndex:1679347868"}
# | ]
# | & schedule a DelayedSidekiq::Worker at 1679347871 (at + 3)
# | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347868 score and reindex all ids with zpoped keys
# | chewy:delayed_sidekiq:CitiesIndex:1679347866 (in case of failed previous reindex),
# | chewy:delayed_sidekiq:CitiesIndex:1679347868
def postpone
::Sidekiq.redis do |redis|
# warning: Redis#sadd will always return an Integer in Redis 5.0.0. Use Redis#sadd? instead
if redis.respond_to?(:sadd?)
redis.sadd?(timechunk_key, serialize_data)
else
redis.sadd(timechunk_key, serialize_data)
end

redis.expire(timechunk_key, ttl)

unless redis.zrank(timechunks_key, timechunk_key)
redis.zadd(timechunks_key, at, timechunk_key)
redis.expire(timechunks_key, ttl)

::Sidekiq::Client.push(
'queue' => sidekiq_queue,
'at' => at + margin,
'class' => Chewy::Strategy::DelayedSidekiq::Worker,
'args' => [type_name, at]
)
end
end
end

private

attr_reader :type, :ids, :options

# this method returns predictable value that jumps by latency value
# another words each latency seconds it return the same value
def at
@at ||= begin
schedule_at = latency.seconds.from_now.to_f

(schedule_at - (schedule_at % latency)).to_i
end
end

def fields
options[:update_fields].presence || [FALLBACK_FIELDS]
end

def timechunks_key
"#{KEY_PREFIX}:#{type_name}:timechunks"
end

def timechunk_key
"#{KEY_PREFIX}:#{type_name}:#{at}"
end

def serialize_data
[ids.join(IDS_SEPARATOR), fields.join(IDS_SEPARATOR)].join(FIELDS_IDS_SEPARATOR)
end

def type_name
type.name
end

def latency
strategy_config.latency || DEFAULT_LATENCY
end

def margin
strategy_config.margin || DEFAULT_MARGIN
end

def ttl
strategy_config.ttl || DEFAULT_TTL
end

def sidekiq_queue
Chewy.settings.dig(:sidekiq, :queue) || DEFAULT_QUEUE
end

def strategy_config
type.strategy_config.delayed_sidekiq
end
end
end
end
end
Loading

0 comments on commit 8ea2cfe

Please sign in to comment.