Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enqueue awaited job #42

Merged
merged 6 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions lib/acidic_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,9 @@ def self.wire_everything_up(klass)
# TODO: write test for a staged job that uses awaits
klass.set_callback :perform, :after, :reenqueue_awaited_by_job,
if: -> { was_awaited_job? && !was_workflow_job? }
klass.set_callback :perform, :after, :delete_staged_job_record,
if: -> { was_staged_job? && !was_awaited_job? }
klass.define_callbacks :finish
klass.set_callback :finish, :after, :reenqueue_awaited_by_job,
if: -> { was_workflow_job? && was_awaited_job? }
klass.set_callback :finish, :after, :delete_staged_job_record,
if: -> { was_workflow_job? && was_staged_job? && !was_awaited_job? }

klass.instance_variable_set(:@acidic_identifier, :job_id)
klass.define_singleton_method(:acidic_by_job_id) { @acidic_identifier = :job_id }
Expand Down
7 changes: 3 additions & 4 deletions lib/acidic_job/awaiting.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ def reenqueue_awaited_by_job

return if run.finished?

# job = job_class.constantize.deserialize(serialized_staged_job)
# job.enqueue

# when a batch of jobs for a step succeeds, we begin processing the `AcidicJob::Run` record again
process_run(run)
# process_run(run)
run.update_column(:locked_at, nil)
job.enqueue
end

def enqueue_step_parallel_jobs(jobs_or_jobs_getter, run, step_result)
Expand Down
2 changes: 1 addition & 1 deletion lib/acidic_job/extensions/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def enqueue
::Sidekiq::Client.push(
"class" => self.class,
"args" => @args,
"jid" => @jid
"jid" => @acidic_job_run&.staged_job_id || @jid
)
end
end
Expand Down
15 changes: 9 additions & 6 deletions lib/acidic_job/run.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,19 @@ def failed?
error_object.present?
end

private

def enqueue_staged_job
return unless staged?

def staged_job_id
# encode the identifier for this record in the job ID
# base64 encoding for minimal security
global_id = to_global_id.to_s.remove("gid://")
encoded_global_id = Base64.encode64(global_id).strip
staged_job_id = "STG__#{idempotency_key}__#{encoded_global_id}"

"STG__#{idempotency_key}__#{encoded_global_id}"
end

private

def enqueue_staged_job
return unless staged?

serialized_staged_job = if serialized_job.key?("jid")
serialized_job.merge("jid" => staged_job_id)
Expand Down
9 changes: 0 additions & 9 deletions lib/acidic_job/staging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,6 @@ module Staging

private

def delete_staged_job_record
return unless was_staged_job?

staged_job_run.delete
true
rescue ActiveRecord::RecordNotFound
true
end

def was_staged_job?
identifier.start_with? "STG__"
end
Expand Down
14 changes: 7 additions & 7 deletions test/acidic_job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_passes_for_a_new_key
assert_equal 1, AcidicJob::Run.unstaged.count
assert_equal 1, Ride.count
assert_equal 1, Audit.count
assert_equal 0, AcidicJob::Run.staged.count
assert_equal 1, AcidicJob::Run.staged.count
end

def test_returns_a_stored_result
Expand Down Expand Up @@ -111,7 +111,7 @@ def test_passes_for_keys_that_are_unlocked
assert_equal 1, AcidicJob::Run.unstaged.count
assert_equal 1, Ride.count
assert_equal 1, Audit.count
assert_equal 0, AcidicJob::Run.staged.count
assert_equal 1, AcidicJob::Run.staged.count
end

def test_passes_for_keys_with_a_stale_locked_at
Expand All @@ -128,7 +128,7 @@ def test_passes_for_keys_with_a_stale_locked_at
assert_equal 1, AcidicJob::Run.unstaged.count
assert_equal 1, Ride.count
assert_equal 1, Audit.count
assert_equal 0, AcidicJob::Run.staged.count
assert_equal 1, AcidicJob::Run.staged.count
end

