Skip to content

Commit

Permalink
Make deletion compatible with redis-namespace (mhenrixon#452)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhenrixon authored Nov 26, 2019
1 parent 83827ed commit 1b5466f
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 115 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,8 @@ For sidekiq versions before 5.1 a `sidekiq_retries_exhausted` block is required
```ruby
class MyWorker
sidekiq_retries_exhausted do |msg, _ex|
SidekiqUniqueJobs::Digests.del(digest: msg['unique_digest']) if msg['unique_digest']
digest = msg['unique_digest']
SidekiqUniqueJobs::Digests.delete_by_digest(digest) if digest
end
end
```
Expand All @@ -612,7 +613,8 @@ Starting in v5.1, Sidekiq can also fire a global callback when a job dies:
# this goes in your initializer
Sidekiq.configure_server do |config|
config.death_handlers << ->(job, _ex) do
SidekiqUniqueJobs::Digests.del(digest: job['unique_digest']) if job['unique_digest']
digest = msg['unique_digest']
SidekiqUniqueJobs::Digests.delete_by_digest(digest) if digest
end
end
```
Expand Down
16 changes: 10 additions & 6 deletions lib/sidekiq_unique_jobs/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ def self.banner(command, _namespace = nil, _subcommand = false)
desc "list PATTERN", "list all unique digests and their expiry time"
option :count, aliases: :c, type: :numeric, default: 1000, desc: "The max number of digests to return"
def list(pattern = "*")
digests = SidekiqUniqueJobs::Digests.new.entries(pattern: pattern, count: options[:count])
say "Found #{digests.size} digests matching '#{pattern}':"
print_in_columns(digests.sort) if digests.any?
entries = digests.entries(pattern: pattern, count: options[:count])
say "Found #{entries.size} digests matching '#{pattern}':"
print_in_columns(entries.sort) if entries.any?
end

desc "del PATTERN", "deletes unique digests from redis by pattern"
Expand All @@ -27,10 +27,10 @@ def list(pattern = "*")
def del(pattern)
max_count = options[:count]
if options[:dry_run]
digests = SidekiqUniqueJobs::Digests.new.entries(pattern: pattern, count: max_count)
say "Would delete #{digests.size} digests matching '#{pattern}'"
result = digests.entries(pattern: pattern, count: max_count)
say "Would delete #{result.size} digests matching '#{pattern}'"
else
deleted_count = SidekiqUniqueJobs::Digests.new.del(pattern: pattern, count: max_count)
deleted_count = digests.delete_by_pattern(pattern, count: max_count)
say "Deleted #{deleted_count} digests matching '#{pattern}'"
end
end
Expand All @@ -46,6 +46,10 @@ def console
end

no_commands do
def digests
@digests ||= SidekiqUniqueJobs::Digests.new
end

def console_class
require "pry"
Pry
Expand Down
82 changes: 34 additions & 48 deletions lib/sidekiq_unique_jobs/digests.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,44 @@ def add(digest)
redis { |conn| conn.zadd(key, now_f, digest) }
end

# Deletes unique digests by pattern
#
# Deletes unique digest either by a digest or pattern
#
# @overload call_script(digest: "abcdefab")
# Call script with digest
# @param [String] digest: a digest to delete
# @overload call_script(pattern: "*", count: 1_000)
# Call script with pattern
# @param [String] pattern: "*" a pattern to match
# @param [String] count: DEFAULT_COUNT the number of keys to delete
#
# @raise [ArgumentError] when given neither pattern nor digest
#
# @param [String] pattern a key pattern to match with
# @param [Integer] count the maximum number
# @return [Array<String>] with unique digests
def delete_by_pattern(pattern, count: DEFAULT_COUNT)
result, elapsed = timed do
digests = entries(pattern: pattern, count: count).keys
redis { |conn| BatchDelete.call(digests, conn) }
end

log_info("#{__method__}(#{pattern}, count: #{count}) completed in #{elapsed}ms")

result
end

# Delete unique digests by digest
# Also deletes the :AVAILABLE, :EXPIRED etc keys
#
def del(digest: nil, pattern: nil, count: DEFAULT_COUNT)
return delete_by_pattern(pattern, count: count) if pattern
return delete_by_digest(digest) if digest
# @param [String] digest a unique digest to delete
def delete_by_digest(digest) # rubocop:disable Metrics/MethodLength
result, elapsed = timed do
call_script(:delete_by_digest, [
digest,
"#{digest}:QUEUED",
"#{digest}:PRIMED",
"#{digest}:LOCKED",
"#{digest}:RUN",
"#{digest}:RUN:QUEUED",
"#{digest}:RUN:PRIMED",
"#{digest}:RUN:LOCKED",
key,
])
end

log_info("#{__method__}(#{digest}) completed in #{elapsed}ms")

raise ArgumentError, "##{__method__} requires either a :digest or a :pattern"
result
end

#
Expand Down Expand Up @@ -92,37 +110,5 @@ def page(cursor: 0, pattern: SCAN_PATTERN, page_size: 100)
]
end
end

private

# Deletes unique digests by pattern
#
# @param [String] pattern a key pattern to match with
# @param [Integer] count the maximum number
# @return [Array<String>] with unique digests
def delete_by_pattern(pattern, count: DEFAULT_COUNT)
result, elapsed = timed do
digests = entries(pattern: pattern, count: count).keys
redis { |conn| BatchDelete.call(digests, conn) }
end

log_info("#{__method__}(#{pattern}, count: #{count}) completed in #{elapsed}ms")

result
end

# Delete unique digests by digest
# Also deletes the :AVAILABLE, :EXPIRED etc keys
#
# @param [String] digest a unique digest to delete
def delete_by_digest(digest)
result, elapsed = timed do
call_script(:delete_by_digest, [digest, key])
end

log_info("#{__method__}(#{digest}) completed in #{elapsed}ms")

result
end
end
end
22 changes: 9 additions & 13 deletions lib/sidekiq_unique_jobs/lua/delete_by_digest.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
-------- BEGIN keys ---------
local digest = KEYS[1]
local digests = KEYS[2]
local digest = KEYS[1]
local queued = KEYS[2]
local primed = KEYS[3]
local locked = KEYS[4]
local run_digest = KEYS[5]
local run_queued = KEYS[6]
local run_primed = KEYS[7]
local run_locked = KEYS[8]
local digests = KEYS[9]
-------- END keys ---------

-------- BEGIN injected arguments --------
Expand All @@ -15,17 +22,6 @@ local redisversion = tostring(ARGV[5])
<%= include_partial "shared/_common.lua" %>
---------- END local functions ----------

-------- BEGIN Variables --------
local queued = digest .. ":QUEUED"
local primed = digest .. ":PRIMED"
local locked = digest .. ":LOCKED"
local run_digest = digest .. ":RUN"
local run_queued = digest .. ":RUN:QUEUED"
local run_primed = digest .. ":RUN:PRIMED"
local run_locked = digest .. ":RUN:LOCKED"
-------- END Variables --------


-------- BEGIN delete_by_digest.lua --------
local counter = 0
local redis_version = toversion(redisversion)
Expand Down
6 changes: 5 additions & 1 deletion lib/sidekiq_unique_jobs/on_conflict/replace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ def delete_job_by_digest
# @return [Integer] the number of keys deleted
#
def delete_lock
call_script(:delete_by_digest, keys: [unique_digest, DIGESTS])
digests.delete_by_digest(unique_digest)
end

def digests
@digests ||= SidekiqUniqueJobs::Digests.new
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/sidekiq_unique_jobs/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
end

app.get "/locks/delete_all" do
digests.del(pattern: "*", count: digests.count)
digests.delete_by_pattern("*", count: digests.count)
redirect_to :locks
end

Expand All @@ -44,7 +44,7 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
end

app.get "/locks/:digest/delete" do
digests.del(digest: params[:digest])
digests.delete_by_digest(params[:digest])
redirect_to :locks
end

Expand Down
1 change: 0 additions & 1 deletion myapp/.rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ inherit_from: ../.rubocop.yml

require:
- rubocop-performance
- rubocop-rails
- rubocop-rspec

inherit_mode:
Expand Down
3 changes: 2 additions & 1 deletion myapp/config/initializers/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
config.error_handlers << Proc.new {|ex,ctx_hash| p ex, ctx_hash }

config.death_handlers << ->(job, _ex) do
SidekiqUniqueJobs::Digests.del(digest: job['unique_digest']) if job['unique_digest']
digest = job['unique_digest']
SidekiqUniqueJobs::Digests.delete_by_digest(digest) if digest
end

# # accepts :expiration (optional)
Expand Down
69 changes: 32 additions & 37 deletions spec/sidekiq_unique_jobs/digests_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,57 +30,52 @@
it { is_expected.to match_array(expected_keys) }
end

describe "#del" do
subject(:del) { digests.del(digest: digest, pattern: pattern, count: count) }
describe "#delete_by_digest" do
subject(:delete_by_digest) { digests.delete_by_digest(digest) }

let(:digest) { nil }
let(:pattern) { nil }
let(:count) { 1000 }
let(:digest) { "uniquejobs:62c11d32fd69c691802579682409a483" }

before do
allow(digests).to receive(:log_info)
end

context "when given nothing" do
let(:digest) { nil }
let(:pattern) { nil }
it "deletes just the specific digest" do
expect { delete_by_digest }.to change { digests.entries.size }.by(-1)
end

it "logs performance info" do
delete_by_digest

it { expect { del }.to raise_error(ArgumentError, "#del requires either a :digest or a :pattern") }
expect(digests).to have_received(:log_info)
.with(
a_string_starting_with("delete_by_digest(#{digest})")
.and(matching(/completed in (\d+(\.\d+)?)ms/)),
)
end
end

context "when given a pattern" do
let(:pattern) { "*" }
describe "#delete_by_pattern" do
subject(:delete_by_pattern) { digests.delete_by_pattern(pattern, count: count) }

it "deletes all matching digests" do
expect(del).to be_a(Integer)
expect(digests.entries).to match_array([])
end
let(:pattern) { "*" }
let(:count) { 1000 }

it "logs performance info" do
del
expect(digests)
.to have_received(:log_info).with(
a_string_starting_with("delete_by_pattern(*, count: 1000)")
.and(matching(/completed in (\d+(\.\d+)?)ms/)),
)
end
before do
allow(digests).to receive(:log_info)
end

context "when given a digest" do
let(:digest) { "uniquejobs:62c11d32fd69c691802579682409a483" }

it "deletes just the specific digest" do
expect { del }.to change { digests.entries.size }.by(-1)
end
it "deletes all matching digests" do
expect(delete_by_pattern).to be_a(Integer)
expect(digests.entries).to match_array([])
end

it "logs performance info" do
del
expect(digests).to have_received(:log_info)
.with(
a_string_starting_with("delete_by_digest(#{digest})")
.and(matching(/completed in (\d+(\.\d+)?)ms/)),
)
end
it "logs performance info" do
delete_by_pattern
expect(digests)
.to have_received(:log_info).with(
a_string_starting_with("delete_by_pattern(*, count: 1000)")
.and(matching(/completed in (\d+(\.\d+)?)ms/)),
)
end
end
end
21 changes: 17 additions & 4 deletions spec/sidekiq_unique_jobs/lua/delete_by_digest_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,21 @@
require "spec_helper"

RSpec.describe "delete_by_digest.lua" do
subject(:delete_by_digest) { call_script(:delete_by_digest, [digest, SidekiqUniqueJobs::DIGESTS]) }
subject(:delete_by_digest) { call_script(:delete_by_digest, keys) }

let(:keys) do
[
key.digest,
key.queued,
key.primed,
key.locked,
run_key.digest,
run_key.queued,
run_key.primed,
run_key.locked,
SidekiqUniqueJobs::DIGESTS,
]
end
let(:job_id) { "jobid" }
let(:digest) { "uniquejobs:digest" }
let(:key) { SidekiqUniqueJobs::Key.new(digest) }
Expand All @@ -14,9 +27,9 @@
let(:locked) { redlock.locked }
let(:run_key) { SidekiqUniqueJobs::Key.new("#{digest}:RUN") }
let(:run_redlock) { SidekiqUniqueJobs::Lock.new(run_key) }
let(:run_queued) { redlock.queued }
let(:run_primed) { redlock.primed }
let(:run_locked) { redlock.locked }
let(:run_queued) { run_redlock.queued }
let(:run_primed) { run_redlock.primed }
let(:run_locked) { run_redlock.locked }
let(:lock_ttl) { nil }
let(:lock_type) { :until_executed }
let(:lock_limit) { 1 }
Expand Down

0 comments on commit 1b5466f

Please sign in to comment.