Skip to content

Commit 2a25d7f

Browse files
committed
add worker_ids, and have workers yield self to hooks
1 parent 9cd6bc3 commit 2a25d7f

File tree

7 files changed

+80
-22
lines changed

7 files changed

+80
-22
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,9 @@ And into two different points in the worker's, dispatcher's and scheduler's life
379379
- `(worker|dispatcher|scheduler)_start`: after the worker/dispatcher/scheduler has finished booting and right before it starts the polling loop or loading the recurring schedule.
380380
- `(worker|dispatcher|scheduler)_stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown (which is just `exit!`).
381381

382+
The hooks for workers will have the worker instance yielded to the block so that you may read its configuration
383+
for logging or other metrics reporting purposes.
384+
382385
You can use the following methods with a block to do this:
383386
```ruby
384387
SolidQueue.on_start
@@ -398,6 +401,14 @@ For example:
398401
```ruby
399402
SolidQueue.on_start { start_metrics_server }
400403
SolidQueue.on_stop { stop_metrics_server }
404+
405+
SolidQueue.on_worker_start do |worker|
406+
Rails.logger.info "Worker #{worker.worker_id} started with queues: #{worker.queues.join(',')}"
407+
end
408+
409+
SolidQueue.on_worker_stop do |worker|
410+
Rails.logger.info "Worker #{worker.worker_id} stopped with queues: #{worker.queues.join(',')}"
411+
end
401412
```
402413

403414
These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this.

lib/solid_queue.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,15 @@ module SolidQueue
4545

4646
[ Dispatcher, Scheduler, Worker ].each do |process|
4747
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
48-
process.on_start { block.call }
48+
process.on_start(&block)
4949
end
5050

5151
define_singleton_method(:"on_#{process.name.demodulize.downcase}_stop") do |&block|
52-
process.on_stop { block.call }
52+
process.on_stop(&block)
5353
end
5454

5555
define_singleton_method(:"on_#{process.name.demodulize.downcase}_exit") do |&block|
56-
process.on_exit { block.call }
56+
process.on_exit(&block)
5757
end
5858
end
5959

lib/solid_queue/configuration.rb

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,15 @@ def instantiate
3333

3434
def initialize(**options)
3535
@options = options.with_defaults(default_options)
36+
@worker_id_counter = 0
3637
end
3738

3839
def configured_processes
39-
if only_work? then workers
40-
else
41-
dispatchers + workers + schedulers
40+
@configured_processes ||= begin
41+
if only_work? then workers
42+
else
43+
dispatchers + workers + schedulers
44+
end
4245
end
4346
end
4447

@@ -109,12 +112,24 @@ def skip_recurring_tasks?
109112
end
110113

111114
def workers
112-
workers_options.flat_map do |worker_options|
113-
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
114-
processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) }
115+
workers_options.flat_map { |worker_options| generate_workers(worker_options) }
116+
end
117+
118+
def generate_workers(worker_options)
119+
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
120+
processes.times.map do
121+
worker_id = next_worker_id
122+
options_with_defaults = worker_options.with_defaults(WORKER_DEFAULTS).merge(worker_id:)
123+
Process.new(:worker, options_with_defaults)
115124
end
116125
end
117126

127+
def next_worker_id
128+
id = @worker_id_counter
129+
@worker_id_counter += 1
130+
id
131+
end
132+
118133
def dispatchers
119134
dispatchers_options.map do |dispatcher_options|
120135
Process.new :dispatcher, dispatcher_options.with_defaults(DISPATCHER_DEFAULTS)

lib/solid_queue/lifecycle_hooks.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,18 @@ def run_exit_hooks
4343

4444
def run_hooks_for(event)
4545
self.class.lifecycle_hooks.fetch(event, []).each do |block|
46-
block.call
46+
if yield_self_to_hooks?
47+
block.call(self)
48+
else
49+
block.call
50+
end
4751
rescue Exception => exception
4852
handle_thread_error(exception)
4953
end
5054
end
55+
56+
def yield_self_to_hooks?
57+
false
58+
end
5159
end
5260
end

