From 6133dd437ac8cf6645bb3c37d5b1e5280c149ede Mon Sep 17 00:00:00 2001 From: Ivan Rabotyaga Date: Mon, 24 May 2021 22:25:47 +0300 Subject: [PATCH] Revert [#787](https://github.com/toptal/chewy/pull/787) progressbar feature to avoid performance degradation in parallel import --- CHANGELOG.md | 10 ++++- README.md | 8 +--- chewy.gemspec | 3 +- lib/chewy/index/actions.rb | 21 ++++------ lib/chewy/index/adapter/object.rb | 10 ----- lib/chewy/index/adapter/orm.rb | 12 ------ lib/chewy/index/import.rb | 42 +++++++------------ lib/chewy/index/import/routine.rb | 4 +- .../index/import/thread_safe_progress_bar.rb | 40 ------------------ lib/chewy/rake_helper.rb | 2 +- lib/chewy/version.rb | 2 +- spec/chewy/index/actions_spec.rb | 12 +++--- spec/chewy/index/import/routine_spec.rb | 4 +- spec/chewy/index/import_spec.rb | 33 +-------------- spec/spec_helper.rb | 1 - spec/support/active_record.rb | 2 +- 16 files changed, 47 insertions(+), 159 deletions(-) delete mode 100644 lib/chewy/index/import/thread_safe_progress_bar.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index be4f4421c..ef1e71e42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,14 @@ ### Changes +### Bugs Fixed + +## 7.2.2 (2021-05-24) + +### Changes + + * [#800](https://github.com/toptal/chewy/pull/800): Revert [#787](https://github.com/toptal/chewy/pull/787) progressbar feature to avoid performance degradation in parallel import ([@rabotyaga][]) + * [#795](https://github.com/toptal/chewy/issues/795): **(Breaking)** Change the Chewy::Search::Parameters::Order implementation to use Array ([@jiajiawang][]): * To allow multiple sorting options that may have the same key name. For example script based sorting whose key will always be `_script`. * Behaviour change of chained `order` calls. @@ -19,8 +27,6 @@ * `assert_elasticsearch_query` helper for Minitest - to compare request and expected query (returns `true`/`false`) * `build_query` matcher for Rspec - to compare request and expected query (returns `true`/`false`) -### Bugs Fixed - ## 7.2.1 (2021-05-11) ### New Features diff --git a/README.md b/README.md index 9ec313329..209a27254 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ Chewy is compatible with MRI 2.6-3.0ยน. | ------------- | ---------------------------------- | | 7.2.x | 7.x | | 7.1.x | 7.x | -| 7.0.0 | 6.8, 7.x | +| 7.0.x | 6.8, 7.x | | 6.0.0 | 5.x, 6.x | | 5.x | 5.x, limited support for 1.x & 2.x | @@ -1032,12 +1032,6 @@ rake chewy:reset[users,cities] # resets UsersIndex and CitiesIndex rake chewy:reset[-users,cities] # resets every index in the application except specified ones ``` -#### Progressbar for `chewy:reset` tasks - -You can optionally output the `progressbar` for `chewy:reset` and `chewy:parallel:reset` during import. - -Progressbar is hidden by default. Set `ENV['PROGRESS']` to `true` to display it. - #### `chewy:upgrade` Performs reset exactly the same way as `chewy:reset` does, but only when the index specification (setting or mapping) was changed. diff --git a/chewy.gemspec b/chewy.gemspec index 6079a4dfa..093ebe6cc 100644 --- a/chewy.gemspec +++ b/chewy.gemspec @@ -31,7 +31,6 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength spec.add_development_dependency 'unparser' spec.add_dependency 'activesupport', '>= 5.2' - spec.add_dependency 'elasticsearch', '>= 6.3.0' + spec.add_dependency 'elasticsearch', '>= 7.12.0' spec.add_dependency 'elasticsearch-dsl' - spec.add_dependency 'ruby-progressbar' end diff --git a/lib/chewy/index/actions.rb b/lib/chewy/index/actions.rb index a019be4fb..c1f434d77 100644 --- a/lib/chewy/index/actions.rb +++ b/lib/chewy/index/actions.rb @@ -146,7 +146,7 @@ def purge!(suffix = nil) # @param journal [true, false] journaling is switched off for import during reset by default # @param import_options [Hash] options, passed to the import call # @return [true, false] false in case of errors - def reset!(suffix = nil, apply_journal: true, journal: false, progressbar: false, **import_options) + def reset!(suffix = nil, apply_journal: true, journal: false, **import_options) result = if suffix.present? start_time = Time.now indexes = self.indexes - [index_name] @@ -159,13 +159,17 @@ def reset!(suffix = nil, apply_journal: true, journal: false, progressbar: false result = import import_options.merge( suffix: suffix, journal: journal, - refresh: !Chewy.reset_disable_refresh_interval, - progressbar: progressbar + refresh: !Chewy.reset_disable_refresh_interval ) original_index_settings suffixed_name delete if indexes.blank? - update_aliases(indexes, general_name, suffixed_name) + client.indices.update_aliases body: {actions: [ + *indexes.map do |index| + {remove: {index: index, alias: general_name}} + end, + {add: {index: suffixed_name, alias: general_name}} + ]} client.indices.delete index: indexes if indexes.present? self.journal.apply(start_time, **import_options) if apply_journal @@ -233,15 +237,6 @@ def sync(parallel: nil) private - def update_aliases(indexes, general_name, suffixed_name) - client.indices.update_aliases body: {actions: [ - *indexes.map do |index| - {remove: {index: index, alias: general_name}} - end, - {add: {index: suffixed_name, alias: general_name}} - ]} - end - def optimize_index_settings(index_name) settings = {} settings[:refresh_interval] = -1 if Chewy.reset_disable_refresh_interval diff --git a/lib/chewy/index/adapter/object.rb b/lib/chewy/index/adapter/object.rb index de33acb63..156a10232 100644 --- a/lib/chewy/index/adapter/object.rb +++ b/lib/chewy/index/adapter/object.rb @@ -192,16 +192,6 @@ def load(ids, **options) end end - def import_count(*args) - collection = if args.first.empty? && @target.respond_to?(import_all_method) - @target.send(import_all_method) - else - args.flatten(1).compact - end - - collection.count - end - private def import_objects(objects, options) diff --git a/lib/chewy/index/adapter/orm.rb b/lib/chewy/index/adapter/orm.rb index 8ad241b69..7c04d9e9e 100644 --- a/lib/chewy/index/adapter/orm.rb +++ b/lib/chewy/index/adapter/orm.rb @@ -108,18 +108,6 @@ def load(ids, **options) ids.map { |id| loaded_objects[id.to_s] } end - def import_count(*args) - collection = if args.first.empty? - default_scope - elsif args.first.is_a?(relation_class) - args.first - else - args.flatten.compact - end - - collection.count - end - private def import_objects(collection, options) diff --git a/lib/chewy/index/import.rb b/lib/chewy/index/import.rb index dd0d89360..a5aca8912 100644 --- a/lib/chewy/index/import.rb +++ b/lib/chewy/index/import.rb @@ -2,19 +2,17 @@ require 'chewy/index/import/bulk_builder' require 'chewy/index/import/bulk_request' require 'chewy/index/import/routine' -require 'chewy/index/import/thread_safe_progress_bar' module Chewy class Index module Import extend ActiveSupport::Concern - IMPORT_WORKER = lambda do |index, options, total, progress_bar, ids, iteration| + IMPORT_WORKER = lambda do |index, options, total, ids, iteration| ::Process.setproctitle("chewy [#{index}]: import data (#{iteration + 1}/#{total})") routine = Routine.new(index, **options) index.adapter.import(*ids, routine.options) do |action_objects| routine.process(**action_objects) - progress_bar.increment(action_objects.map { |_, v| v.size }.sum) if routine.options[:progressbar] end {errors: routine.errors, import: routine.stats, leftovers: routine.leftovers} end @@ -152,10 +150,8 @@ def empty_objects_or_scope?(objects_or_scope) def import_linear(objects, routine) ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload| - progress_bar = ThreadSafeProgressBar.new(routine.options[:progressbar]) { adapter.import_count(objects) } adapter.import(*objects, routine.options) do |action_objects| routine.process(**action_objects) - progress_bar.increment(action_objects.map { |_, v| v.size }.sum) if routine.options[:progressbar] end routine.perform_bulk(routine.leftovers) payload[:import] = routine.stats @@ -170,20 +166,24 @@ def import_parallel(objects, routine) ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload| batches = adapter.import_references(*objects, routine.options.slice(:batch_size)).to_a - progress_bar = ThreadSafeProgressBar.new(routine.options[:progressbar]) { adapter.import_count(objects) } ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base) - - results = ::Parallel.map_with_index(batches, routine.parallel_options) do |ids, index| - progress_bar.wait_until_ready - ActiveRecord::Base.connection_pool.with_connection do - IMPORT_WORKER.call(self, routine.options, total, progress_bar, ids, index) - end - end - + results = ::Parallel.map_with_index( + batches, + routine.parallel_options, + &IMPORT_WORKER.curry[self, routine.options, batches.size] + ) ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base) errors, import, leftovers = process_parallel_import_results(results) - execute_leftovers(leftovers, routine, self, errors) + if leftovers.present? + batches = leftovers.each_slice(routine.options[:batch_size]) + results = ::Parallel.map_with_index( + batches, + routine.parallel_options, + &LEFTOVERS_WORKER.curry[self, routine.options, batches.size] + ) + errors.concat(results.flatten(1)) + end payload[:import] = import payload[:errors] = payload_errors(errors) if errors.present? @@ -191,18 +191,6 @@ def import_parallel(objects, routine) end end - def execute_leftovers(leftovers, routine, self_object, errors) - return unless leftovers.present? - - batches = leftovers.each_slice(routine.options[:batch_size]) - results = ::Parallel.map_with_index( - batches, - routine.parallel_options, - &LEFTOVERS_WORKER.curry[self_object, routine.options, batches.size] - ) - errors.concat(results.flatten(1)) - end - def process_parallel_import_results(results) results.each_with_object([[], {}, []]) do |r, (e, i, l)| e.concat(r[:errors]) diff --git a/lib/chewy/index/import/routine.rb b/lib/chewy/index/import/routine.rb index b82e51c37..556510572 100644 --- a/lib/chewy/index/import/routine.rb +++ b/lib/chewy/index/import/routine.rb @@ -51,9 +51,9 @@ def initialize(index, **options) @parallel_options = @options.delete(:parallel) if @parallel_options && !@parallel_options.is_a?(Hash) @parallel_options = if @parallel_options.is_a?(Integer) - {in_threads: @parallel_options} + {in_processes: @parallel_options} else - {in_threads: [::Parallel.processor_count, ActiveRecord::Base.connection_pool.size].min} + {} end end @errors = [] diff --git a/lib/chewy/index/import/thread_safe_progress_bar.rb b/lib/chewy/index/import/thread_safe_progress_bar.rb deleted file mode 100644 index bee2a6fa3..000000000 --- a/lib/chewy/index/import/thread_safe_progress_bar.rb +++ /dev/null @@ -1,40 +0,0 @@ -module Chewy - class Index - module Import - # This class performs the threading for parallel import to avoid concurrency during progressbar output. - # - # @see Chewy::Type::Import::ClassMethods#import with `parallel: true` option - class ThreadSafeProgressBar - def initialize(enabled) - @enabled = enabled - - return unless @enabled - - @mutex = Mutex.new - @released = false - @progressbar = ProgressBar.create total: nil - Thread.new do - ActiveRecord::Base.connection_pool.with_connection do - @mutex.synchronize { @released = true } - @progressbar.total = yield - end - end - end - - def increment(value) - return unless @enabled - - @mutex.synchronize do - @progressbar.progress += value - end - end - - def wait_until_ready - return true unless @enabled - - @mutex.synchronize { @released } until @released - end - end - end - end -end diff --git a/lib/chewy/rake_helper.rb b/lib/chewy/rake_helper.rb index 4d09efd8e..27d1f90ef 100644 --- a/lib/chewy/rake_helper.rb +++ b/lib/chewy/rake_helper.rb @@ -271,7 +271,7 @@ def human_duration(seconds) def reset_one(index, output, parallel: false) output.puts "Resetting #{index}" - index.reset!((Time.now.to_f * 1000).round, parallel: parallel, progressbar: ENV['PROGRESS'] == 'true') + index.reset!((Time.now.to_f * 1000).round, parallel: parallel) end end end diff --git a/lib/chewy/version.rb b/lib/chewy/version.rb index 0074977ca..3ff5d68d0 100644 --- a/lib/chewy/version.rb +++ b/lib/chewy/version.rb @@ -1,3 +1,3 @@ module Chewy - VERSION = '7.2.1'.freeze + VERSION = '7.2.2'.freeze end diff --git a/spec/chewy/index/actions_spec.rb b/spec/chewy/index/actions_spec.rb index 2092fc85a..0ca812d92 100644 --- a/spec/chewy/index/actions_spec.rb +++ b/spec/chewy/index/actions_spec.rb @@ -502,7 +502,7 @@ specify do expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: before_import_body).once expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once - expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: false).and_call_original + expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: false).and_call_original expect(CitiesIndex.reset!(suffix)).to eq(true) end @@ -525,7 +525,7 @@ .to receive(:put_settings).with(index: name, body: before_import_body).once expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once expect(CitiesIndex) - .to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: false).and_call_original + .to receive(:import).with(suffix: suffix, journal: false, refresh: false).and_call_original expect(CitiesIndex.reset!(suffix)).to eq(true) end @@ -541,7 +541,7 @@ let(:reset_disable_refresh_interval) { false } specify do expect(CitiesIndex.client.indices).not_to receive(:put_settings) - expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: true).and_call_original + expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original expect(CitiesIndex.reset!(suffix)).to eq(true) end end @@ -568,7 +568,7 @@ specify do expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: before_import_body).once expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once - expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: true).and_call_original + expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original expect(CitiesIndex.reset!(suffix)).to eq(true) end end @@ -577,7 +577,7 @@ let(:reset_no_replicas) { false } specify do expect(CitiesIndex.client.indices).not_to receive(:put_settings) - expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: true).and_call_original + expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original expect(CitiesIndex.reset!(suffix)).to eq(true) end end @@ -667,7 +667,7 @@ specify do expect(CitiesIndex) .to receive(:import) - .with(suffix: 'suffix', progressbar: false, parallel: true, journal: false, refresh: true) + .with(suffix: 'suffix', parallel: true, journal: false, refresh: true) .once.and_return(true) expect(CitiesIndex.reset!('suffix', parallel: true)).to eq(true) end diff --git a/spec/chewy/index/import/routine_spec.rb b/spec/chewy/index/import/routine_spec.rb index ff31edcca..40eb3d7ae 100644 --- a/spec/chewy/index/import/routine_spec.rb +++ b/spec/chewy/index/import/routine_spec.rb @@ -64,8 +64,8 @@ describe '#parallel_options' do specify { expect(described_class.new(CitiesIndex).parallel_options).to be_nil } - specify { expect(described_class.new(CitiesIndex, parallel: true).parallel_options).to eq({in_threads: [::Parallel.processor_count, ActiveRecord::Base.connection_pool.size].min}) } - specify { expect(described_class.new(CitiesIndex, parallel: 3).parallel_options).to eq(in_threads: 3) } + specify { expect(described_class.new(CitiesIndex, parallel: true).parallel_options).to eq({}) } + specify { expect(described_class.new(CitiesIndex, parallel: 3).parallel_options).to eq(in_processes: 3) } specify do expect(described_class.new(CitiesIndex, parallel: {in_threads: 2}).parallel_options).to eq(in_threads: 2) end diff --git a/spec/chewy/index/import_spec.rb b/spec/chewy/index/import_spec.rb index 6804481d8..ca7d79e89 100644 --- a/spec/chewy/index/import_spec.rb +++ b/spec/chewy/index/import_spec.rb @@ -493,35 +493,6 @@ def import(*args) it_behaves_like 'importing' end - - context 'with progressbar output' do - let(:mocked_progressbar) { Struct.new(:progress, :total).new(0, 100) } - - it 'imports tracks progress in a single batch' do - expect(ProgressBar).to receive(:create).and_return(mocked_progressbar) - expect(mocked_progressbar).to receive(:progress).at_least(:once).and_call_original - expect(CitiesIndex).to receive(:import_parallel).and_call_original - - CitiesIndex.import(parallel: 1, progressbar: true) - - expect(mocked_progressbar.progress).to eq(3) - expect(mocked_progressbar.total).to eq(3) - end - - it 'imports tracks progress in many batches' do - expect(ProgressBar).to receive(:create).and_return(mocked_progressbar) - expect(mocked_progressbar).to receive(:progress).at_least(:once).and_call_original - expect(CitiesIndex).to receive(:import_parallel).and_call_original - - batches = City.pluck(:id).map { |id| [id] } - expect(CitiesIndex.adapter).to receive(:import_references).and_return(batches) - - CitiesIndex.import(parallel: 3, progressbar: true) - - expect(mocked_progressbar.progress).to eq(3) - expect(mocked_progressbar.total).to eq(3) - end - end end describe '.import!', :orm do @@ -535,9 +506,7 @@ def import(*args) end end - specify do - expect { CitiesIndex.import!(dummy_cities) }.to raise_error Chewy::ImportFailed - end + specify { expect { CitiesIndex.import!(dummy_cities) }.to raise_error Chewy::ImportFailed } end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 3f2501564..48e901d6c 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -8,7 +8,6 @@ require 'rspec/collection_matchers' require 'timecop' -require 'ruby-progressbar' Kaminari::Hooks.init if defined?(::Kaminari::Hooks) diff --git a/spec/support/active_record.rb b/spec/support/active_record.rb index 26068c60f..45c3c501b 100644 --- a/spec/support/active_record.rb +++ b/spec/support/active_record.rb @@ -1,6 +1,6 @@ require 'database_cleaner' -ActiveRecord::Base.establish_connection(adapter: 'sqlite3', database: 'file::memory:?cache=shared', pool: 1) +ActiveRecord::Base.establish_connection(adapter: 'sqlite3', database: 'file::memory:?cache=shared', pool: 10) ActiveRecord::Base.logger = Logger.new('/dev/null') if ActiveRecord::Base.respond_to?(:raise_in_transactional_callbacks) ActiveRecord::Base.raise_in_transactional_callbacks = true