Skip to content

Commit

Permalink
[Fix #469] Print log progress on import rake task (#787)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vitalina-Vakulchyk authored Apr 23, 2021
1 parent 64f7343 commit cb7e4c4
Show file tree
Hide file tree
Showing 14 changed files with 151 additions and 36 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
* in rake tasks output (e.g. `Imported CitiesIndex::City in 1s, stats: index 3` -> `Imported CitiesIndex in 1s, stats: index 3`)
* Use index name instead of type name in loader additional scope
* e.g. `CitiesIndex.filter(...).load(city: {scope: City.where(...)})` -> `CitiesIndex.filter(...).load(cities: {scope: City.where(...)})`
* [#469](https://github.com/toptal/chewy/issues/469): Add ability to output progressbar with `ENV['PROGRESS']` during `reset` rake tasks ([@Vitalina-Vakulchyk][]):
* for `rake chewy:reset` and `rake chewy:parallel:reset`
* progressbar is hidden by default, set `ENV['PROGRESS']` to `true` to display it
* [#692](https://github.com/toptal/chewy/issues/692): Add `.update_mapping` to Index class ([@Vitalina-Vakulchyk][]):
* Wrapped Elasticsearch gem `.put_mapping` with `.update_mapping` in Index class
* Add `rake chewy:update_mapping` task
Expand Down
1 change: 1 addition & 0 deletions chewy.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength
spec.add_dependency 'activesupport', '>= 5.2'
spec.add_dependency 'elasticsearch', '>= 6.3.0'
spec.add_dependency 'elasticsearch-dsl'
spec.add_dependency 'ruby-progressbar'
end
21 changes: 13 additions & 8 deletions lib/chewy/index/actions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def purge!(suffix = nil)
# @param journal [true, false] journaling is switched off for import during reset by default
# @param import_options [Hash] options, passed to the import call
# @return [true, false] false in case of errors
def reset!(suffix = nil, apply_journal: true, journal: false, **import_options)
def reset!(suffix = nil, apply_journal: true, journal: false, progressbar: false, **import_options)
result = if suffix.present?
start_time = Time.now
indexes = self.indexes - [index_name]
Expand All @@ -159,17 +159,13 @@ def reset!(suffix = nil, apply_journal: true, journal: false, **import_options)
result = import import_options.merge(
suffix: suffix,
journal: journal,
refresh: !Chewy.reset_disable_refresh_interval
refresh: !Chewy.reset_disable_refresh_interval,
progressbar: progressbar
)
original_index_settings suffixed_name

delete if indexes.blank?
client.indices.update_aliases body: {actions: [
*indexes.map do |index|
{remove: {index: index, alias: general_name}}
end,
{add: {index: suffixed_name, alias: general_name}}
]}
update_aliases(indexes, general_name, suffixed_name)
client.indices.delete index: indexes if indexes.present?

self.journal.apply(start_time, **import_options) if apply_journal
Expand Down Expand Up @@ -237,6 +233,15 @@ def sync(parallel: nil)

private

def update_aliases(indexes, general_name, suffixed_name)
client.indices.update_aliases body: {actions: [
*indexes.map do |index|
{remove: {index: index, alias: general_name}}
end,
{add: {index: suffixed_name, alias: general_name}}
]}
end

def optimize_index_settings(index_name)
settings = {}
settings[:refresh_interval] = -1 if Chewy.reset_disable_refresh_interval
Expand Down
10 changes: 10 additions & 0 deletions lib/chewy/index/adapter/object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ def load(ids, **options)
end
end

def import_count(*args)
collection = if args.first.empty? && @target.respond_to?(import_all_method)
@target.send(import_all_method)
else
args.flatten(1).compact
end

collection.count
end

private

def import_objects(objects, options)
Expand Down
12 changes: 12 additions & 0 deletions lib/chewy/index/adapter/orm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ def load(ids, **options)
ids.map { |id| loaded_objects[id.to_s] }
end

def import_count(*args)
collection = if args.first.empty?
default_scope
elsif args.first.is_a?(relation_class)
args.first
else
args.flatten.compact
end

collection.count
end

private

def import_objects(collection, options)
Expand Down
42 changes: 27 additions & 15 deletions lib/chewy/index/import.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@
require 'chewy/index/import/bulk_builder'
require 'chewy/index/import/bulk_request'
require 'chewy/index/import/routine'
require 'chewy/index/import/thread_safe_progress_bar'

module Chewy
class Index
module Import
extend ActiveSupport::Concern

IMPORT_WORKER = lambda do |index, options, total, ids, iteration|
IMPORT_WORKER = lambda do |index, options, total, progress_bar, ids, iteration|
::Process.setproctitle("chewy [#{index}]: import data (#{iteration + 1}/#{total})")
routine = Routine.new(index, **options)
index.adapter.import(*ids, routine.options) do |action_objects|
routine.process(**action_objects)
progress_bar.increment(action_objects.map { |_, v| v.size }.sum) if routine.options[:progressbar]
end
{errors: routine.errors, import: routine.stats, leftovers: routine.leftovers}
end
Expand Down Expand Up @@ -150,8 +152,10 @@ def empty_objects_or_scope?(objects_or_scope)

def import_linear(objects, routine)
ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload|
progress_bar = ThreadSafeProgressBar.new(routine.options[:progressbar]) { adapter.import_count(objects) }
adapter.import(*objects, routine.options) do |action_objects|
routine.process(**action_objects)
progress_bar.increment(action_objects.map { |_, v| v.size }.sum) if routine.options[:progressbar]
end
routine.perform_bulk(routine.leftovers)
payload[:import] = routine.stats
Expand All @@ -166,31 +170,39 @@ def import_parallel(objects, routine)
ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload|
batches = adapter.import_references(*objects, routine.options.slice(:batch_size)).to_a

progress_bar = ThreadSafeProgressBar.new(routine.options[:progressbar]) { adapter.import_count(objects) }
::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base)
results = ::Parallel.map_with_index(
batches,
routine.parallel_options,
&IMPORT_WORKER.curry[self, routine.options, batches.size]
)

results = ::Parallel.map_with_index(batches, routine.parallel_options) do |ids, index|
progress_bar.wait_until_ready
ActiveRecord::Base.connection_pool.with_connection do
IMPORT_WORKER.call(self, routine.options, total, progress_bar, ids, index)
end
end

::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base)
errors, import, leftovers = process_parallel_import_results(results)

if leftovers.present?
batches = leftovers.each_slice(routine.options[:batch_size])
results = ::Parallel.map_with_index(
batches,
routine.parallel_options,
&LEFTOVERS_WORKER.curry[self, routine.options, batches.size]
)
errors.concat(results.flatten(1))
end
execute_leftovers(leftovers, routine, self, errors)

payload[:import] = import
payload[:errors] = payload_errors(errors) if errors.present?
payload[:errors]
end
end

def execute_leftovers(leftovers, routine, self_object, errors)
return unless leftovers.present?

batches = leftovers.each_slice(routine.options[:batch_size])
results = ::Parallel.map_with_index(
batches,
routine.parallel_options,
&LEFTOVERS_WORKER.curry[self_object, routine.options, batches.size]
)
errors.concat(results.flatten(1))
end

def process_parallel_import_results(results)
results.each_with_object([[], {}, []]) do |r, (e, i, l)|
e.concat(r[:errors])
Expand Down
4 changes: 2 additions & 2 deletions lib/chewy/index/import/routine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ def initialize(index, **options)
@parallel_options = @options.delete(:parallel)
if @parallel_options && !@parallel_options.is_a?(Hash)
@parallel_options = if @parallel_options.is_a?(Integer)
{in_processes: @parallel_options}
{in_threads: @parallel_options}
else
{}
{in_threads: [::Parallel.processor_count, ActiveRecord::Base.connection_pool.size].min}
end
end
@errors = []
Expand Down
40 changes: 40 additions & 0 deletions lib/chewy/index/import/thread_safe_progress_bar.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module Chewy
class Index
module Import
# This class performs the threading for parallel import to avoid concurrency during progressbar output.
#
# @see Chewy::Type::Import::ClassMethods#import with `parallel: true` option
class ThreadSafeProgressBar
def initialize(enabled)
@enabled = enabled

return unless @enabled

@mutex = Mutex.new
@released = false
@progressbar = ProgressBar.create total: nil
Thread.new do
ActiveRecord::Base.connection_pool.with_connection do
@mutex.synchronize { @released = true }
@progressbar.total = yield
end
end
end

def increment(value)
return unless @enabled

@mutex.synchronize do
@progressbar.progress += value
end
end

def wait_until_ready
return true unless @enabled

@mutex.synchronize { @released } until @released
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/chewy/rake_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def human_duration(seconds)

def reset_one(index, output, parallel: false)
output.puts "Resetting #{index}"
index.reset!((Time.now.to_f * 1000).round, parallel: parallel)
index.reset!((Time.now.to_f * 1000).round, parallel: parallel, progressbar: ENV['PROGRESS'] == 'true')
end
end
end
Expand Down
12 changes: 6 additions & 6 deletions spec/chewy/index/actions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@
specify do
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: before_import_body).once
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: false).and_call_original
expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: false).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end