lib/solid_queue/worker.rb

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,24 @@ class Worker < Processes::Poller
88
before_shutdown :run_stop_hooks
99
after_shutdown :run_exit_hooks
1010

11-
attr_accessor :queues, :pool
11+
attr_accessor :pool
12+
13+
attr_reader :worker_id, :queues
1214

1315
def initialize(**options)
1416
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)
1517

16-
@queues = Array(options[:queues])
18+
# Ensure that the queues array is deep frozen to prevent accidental modification
19+
@queues = Array(options[:queues]).map(&:freeze).freeze
20+
1721
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
22+
@worker_id = options[:worker_id]
1823

1924
super(**options)
2025
end
2126

2227
def metadata
23-
super.merge(queues: queues.join(","), thread_pool_size: pool.size)
28+
super.merge(queues: queues.join(","), thread_pool_size: pool.size, worker_id:)
2429
end
2530

2631
private
@@ -54,5 +59,9 @@ def all_work_completed?
5459
def set_procline
5560
procline "waiting for jobs in #{queues.join(",")}"
5661
end
62+
63+
def yield_self_to_hooks?
64+
true
65+
end
5766
end
5867
end

test/integration/jobs_lifecycle_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class JobsLifecycleTest < ActiveSupport::TestCase
66
setup do
77
@_on_thread_error = SolidQueue.on_thread_error
88
SolidQueue.on_thread_error = silent_on_thread_error_for([ ExpectedTestError, RaisingJob::DefaultError ], @_on_thread_error)
9-
@worker = SolidQueue::Worker.new(queues: "background", threads: 3)
9+
@worker = SolidQueue::Worker.new(queues: "background", threads: 3, worker_id: 1)
1010
@dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2)
1111
end
1212

test/integration/lifecycle_hooks_test.rb

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,17 @@ class LifecycleHooksTest < ActiveSupport::TestCase
1010
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }
1111
SolidQueue.on_exit { JobResult.create!(status: :hook_called, value: :exit) }
1212

13-
SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
14-
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }
15-
SolidQueue.on_worker_exit { JobResult.create!(status: :hook_called, value: :worker_exit) }
13+
SolidQueue.on_worker_start do |w|
14+
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_#{w.worker_id}_start")
15+
end
16+
17+
SolidQueue.on_worker_stop do |w|
18+
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_#{w.worker_id}_stop")
19+
end
20+
21+
SolidQueue.on_worker_exit do |w|
22+
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_#{w.worker_id}_exit")
23+
end
1624

1725
SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) }
1826
SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) }
@@ -22,23 +30,30 @@ class LifecycleHooksTest < ActiveSupport::TestCase
2230
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) }
2331
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_exit) }
2432

25-
pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false)
26-
wait_for_registered_processes(4)
33+
pid = run_supervisor_as_fork(
34+
workers: [ { queues: "first_queue" }, { queues: "second_queue", processes: 2 } ],
35+
dispatchers: [ { batch_size: 100 } ],
36+
skip_recurring: false
37+
)
38+
39+
wait_for_registered_processes(6)
2740

2841
terminate_process(pid)
2942
wait_for_registered_processes(0)
3043

3144

3245
results = skip_active_record_query_cache do
3346
job_results = JobResult.where(status: :hook_called)
34-
assert_equal 12, job_results.count
47+
assert_equal 18, job_results.count
3548
job_results
3649
end
3750

38-
assert_equal({ "hook_called" => 12 }, results.map(&:status).tally)
51+
assert_equal({ "hook_called" => 18 }, results.map(&:status).tally)
3952
assert_equal %w[
4053
start stop exit
41-
worker_start worker_stop worker_exit
54+
worker_first_queue_0_start worker_first_queue_0_stop worker_first_queue_0_exit
55+
worker_second_queue_1_start worker_second_queue_1_stop worker_second_queue_1_exit
56+
worker_second_queue_2_start worker_second_queue_2_stop worker_second_queue_2_exit
4257
dispatcher_start dispatcher_stop dispatcher_exit
4358
scheduler_start scheduler_stop scheduler_exit
4459
].sort, results.map(&:value).sort

0 commit comments

Comments
 (0)