def test_stores_results_for_a_permanent_failure
Expand Down Expand Up @@ -185,7 +185,7 @@ def test_continues_from_recovery_point_create_ride_and_audit_record
assert_equal 1, AcidicJob::Run.unstaged.count
assert_equal 1, Ride.count
assert_equal 1, Audit.count
assert_equal 0, AcidicJob::Run.staged.count
assert_equal 1, AcidicJob::Run.staged.count
assert_equal key.attr_accessors,
{ "user_id" => @valid_user.id, "params" => @valid_params, "ride" => Ride.first }
end
Expand All @@ -207,7 +207,7 @@ def test_continues_from_recovery_point_create_stripe_charge
assert_equal 1, AcidicJob::Run.unstaged.count
assert_equal 1, Ride.count
assert_equal 0, Audit.count
assert_equal 0, AcidicJob::Run.staged.count
assert_equal 1, AcidicJob::Run.staged.count
assert_equal key.attr_accessors,
{ "user_id" => @valid_user.id, "params" => @valid_params, "ride" => Ride.first }
end
Expand All @@ -226,7 +226,7 @@ def test_continues_from_recovery_point_send_receipt
assert_equal 1, AcidicJob::Run.unstaged.count
assert_equal 0, Ride.count
assert_equal 0, Audit.count
assert_equal 0, AcidicJob::Run.staged.count
assert_equal 1, AcidicJob::Run.staged.count
assert_equal key.attr_accessors,
{ "user_id" => @valid_user.id, "params" => @valid_params, "ride" => nil }
end
Expand All @@ -245,7 +245,7 @@ def test_halts_execution_of_steps_when_safely_finish_acidic_job_returned
assert_equal 1, AcidicJob::Run.unstaged.count
assert_equal 0, Ride.count
assert_equal 0, Audit.count
assert_equal 0, AcidicJob::Run.staged.count
assert_equal 1, AcidicJob::Run.staged.count
assert_equal key.attr_accessors,
{ "user_id" => @valid_user.id, "params" => @valid_params, "ride" => nil }
end
Expand Down
12 changes: 6 additions & 6 deletions test/acidic_worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_passes_for_a_new_key
assert_equal 1, AcidicJob::Run.unstaged.count
assert_equal 1, Ride.count
assert_equal 1, Audit.count
assert_equal 0, AcidicJob::Run.staged.count
assert_equal 1, AcidicJob::Run.staged.count
end

def test_returns_a_stored_result
Expand Down Expand Up @@ -107,7 +107,7 @@ def test_passes_for_keys_that_are_unlocked
assert_equal 1, AcidicJob::Run.unstaged.count
assert_equal 1, Ride.count
assert_equal 1, Audit.count
assert_equal 0, AcidicJob::Run.staged.count
assert_equal 1, AcidicJob::Run.staged.count
end

def test_passes_for_keys_with_a_stale_locked_at
Expand All @@ -124,7 +124,7 @@ def test_passes_for_keys_with_a_stale_locked_at
assert_equal 1, AcidicJob::Run.unstaged.count
assert_equal 1, Ride.count
assert_equal 1, Audit.count
assert_equal 0, AcidicJob::Run.staged.count
assert_equal 1, AcidicJob::Run.staged.count
end

def test_stores_results_for_a_permanent_failure
Expand Down Expand Up @@ -160,7 +160,7 @@ def test_continues_from_recovery_point_create_ride_and_audit_record
assert_equal 1, AcidicJob::Run.unstaged.count
assert_equal 1, Ride.count
assert_equal 1, Audit.count
assert_equal 0, AcidicJob::Run.staged.count
assert_equal 1, AcidicJob::Run.staged.count
end

def test_continues_from_recovery_point_create_stripe_charge
Expand All @@ -180,7 +180,7 @@ def test_continues_from_recovery_point_create_stripe_charge
assert_equal 1, AcidicJob::Run.unstaged.count
assert_equal 1, Ride.count
assert_equal 0, Audit.count
assert_equal 0, AcidicJob::Run.staged.count
assert_equal 1, AcidicJob::Run.staged.count
end

def test_continues_from_recovery_point_send_receipt
Expand All @@ -197,7 +197,7 @@ def test_continues_from_recovery_point_send_receipt
assert_equal 1, AcidicJob::Run.unstaged.count
assert_equal 0, Ride.count
assert_equal 0, Audit.count
assert_equal 0, AcidicJob::Run.staged.count
assert_equal 1, AcidicJob::Run.staged.count
end
end

Expand Down
32 changes: 32 additions & 0 deletions test/workflow_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -345,4 +345,36 @@ def dynamic_awaiting
assert_equal 1, retry_set.size
assert_equal ["ErroringDynamicAwaitFromStringWorker"], retry_set.map { _1.item["class"] }
end

def test_step_with_awaits_followed_by_another_step_is_run_properly
dynamic_class = Class.new(ApplicationWorker) do
dynamic_step_job = Class.new(ApplicationWorker) do
def perform; end
end
Object.const_set("SimpleAwaitedWorker", dynamic_step_job)

def perform
with_acidity providing: {} do
step :await_step, awaits: [SimpleAwaitedWorker]
step :do_something
end
end

def do_something; end
end
Object.const_set("WorkerWithAwaitStepFollowedByAnotherStep", dynamic_class)

WorkerWithAwaitStepFollowedByAnotherStep.new.perform
Sidekiq::Worker.drain_all

assert_equal 2, AcidicJob::Run.count

parent_run = AcidicJob::Run.find_by(job_class: "WorkerWithAwaitStepFollowedByAnotherStep")
assert_equal "FINISHED", parent_run.recovery_point

child_run = AcidicJob::Run.find_by(job_class: "SimpleAwaitedWorker")
assert_nil child_run.recovery_point

assert_equal 0, Sidekiq::RetrySet.new.size
end
end