Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix #469] Print log progress on import rake task #787

Merged
merged 19 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
* in rake tasks output (e.g. `Imported CitiesIndex::City in 1s, stats: index 3` -> `Imported CitiesIndex in 1s, stats: index 3`)
* Use index name instead of type name in loader additional scope
* e.g. `CitiesIndex.filter(...).load(city: {scope: City.where(...)})` -> `CitiesIndex.filter(...).load(cities: {scope: City.where(...)})`
* [#469](https://github.com/toptal/chewy/issues/469): Add ability to output progressbar with `ENV['PROGRESS']` during `reset` rake tasks ([@Vitalina-Vakulchyk][]):
* for `rake chewy:reset` and `rake chewy:parallel:reset`
* progressbar is hidden by default, set `ENV['PROGRESS']` to `true` to display it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have been in the unreleased section, between lines 7 and 9. Or probably better in the new features sections (5-7).

* [#692](https://github.com/toptal/chewy/issues/692): Add `.update_mapping` to Index class ([@Vitalina-Vakulchyk][]):
* Wrapped Elasticsearch gem `.put_mapping` with `.update_mapping` in Index class
* Add `rake chewy:update_mapping` task
Expand Down
1 change: 1 addition & 0 deletions chewy.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength
spec.add_dependency 'activesupport', '>= 5.2'
spec.add_dependency 'elasticsearch', '>= 6.3.0'
spec.add_dependency 'elasticsearch-dsl'
spec.add_dependency 'ruby-progressbar'
end
21 changes: 13 additions & 8 deletions lib/chewy/index/actions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def purge!(suffix = nil)
# @param journal [true, false] journaling is switched off for import during reset by default
# @param import_options [Hash] options, passed to the import call
# @return [true, false] false in case of errors
def reset!(suffix = nil, apply_journal: true, journal: false, **import_options)
def reset!(suffix = nil, apply_journal: true, journal: false, progressbar: false, **import_options)
result = if suffix.present?
start_time = Time.now
indexes = self.indexes - [index_name]
Expand All @@ -159,17 +159,13 @@ def reset!(suffix = nil, apply_journal: true, journal: false, **import_options)
result = import import_options.merge(
suffix: suffix,
journal: journal,
refresh: !Chewy.reset_disable_refresh_interval
refresh: !Chewy.reset_disable_refresh_interval,
progressbar: progressbar
)
original_index_settings suffixed_name

delete if indexes.blank?
client.indices.update_aliases body: {actions: [
*indexes.map do |index|
{remove: {index: index, alias: general_name}}
end,
{add: {index: suffixed_name, alias: general_name}}
]}
update_aliases(indexes, general_name, suffixed_name)
client.indices.delete index: indexes if indexes.present?

self.journal.apply(start_time, **import_options) if apply_journal
Expand Down Expand Up @@ -237,6 +233,15 @@ def sync(parallel: nil)

private

def update_aliases(indexes, general_name, suffixed_name)
client.indices.update_aliases body: {actions: [
*indexes.map do |index|
{remove: {index: index, alias: general_name}}
end,
{add: {index: suffixed_name, alias: general_name}}
]}
end

def optimize_index_settings(index_name)
settings = {}
settings[:refresh_interval] = -1 if Chewy.reset_disable_refresh_interval
Expand Down
10 changes: 10 additions & 0 deletions lib/chewy/index/adapter/object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ def load(ids, **options)
end
end

def import_count(*args)
collection = if args.first.empty? && @target.respond_to?(import_all_method)
@target.send(import_all_method)
else
args.flatten(1).compact
end

collection.count
end

private

def import_objects(objects, options)
Expand Down
12 changes: 12 additions & 0 deletions lib/chewy/index/adapter/orm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ def load(ids, **options)
ids.map { |id| loaded_objects[id.to_s] }
end

def import_count(*args)
collection = if args.first.empty?
default_scope
elsif args.first.is_a?(relation_class)
args.first
else
args.flatten.compact
end

collection.count
end

private

def import_objects(collection, options)
Expand Down
42 changes: 27 additions & 15 deletions lib/chewy/index/import.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@
require 'chewy/index/import/bulk_builder'
require 'chewy/index/import/bulk_request'
require 'chewy/index/import/routine'
require 'chewy/index/import/thread_safe_progress_bar'

module Chewy
class Index
module Import
extend ActiveSupport::Concern

IMPORT_WORKER = lambda do |index, options, total, ids, iteration|
IMPORT_WORKER = lambda do |index, options, total, progress_bar, ids, iteration|
::Process.setproctitle("chewy [#{index}]: import data (#{iteration + 1}/#{total})")
routine = Routine.new(index, **options)
index.adapter.import(*ids, routine.options) do |action_objects|
routine.process(**action_objects)
progress_bar.increment(action_objects.map { |_, v| v.size }.sum) if routine.options[:progressbar]
end
{errors: routine.errors, import: routine.stats, leftovers: routine.leftovers}
end
Expand Down Expand Up @@ -150,8 +152,10 @@ def empty_objects_or_scope?(objects_or_scope)

def import_linear(objects, routine)
ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload|
progress_bar = ThreadSafeProgressBar.new(routine.options[:progressbar]) { adapter.import_count(objects) }
adapter.import(*objects, routine.options) do |action_objects|
routine.process(**action_objects)
progress_bar.increment(action_objects.map { |_, v| v.size }.sum) if routine.options[:progressbar]
end
routine.perform_bulk(routine.leftovers)
payload[:import] = routine.stats
Expand All @@ -166,31 +170,39 @@ def import_parallel(objects, routine)
ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload|
batches = adapter.import_references(*objects, routine.options.slice(:batch_size)).to_a

progress_bar = ThreadSafeProgressBar.new(routine.options[:progressbar]) { adapter.import_count(objects) }
::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base)
results = ::Parallel.map_with_index(
batches,
routine.parallel_options,
&IMPORT_WORKER.curry[self, routine.options, batches.size]
)

results = ::Parallel.map_with_index(batches, routine.parallel_options) do |ids, index|
progress_bar.wait_until_ready
ActiveRecord::Base.connection_pool.with_connection do
IMPORT_WORKER.call(self, routine.options, total, progress_bar, ids, index)
end
end

::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base)
errors, import, leftovers = process_parallel_import_results(results)

