Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Indexing Optimization and Refresh on Async Strategies #467

Merged
merged 24 commits into from
Mar 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
149a7d0
Added use_enhance_index_settings_while_resetting and disable_refresh_…
ericproulx Jan 12, 2017
3f0ae69
Use config `disable_refresh_async` in async strategies
ericproulx Jan 12, 2017
6b76da4
Use config `use_enhance_index_settings_while_resetting` in reset
ericproulx Jan 12, 2017
597d6e9
Fixed refresh_interval settings after import
ericproulx Jan 13, 2017
be15f69
Added specs
ericproulx Jan 13, 2017
d1ef901
Small refactor with `refresh: false` when `use_enhance_index_settings…
ericproulx Jan 13, 2017
7cbfaff
Added condition for refresh with `disable_refresh_async`
ericproulx Jan 13, 2017
04b6e26
Fixed rubocop offenses + Fix setting back refresh_interval if no sett…
ericproulx Jan 13, 2017
67c6b7b
Fixed rubocop offenses
ericproulx Jan 13, 2017
f58fa44
Fixed warning + not proud copy/paste
ericproulx Jan 13, 2017
af3b8b3
Fixed resque spec typo
ericproulx Jan 13, 2017
d7e447e
Added disabled_refresh_async spec
ericproulx Jan 13, 2017
3342428
Refactor + Added spec
ericproulx Jan 16, 2017
d258744
Refactor extract_index_settings
ericproulx Jan 16, 2017
6746abf
Fix offenses
ericproulx Jan 16, 2017
0580a10
Freeze rubocop to 0.46.0
ericproulx Jan 16, 2017
b4e2bd9
Moved rubocop 0.46.0 to gemspec
ericproulx Jan 16, 2017
9dae6a9
Fixed rubocop 0.47.0 offenses. Removed 0.46.0 lock
ericproulx Jan 17, 2017
4d29534
Separated no_replicas and refresh_interval.
ericproulx Jan 19, 2017
6c19286
Removed rubocop 0.46 from gemfile
ericproulx Jan 19, 2017
7cabb49
Trigger
ericproulx Jan 22, 2017
a8659a3
Trigger
ericproulx Jan 22, 2017
19b4180
Freeze elasticsearch gem to 5.0.0 because of the delete_by_query chan…
ericproulx Jan 22, 2017
f3f8042
Merge remote-tracking branch 'upstream/master'
ericproulx Mar 29, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion lib/chewy/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,19 @@ class Config

# Where Chewy expects to find index definitions
# within a Rails app folder.
:indices_path
:indices_path,

# Set index refresh_interval setting to -1 before reset and put the original value after.
# If setting not present, put back to default 1s
# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html
:reset_disable_refresh_interval,

# Set number_of_replicas to 0 before reset and put the original value after
# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html
:reset_no_replicas,

# Refresh or not when import async (sidekiq, resque, activejob)
:disable_refresh_async
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, why did you add this?
I'm thinking about some options passed to the block: Chewy.strategy(:sidekiq, refresh: false) { do_stuff } Instead of the global option. However it would not affect anything, so I'm ok to accept it if you don't want to change your mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both ways could work. We could set the strategy globally for the async strategies or we could simply pass some options to the block to have a different behavior for some cases. What do you think ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this makes sense. I'm just thinking of it being slightly excess. But yeah, we can keep both ways. Tell me when you think this PR is ready, I'll review it again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ready to review sir 👍


def self.delegated
public_instance_methods - superclass.public_instance_methods - Singleton.public_instance_methods
Expand All @@ -53,6 +65,9 @@ def initialize
@root_strategy = :base
@request_strategy = :atomic
@use_after_commit_callbacks = true
@reset_disable_refresh_interval = false
@reset_no_replicas = false
@disable_refresh_async = false
@indices_path = 'app/chewy'
end

Expand Down
53 changes: 28 additions & 25 deletions lib/chewy/fields/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ def object_field?
end

def mappings_hash
mapping = if children.present?
{ (multi_field? ? :fields : :properties) => children.map(&:mappings_hash).inject(:merge) }
else
{}
end
mapping =
if children.present?
{ (multi_field? ? :fields : :properties) => children.map(&:mappings_hash).inject(:merge) }
else
{}
end
mapping.reverse_merge!(options)
mapping.reverse_merge!(type: (children.present? ? 'object' : 'string'))
{ name => mapping }
Expand All @@ -33,30 +34,32 @@ def mappings_hash
def compose(object, *parent_objects)
objects = ([object] + parent_objects.flatten).uniq

result = if value && value.is_a?(Proc)
if value.arity.zero?
object.instance_exec(&value)
elsif value.arity < 0
value.call(*object)
else
value.call(*objects.first(value.arity))
end
elsif object.is_a?(Hash)
if object.key?(name)
object[name]
result =
if value && value.is_a?(Proc)
if value.arity.zero?
object.instance_exec(&value)
elsif value.arity < 0
value.call(*object)
else
value.call(*objects.first(value.arity))
end
elsif object.is_a?(Hash)
if object.key?(name)
object[name]
else
object[name.to_s]
end
else
object[name.to_s]
object.send(name)
end
else
object.send(name)
end

if children.present? && !multi_field?
result = if result.respond_to?(:to_ary)
result.to_ary.map { |item| compose_children(item, *objects) }
else
compose_children(result, *objects)
end
result =
if result.respond_to?(:to_ary)
result.to_ary.map { |item| compose_children(item, *objects) }
else
compose_children(result, *objects)
end
end

{ name => result }
Expand Down
36 changes: 35 additions & 1 deletion lib/chewy/index/actions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,11 @@ def #{method} options = {}
def reset!(suffix = nil, journal: false)
if suffix.present? && (indexes = self.indexes).present?
create! suffix, alias: false
result = import suffix: suffix, journal: journal

optimize_index_settings suffix
result = import suffix: suffix, journal: journal, refresh: !Chewy.reset_disable_refresh_interval
original_index_settings suffix

client.indices.update_aliases body: { actions: [
*indexes.map do |index|
{ remove: { index: index, alias: index_name } }
Expand All @@ -177,6 +181,36 @@ def reset!(suffix = nil, journal: false)
import journal: journal
end
end

private

def optimize_index_settings(suffix)
settings = {}
settings[:refresh_interval] = -1 if Chewy.reset_disable_refresh_interval
settings[:number_of_replicas] = 0 if Chewy.reset_no_replicas
update_settings suffix: suffix, settings: settings if settings.any?
end

def original_index_settings(suffix)
settings = {}
if Chewy.reset_disable_refresh_interval
settings.merge! index_settings(:refresh_interval)
settings[:refresh_interval] = '1s' if settings.empty?
end
settings.merge! index_settings(:number_of_replicas) if Chewy.reset_no_replicas
update_settings suffix: suffix, settings: settings if settings.any?
end

def update_settings(*args)
options = args.extract_options!
name = build_index_name suffix: options[:suffix]
client.indices.put_settings index: name, body: { index: options[:settings] }
end

def index_settings(setting_name)
return {} unless settings_hash.key?(:settings) || settings_hash[:settings].key?(:index)
settings_hash[:settings][:index].slice(setting_name)
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/chewy/strategy/active_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Worker < ::ActiveJob::Base
queue_as :chewy

def perform(type, ids, options = {})
options[:refresh] = !Chewy.disable_refresh_async if Chewy.disable_refresh_async
type.constantize.import!(ids, options)
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/chewy/strategy/resque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Worker
@queue = :chewy

def self.perform(type, ids, options = {})
options[:refresh] = !Chewy.disable_refresh_async if Chewy.disable_refresh_async
type.constantize.import!(ids, options)
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/chewy/strategy/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Worker
include ::Sidekiq::Worker

def perform(type, ids, options = {})
options[:refresh] = !Chewy.disable_refresh_async if Chewy.disable_refresh_async
type.constantize.import!(ids, options)
end
end
Expand Down
3 changes: 3 additions & 0 deletions spec/chewy/config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
its(:request_strategy) { should == :atomic }
its(:use_after_commit_callbacks) { should == true }
its(:indices_path) { should == 'app/chewy' }
its(:reset_disable_refresh_interval) { should == false }
its(:reset_no_replicas) { should == false }
its(:disable_refresh_async) { should == false }

describe '#transport_logger=' do
let(:logger) { Logger.new('/dev/null') }
Expand Down
96 changes: 96 additions & 0 deletions spec/chewy/index/actions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,102 @@
end
end

context 'reset_disable_refresh_interval' do
let(:suffix) { Time.now.to_i }
let(:name) { CitiesIndex.build_index_name(suffix: suffix) }
let(:before_import_body) do
{
index: { refresh_interval: -1 }
}
end
let(:after_import_body) do
{
index: { refresh_interval: '1s' }
}
end

before { CitiesIndex.reset!('2013') }
before { allow(Chewy).to receive(:reset_disable_refresh_interval).and_return(reset_disable_refresh_interval) }

context 'activated' do
let(:reset_disable_refresh_interval) { true }
specify do
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: before_import_body).once
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: false).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end

context 'refresh_interval already defined' do
before do
stub_index(:cities) do
settings index: { refresh_interval: '2s' }
define_type City
end
end

let(:after_import_body) do
{
index: { refresh_interval: '2s' }
}
end

specify do
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: before_import_body).once
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: false).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end
end
end

context 'not activated' do
let(:reset_disable_refresh_interval) { false }
specify do
expect(CitiesIndex.client.indices).not_to receive(:put_settings)
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end
end
end

context 'reset_no_replicas' do
let(:suffix) { Time.now.to_i }
let(:name) { CitiesIndex.build_index_name(suffix: suffix) }
let(:before_import_body) do
{
index: { number_of_replicas: 0 }
}
end
let(:after_import_body) do
{
index: { number_of_replicas: 0 }
}
end

before { CitiesIndex.reset!('2013') }
before { allow(Chewy).to receive(:reset_no_replicas).and_return(reset_no_replicas) }

context 'activated' do
let(:reset_no_replicas) { true }
specify do
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: before_import_body).once
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end
end

context 'not activated' do
let(:reset_no_replicas) { false }
specify do
expect(CitiesIndex.client.indices).not_to receive(:put_settings)
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end
end
end

context 'journaling' do
before do
begin
Expand Down
13 changes: 13 additions & 0 deletions spec/chewy/index_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ class DummyCityIndex2 < Chewy::Index
end.to raise_error(NameError)
end
end

context 'type methods should be deprecated and can\'t redefine existing ones' do
before do
stub_index(:places) do
def self.city; end
define_type :city
define_type :country
end
end

specify { expect(PlacesIndex.city).to be_nil }
specify { expect(PlacesIndex::Country).to be < Chewy::Type }
end
end

describe '.type_hash' do
Expand Down
6 changes: 6 additions & 0 deletions spec/chewy/strategy/active_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,11 @@
expect(CitiesIndex::City).to receive(:import!).with([city.id, other_city.id], suffix: '201601')
Chewy::Strategy::ActiveJob::Worker.new.perform('CitiesIndex::City', [city.id, other_city.id], suffix: '201601')
end

specify do
allow(Chewy).to receive(:disable_refresh_async).and_return(true)
expect(CitiesIndex::City).to receive(:import!).with([city.id, other_city.id], suffix: '201601', refresh: false)
Chewy::Strategy::ActiveJob::Worker.new.perform('CitiesIndex::City', [city.id, other_city.id], suffix: '201601')
end
end
end
6 changes: 6 additions & 0 deletions spec/chewy/strategy/resque_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,11 @@
expect(CitiesIndex::City).to receive(:import!).with([city.id, other_city.id], suffix: '201601')
Chewy::Strategy::Resque::Worker.perform('CitiesIndex::City', [city.id, other_city.id], suffix: '201601')
end

specify do
allow(Chewy).to receive(:disable_refresh_async).and_return(true)
expect(CitiesIndex::City).to receive(:import!).with([city.id, other_city.id], suffix: '201601', refresh: false)
Chewy::Strategy::Resque::Worker.perform('CitiesIndex::City', [city.id, other_city.id], suffix: '201601')
end
end
end
6 changes: 6 additions & 0 deletions spec/chewy/strategy/sidekiq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,11 @@
expect(CitiesIndex::City).to receive(:import!).with([city.id, other_city.id], suffix: '201601')
Chewy::Strategy::Sidekiq::Worker.new.perform('CitiesIndex::City', [city.id, other_city.id], suffix: '201601')
end

specify do
allow(Chewy).to receive(:disable_refresh_async).and_return(true)
expect(CitiesIndex::City).to receive(:import!).with([city.id, other_city.id], suffix: '201601', refresh: false)
Chewy::Strategy::Sidekiq::Worker.new.perform('CitiesIndex::City', [city.id, other_city.id], suffix: '201601')
end
end
end