Skip to content

Commit

Permalink
Improve parallel wroker titles
Browse files Browse the repository at this point in the history
  • Loading branch information
pyromaniac committed Aug 6, 2017
1 parent 4a21b57 commit ba16ba0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
12 changes: 6 additions & 6 deletions lib/chewy/type/import.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ class Type
module Import
extend ActiveSupport::Concern

IMPORT_WORKER = lambda do |type, options, ids|
::Process.setproctitle("chewy import #{type}[#{::Parallel.worker_number}]")
IMPORT_WORKER = lambda do |type, options, total, ids, index|
::Process.setproctitle("chewy [#{type}]: import data (#{index + 1}/#{total})")
routine = Routine.new(type, options)
type.adapter.import(*ids, routine.options) do |action_objects|
routine.process(**action_objects)
end
{errors: routine.errors, import: routine.stats, leftovers: routine.leftovers}
end

LEFTOVERS_WORKER = lambda do |type, options, body|
::Process.setproctitle("chewy import #{type}[#{::Parallel.worker_number}]")
LEFTOVERS_WORKER = lambda do |type, options, total, body, index|
::Process.setproctitle("chewy [#{type}]: import leftovers (#{index + 1}/#{total})")
routine = Routine.new(type, options)
routine.perform_bulk(body)
routine.errors
Expand Down Expand Up @@ -155,13 +155,13 @@ def import_parallel(objects, routine)
batches = adapter.import_references(*objects, routine.options.slice(:batch_size)).to_a

::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base)
results = ::Parallel.map(batches, routine.parallel_options, &IMPORT_WORKER.curry[self, routine.options])
results = ::Parallel.map_with_index(batches, routine.parallel_options, &IMPORT_WORKER.curry[self, routine.options, batches.size])
::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base)
errors, import, leftovers = process_parallel_import_results(results)

if leftovers.present?
batches = leftovers.each_slice(routine.options[:batch_size])
results = ::Parallel.map(batches, routine.parallel_options, &LEFTOVERS_WORKER.curry[self, routine.options])
results = ::Parallel.map_with_index(batches, routine.parallel_options, &LEFTOVERS_WORKER.curry[self, routine.options, batches.size])
errors.concat(results.flatten(1))
end

Expand Down
16 changes: 9 additions & 7 deletions lib/chewy/type/syncer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class Type
class Syncer
DEFAULT_SYNC_BATCH_SIZE = 20_000
ISO_DATETIME = /\A(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)(\.\d+)?\z/
OUTDATED_IDS_WORKER = lambda do |outdated_sync_field_type, source_data_hash, index_data|
OUTDATED_IDS_WORKER = lambda do |outdated_sync_field_type, source_data_hash, type, total, index_data|
::Process.setproctitle("chewy [#{type}]: sync outdated calculation (#{::Parallel.worker_number + 1}/#{total})") if type
index_data.each_with_object([]) do |(id, index_sync_value), result|
next unless source_data_hash[id]

Expand All @@ -41,14 +42,15 @@ class Syncer
result.push(id) if outdated
end
end
SOURCE_OR_INDEX_DATA_WORKER = lambda do |syncer, type|
result = case type
SOURCE_OR_INDEX_DATA_WORKER = lambda do |syncer, type, kind|
::Process.setproctitle("chewy [#{type}]: sync fetching data (#{kind})")
result = case kind
when :source
syncer.send(:fetch_source_data)
when :index
syncer.send(:fetch_index_data)
end
{type => result}
{kind => result}
end

def self.typecast_date(string)
Expand Down Expand Up @@ -143,7 +145,7 @@ def source_and_index_data
@source_and_index_data ||= begin
if @parallel
::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base)
result = ::Parallel.map(%i[source index], @parallel, &SOURCE_OR_INDEX_DATA_WORKER.curry[self])
result = ::Parallel.map(%i[source index], @parallel, &SOURCE_OR_INDEX_DATA_WORKER.curry[self, @type])
::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base)
if result.first.keys.first == :source
[result.first.values.first, result.second.values.first]
Expand Down Expand Up @@ -182,15 +184,15 @@ def data_ids(data)
end

def linear_outdated_ids
OUTDATED_IDS_WORKER.call(outdated_sync_field_type, source_data.to_h, index_data)
OUTDATED_IDS_WORKER.call(outdated_sync_field_type, source_data.to_h, nil, nil, index_data)
end

def parallel_outdated_ids
size = processor_count.zero? ? index_data.size : (index_data.size / processor_count.to_f).ceil
batches = index_data.each_slice(size)

::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base)
result = ::Parallel.map(batches, @parallel, &OUTDATED_IDS_WORKER.curry[outdated_sync_field_type, source_data.to_h]).flatten(1)
result = ::Parallel.map(batches, @parallel, &OUTDATED_IDS_WORKER.curry[outdated_sync_field_type, source_data.to_h, @type, batches.size]).flatten(1)
::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base)
result
end
Expand Down

0 comments on commit ba16ba0

Please sign in to comment.