Skip to content

Add capability to discard duplicate jobs with concurrency configuration #523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,18 +426,21 @@ In the case of recurring tasks, if such error is raised when enqueuing the job c

## Concurrency controls

Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs are never discarded or lost, only blocked.
Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs can be configured to either be discarded or blocked.

```ruby
class MyJob < ApplicationJob
limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group
limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group, on_conflict: conflict_behaviour

# ...
```
- `key` is the only required parameter, and it can be a symbol, a string or a proc that receives the job arguments as parameters and will be used to identify the jobs that need to be limited together. If the proc returns an Active Record record, the key will be built from its class name and `id`.
- `to` is `1` by default.
- `duration` is set to `SolidQueue.default_concurrency_control_period` by default, which itself defaults to `3 minutes`, but that you can configure as well.
- `group` is used to control the concurrency of different job classes together. It defaults to the job class name.
- `on_conflict` controls behaviour when enqueuing a job which is above the max concurrent executions for your configuration.
- (default) `:block`; the job is blocked and is dispatched until another job completes and unblocks it
- `:discard`; the job is discarded

When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping).

Expand Down Expand Up @@ -480,6 +483,31 @@ Jobs are unblocked in order of priority but queue order is not taken into accoun

Finally, failed jobs that are automatically or manually retried work in the same way as new jobs that get enqueued: they get in the queue for getting an open semaphore, and whenever they get it, they'll be run. It doesn't matter if they had already gotten an open semaphore in the past.

### Discarding conflicting jobs

When configuring `on_conflict` with `:discard`, jobs enqueued above the concurrent execution limit are discarded and failed to be enqueued.

```ruby
class ConcurrentJob < ApplicationJob
limits_concurrency key: ->(record) { record }, on_conflict: :discard

def perform(user); end
end

enqueued_job = ConcurrentJob.perform_later(record)
# => instance of ConcurrentJob
enqueued_job.successfully_enqueued?
# => true

second_enqueued_job = ConcurrentJob.perform_later(record) do |job|
job.successfully_enqueued?
# => false
end

second_enqueued_job
# => false
```

### Performance considerations

Concurrency controls introduce significant overhead (blocked executions need to be created and promoted to ready, semaphores need to be created and updated) so you should consider carefully whether you need them. For throttling purposes, where you plan to have `limit` significantly larger than 1, I'd encourage relying on a limited number of workers per queue instead. For example:
Expand All @@ -503,6 +531,10 @@ production:

Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues).

### Discarding concurrent jobs



## Failed jobs and retries

Solid Queue doesn't include any automatic retry mechanism, it [relies on Active Job for this](https://edgeguides.rubyonrails.org/active_job_basics.html#retrying-or-discarding-failed-jobs). Jobs that fail will be kept in the system, and a _failed execution_ (a record in the `solid_queue_failed_executions` table) will be created for these. The job will stay there until manually discarded or re-enqueued. You can do this in a console as:
Expand Down
23 changes: 14 additions & 9 deletions app/models/solid_queue/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,32 @@

module SolidQueue
class Job < Record
class EnqueueError < StandardError; end
class EnqueueError < ActiveJob::EnqueueError; end

include Executable, Clearable, Recurrable

serialize :arguments, coder: JSON

class << self
def enqueue_all(active_jobs)
active_jobs_by_job_id = active_jobs.index_by(&:job_id)
enqueued_jobs_count = 0

transaction do
jobs = create_all_from_active_jobs(active_jobs)
prepare_all_for_execution(jobs).tap do |enqueued_jobs|
enqueued_jobs.each do |enqueued_job|
active_jobs_by_job_id[enqueued_job.active_job_id].provider_job_id = enqueued_job.id
active_jobs_by_job_id[enqueued_job.active_job_id].successfully_enqueued = true
end
prepare_all_for_execution(jobs)
jobs_by_active_job_id = jobs.index_by(&:active_job_id)

active_jobs.each do |active_job|
job = jobs_by_active_job_id[active_job.job_id]

active_job.provider_job_id = job&.id
active_job.enqueue_error = job&.enqueue_error
active_job.successfully_enqueued = job.present? && job.enqueue_error.nil?
enqueued_jobs_count += 1 if active_job.successfully_enqueued?
end
end