if leftovers.present?
batches = leftovers.each_slice(routine.options[:batch_size])
results = ::Parallel.map_with_index(
batches,
routine.parallel_options,
&LEFTOVERS_WORKER.curry[self, routine.options, batches.size]
)
errors.concat(results.flatten(1))
end
execute_leftovers(leftovers, routine, self, errors)

payload[:import] = import
payload[:errors] = payload_errors(errors) if errors.present?
payload[:errors]
end
end

def execute_leftovers(leftovers, routine, self_object, errors)
return unless leftovers.present?

batches = leftovers.each_slice(routine.options[:batch_size])
results = ::Parallel.map_with_index(
batches,
routine.parallel_options,
&LEFTOVERS_WORKER.curry[self_object, routine.options, batches.size]
)
errors.concat(results.flatten(1))
end

def process_parallel_import_results(results)
results.each_with_object([[], {}, []]) do |r, (e, i, l)|
e.concat(r[:errors])
Expand Down
4 changes: 2 additions & 2 deletions lib/chewy/index/import/routine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ def initialize(index, **options)
@parallel_options = @options.delete(:parallel)
if @parallel_options && !@parallel_options.is_a?(Hash)
@parallel_options = if @parallel_options.is_a?(Integer)
{in_processes: @parallel_options}
{in_threads: @parallel_options}
else
{}
{in_threads: [::Parallel.processor_count, ActiveRecord::Base.connection_pool.size].min}
end
end
@errors = []
Expand Down
40 changes: 40 additions & 0 deletions lib/chewy/index/import/thread_safe_progress_bar.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module Chewy
class Index
module Import
# This class performs the threading for parallel import to avoid concurrency during progressbar output.
#
# @see Chewy::Type::Import::ClassMethods#import with `parallel: true` option
class ThreadSafeProgressBar
def initialize(enabled)
@enabled = enabled

return unless @enabled

@mutex = Mutex.new
@released = false
@progressbar = ProgressBar.create total: nil
Thread.new do
ActiveRecord::Base.connection_pool.with_connection do
@mutex.synchronize { @released = true }
@progressbar.total = yield
end
end
end

def increment(value)
return unless @enabled

@mutex.synchronize do
@progressbar.progress += value
end
end

def wait_until_ready
return true unless @enabled

@mutex.synchronize { @released } until @released
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/chewy/rake_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def human_duration(seconds)

def reset_one(index, output, parallel: false)
output.puts "Resetting #{index}"
index.reset!((Time.now.to_f * 1000).round, parallel: parallel)
index.reset!((Time.now.to_f * 1000).round, parallel: parallel, progressbar: ENV['PROGRESS'] == 'true')
end
end
end
Expand Down
12 changes: 6 additions & 6 deletions spec/chewy/index/actions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@
specify do
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: before_import_body).once
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: false).and_call_original
expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: false).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end

Expand All @@ -525,7 +525,7 @@
.to receive(:put_settings).with(index: name, body: before_import_body).once
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once
expect(CitiesIndex)
.to receive(:import).with(suffix: suffix, journal: false, refresh: false).and_call_original
.to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: false).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end

