diff --git a/lib/chewy/type/import.rb b/lib/chewy/type/import.rb index f5c272b4e..f34b87e06 100644 --- a/lib/chewy/type/import.rb +++ b/lib/chewy/type/import.rb @@ -8,8 +8,8 @@ class Type module Import extend ActiveSupport::Concern - IMPORT_WORKER = lambda do |type, options, ids| - ::Process.setproctitle("chewy import #{type}[#{::Parallel.worker_number}]") + IMPORT_WORKER = lambda do |type, options, total, ids, index| + ::Process.setproctitle("chewy [#{type}]: import data (#{index + 1}/#{total})") routine = Routine.new(type, options) type.adapter.import(*ids, routine.options) do |action_objects| routine.process(**action_objects) @@ -17,8 +17,8 @@ module Import {errors: routine.errors, import: routine.stats, leftovers: routine.leftovers} end - LEFTOVERS_WORKER = lambda do |type, options, body| - ::Process.setproctitle("chewy import #{type}[#{::Parallel.worker_number}]") + LEFTOVERS_WORKER = lambda do |type, options, total, body, index| + ::Process.setproctitle("chewy [#{type}]: import leftovers (#{index + 1}/#{total})") routine = Routine.new(type, options) routine.perform_bulk(body) routine.errors @@ -155,13 +155,13 @@ def import_parallel(objects, routine) batches = adapter.import_references(*objects, routine.options.slice(:batch_size)).to_a ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base) - results = ::Parallel.map(batches, routine.parallel_options, &IMPORT_WORKER.curry[self, routine.options]) + results = ::Parallel.map_with_index(batches, routine.parallel_options, &IMPORT_WORKER.curry[self, routine.options, batches.size]) ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base) errors, import, leftovers = process_parallel_import_results(results) if leftovers.present? batches = leftovers.each_slice(routine.options[:batch_size]) - results = ::Parallel.map(batches, routine.parallel_options, &LEFTOVERS_WORKER.curry[self, routine.options]) + results = ::Parallel.map_with_index(batches, routine.parallel_options, &LEFTOVERS_WORKER.curry[self, routine.options, batches.size]) errors.concat(results.flatten(1)) end diff --git a/lib/chewy/type/syncer.rb b/lib/chewy/type/syncer.rb index 5f0a466c8..f6fa4a151 100644 --- a/lib/chewy/type/syncer.rb +++ b/lib/chewy/type/syncer.rb @@ -28,7 +28,8 @@ class Type class Syncer DEFAULT_SYNC_BATCH_SIZE = 20_000 ISO_DATETIME = /\A(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)(\.\d+)?\z/ - OUTDATED_IDS_WORKER = lambda do |outdated_sync_field_type, source_data_hash, index_data| + OUTDATED_IDS_WORKER = lambda do |outdated_sync_field_type, source_data_hash, type, total, index_data| + ::Process.setproctitle("chewy [#{type}]: sync outdated calculation (#{::Parallel.worker_number + 1}/#{total})") if type index_data.each_with_object([]) do |(id, index_sync_value), result| next unless source_data_hash[id] @@ -41,14 +42,15 @@ class Syncer result.push(id) if outdated end end - SOURCE_OR_INDEX_DATA_WORKER = lambda do |syncer, type| - result = case type + SOURCE_OR_INDEX_DATA_WORKER = lambda do |syncer, type, kind| + ::Process.setproctitle("chewy [#{type}]: sync fetching data (#{kind})") + result = case kind when :source syncer.send(:fetch_source_data) when :index syncer.send(:fetch_index_data) end - {type => result} + {kind => result} end def self.typecast_date(string) @@ -143,7 +145,7 @@ def source_and_index_data @source_and_index_data ||= begin if @parallel ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base) - result = ::Parallel.map(%i[source index], @parallel, &SOURCE_OR_INDEX_DATA_WORKER.curry[self]) + result = ::Parallel.map(%i[source index], @parallel, &SOURCE_OR_INDEX_DATA_WORKER.curry[self, @type]) ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base) if result.first.keys.first == :source [result.first.values.first, result.second.values.first] @@ -182,7 +184,7 @@ def data_ids(data) end def linear_outdated_ids - OUTDATED_IDS_WORKER.call(outdated_sync_field_type, source_data.to_h, index_data) + OUTDATED_IDS_WORKER.call(outdated_sync_field_type, source_data.to_h, nil, nil, index_data) end def parallel_outdated_ids @@ -190,7 +192,7 @@ def parallel_outdated_ids batches = index_data.each_slice(size) ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base) - result = ::Parallel.map(batches, @parallel, &OUTDATED_IDS_WORKER.curry[outdated_sync_field_type, source_data.to_h]).flatten(1) + result = ::Parallel.map(batches, @parallel, &OUTDATED_IDS_WORKER.curry[outdated_sync_field_type, source_data.to_h, @type, batches.size]).flatten(1) ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base) result end