Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fully name scheduler threadpools and thread names; refactor CLI STDOUT #80

Merged
merged 1 commit into from
Aug 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exe/good_job
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/env ruby
require 'good_job/cli'
GOOD_JOB_LOG_TO_STDOUT = true
GoodJob::CLI.start(ARGV)
2 changes: 1 addition & 1 deletion lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
require 'active_job/queue_adapters/good_job_adapter'

module GoodJob
cattr_accessor :logger, default: ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT))
mattr_accessor :logger, default: ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT))
mattr_accessor :preserve_job_records, default: false
mattr_accessor :reperform_jobs_on_standard_error, default: true
mattr_accessor :on_thread_error, default: nil
Expand Down
4 changes: 4 additions & 0 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ def cleanup_preserved_jobs
no_commands do
def set_up_application!
require RAILS_ENVIRONMENT_RB
return unless defined?(GOOD_JOB_LOG_TO_STDOUT) && GOOD_JOB_LOG_TO_STDOUT && !ActiveSupport::Logger.logger_outputs_to?(GoodJob.logger, STDOUT)

GoodJob::LogSubscriber.loggers << ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT))
GoodJob::LogSubscriber.reset_logger
end
end
end
Expand Down
10 changes: 10 additions & 0 deletions lib/good_job/current_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,15 @@ def self.reset
self.error_on_retry = nil
self.error_on_discard = nil
end

# @return [Integer] Current process ID
def self.process_id
Process.pid
end

# @return [String] Current thread name
def self.thread_name
(Thread.current.name || Thread.current.object_id).to_s
end
end
end
2 changes: 1 addition & 1 deletion lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def perform(destroy_after: !GoodJob.preserve_job_records, reperform_on_standard_
)

begin
ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self }) do
ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do
result = ActiveJob::Base.execute(params)
end
rescue StandardError => e
Expand Down
83 changes: 56 additions & 27 deletions lib/good_job/log_subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,79 +32,108 @@ def scheduler_create_pools(event)
performer_name = event.payload[:performer_name]
process_id = event.payload[:process_id]

info_and_stdout(tags: [process_id]) do
info(tags: [process_id]) do
"GoodJob started scheduler with queues=#{performer_name} max_threads=#{max_threads} poll_interval=#{poll_interval}."
end
end

def scheduler_shutdown_start(event)
process_id = event.payload[:process_id]

info_and_stdout(tags: [process_id]) do
info(tags: [process_id]) do
"GoodJob shutting down scheduler..."
end
end

def scheduler_shutdown(event)
process_id = event.payload[:process_id]

info_and_stdout(tags: [process_id]) do
info(tags: [process_id]) do
"GoodJob scheduler is shut down."
end
end

def scheduler_restart_pools(event)
process_id = event.payload[:process_id]

info_and_stdout(tags: [process_id]) do
info(tags: [process_id]) do
"GoodJob scheduler has restarted."
end
end

def perform_job(event)
good_job = event.payload[:good_job]
process_id = event.payload[:process_id]
thread_name = event.payload[:thread_name]

info(tags: [process_id, thread_name]) do
"Executed GoodJob #{good_job.id}"
end
end

def cleanup_preserved_jobs(event)
timestamp = event.payload[:timestamp]
deleted_records_count = event.payload[:deleted_records_count]

info_and_stdout do
info do
"GoodJob deleted #{deleted_records_count} preserved #{'job'.pluralize(deleted_records_count)} finished before #{timestamp}."
end
end

private
class << self
def loggers
@_loggers ||= [GoodJob.logger]
end

def logger
@_logger ||= begin
logger = Logger.new(StringIO.new)
loggers.each do |each_logger|
logger.extend(ActiveSupport::Logger.broadcast(each_logger))
end
logger
end
end

def reset_logger
@_logger = nil
end
end

def logger
GoodJob.logger
GoodJob::LogSubscriber.logger
end

private

def tag_logger(*tags, &block)
tags = tags.dup.unshift("GoodJob").compact

self.class.loggers.inject(block) do |inner, each_logger|
if each_logger.respond_to?(:tagged)
tags_for_logger = if each_logger.formatter.current_tags.include?("ActiveJob")
["ActiveJob"] + tags
else
tags
end

proc { each_logger.tagged(*tags_for_logger, &inner) }
else
inner
end
end.call
end

%w(info debug warn error fatal unknown).each do |level|
class_eval <<-METHOD, __FILE__, __LINE__ + 1
def #{level}(progname = nil, tags: [], &block)
return unless logger

if logger.respond_to?(:tagged)
tags.unshift "GoodJob" unless logger.formatter.current_tags.include?("GoodJob")
logger.tagged(*tags.compact) do
logger.#{level}(progname, &block)
end
else
tag_logger(*tags) do
logger.#{level}(progname, &block)
end
end
METHOD
end

