Skip to content

Allow duplicate jobs to be discarded #586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/models/solid_queue/job/concurrency_controls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module ConcurrencyControls
included do
has_one :blocked_execution

delegate :concurrency_limit, :concurrency_duration, to: :job_class
delegate :concurrency_limit, :concurrency_duration, :concurrency_on_conflict, to: :job_class

before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? }
end
Expand Down
11 changes: 10 additions & 1 deletion app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ def prepare_for_execution
def dispatch
if acquire_concurrency_lock then ready
else
block
case job_class.concurrency_on_conflict
when :discard
discard_on_conflict
else
block
end
end
end

Expand Down Expand Up @@ -104,6 +109,10 @@ def ready
ReadyExecution.create_or_find_by!(job_id: id)
end

def discard_on_conflict
finished!
end

def execution
%w[ ready claimed failed ].reduce(nil) { |acc, status| acc || public_send("#{status}_execution") }
end
Expand Down
4 changes: 3 additions & 1 deletion lib/active_job/concurrency_controls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ module ConcurrencyControls

class_attribute :concurrency_limit
class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period
class_attribute :concurrency_on_conflict, default: :block
end

class_methods do
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period)
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block)
self.concurrency_key = key
self.concurrency_limit = to
self.concurrency_group = group
self.concurrency_duration = duration
self.concurrency_on_conflict = on_conflict
end
end

Expand Down
7 changes: 7 additions & 0 deletions test/dummy/app/jobs/discard_on_conflict_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class DiscardOnConflictJob < ApplicationJob
limits_concurrency to: 1, key: ->(value) { value }, on_conflict: :discard

def perform(value)
Rails.logger.info "Performing DiscardOnConflictJob with value: #{value}"
end
end
8 changes: 8 additions & 0 deletions test/dummy/app/jobs/limited_discard_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class LimitedDiscardJob < ApplicationJob
limits_concurrency to: 2, key: ->(group, id) { group }, on_conflict: :discard

def perform(group, id)
Rails.logger.info "Performing LimitedDiscardJob with group: #{group}, id: #{id}"
sleep 0.1
end
end
137 changes: 137 additions & 0 deletions test/integration/concurrency_discard_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# frozen_string_literal: true

require "test_helper"

class ConcurrencyDiscardTest < ActiveSupport::TestCase
setup do
@job_result = JobResult.create!(queue_name: "default", status: "test")
end

test "discard jobs when concurrency limit is reached with on_conflict: :discard" do
# Enqueue first job - should be executed
job1 = DiscardOnConflictJob.perform_later(@job_result.id)

# Enqueue second job - should be discarded due to concurrency limit
job2 = DiscardOnConflictJob.perform_later(@job_result.id)

# Enqueue third job - should also be discarded
job3 = DiscardOnConflictJob.perform_later(@job_result.id)

# Check that first job was ready
solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id)
assert solid_job1.ready?
assert solid_job1.ready_execution.present?

# Check that second and third jobs were discarded
solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id)
assert solid_job2.finished?
assert_nil solid_job2.ready_execution
assert_nil solid_job2.blocked_execution

solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id)
assert solid_job3.finished?
assert_nil solid_job3.ready_execution
assert_nil solid_job3.blocked_execution
end

test "block jobs when concurrency limit is reached without on_conflict option" do
# Using SequentialUpdateResultJob which has default blocking behavior
# Enqueue first job - should be executed
job1 = SequentialUpdateResultJob.perform_later(@job_result, name: "A")

# Enqueue second job - should be blocked due to concurrency limit
job2 = SequentialUpdateResultJob.perform_later(@job_result, name: "B")

# Check that second job is blocked
solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id)
assert solid_job2.blocked?
assert solid_job2.blocked_execution.present?
end

test "respect concurrency limit with discard option" do
# Enqueue jobs with limit of 2
job1 = LimitedDiscardJob.perform_later("group1", 1)
job2 = LimitedDiscardJob.perform_later("group1", 2)
job3 = LimitedDiscardJob.perform_later("group1", 3) # Should be discarded
job4 = LimitedDiscardJob.perform_later("group1", 4) # Should be discarded

# Check that first two jobs are ready
solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id)
solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id)
assert solid_job1.ready?
assert solid_job2.ready?

# Check that third and fourth jobs are discarded
solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id)
solid_job4 = SolidQueue::Job.find_by(active_job_id: job4.job_id)
assert solid_job3.finished?
assert solid_job4.finished?
assert_nil solid_job3.ready_execution
assert_nil solid_job4.ready_execution
end

test "discard option works with different concurrency keys" do
# These should not conflict because they have different keys
job1 = DiscardOnConflictJob.perform_later("key1")
job2 = DiscardOnConflictJob.perform_later("key2")
job3 = DiscardOnConflictJob.perform_later("key1") # Should be discarded

