Skip to content

Adds batch processing support #590

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Solid Queue is a DB-based queuing backend for [Active Job](https://edgeguides.rubyonrails.org/active_job_basics.html), designed with simplicity and performance in mind.

Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, recurring jobs, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`).
Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, batch processing, recurring jobs, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`).

Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, and it leverages the `FOR UPDATE SKIP LOCKED` clause, if available, to avoid blocking and waiting on locks when polling jobs. It relies on Active Job for retries, discarding, error handling, serialization, or delays, and it's compatible with Ruby on Rails's multi-threading.

Expand Down Expand Up @@ -503,6 +503,74 @@ production:

Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues).

## Batch processing

Solid Queue supports grouping jobs into batches, allowing you to track their collective progress, run callbacks when all jobs complete, and manage complex workflows. This is useful for processing large datasets in parallel, importing files, or any scenario where you need to coordinate multiple jobs.

To create a batch, use `perform_batch_later`:

```ruby
# Simple batch
batch = MyJob.perform_batch_later([
{ user_id: 1, action: "update" },
{ user_id: 2, action: "update" },
{ user_id: 3, action: "update" }
])

puts batch.batch_id # => "550e8400-e29b-41d4-a716..."
puts batch.total_jobs # => 3
```

You can specify callbacks to run when the batch completes:

```ruby
batch = DataImportJob.perform_batch_later(
import_rows,
on_success: { job: ImportSuccessJob, args: { email: "admin@example.com" } },
on_failure: { job: ImportFailureJob, args: { email: "admin@example.com" } },
on_complete: { job: ImportCompleteJob },
metadata: { source: "api", imported_by: current_user.id }
)
```

- `on_success`: Runs when all jobs complete successfully
- `on_failure`: Runs if any job fails
- `on_complete`: Always runs when the batch finishes

Jobs can check if they're part of a batch:

```ruby
class DataImportJob < ApplicationJob
def perform(row_data)
if in_batch?
Rails.logger.info "Processing row as part of batch #{batch_id}"
Rails.logger.info "Batch progress: #{batch_progress}%"
end

# Process the row...
end
end
```

You can query and monitor batches:

```ruby
# Find a batch
batch = SolidQueue::Batch.find_by(batch_id: batch_id)

# Check progress
batch.pending_jobs # => 10
batch.completed_jobs # => 85
batch.failed_jobs # => 5
batch.progress_percentage # => 90.0
batch.finished? # => false

# Query batches by status
SolidQueue::Batch.pending
SolidQueue::Batch.completed
SolidQueue::Batch.failed
```

## Failed jobs and retries

Solid Queue doesn't include any automatic retry mechanism, it [relies on Active Job for this](https://edgeguides.rubyonrails.org/active_job_basics.html#retrying-or-discarding-failed-jobs). Jobs that fail will be kept in the system, and a _failed execution_ (a record in the `solid_queue_failed_executions` table) will be created for these. The job will stay there until manually discarded or re-enqueued. You can do this in a console as:
Expand Down
22 changes: 22 additions & 0 deletions app/jobs/solid_queue/batch_update_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

module SolidQueue
class BatchUpdateJob < ActiveJob::Base
queue_as :default

discard_on ActiveRecord::RecordNotFound

def perform(batch_id, job_id)
batch = Batch.find_by!(batch_id: batch_id)
job = Job.find_by!(id: job_id)

# Only process if the job is actually finished and belongs to this batch
return unless job.finished? && job.batch_id == batch_id

batch.job_finished!(job)
rescue => e
Rails.logger.error "[SolidQueue] BatchUpdateJob failed for batch #{batch_id}, job #{job_id}: #{e.message}"
raise
end
end
end
152 changes: 152 additions & 0 deletions app/models/solid_queue/batch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# frozen_string_literal: true

module SolidQueue
class Batch < Record
serialize :on_complete_job_args, coder: JSON
serialize :on_success_job_args, coder: JSON
serialize :on_failure_job_args, coder: JSON
serialize :metadata, coder: JSON

STATUSES = %w[pending processing completed failed]

validates :batch_id, uniqueness: true
validates :status, inclusion: { in: STATUSES }

has_many :jobs, foreign_key: :batch_id, primary_key: :batch_id, dependent: :nullify

scope :pending, -> { where(status: "pending") }
scope :processing, -> { where(status: "processing") }
scope :completed, -> { where(status: "completed") }
scope :failed, -> { where(status: "failed") }
scope :finished, -> { where(status: %w[completed failed]) }
scope :unfinished, -> { where(status: %w[pending processing]) }

before_create :set_batch_id

class << self
def enqueue(job_instances, on_complete: nil, on_success: nil, on_failure: nil, metadata: {})
return 0 if job_instances.empty?

batch = create!(
on_complete_job_class: on_complete&.dig(:job)&.to_s,
on_complete_job_args: on_complete&.dig(:args),
on_success_job_class: on_success&.dig(:job)&.to_s,
on_success_job_args: on_success&.dig(:args),
on_failure_job_class: on_failure&.dig(:job)&.to_s,
on_failure_job_args: on_failure&.dig(:args),
metadata: metadata,
total_jobs: job_instances.size,
pending_jobs: job_instances.size
)