active_jobs.count(&:successfully_enqueued?)
enqueued_jobs_count
end

def enqueue(active_job, scheduled_at: Time.current)
Expand All @@ -49,7 +54,7 @@ def create_from_active_job(active_job)
def create_all_from_active_jobs(active_jobs)
job_rows = active_jobs.map { |job| attributes_from_active_job(job) }
insert_all(job_rows)
where(active_job_id: active_jobs.map(&:job_id))
where(active_job_id: active_jobs.map(&:job_id)).order(id: :asc)
end

def attributes_from_active_job(active_job)
Expand Down
6 changes: 5 additions & 1 deletion app/models/solid_queue/job/concurrency_controls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module ConcurrencyControls
included do
has_one :blocked_execution

delegate :concurrency_limit, :concurrency_duration, to: :job_class
delegate :concurrency_limit, :concurrency_on_conflict, :concurrency_duration, to: :job_class

before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? }
end
Expand All @@ -34,6 +34,10 @@ def blocked?
end

private
def discard_concurrent?
concurrency_on_conflict == :discard
end

def acquire_concurrency_lock
return true unless concurrency_limited?

Expand Down
13 changes: 12 additions & 1 deletion app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ module Executable

after_create :prepare_for_execution

attr_accessor :enqueue_error

scope :finished, -> { where.not(finished_at: nil) }
end

Expand All @@ -37,7 +39,13 @@ def dispatch_all_at_once(jobs)
end

def dispatch_all_one_by_one(jobs)
jobs.each(&:dispatch)
jobs.each do |job|
begin
job.dispatch
rescue EnqueueError => e
job.enqueue_error = e
end
end
end

def successfully_dispatched(jobs)
Expand Down Expand Up @@ -66,6 +74,9 @@ def prepare_for_execution

def dispatch
if acquire_concurrency_lock then ready
elsif discard_concurrent?
discard
raise EnqueueError.new("Dispatched job discarded due to concurrent configuration.")
else
block
end
Expand Down
4 changes: 3 additions & 1 deletion lib/active_job/concurrency_controls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ module ConcurrencyControls
class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false

class_attribute :concurrency_limit
class_attribute :concurrency_on_conflict
class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period
end

class_methods do
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period)
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block)
self.concurrency_key = key
self.concurrency_limit = to
self.concurrency_group = group
self.concurrency_duration = duration
self.concurrency_on_conflict = on_conflict
end
end

Expand Down
124 changes: 119 additions & 5 deletions test/models/solid_queue/job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ def perform(job_result)
end
end

class DiscardedNonOverlappingJob < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard
end

class DiscardedOverlappingJob < NonOverlappingJob
limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :discard
end

class NonOverlappingGroupedJob1 < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup"
end
Expand All @@ -18,8 +26,19 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup"
end

class DiscardedNonOverlappingGroupedJob1 < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, group: "DiscardingGroup", on_conflict: :discard
end

class DiscardedNonOverlappingGroupedJob2 < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, group: "DiscardingGroup", on_conflict: :discard
end

setup do
@result = JobResult.create!(queue_name: "default")
@discarded_concurrent_error = SolidQueue::Job::EnqueueError.new(
"Dispatched job discarded due to concurrent configuration."
)
end

test "enqueue active job to be executed right away" do
Expand Down Expand Up @@ -98,6 +117,78 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
assert_equal active_job.concurrency_key, job.concurrency_key
end

test "enqueue jobs with discarding concurrency controls" do
assert_ready do
active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A")
assert active_job.successfully_enqueued?

assert_not DiscardedNonOverlappingJob.perform_later(@result, name: "B") do |overlapping_active_job|
assert_not overlapping_active_job.successfully_enqueued?
assert_equal @discarded_concurrent_error, overlapping_active_job.enqueue_error
end
end
end

test "enqueue scheduled job with discarding concurrency controls" do
assert_ready do
active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A")
assert active_job.successfully_enqueued?
end

scheduled_job_id = nil

assert_scheduled do
scheduled_active_job = DiscardedNonOverlappingJob.set(wait: 0.5.seconds).perform_later(@result, name: "B")
assert scheduled_active_job.successfully_enqueued?
assert_nil scheduled_active_job.enqueue_error

