Skip to content

Fail in-progress jobs when the worker running them exits abnormally #277

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions UPGRADING.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,32 @@
# Upgrading to version 0.6.x

## New migration in 3 steps
This version adds two new migrations to modify the `solid_queue_processes` table. The goal of that migration is to add a new column that needs to be `NOT NULL`. This needs to be done with two migrations and the following steps to ensure it happens without downtime and with new processes being able to register just fine:
1. Run the first migration that adds the new column, nullable
2. Deploy the updated Solid Queue code that uses this column
2. Run the second migration. This migration does two things:
- Backfill existing rows that would have the column as NULL
- Make the column not nullable and add a new index

Besides, it adds another migration with no effects to the `solid_queue_recurring_tasks` table. This one can be run just fine whenever, as the column affected is not used.

To install the migrations:
```bash
$ bin/rails solid_queue:install:migrations
```

Or, if you're using a different database for Solid Queue:

```bash
$ bin/rails solid_queue:install:migrations DATABASE=<the_name_of_your_solid_queue_db>
```

And then follow the steps above, running first one, then deploying the code, then running the second one.

## New behaviour when workers are killed
From this version onwards, when a worker is killed and the supervisor can detect that, it'll fail in-progress jobs claimed by that worker. For this to work correctly, you need to run the above migration and ensure you restart any supervisors you'd have.


# Upgrading to version 0.5.x
This version includes a new migration to improve recurring tasks. To install it, just run:

Expand Down
26 changes: 19 additions & 7 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ def release_all
end
end

def fail_all_with(error)
SolidQueue.instrument(:fail_many_claimed) do |payload|
includes(:job).tap do |executions|
payload[:size] = executions.size
payload[:process_ids] = executions.map(&:process_id).uniq
payload[:job_ids] = executions.map(&:job_id).uniq

executions.each { |execution| execution.failed_with(error) }
end
end
end

def discard_all_in_batches(*)
raise UndiscardableError, "Can't discard jobs in progress"
end
Expand Down Expand Up @@ -69,6 +81,13 @@ def discard
raise UndiscardableError, "Can't discard a job in progress"
end

def failed_with(error)
transaction do
job.failed_with(error)
destroy!
end
end

private
def execute
ActiveJob::Base.execute(job.arguments)
Expand All @@ -83,11 +102,4 @@ def finished
destroy!
end
end

def failed_with(error)
transaction do
job.failed_with(error)
destroy!
end
end
end
8 changes: 3 additions & 5 deletions app/models/solid_queue/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ def self.register(**attributes)
create!(attributes.merge(last_heartbeat_at: Time.current)).tap do |process|
payload[:process_id] = process.id
end
rescue Exception => error
payload[:error] = error
raise
end
rescue Exception => error
SolidQueue.instrument :register_process, **attributes.merge(error: error)
raise
end

def heartbeat
Expand All @@ -25,8 +25,6 @@ def heartbeat

def deregister(pruned: false)
SolidQueue.instrument :deregister_process, process: self, pruned: pruned do |payload|
payload[:claimed_size] = claimed_executions.size if claims_executions?

destroy!
rescue Exception => error
payload[:error] = error
Expand Down
6 changes: 6 additions & 0 deletions app/models/solid_queue/process/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ module Executor
after_destroy -> { claimed_executions.release_all }, if: :claims_executions?
end

def fail_all_claimed_executions_with(error)
if claims_executions?
claimed_executions.fail_all_with(error)
end
end

private
def claims_executions?
kind == "Worker"
Expand Down
15 changes: 14 additions & 1 deletion app/models/solid_queue/process/prunable.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# frozen_string_literal: true

module SolidQueue
class ProcessPrunedError < RuntimeError
def initialize(last_heartbeat_at)
super("Process was found dead and pruned (last heartbeat at: #{last_heartbeat_at}")
end
end

class Process
module Prunable
extend ActiveSupport::Concern
Expand All @@ -15,11 +21,18 @@ def prune
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
payload[:size] += batch.size

batch.each { |process| process.deregister(pruned: true) }
batch.each(&:prune)
end
end
end
end

def prune
error = ProcessPrunedError.new(last_heartbeat_at)
fail_all_claimed_executions_with(error)

deregister(pruned: true)
end
end
end
end
5 changes: 5 additions & 0 deletions db/migrate/20240811173327_add_name_to_processes.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class AddNameToProcesses < ActiveRecord::Migration[7.1]
def change
add_column :solid_queue_processes, :name, :string
end
end
16 changes: 16 additions & 0 deletions db/migrate/20240813160053_make_name_not_null.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
class MakeNameNotNull < ActiveRecord::Migration[7.1]
def up
SolidQueue::Process.where(name: nil).find_each do |process|
process.name ||= [ process.kind.downcase, SecureRandom.hex(10) ].join("-")
process.save!
end

change_column :solid_queue_processes, :name, :string, null: false
add_index :solid_queue_processes, [ :name, :supervisor_id ], unique: true
end

def down
remove_index :solid_queue_processes, [ :name, :supervisor_id ]
change_column :solid_queue_processes, :name, :string, null: false
end
end
44 changes: 25 additions & 19 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

module SolidQueue
class Configuration
class Process < Struct.new(:kind, :attributes)
def instantiate
"SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes)
end
end