Expand All @@ -525,7 +525,7 @@
.to receive(:put_settings).with(index: name, body: before_import_body).once
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once
expect(CitiesIndex)
.to receive(:import).with(suffix: suffix, journal: false, refresh: false).and_call_original
.to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: false).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end

Expand All @@ -541,7 +541,7 @@
let(:reset_disable_refresh_interval) { false }
specify do
expect(CitiesIndex.client.indices).not_to receive(:put_settings)
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original
expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: true).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end
end
Expand All @@ -568,7 +568,7 @@
specify do
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: before_import_body).once
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original
expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: true).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end
end
Expand All @@ -577,7 +577,7 @@
let(:reset_no_replicas) { false }
specify do
expect(CitiesIndex.client.indices).not_to receive(:put_settings)
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original
expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: true).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end
end
Expand Down Expand Up @@ -667,7 +667,7 @@
specify do
expect(CitiesIndex)
.to receive(:import)
.with(suffix: 'suffix', parallel: true, journal: false, refresh: true)
.with(suffix: 'suffix', progressbar: false, parallel: true, journal: false, refresh: true)
.once.and_return(true)
expect(CitiesIndex.reset!('suffix', parallel: true)).to eq(true)
end
Expand Down
4 changes: 2 additions & 2 deletions spec/chewy/index/import/routine_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@