# Check that first two jobs are ready (different keys)
solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id)
solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id)
assert solid_job1.ready?
assert solid_job2.ready?

# Check that third job is discarded (same key as first)
solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id)
assert solid_job3.finished?
assert_nil solid_job3.ready_execution
end

test "discarded jobs do not unblock other jobs" do
# Enqueue a job that will be executed
job1 = DiscardOnConflictJob.perform_later(@job_result.id)

# Enqueue a job that will be discarded
job2 = DiscardOnConflictJob.perform_later(@job_result.id)

# The first job should be ready
solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id)
assert solid_job1.ready?

# The second job should be discarded immediately
solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id)
assert solid_job2.finished?

# Complete the first job and release its lock
solid_job1.unblock_next_blocked_job
solid_job1.finished!

# Enqueue another job - it should be ready since the lock is released
job3 = DiscardOnConflictJob.perform_later(@job_result.id)
solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id)
assert solid_job3.ready?
end

test "discarded jobs are marked as finished without execution" do
# Enqueue a job that will be ready
job1 = DiscardOnConflictJob.perform_later("test_key")

# Enqueue a job that will be discarded
job2 = DiscardOnConflictJob.perform_later("test_key")

solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id)
solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id)

# First job should be ready
assert solid_job1.ready?
assert solid_job1.ready_execution.present?

# Second job should be finished without any execution
assert solid_job2.finished?
assert_nil solid_job2.ready_execution
assert_nil solid_job2.claimed_execution
assert_nil solid_job2.failed_execution
assert_nil solid_job2.blocked_execution
end
end
83 changes: 83 additions & 0 deletions test/models/solid_queue/job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup"
end

class NonOverlappingDiscardJob < ApplicationJob
limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard

def perform(job_result)
end
end

setup do
@result = JobResult.create!(queue_name: "default")
end
Expand Down Expand Up @@ -45,6 +52,82 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
assert_equal 8, execution.priority
end

test "enqueue jobs with on_conflict discard" do
# First job should be ready
active_job1 = NonOverlappingDiscardJob.new(@result)
assert_ready do
SolidQueue::Job.enqueue(active_job1)
end
job1 = SolidQueue::Job.find_by(active_job_id: active_job1.job_id)
assert job1.ready?

# Second job should be discarded (finished without execution)
active_job2 = NonOverlappingDiscardJob.new(@result)
assert_no_difference -> { SolidQueue::ReadyExecution.count } do
assert_no_difference -> { SolidQueue::BlockedExecution.count } do
SolidQueue::Job.enqueue(active_job2)
end
end
job2 = SolidQueue::Job.find_by(active_job_id: active_job2.job_id)

assert job2.finished?
assert_nil job2.ready_execution
assert_nil job2.blocked_execution
assert_nil job2.claimed_execution
assert_nil job2.failed_execution

# Third job with same key should also be discarded
active_job3 = NonOverlappingDiscardJob.new(@result)
assert_no_difference -> { SolidQueue::ReadyExecution.count } do
SolidQueue::Job.enqueue(active_job3)
end
job3 = SolidQueue::Job.find_by(active_job_id: active_job3.job_id)

assert job3.finished?
end

test "compare blocking vs discard behavior" do
# Test default blocking behavior
blocking_job1 = NonOverlappingJob.new(@result)
assert_ready do
SolidQueue::Job.enqueue(blocking_job1)
end
job1 = SolidQueue::Job.find_by(active_job_id: blocking_job1.job_id)
assert job1.ready?

# Second job should be blocked (not discarded)
blocking_job2 = NonOverlappingJob.new(@result)
assert_difference -> { SolidQueue::BlockedExecution.count }, +1 do
SolidQueue::Job.enqueue(blocking_job2)
end
job2 = SolidQueue::Job.find_by(active_job_id: blocking_job2.job_id)
assert job2.blocked?
assert job2.blocked_execution.present?
assert_not job2.finished?

# Clean up for discard test
SolidQueue::Job.destroy_all
SolidQueue::Semaphore.destroy_all

# Test discard behavior
discard_job1 = NonOverlappingDiscardJob.new(@result)
assert_ready do
SolidQueue::Job.enqueue(discard_job1)
end
job3 = SolidQueue::Job.find_by(active_job_id: discard_job1.job_id)
assert job3.ready?

# Second job should be discarded (not blocked)
discard_job2 = NonOverlappingDiscardJob.new(@result)
assert_no_difference -> { SolidQueue::BlockedExecution.count } do
SolidQueue::Job.enqueue(discard_job2)
end
job4 = SolidQueue::Job.find_by(active_job_id: discard_job2.job_id)
assert job4.finished?
assert_nil job4.blocked_execution
assert_nil job4.ready_execution
end

test "enqueue active job to be scheduled in the future" do
active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test")

Expand Down
Loading
Loading