Skip to content

Commit

Permalink
Implement PoC for LazySidekiq strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
sl4vr committed Dec 10, 2021
1 parent 192e71b commit 6f149b1
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 2 deletions.
8 changes: 6 additions & 2 deletions lib/chewy/index/observe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def self.extend_object(base)
base.define_method :run_chewy_callbacks do
chewy_callbacks.each { |callback| callback.call(self) }
end

base.define_method :update_chewy_indices do
Chewy.strategy.current.update_chewy_indices(self)
end
end

def update_index(type_name, *args, &block)
Expand All @@ -93,10 +97,10 @@ def update_index(type_name, *args, &block)
# Set Chewy callbacks along with destroy callbacks here
# because here we have actual Chewy.use_after_commit_callbacks
if Chewy.use_after_commit_callbacks
after_commit(:run_chewy_callbacks, on: %i[create update])
after_commit(:update_chewy_indices, on: %i[create update])
after_commit(on: :destroy, **callback_options, &update_proc)
else
after_save(:run_chewy_callbacks)
after_save(:update_chewy_indices)
after_destroy(**callback_options, &update_proc)
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/chewy/strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
begin
require 'sidekiq'
require 'chewy/strategy/sidekiq'
require 'chewy/strategy/lazy_sidekiq'
rescue LoadError
nil
end
Expand Down
10 changes: 10 additions & 0 deletions lib/chewy/strategy/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ def update(type, _objects, _options = {})
# strategies stack
#
def leave; end

# This method called when some model record is created or updated.
# Normally it will just evaluate all the Chewy callbacks and pass results
# to current strategy's update method.
# However it's possible to override it to achieve delayed evaluation of
# callbacks, e.g. using sidekiq.
#
def update_chewy_indices(object)
object.run_chewy_callbacks
end
end
end
end
52 changes: 52 additions & 0 deletions lib/chewy/strategy/lazy_sidekiq.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
module Chewy
class Strategy
# The strategy works the same way as sidekiq, but performs
# async evaluation of all index callbacks on model create and update
# driven by sidekiq
#
# Chewy.strategy(:lazy_sidekiq) do
# User.all.map(&:save) # Does nothing here
# Post.all.map(&:save) # And here
# # It imports all the changed users and posts right here
# end
#
class LazySidekiq < Sidekiq
class LazyWorker
include ::Sidekiq::Worker

def perform(type, id)
type.constantize.find_by_id(id)&.run_chewy_callbacks
end
end

def initialize
@stash = []
end

def update(type, objects, _options = {})
ids = type.root.id ? Array.wrap(objects) : type.adapter.identify(objects)
return if ids.empty?

::Sidekiq::Client.push(
'queue' => sidekiq_queue,
'class' => Chewy::Strategy::Sidekiq::Worker,
'args' => [type.name, ids]
)
end

def leave
@stash.each do |model_name, id|
::Sidekiq::Client.push(
'queue' => sidekiq_queue,
'class' => Chewy::Strategy::LazySidekiq::LazyWorker,
'args' => [model_name, id]
)
end
end

def update_chewy_indices(object)
@stash << [object.class.name, object.id]
end
end
end
end
192 changes: 192 additions & 0 deletions spec/chewy/strategy/lazy_sidekiq_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
require 'spec_helper'

if defined?(::Sidekiq)
require 'sidekiq/testing'

describe Chewy::Strategy::LazySidekiq do
around do |example|
sidekiq_settings = Chewy.settings[:sidekiq]
Chewy.settings[:sidekiq] = {queue: 'low'}
Chewy.strategy(:lazy_sidekiq) { example.run }
Chewy.settings[:sidekiq] = sidekiq_settings
end
before { ::Sidekiq::Worker.clear_all }

context 'tests from sidekiq_spec + some more' do
before do
stub_model(:city) do
update_index('cities') { self }
end

stub_index(:cities) do
index_scope City
end
end

let(:city) { City.create!(name: 'hello') }
let(:other_city) { City.create!(name: 'world') }

specify do
expect { [city, other_city].map(&:save!) }
.not_to update_index(CitiesIndex, strategy: :lazy_sidekiq)
end

specify do
expect(::Sidekiq::Client).to receive(:push)
.with(hash_including('class' => Chewy::Strategy::LazySidekiq::LazyWorker, 'queue' => 'low'))
.and_call_original
.twice
expect(::Sidekiq::Client).to receive(:push)
.with(hash_including('class' => Chewy::Strategy::Sidekiq::Worker, 'queue' => 'low'))
.and_call_original
.twice
::Sidekiq::Testing.inline! do
expect { [city, other_city].map(&:save!) }
.to update_index(CitiesIndex, strategy: :lazy_sidekiq)
.and_reindex(city, other_city).only
end
end

