Skip to content

Commit

Permalink
Revert [#787](#787) progressbar feature to avoid performance degradat…
Browse files Browse the repository at this point in the history
…ion in parallel import
  • Loading branch information
Ivan Rabotyaga committed May 24, 2021
1 parent 261dc9e commit 6133dd4
Show file tree
Hide file tree
Showing 16 changed files with 47 additions and 159 deletions.
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@

### Changes

### Bugs Fixed

## 7.2.2 (2021-05-24)

### Changes

* [#800](https://github.com/toptal/chewy/pull/800): Revert [#787](https://github.com/toptal/chewy/pull/787) progressbar feature to avoid performance degradation in parallel import ([@rabotyaga][])

* [#795](https://github.com/toptal/chewy/issues/795): **(Breaking)** Change the Chewy::Search::Parameters::Order implementation to use Array ([@jiajiawang][]):
* To allow multiple sorting options that may have the same key name. For example script based sorting whose key will always be `_script`.
* Behaviour change of chained `order` calls.
Expand All @@ -19,8 +27,6 @@
* `assert_elasticsearch_query` helper for Minitest - to compare request and expected query (returns `true`/`false`)
* `build_query` matcher for Rspec - to compare request and expected query (returns `true`/`false`)

### Bugs Fixed

## 7.2.1 (2021-05-11)

### New Features
Expand Down
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Chewy is compatible with MRI 2.6-3.0¹.
| ------------- | ---------------------------------- |
| 7.2.x | 7.x |
| 7.1.x | 7.x |
| 7.0.0 | 6.8, 7.x |
| 7.0.x | 6.8, 7.x |
| 6.0.0 | 5.x, 6.x |
| 5.x | 5.x, limited support for 1.x & 2.x |

Expand Down Expand Up @@ -1032,12 +1032,6 @@ rake chewy:reset[users,cities] # resets UsersIndex and CitiesIndex
rake chewy:reset[-users,cities] # resets every index in the application except specified ones
```

#### Progressbar for `chewy:reset` tasks

You can optionally output the `progressbar` for `chewy:reset` and `chewy:parallel:reset` during import.

Progressbar is hidden by default. Set `ENV['PROGRESS']` to `true` to display it.

#### `chewy:upgrade`

Performs reset exactly the same way as `chewy:reset` does, but only when the index specification (setting or mapping) was changed.
Expand Down
3 changes: 1 addition & 2 deletions chewy.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength
spec.add_development_dependency 'unparser'

spec.add_dependency 'activesupport', '>= 5.2'
spec.add_dependency 'elasticsearch', '>= 6.3.0'
spec.add_dependency 'elasticsearch', '>= 7.12.0'
spec.add_dependency 'elasticsearch-dsl'
spec.add_dependency 'ruby-progressbar'
end
21 changes: 8 additions & 13 deletions lib/chewy/index/actions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def purge!(suffix = nil)
# @param journal [true, false] journaling is switched off for import during reset by default
# @param import_options [Hash] options, passed to the import call
# @return [true, false] false in case of errors
def reset!(suffix = nil, apply_journal: true, journal: false, progressbar: false, **import_options)
def reset!(suffix = nil, apply_journal: true, journal: false, **import_options)
result = if suffix.present?
start_time = Time.now
indexes = self.indexes - [index_name]
Expand All @@ -159,13 +159,17 @@ def reset!(suffix = nil, apply_journal: true, journal: false, progressbar: false
result = import import_options.merge(
suffix: suffix,
journal: journal,
refresh: !Chewy.reset_disable_refresh_interval,
progressbar: progressbar
refresh: !Chewy.reset_disable_refresh_interval
)
original_index_settings suffixed_name

delete if indexes.blank?
update_aliases(indexes, general_name, suffixed_name)
client.indices.update_aliases body: {actions: [
*indexes.map do |index|
{remove: {index: index, alias: general_name}}
end,
{add: {index: suffixed_name, alias: general_name}}
]}
client.indices.delete index: indexes if indexes.present?

self.journal.apply(start_time, **import_options) if apply_journal
Expand Down Expand Up @@ -233,15 +237,6 @@ def sync(parallel: nil)

private

def update_aliases(indexes, general_name, suffixed_name)
client.indices.update_aliases body: {actions: [
*indexes.map do |index|
{remove: {index: index, alias: general_name}}
end,
{add: {index: suffixed_name, alias: general_name}}
]}
end

def optimize_index_settings(index_name)
settings = {}
settings[:refresh_interval] = -1 if Chewy.reset_disable_refresh_interval
Expand Down
10 changes: 0 additions & 10 deletions lib/chewy/index/adapter/object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,6 @@ def load(ids, **options)
end
end

def import_count(*args)
collection = if args.first.empty? && @target.respond_to?(import_all_method)
@target.send(import_all_method)
else
args.flatten(1).compact
end

collection.count
end

private

def import_objects(objects, options)
Expand Down
12 changes: 0 additions & 12 deletions lib/chewy/index/adapter/orm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,6 @@ def load(ids, **options)
ids.map { |id| loaded_objects[id.to_s] }
end

def import_count(*args)
collection = if args.first.empty?
default_scope
elsif args.first.is_a?(relation_class)
args.first
else
args.flatten.compact
end

collection.count
end

private

def import_objects(collection, options)
Expand Down
42 changes: 15 additions & 27 deletions lib/chewy/index/import.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@
require 'chewy/index/import/bulk_builder'
require 'chewy/index/import/bulk_request'
require 'chewy/index/import/routine'
require 'chewy/index/import/thread_safe_progress_bar'

module Chewy
class Index
module Import
extend ActiveSupport::Concern

IMPORT_WORKER = lambda do |index, options, total, progress_bar, ids, iteration|
IMPORT_WORKER = lambda do |index, options, total, ids, iteration|
::Process.setproctitle("chewy [#{index}]: import data (#{iteration + 1}/#{total})")
routine = Routine.new(index, **options)
index.adapter.import(*ids, routine.options) do |action_objects|
routine.process(**action_objects)
progress_bar.increment(action_objects.map { |_, v| v.size }.sum) if routine.options[:progressbar]
end
{errors: routine.errors, import: routine.stats, leftovers: routine.leftovers}
end
Expand Down Expand Up @@ -152,10 +150,8 @@ def empty_objects_or_scope?(objects_or_scope)

def import_linear(objects, routine)
ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload|
progress_bar = ThreadSafeProgressBar.new(routine.options[:progressbar]) { adapter.import_count(objects) }
adapter.import(*objects, routine.options) do |action_objects|
routine.process(**action_objects)
progress_bar.increment(action_objects.map { |_, v| v.size }.sum) if routine.options[:progressbar]
end
routine.perform_bulk(routine.leftovers)
payload[:import] = routine.stats
Expand All @@ -170,39 +166,31 @@ def import_parallel(objects, routine)
ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload|
batches = adapter.import_references(*objects, routine.options.slice(:batch_size)).to_a

progress_bar = ThreadSafeProgressBar.new(routine.options[:progressbar]) { adapter.import_count(objects) }
::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base)

results = ::Parallel.map_with_index(batches, routine.parallel_options) do |ids, index|
progress_bar.wait_until_ready
ActiveRecord::Base.connection_pool.with_connection do
IMPORT_WORKER.call(self, routine.options, total, progress_bar, ids, index)
end
end

results = ::Parallel.map_with_index(
batches,
routine.parallel_options,
&IMPORT_WORKER.curry[self, routine.options, batches.size]
)
::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base)
errors, import, leftovers = process_parallel_import_results(results)

execute_leftovers(leftovers, routine, self, errors)
if leftovers.present?
batches = leftovers.each_slice(routine.options[:batch_size])
results = ::Parallel.map_with_index(
batches,
routine.parallel_options,
&LEFTOVERS_WORKER.curry[self, routine.options, batches.size]
)
errors.concat(results.flatten(1))
end

payload[:import] = import
payload[:errors] = payload_errors(errors) if errors.present?
payload[:errors]
end
end

def execute_leftovers(leftovers, routine, self_object, errors)
return unless leftovers.present?

batches = leftovers.each_slice(routine.options[:batch_size])
results = ::Parallel.map_with_index(
batches,
routine.parallel_options,
&LEFTOVERS_WORKER.curry[self_object, routine.options, batches.size]
)
errors.concat(results.flatten(1))
end

def process_parallel_import_results(results)
results.each_with_object([[], {}, []]) do |r, (e, i, l)|
e.concat(r[:errors])
Expand Down
4 changes: 2 additions & 2 deletions lib/chewy/index/import/routine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ def initialize(index, **options)
@parallel_options = @options.delete(:parallel)
if @parallel_options && !@parallel_options.is_a?(Hash)
@parallel_options = if @parallel_options.is_a?(Integer)
{in_threads: @parallel_options}
{in_processes: @parallel_options}
else
{in_threads: [::Parallel.processor_count, ActiveRecord::Base.connection_pool.size].min}
{}
end
end
@errors = []
Expand Down
40 changes: 0 additions & 40 deletions lib/chewy/index/import/thread_safe_progress_bar.rb

This file was deleted.

2 changes: 1 addition & 1 deletion lib/chewy/rake_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def human_duration(seconds)

def reset_one(index, output, parallel: false)
output.puts "Resetting #{index}"
index.reset!((Time.now.to_f * 1000).round, parallel: parallel, progressbar: ENV['PROGRESS'] == 'true')
index.reset!((Time.now.to_f * 1000).round, parallel: parallel)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/chewy/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Chewy
VERSION = '7.2.1'.freeze
VERSION = '7.2.2'.freeze
end
12 changes: 6 additions & 6 deletions spec/chewy/index/actions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@
specify do
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: before_import_body).once
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once
expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: false).and_call_original
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: false).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end

Expand All @@ -525,7 +525,7 @@
.to receive(:put_settings).with(index: name, body: before_import_body).once
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once
expect(CitiesIndex)
.to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: false).and_call_original
.to receive(:import).with(suffix: suffix, journal: false, refresh: false).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end

Expand All @@ -541,7 +541,7 @@
let(:reset_disable_refresh_interval) { false }
specify do
expect(CitiesIndex.client.indices).not_to receive(:put_settings)
expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: true).and_call_original
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end
end
Expand All @@ -568,7 +568,7 @@
specify do
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: before_import_body).once
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once
expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: true).and_call_original
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end
end
Expand All @@ -577,7 +577,7 @@
let(:reset_no_replicas) { false }
specify do
expect(CitiesIndex.client.indices).not_to receive(:put_settings)
expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: true).and_call_original
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end
end
Expand Down Expand Up @@ -667,7 +667,7 @@
specify do
expect(CitiesIndex)
.to receive(:import)
.with(suffix: 'suffix', progressbar: false, parallel: true, journal: false, refresh: true)
.with(suffix: 'suffix', parallel: true, journal: false, refresh: true)
.once.and_return(true)
expect(CitiesIndex.reset!('suffix', parallel: true)).to eq(true)
end
Expand Down
4 changes: 2 additions & 2 deletions spec/chewy/index/import/routine_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@

describe '#parallel_options' do
specify { expect(described_class.new(CitiesIndex).parallel_options).to be_nil }
specify { expect(described_class.new(CitiesIndex, parallel: true).parallel_options).to eq({in_threads: [::Parallel.processor_count, ActiveRecord::Base.connection_pool.size].min}) }
specify { expect(described_class.new(CitiesIndex, parallel: 3).parallel_options).to eq(in_threads: 3) }
specify { expect(described_class.new(CitiesIndex, parallel: true).parallel_options).to eq({}) }
specify { expect(described_class.new(CitiesIndex, parallel: 3).parallel_options).to eq(in_processes: 3) }
specify do
expect(described_class.new(CitiesIndex, parallel: {in_threads: 2}).parallel_options).to eq(in_threads: 2)
end
Expand Down
Loading

0 comments on commit 6133dd4

Please sign in to comment.