def info_and_stdout(progname = nil, tags: [], &block)
unless ActiveSupport::Logger.logger_outputs_to?(logger, STDOUT)
tags_string = (['GoodJob'] + tags).map { |t| "[#{t}]" }.join(' ')
stdout_message = "#{tags_string} #{yield}"
$stdout.puts stdout_message
end

info(progname, tags: [], &block)
end

def thread_name
Thread.current.name || Thread.current.object_id
end
end
end
29 changes: 15 additions & 14 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ class Scheduler

# Defaults for instance of Concurrent::ThreadPoolExecutor
DEFAULT_POOL_OPTIONS = {
name: 'good_job',
min_threads: 0,
max_threads: Concurrent.processor_count,
auto_terminate: true,
Expand Down Expand Up @@ -74,6 +73,8 @@ def initialize(performer, timer_options: {}, pool_options: {})
@pool_options = DEFAULT_POOL_OPTIONS.merge(pool_options)
@timer_options = DEFAULT_TIMER_OPTIONS.merge(timer_options)

@pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]} poll_interval=#{@timer_options[:execution_interval]})"

create_pools
end

Expand All @@ -83,8 +84,8 @@ def initialize(performer, timer_options: {}, pool_options: {})
def shutdown(wait: true)
@_shutdown = true

ActiveSupport::Notifications.instrument("scheduler_shutdown_start.good_job", { wait: wait, process_id: process_id })
ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait, process_id: process_id }) do
instrument("scheduler_shutdown_start", { wait: wait })
instrument("scheduler_shutdown", { wait: wait }) do
if @timer&.running?
@timer.shutdown
@timer.wait_for_termination if wait
Expand All @@ -107,7 +108,7 @@ def shutdown?
# @param wait [Boolean] Wait for actively executing jobs to finish
# @return [void]
def restart(wait: true)
ActiveSupport::Notifications.instrument("scheduler_restart_pools.good_job", { process_id: process_id }) do
instrument("scheduler_restart_pools") do
shutdown(wait: wait) unless shutdown?
create_pools
end
Expand All @@ -133,23 +134,23 @@ def create_thread
# @return [void]
def timer_observer(time, executed_task, thread_error)
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
ActiveSupport::Notifications.instrument("finished_timer_task.good_job", { result: executed_task, error: thread_error, time: time })
instrument("finished_timer_task", { result: executed_task, error: thread_error, time: time })
end

# Invoked on completion of ThreadPoolExecutor task
# @!visibility private
# @return [void]
def task_observer(time, output, thread_error)
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: output, error: thread_error, time: time })
instrument("finished_job_task", { result: output, error: thread_error, time: time })
create_thread if output
end

private

# @return [void]
def create_pools
ActiveSupport::Notifications.instrument("scheduler_create_pools.good_job", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval], process_id: process_id }) do
instrument("scheduler_create_pools", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval] }) do
@pool = ThreadPoolExecutor.new(@pool_options)
next unless @timer_options[:execution_interval].positive?

Expand All @@ -159,14 +160,14 @@ def create_pools
end
end

# @return [Integer] Current process ID
def process_id
Process.pid
end
def instrument(name, payload = {}, &block)
payload = payload.reverse_merge({
scheduler: self,
process_id: GoodJob::CurrentExecution.process_id,
thread_name: GoodJob::CurrentExecution.thread_name,
})

# @return [String] Current thread name
def thread_name
(Thread.current.name || Thread.current.object_id).to_s
ActiveSupport::Notifications.instrument("#{name}.good_job", payload, &block)
end
end

Expand Down
4 changes: 0 additions & 4 deletions spec/integration/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ def perform(*args, **kwargs)

let(:adapter) { GoodJob::Adapter.new }

around do |example|
expect { example.run }.to output.to_stdout # rubocop:disable RSpec/ExpectInHook
end

context 'when there are a large number of jobs' do
let(:number_of_jobs) { 250 }

Expand Down
8 changes: 3 additions & 5 deletions spec/lib/good_job/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
allow(Kernel).to receive(:loop)

cli = described_class.new([], {}, {})

expect do
cli.start
end.to output.to_stdout
cli.start

expect(GoodJob::Scheduler).to have_received(:new)
end
Expand Down Expand Up @@ -84,7 +81,8 @@

it 'deletes finished jobs' do
cli = described_class.new([], { before_seconds_ago: 24.hours.to_i }, {})
expect { cli.cleanup_preserved_jobs }.to output(/GoodJob deleted 1 preserved job/).to_stdout

cli.cleanup_preserved_jobs

expect { recent_job.reload }.not_to raise_error
expect { old_unfinished_job.reload }.not_to raise_error
Expand Down
4 changes: 0 additions & 4 deletions spec/lib/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
RSpec.describe GoodJob::Scheduler do
let(:performer) { instance_double(GoodJob::Performer, next: nil, name: '') }

around do |example|
expect { example.run }.to output.to_stdout # rubocop:disable RSpec/ExpectInHook
end

context 'when thread error' do
let(:error_proc) { double("Error Collector", call: nil) } # rubocop:disable RSpec/VerifiedDoubles

Expand Down