specify do
expect(CitiesIndex).to receive(:import!).with([city.id, other_city.id], suffix: '201601')
Chewy::Strategy::Sidekiq::Worker.new.perform('CitiesIndex', [city.id, other_city.id], suffix: '201601')
end

specify do
allow(Chewy).to receive(:disable_refresh_async).and_return(true)
expect(CitiesIndex).to receive(:import!).with([city.id, other_city.id], suffix: '201601', refresh: false)
Chewy::Strategy::Sidekiq::Worker.new.perform('CitiesIndex', [city.id, other_city.id], suffix: '201601')
end

specify do
expect(CitiesIndex).to receive(:import!).with([city.id], {})
expect(::Sidekiq::Client).to receive(:push)
.with(hash_including('class' => Chewy::Strategy::Sidekiq::Worker, 'queue' => 'low'))
.and_call_original
allow(City).to receive(:find_by_id).with(city.id).and_return(city)
expect(city).to receive(:run_chewy_callbacks).and_call_original
::Sidekiq::Testing.inline! do
Chewy::Strategy::LazySidekiq::LazyWorker.new.perform('City', city.id)
end
end
end

context 'integration' do
around { |example| ::Sidekiq::Testing.inline! { example.run } }

let(:update_condition) { true }

context 'state dependent' do
before do
stub_model(:city) do
update_index(-> { 'cities' }, :self)
update_index('countries') { changes['country_id'] || previous_changes['country_id'] || country }
end

stub_model(:country) do
update_index('cities', if: -> { update_condition_state }) { cities }
update_index(-> { 'countries' }, :self)
attr_accessor :update_condition_state
end

City.belongs_to :country
Country.has_many :cities

stub_index(:cities) do
index_scope City
end

stub_index(:countries) do
index_scope Country
end
end

context do
let!(:country1) { Country.create!(id: 1) }
let!(:country2) { Country.create!(id: 2) }
let!(:city) { City.create!(id: 1, country: country1) }

it 'does not update index of removed entity because model state on the moment of save cannot be fetched' do
expect { city.update!(country: nil) }.not_to update_index('countries', strategy: :lazy_sidekiq)
end
it 'does not update index of removed entity because model state on the moment of save cannot be fetched' do
expect { city.update!(country: country2) }.to update_index('countries', strategy: :lazy_sidekiq).and_reindex(country2).only
end
end

context do
let!(:country) do
cities = Array.new(2) { |i| City.create!(id: i) }
Country.create!(id: 1, cities: cities, update_condition_state: update_condition)
end

it 'does not update index because state of attribute cannot be fetched' do
expect { country.save! }.not_to update_index('cities', strategy: :lazy_sidekiq)
end
end
end

context do
before do
stub_model(:city) do
update_index(-> { 'cities' }, :self)
update_index('countries') { country }
end

stub_model(:country) do
update_index('cities', if: -> { update_condition_state }) { cities }
update_index(-> { 'countries' }, :self)
end

City.belongs_to :country
Country.has_many :cities

stub_index(:cities) do
index_scope City
end

stub_index(:countries) do
index_scope Country
end

allow_any_instance_of(Country).to receive(:update_condition_state).and_return(update_condition)
end

context do
let!(:country1) { Country.create!(id: 1) }
let!(:country2) { Country.create!(id: 2) }
let!(:city) { City.create!(id: 1, country: country1) }

specify { expect { city.save! }.to update_index('cities', strategy: :lazy_sidekiq).and_reindex(city).only }
specify { expect { city.save! }.to update_index('countries', strategy: :lazy_sidekiq).and_reindex(country1).only }

specify { expect { city.update!(country: nil) }.to update_index('cities', strategy: :lazy_sidekiq).and_reindex(city).only }

specify { expect { city.update!(country: country2) }.to update_index('cities', strategy: :lazy_sidekiq).and_reindex(city).only }

# See spec/chewy/strategy/lazy_sidekiq_spec.rb:109
skip { expect { city.update!(country: nil) }.to update_index('countries', strategy: :lazy_sidekiq).and_reindex(country1).only }
# See spec/chewy/strategy/lazy_sidekiq_spec.rb:112
skip do
expect { city.update!(country: country2) }
.to update_index('countries', strategy: :lazy_sidekiq).and_reindex(country1, country2).only
end
end

context do
let!(:country) do
cities = Array.new(2) { |i| City.create!(id: i) }
Country.create!(id: 1, cities: cities)
end
specify { expect { country.save! }.to update_index('cities', strategy: :lazy_sidekiq).and_reindex(country.cities).only }
specify { expect { country.save! }.to update_index('countries', strategy: :lazy_sidekiq).and_reindex(country).only }

context 'conditional update' do
let(:update_condition) { false }
specify { expect { country.save! }.not_to update_index('cities', strategy: :lazy_sidekiq) }
end
end
end
end
end
end

0 comments on commit 6f149b1

Please sign in to comment.