Skip to content

Commit 7901a8e

Browse files
committed
Fix issue when pruning a supervisor and its supervisees via callbacks
In that case we weren't correctly failing executions as a consequence of the prune because the processes would get deleted via a callback without going through the regular prune. Leave this to the prune itself instead of relying on the callbacks and only deregister supervised processes when not pruning. This also saves a query because we check whether we have a supervisor or not before trying to find supervisees.
1 parent 3a6eec8 commit 7901a8e

File tree

6 files changed

+54
-14
lines changed

6 files changed

+54
-14
lines changed

app/models/solid_queue/claimed_execution.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,21 @@ def claiming(job_ids, process_id, &block)
2929
def release_all
3030
SolidQueue.instrument(:release_many_claimed) do |payload|
3131
includes(:job).tap do |executions|
32-
payload[:size] = executions.size
3332
executions.each(&:release)
33+
34+
payload[:size] = executions.size
3435
end
3536
end
3637
end
3738

3839
def fail_all_with(error)
3940
SolidQueue.instrument(:fail_many_claimed) do |payload|
4041
includes(:job).tap do |executions|
41-
payload[:size] = executions.size
42+
executions.each { |execution| execution.failed_with(error) }
43+
4244
payload[:process_ids] = executions.map(&:process_id).uniq
4345
payload[:job_ids] = executions.map(&:job_id).uniq
44-
45-
executions.each { |execution| execution.failed_with(error) }
46+
payload[:size] = executions.size
4647
end
4748
end
4849
end

app/models/solid_queue/process.rb

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ class SolidQueue::Process < SolidQueue::Record
44
include Executor, Prunable
55

66
belongs_to :supervisor, class_name: "SolidQueue::Process", optional: true, inverse_of: :supervisees
7-
has_many :supervisees, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id, dependent: :destroy
7+
has_many :supervisees, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id
88

99
store :metadata, coder: JSON
1010

@@ -26,9 +26,18 @@ def heartbeat
2626
def deregister(pruned: false)
2727
SolidQueue.instrument :deregister_process, process: self, pruned: pruned do |payload|
2828
destroy!
29+
30+
unless supervised? || pruned
31+
supervisees.each(&:deregister)
32+
end
2933
rescue Exception => error
3034
payload[:error] = error
3135
raise
3236
end
3337
end
38+
39+
private
40+
def supervised?
41+
supervisor_id.present?
42+
end
3443
end

app/models/solid_queue/process/executor.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ module Executor
88
included do
99
has_many :claimed_executions
1010

11-
after_destroy -> { claimed_executions.release_all }, if: :claims_executions?
11+
after_destroy :release_all_claimed_executions
1212
end
1313

1414
def fail_all_claimed_executions_with(error)
@@ -17,6 +17,12 @@ def fail_all_claimed_executions_with(error)
1717
end
1818
end
1919

20+
def release_all_claimed_executions
21+
if claims_executions?
22+
claimed_executions.release_all
23+
end
24+
end
25+
2026
private
2127
def claims_executions?
2228
kind == "Worker"

lib/solid_queue/supervisor/maintenance.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def prune_dead_processes
3535

3636
def fail_orphaned_executions
3737
wrap_in_app_executor do
38-
SolidQueue::ClaimedExecution.orphaned.fail_all_with(ProcessMissingError.new)
38+
ClaimedExecution.orphaned.fail_all_with(ProcessMissingError.new)
3939
end
4040
end
4141
end

test/integration/forked_processes_lifecycle_test.rb

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
1010
@pid = run_supervisor_as_fork(load_configuration_from: config_as_hash)
1111

1212
wait_for_registered_processes(3, timeout: 3.second)
13-
assert_registered_workers_for(:background, :default)
13+
assert_registered_workers_for(:background, :default, supervisor_pid: @pid)
1414
end
1515

1616
teardown do
@@ -49,7 +49,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
4949
assert_job_status(pause, :finished)
5050

5151
# Termination is almost clean, but the supervisor remains
52-
assert_registered_supervisor
52+
assert_registered_supervisor_with(@pid)
5353
assert_no_registered_workers
5454
assert_no_claimed_jobs
5555
end
@@ -217,7 +217,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
217217

218218
# And there's a new worker that has been registered for that queue:
219219
wait_for_registered_processes(3, timeout: 3.second)
220-
assert_registered_workers_for(:background, :default)
220+
assert_registered_workers_for(:background, :default, supervisor_pid: @pid)
221221

222222
# And they can process jobs just fine
223223
enqueue_store_result_job("no_pause")
@@ -272,17 +272,19 @@ def assert_clean_termination
272272
assert_not process_exists?(@pid)
273273
end
274274

275-
def assert_registered_workers_for(*queues)
275+
def assert_registered_workers_for(*queues, supervisor_pid: nil)
276276
workers = find_processes_registered_as("Worker")
277277
registered_queues = workers.map { |process| process.metadata["queues"] }.compact
278278
assert_equal queues.map(&:to_s).sort, registered_queues.sort
279-
assert_equal [ @pid ], workers.map { |process| process.supervisor.pid }.uniq
279+
if supervisor_pid
280+
assert_equal [ supervisor_pid ], workers.map { |process| process.supervisor.pid }.uniq
281+
end
280282
end
281283

282-
def assert_registered_supervisor
284+
def assert_registered_supervisor_with(pid)
283285
processes = find_processes_registered_as("Supervisor(fork)")
284286
assert_equal 1, processes.count
285-
assert_equal @pid, processes.first.pid
287+
assert_equal pid, processes.first.pid
286288
end
287289

288290
def assert_no_registered_workers

test/models/solid_queue/process_test.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,28 @@ class SolidQueue::ProcessTest < ActiveSupport::TestCase
3434
assert jobs.all?(&:failed?)
3535
end
3636

37+
test "prune processes including their supervisor with expired heartbeats and fail claimed executions" do
38+
supervisor = SolidQueue::Process.register(kind: "Supervisor(fork)", pid: 42, name: "supervisor-42")
39+
process = SolidQueue::Process.register(kind: "Worker", pid: 43, name: "worker-43", supervisor_id: supervisor.id)
40+
3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) }
41+
jobs = SolidQueue::Job.last(3)
42+
43+
SolidQueue::ReadyExecution.claim("*", 5, process.id)
44+
45+
travel_to 10.minutes.from_now
46+
47+
assert_difference -> { SolidQueue::Process.count }, -2 do
48+
assert_difference -> { SolidQueue::FailedExecution.count }, 3 do
49+
assert_difference -> { SolidQueue::ClaimedExecution.count }, -3 do
50+
SolidQueue::Process.prune
51+
end
52+
end
53+
end
54+
55+
jobs.each(&:reload)
56+
assert jobs.all?(&:failed?)
57+
end
58+
3759
test "hostname's with special characters are properly loaded" do
3860
worker = SolidQueue::Worker.new(queues: "*", threads: 3, polling_interval: 0.2)
3961
hostname = "Basecamp’s-Computer"

0 commit comments

Comments
 (0)