WORKER_DEFAULTS = {
queues: "*",
threads: 3,
Expand All @@ -22,28 +28,10 @@ def initialize(mode: :fork, load_from: nil)
@raw_config = config_from(load_from)
end

def processes
def configured_processes
dispatchers + workers
end

def workers
workers_options.flat_map do |worker_options|
processes = if mode.fork?
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
else
WORKER_DEFAULTS[:processes]
end
processes.times.map { Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
end
end

def dispatchers
dispatchers_options.map do |dispatcher_options|
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]
Dispatcher.new **dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
end
end

def max_number_of_threads
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
workers_options.map { |options| options[:threads] }.max + 2
Expand All @@ -54,6 +42,24 @@ def max_number_of_threads

DEFAULT_CONFIG_FILE_PATH = "config/solid_queue.yml"

def workers
workers_options.flat_map do |worker_options|
processes = if mode.fork?
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
else
WORKER_DEFAULTS[:processes]
end
processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) }
end
end

def dispatchers
dispatchers_options.map do |dispatcher_options|
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]
Process.new :dispatcher, dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
end
end

def config_from(file_or_hash, env: Rails.env)
config = load_config_from(file_or_hash)
config[env.to_sym] ? config[env.to_sym] : config
Expand Down
19 changes: 13 additions & 6 deletions lib/solid_queue/log_subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ def claim(event)
end

def release_many_claimed(event)
debug formatted_event(event, action: "Release claimed jobs", **event.payload.slice(:size))
info formatted_event(event, action: "Release claimed jobs", **event.payload.slice(:size))
end

def fail_many_claimed(event)
warn formatted_event(event, action: "Fail claimed jobs", **event.payload.slice(:job_ids, :process_ids))
end

def release_claimed(event)
debug formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id))
info formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id))
end

def retry_all(event)
Expand Down Expand Up @@ -63,7 +67,8 @@ def start_process(event)
attributes = {
pid: process.pid,
hostname: process.hostname,
process_id: process.process_id
process_id: process.process_id,
name: process.name
}.merge(process.metadata)

info formatted_event(event, action: "Started #{process.kind}", **attributes)
Expand All @@ -75,15 +80,16 @@ def shutdown_process(event)
attributes = {
pid: process.pid,
hostname: process.hostname,
process_id: process.process_id
process_id: process.process_id,
name: process.name
}.merge(process.metadata)

info formatted_event(event, action: "Shutdown #{process.kind}", **attributes)
end

def register_process(event)
process_kind = event.payload[:kind]
attributes = event.payload.slice(:pid, :hostname, :process_id)
attributes = event.payload.slice(:pid, :hostname, :process_id, :name)

if error = event.payload[:error]
warn formatted_event(event, action: "Error registering #{process_kind}", **attributes.merge(error: formatted_error(error)))
Expand All @@ -99,6 +105,7 @@ def deregister_process(event)
process_id: process.id,
pid: process.pid,
hostname: process.hostname,
name: process.name,
last_heartbeat_at: process.last_heartbeat_at.iso8601,
claimed_size: event.payload[:claimed_size],
pruned: event.payload[:pruned]
Expand Down Expand Up @@ -147,7 +154,7 @@ def replace_fork(event)
termsig: status.termsig

if replaced_fork = event.payload[:fork]
info formatted_event(event, action: "Replaced terminated #{replaced_fork.kind}", **attributes.merge(hostname: replaced_fork.hostname))
info formatted_event(event, action: "Replaced terminated #{replaced_fork.kind}", **attributes.merge(hostname: replaced_fork.hostname, name: replaced_fork.name))
else
warn formatted_event(event, action: "Tried to replace forked process but it had already died", **attributes)
end
Expand Down
11 changes: 11 additions & 0 deletions lib/solid_queue/processes/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ class Base
include Callbacks # Defines callbacks needed by other concerns
include AppExecutor, Registrable, Interruptible, Procline

attr_reader :name

def initialize(*)
@name = generate_name
end

def kind
self.class.name.demodulize
end
Expand All @@ -21,6 +27,11 @@ def pid
def metadata
{}
end

private
def generate_name
[ kind.downcase, SecureRandom.hex(10) ].join("-")
end
end
end
end
2 changes: 2 additions & 0 deletions lib/solid_queue/processes/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class Poller < Base

def initialize(polling_interval:, **options)
@polling_interval = polling_interval

super(**options)
end

def metadata
Expand Down
1 change: 1 addition & 0 deletions lib/solid_queue/processes/registrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def process_id
def register
@process = SolidQueue::Process.register \
kind: kind,
name: name,
pid: pid,
hostname: hostname,
supervisor: try(:supervisor),
Expand Down
4 changes: 0 additions & 4 deletions lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ def stop
@thread&.join
end

def name
@name ||= [ kind.downcase, SecureRandom.hex(6) ].join("-")
end

def alive?
!running_async? || @thread.alive?
end
Expand Down
3 changes: 2 additions & 1 deletion lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def start(mode: :fork, load_configuration_from: nil)

def initialize(configuration)
@configuration = configuration
super
end

def start
Expand Down Expand Up @@ -44,7 +45,7 @@ def boot
end

def start_processes
configuration.processes.each { |configured_process| start_process(configured_process) }
configuration.configured_processes.each { |configured_process| start_process(configured_process) }
end

def stopped?
Expand Down
9 changes: 6 additions & 3 deletions lib/solid_queue/supervisor/async_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ def stop
attr_reader :threads

def start_process(configured_process)
configured_process.supervised_by process
configured_process.start
process_instance = configured_process.instantiate.tap do |instance|
instance.supervised_by process
end

process_instance.start

threads[configured_process.name] = configured_process
threads[process_instance.name] = process_instance
end

def stop_threads
Expand Down
Loading
Loading