Skip to content

Commit

Permalink
Ensure batch delete removes expiring locks
Browse files Browse the repository at this point in the history
  • Loading branch information
francesmcmullin committed Jul 1, 2022
1 parent 10d08bd commit f112f1d
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 1 deletion.
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/batch_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def batch_delete(conn)
chunk.each do |digest|
del_digest(pipeline, digest)
pipeline.zrem(SidekiqUniqueJobs::DIGESTS, digest)
pipeline.zrem(SidekiqUniqueJobs::EXPIRING_DIGESTS, digest)
@count += 1
end
end
Expand Down
19 changes: 18 additions & 1 deletion lib/sidekiq_unique_jobs/lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def lock(job_id, lock_info = {})
pipeline.set(key.digest, job_id)
pipeline.hset(key.locked, job_id, now_f)
info.set(lock_info, pipeline)
pipeline.zadd(key.digests, now_f, key.digest)
add_digest_to_set(pipeline, lock_info)
pipeline.zadd(key.changelog, now_f, changelog_json(job_id, "queue.lua", "Queued"))
pipeline.zadd(key.changelog, now_f, changelog_json(job_id, "lock.lua", "Locked"))
end
Expand Down Expand Up @@ -321,5 +321,22 @@ def changelog_json(job_id, script, message)
time: now_f,
)
end

#
# Add the digest to the correct sorted set
#
# @param [Object] pipeline a redis pipeline object for issue commands
# @param [Hash] lock_info the lock info relevant to the digest
#
# @return [nil]
#
def add_digest_to_set(pipeline, lock_info)
digest_string = key.digest
if lock_info["lock"] == :until_expired
pipeline.zadd(key.expiring_digests, now_f + lock_info["ttl"], digest_string)
else
pipeline.zadd(key.digests, now_f, digest_string)
end
end
end
end
23 changes: 23 additions & 0 deletions spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,28 @@
expect(service).not_to have_received(:orphans)
end
end

context "when a lock is until_expired" do
let(:lock_info) do
{
"job_id" => job_id,
"limit" => 1,
"lock" => :until_expired,
"time" => now_f,
"timeout" => nil,
"ttl" => 1,
"lock_args" => [],
"worker" => "MyUniqueJob",
}
end

it "clears the lock" do
expect(redis { |conn| conn.zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS) }).to eq 1
sleep 2
service.call

expect(redis { |conn| conn.zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS) }).to eq 0
end
end
end
end

0 comments on commit f112f1d

Please sign in to comment.