scheduled_job_id = scheduled_active_job.provider_job_id
end

scheduled_job = SolidQueue::Job.find(scheduled_job_id)
wait_for { scheduled_job.due? }

dispatched = SolidQueue::ScheduledExecution.dispatch_next_batch(10)
assert_equal 0, dispatched
assert_raises(ActiveRecord::RecordNotFound) { scheduled_job.reload }
end

test "enqueues jobs in bulk with discarding concurrency controls" do
jobs = [
job_1 = DiscardedNonOverlappingJob.new(@result, name: "A"),
job_2 = DiscardedNonOverlappingJob.new(@result, name: "B")
]

assert_job_counts(ready: 1, discarded: 1) do
enqueued_jobs_count = SolidQueue::Job.enqueue_all(jobs)
assert_equal enqueued_jobs_count, 1
end

assert job_1.successfully_enqueued?
assert_not job_2.successfully_enqueued?
assert_equal SolidQueue::Job::EnqueueError, job_2.enqueue_error.class
assert_equal @discarded_concurrent_error.message, job_2.enqueue_error.message
end

test "enqueue jobs with discarding concurrency controls when below limit" do
assert_job_counts(ready: 2) do
assert_ready do
active_job = DiscardedOverlappingJob.perform_later(@result, name: "A")
assert active_job.successfully_enqueued?
end

assert_ready do
active_job = DiscardedOverlappingJob.perform_later(@result, name: "B")
assert active_job.successfully_enqueued?
end

assert_not DiscardedOverlappingJob.perform_later(@result, name: "C") do |overlapping_active_job|
assert_not overlapping_active_job.successfully_enqueued?
assert_equal @discarded_concurrent_error, overlapping_active_job.enqueue_error
end
end
end

test "enqueue jobs with concurrency controls in the same concurrency group" do
assert_ready do
active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A")
Expand All @@ -112,6 +203,23 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
end
end

test "enqueue jobs with discarding concurrency controls in the same concurrency group" do
assert_job_counts(ready: 1) do
assert_ready do
active_job = DiscardedNonOverlappingGroupedJob1.perform_later(@result, name: "A")
assert active_job.successfully_enqueued?
assert_equal 1, active_job.concurrency_limit
assert_equal "DiscardingGroup/JobResult/#{@result.id}", active_job.concurrency_key
end

assert_not DiscardedNonOverlappingGroupedJob2.perform_later(@result, name: "B") do |blocked_active_job|
assert_not blocked_active_job.successfully_enqueued?
assert_equal 1, blocked_active_job.concurrency_limit
assert_equal "DiscardingGroup/JobResult/#{@result.id}", blocked_active_job.concurrency_key
end
end
end

test "enqueue multiple jobs" do
active_jobs = [
AddToBufferJob.new(2),
Expand Down Expand Up @@ -249,13 +357,15 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
test "raise EnqueueError when there's an ActiveRecordError" do
SolidQueue::Job.stubs(:create!).raises(ActiveRecord::Deadlocked)

active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test")
assert_raises SolidQueue::Job::EnqueueError do
active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test")
SolidQueue::Job.enqueue(active_job)
end

assert_raises SolidQueue::Job::EnqueueError do
AddToBufferJob.perform_later(1)
# #perform_later doesn't raise ActiveJob::EnqueueError, and instead set's successfully_enqueued? to false
assert_not AddToBufferJob.perform_later(1) do |active_job|
assert_not active_job.successfully_enqueued?
assert_equal SolidQueue::Job::EnqueueError, active_job.enqueue_error.class
end
end

Expand Down Expand Up @@ -291,8 +401,12 @@ def assert_blocked(&block)
assert SolidQueue::Job.last.blocked?
end

def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, &block)
assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked) do
def assert_discarded(&block)
assert_job_counts(discarded: 1, &block)
end

def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, discarded: 0, &block)
assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked + discarded) do
assert_difference -> { SolidQueue::ReadyExecution.count }, +ready do
assert_difference -> { SolidQueue::ScheduledExecution.count }, +scheduled do
assert_difference -> { SolidQueue::BlockedExecution.count }, +blocked, &block
Expand Down
Loading