# Add batch_id to each job
job_instances.each do |job|
job.batch_id = batch.batch_id
end

# Use SolidQueue's bulk enqueue
enqueued_count = SolidQueue::Job.enqueue_all(job_instances)

# Update pending count if some jobs failed to enqueue
if enqueued_count < job_instances.size
batch.update!(pending_jobs: enqueued_count)
end

batch
end
end

def add_jobs(job_instances)
return 0 if job_instances.empty? || finished?

job_instances.each do |job|
job.batch_id = batch_id
end

enqueued_count = SolidQueue::Job.enqueue_all(job_instances)

increment!(:total_jobs, job_instances.size)
increment!(:pending_jobs, enqueued_count)

enqueued_count
end

def job_finished!(job)
return if finished?

transaction do
if job.failed_execution.present?
increment!(:failed_jobs)
else
increment!(:completed_jobs)
end

decrement!(:pending_jobs)

check_completion!
end
end

def check_completion!
return if finished?

if pending_jobs <= 0
if failed_jobs > 0
mark_as_failed!
else
mark_as_completed!
end
elsif status == "pending"
update!(status: "processing")
end
end

def finished?
status.in?(%w[completed failed])
end

def processing?
status == "processing"
end

def pending?
status == "pending"
end

def progress_percentage
return 0 if total_jobs == 0
((completed_jobs + failed_jobs) * 100.0 / total_jobs).round(2)
end

private
def set_batch_id
self.batch_id ||= SecureRandom.uuid
end

def mark_as_completed!
update!(status: "completed", completed_at: Time.current)
enqueue_callback(:on_success)
enqueue_callback(:on_complete)
end

def mark_as_failed!
update!(status: "failed", completed_at: Time.current)
enqueue_callback(:on_failure)
enqueue_callback(:on_complete)
end

def enqueue_callback(callback_type)
job_class = public_send("#{callback_type}_job_class")
job_args = public_send("#{callback_type}_job_args")

return unless job_class.present?

job_class.constantize.perform_later(
batch_id: batch_id,
**(job_args || {}).symbolize_keys
)
rescue => e
Rails.logger.error "[SolidQueue] Failed to enqueue #{callback_type} callback for batch #{batch_id}: #{e.message}"
end
end
end
5 changes: 3 additions & 2 deletions app/models/solid_queue/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module SolidQueue
class Job < Record
class EnqueueError < StandardError; end

include Executable, Clearable, Recurrable
include Executable, Clearable, Recurrable, Batchable

serialize :arguments, coder: JSON

Expand Down Expand Up @@ -60,7 +60,8 @@ def attributes_from_active_job(active_job)
scheduled_at: active_job.scheduled_at,
class_name: active_job.class.name,
arguments: active_job.serialize,
concurrency_key: active_job.concurrency_key
concurrency_key: active_job.concurrency_key,
batch_id: active_job.respond_to?(:batch_id) ? active_job.batch_id : nil
}
end
end
Expand Down
67 changes: 67 additions & 0 deletions app/models/solid_queue/job/batchable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# frozen_string_literal: true

module SolidQueue
class Job
module Batchable
extend ActiveSupport::Concern

included do
belongs_to :batch, foreign_key: :batch_id, primary_key: :batch_id, optional: true, class_name: "SolidQueue::Batch"

scope :in_batch, ->(batch_id) { where(batch_id: batch_id) }
scope :without_batch, -> { where(batch_id: nil) }
scope :batch_pending, -> { in_batch.where(finished_at: nil) }
scope :batch_finished, -> { in_batch.where.not(finished_at: nil) }

after_update :notify_batch_if_finished, if: :batch_id?
end

class_methods do
def enqueue_batch(active_jobs, **batch_options)
return 0 if active_jobs.empty?

Batch.enqueue(active_jobs, **batch_options)
end

def create_all_from_active_jobs_with_batch(active_jobs, batch_id = nil)
if batch_id.present?
job_rows = active_jobs.map do |job|
attributes_from_active_job(job).merge(batch_id: batch_id)
end
insert_all(job_rows)
where(active_job_id: active_jobs.map(&:job_id))
else
create_all_from_active_jobs_without_batch(active_jobs)
end
end
end

def in_batch?
batch_id.present?
end

def batch_siblings
return Job.none unless in_batch?

self.class.in_batch(batch_id).where.not(id: id)
end

def batch_position
return nil unless in_batch?

batch.jobs.where("id <= ?", id).count
end

private
def notify_batch_if_finished
return unless saved_change_to_finished_at? && finished_at.present?
return unless batch.present?

# Use perform_later to avoid holding locks
BatchUpdateJob.perform_later(batch_id, id)
rescue => e
Rails.logger.error "[SolidQueue] Failed to notify batch #{batch_id} about job #{id} completion: #{e.message}"
end
end
end
end
Loading
Loading