Expand All @@ -541,7 +541,7 @@
let(:reset_disable_refresh_interval) { false }
specify do
expect(CitiesIndex.client.indices).not_to receive(:put_settings)
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original
expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: true).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end
end
Expand All @@ -568,7 +568,7 @@
specify do
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: before_import_body).once
expect(CitiesIndex.client.indices).to receive(:put_settings).with(index: name, body: after_import_body).once
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original
expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: true).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end
end
Expand All @@ -577,7 +577,7 @@
let(:reset_no_replicas) { false }
specify do
expect(CitiesIndex.client.indices).not_to receive(:put_settings)
expect(CitiesIndex).to receive(:import).with(suffix: suffix, journal: false, refresh: true).and_call_original
expect(CitiesIndex).to receive(:import).with(suffix: suffix, progressbar: false, journal: false, refresh: true).and_call_original
expect(CitiesIndex.reset!(suffix)).to eq(true)
end
end
Expand Down Expand Up @@ -667,7 +667,7 @@
specify do
expect(CitiesIndex)
.to receive(:import)
.with(suffix: 'suffix', parallel: true, journal: false, refresh: true)
.with(suffix: 'suffix', progressbar: false, parallel: true, journal: false, refresh: true)
.once.and_return(true)
expect(CitiesIndex.reset!('suffix', parallel: true)).to eq(true)
end
Expand Down
4 changes: 2 additions & 2 deletions spec/chewy/index/import/routine_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@

describe '#parallel_options' do
specify { expect(described_class.new(CitiesIndex).parallel_options).to be_nil }
specify { expect(described_class.new(CitiesIndex, parallel: true).parallel_options).to eq({}) }
specify { expect(described_class.new(CitiesIndex, parallel: 3).parallel_options).to eq(in_processes: 3) }
specify { expect(described_class.new(CitiesIndex, parallel: true).parallel_options).to eq({in_threads: [::Parallel.processor_count, ActiveRecord::Base.connection_pool.size].min}) }
specify { expect(described_class.new(CitiesIndex, parallel: 3).parallel_options).to eq(in_threads: 3) }
specify do
expect(described_class.new(CitiesIndex, parallel: {in_threads: 2}).parallel_options).to eq(in_threads: 2)
end
Expand Down
33 changes: 32 additions & 1 deletion spec/chewy/index/import_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,35 @@ def import(*args)

it_behaves_like 'importing'
end

context 'with progressbar output' do
let(:mocked_progressbar) { Struct.new(:progress, :total).new(0, 100) }

it 'imports tracks progress in a single batch' do
expect(ProgressBar).to receive(:create).and_return(mocked_progressbar)
expect(mocked_progressbar).to receive(:progress).at_least(:once).and_call_original
expect(CitiesIndex).to receive(:import_parallel).and_call_original

CitiesIndex.import(parallel: 1, progressbar: true)

expect(mocked_progressbar.progress).to eq(3)
expect(mocked_progressbar.total).to eq(3)
end

it 'imports tracks progress in many batches' do
expect(ProgressBar).to receive(:create).and_return(mocked_progressbar)
expect(mocked_progressbar).to receive(:progress).at_least(:once).and_call_original
expect(CitiesIndex).to receive(:import_parallel).and_call_original

batches = City.pluck(:id).map { |id| [id] }
expect(CitiesIndex.adapter).to receive(:import_references).and_return(batches)

CitiesIndex.import(parallel: 3, progressbar: true)

expect(mocked_progressbar.progress).to eq(3)
expect(mocked_progressbar.total).to eq(3)
end
end
end

describe '.import!', :orm do
Expand All @@ -506,7 +535,9 @@ def import(*args)
end
end

specify { expect { CitiesIndex.import!(dummy_cities) }.to raise_error Chewy::ImportFailed }
specify do
expect { CitiesIndex.import!(dummy_cities) }.to raise_error Chewy::ImportFailed
end
end
end

Expand Down
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require 'rspec/collection_matchers'

require 'timecop'
require 'ruby-progressbar'

Kaminari::Hooks.init if defined?(::Kaminari::Hooks)

Expand Down
2 changes: 1 addition & 1 deletion spec/support/active_record.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require 'database_cleaner'

ActiveRecord::Base.establish_connection(adapter: 'sqlite3', database: 'file::memory:?cache=shared', pool: 10)
ActiveRecord::Base.establish_connection(adapter: 'sqlite3', database: 'file::memory:?cache=shared', pool: 1)
Copy link
Contributor

@dalthon dalthon Apr 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be interesting to explain here that test suite doesn't run properly with more than one database connection, otherwise it does not find tables

ActiveRecord::Base.logger = Logger.new('/dev/null')
if ActiveRecord::Base.respond_to?(:raise_in_transactional_callbacks)
ActiveRecord::Base.raise_in_transactional_callbacks = true
Expand Down