describe '#parallel_options' do
specify { expect(described_class.new(CitiesIndex).parallel_options).to be_nil }
specify { expect(described_class.new(CitiesIndex, parallel: true).parallel_options).to eq({}) }
specify { expect(described_class.new(CitiesIndex, parallel: 3).parallel_options).to eq(in_processes: 3) }
specify { expect(described_class.new(CitiesIndex, parallel: true).parallel_options).to eq({in_threads: [::Parallel.processor_count, ActiveRecord::Base.connection_pool.size].min}) }
specify { expect(described_class.new(CitiesIndex, parallel: 3).parallel_options).to eq(in_threads: 3) }
specify do
expect(described_class.new(CitiesIndex, parallel: {in_threads: 2}).parallel_options).to eq(in_threads: 2)
end
Expand Down
33 changes: 32 additions & 1 deletion spec/chewy/index/import_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,35 @@ def import(*args)

it_behaves_like 'importing'
end

context 'with progressbar output' do
let(:mocked_progressbar) { Struct.new(:progress, :total).new(0, 100) }

it 'imports tracks progress in a single batch' do
expect(ProgressBar).to receive(:create).and_return(mocked_progressbar)
expect(mocked_progressbar).to receive(:progress).at_least(:once).and_call_original
expect(CitiesIndex).to receive(:import_parallel).and_call_original

CitiesIndex.import(parallel: 1, progressbar: true)

expect(mocked_progressbar.progress).to eq(3)
expect(mocked_progressbar.total).to eq(3)
end

it 'imports tracks progress in many batches' do
expect(ProgressBar).to receive(:create).and_return(mocked_progressbar)
expect(mocked_progressbar).to receive(:progress).at_least(:once).and_call_original
expect(CitiesIndex).to receive(:import_parallel).and_call_original

batches = City.pluck(:id).map { |id| [id] }
expect(CitiesIndex.adapter).to receive(:import_references).and_return(batches)

CitiesIndex.import(parallel: 3, progressbar: true)

expect(mocked_progressbar.progress).to eq(3)
expect(mocked_progressbar.total).to eq(3)
end
end
end

describe '.import!', :orm do
Expand All @@ -506,7 +535,9 @@ def import(*args)
end
end

specify { expect { CitiesIndex.import!(dummy_cities) }.to raise_error Chewy::ImportFailed }
specify do
expect { CitiesIndex.import!(dummy_cities) }.to raise_error Chewy::ImportFailed
end
end
end

Expand Down
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require 'rspec/collection_matchers'

require 'timecop'
require 'ruby-progressbar'

Kaminari::Hooks.init if defined?(::Kaminari::Hooks)

Expand Down
2 changes: 1 addition & 1 deletion spec/support/active_record.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require 'database_cleaner'

ActiveRecord::Base.establish_connection(adapter: 'sqlite3', database: 'file::memory:?cache=shared', pool: 10)
ActiveRecord::Base.establish_connection(adapter: 'sqlite3', database: 'file::memory:?cache=shared', pool: 1)
ActiveRecord::Base.logger = Logger.new('/dev/null')
if ActiveRecord::Base.respond_to?(:raise_in_transactional_callbacks)
ActiveRecord::Base.raise_in_transactional_callbacks = true
Expand Down

0 comments on commit cb7e4c4

Please sign in to comment.