Skip to content

Supporting Sidekiq Pro Reliable Queueing #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ language: ruby
rvm:
- 2.0.0
- 1.9.3
services:
- redis-server
15 changes: 11 additions & 4 deletions lib/sidekiq/priority.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ def self.priorities=(priorities)
end

def self.queue_with_priority(queue, priority)
priority.nil? ? queue : "#{queue}_#{priority}"
priority && self.priorities.include?(priority) ? "#{queue}_#{priority}" : queue
end
end
end

Sidekiq.configure_server do |config|
if defined? Rails
class ConfigureServer < Rails::Railtie
config.after_initialize do
require "#{File.dirname(File.absolute_path(__FILE__))}/priority/server/fetch.rb"
Sidekiq::Priority::Server.configure_priority_fetch
end
end
else
require "#{directory}/priority/server/fetch.rb"
Sidekiq.options[:fetch] = Sidekiq::Priority::Server::Fetch
end
Sidekiq::Priority::Server.configure_priority_fetch
end
89 changes: 75 additions & 14 deletions lib/sidekiq/priority/server/fetch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,87 @@
module Sidekiq
module Priority
module Server
class Fetch < Sidekiq::BasicFetch
def initialize(options)
queues = prioritized_queues(options[:queues])
@strictly_ordered_queues = !!options[:strict]
@queues = queues.map { |q| "queue:#{q}" }
@unique_queues = @queues.uniq

def self.configure_priority_fetch
Sidekiq.configure_server do |config|
Sidekiq.options[:fetch] = Sidekiq::Priority::Server.priority_fetch_class
end
end

protected
def self.priority_fetch_class
if defined? Sidekiq::Pro::ReliableFetch
reliable_priority_fetch_class
else
basic_priority_fetch_class
end
end

def prioritized_queues(base_queues)
queues = []
priorities = Sidekiq::Priority.priorities
priorities.each do |priority|
base_queues.each do |queue|
queues << Sidekiq::Priority.queue_with_priority(queue, priority)
def self.prioritized_queues(base_queues)
queues = []
priorities = Sidekiq::Priority.priorities
priorities.each do |priority|
base_queues.each do |queue|
queues << Sidekiq::Priority.queue_with_priority(queue, priority)
end
end
queues
end


def self.reliable_priority_fetch_class
return Sidekiq::Priority::Server::ReliableFetch if defined?(Sidekiq::Priority::Server::ReliableFetch)

Sidekiq::Priority::Server.const_set('ReliableFetch', Class.new(Sidekiq::Pro::ReliableFetch) do
def initialize(options)
@queues = prioritized_queues(options[:queues]).map {|q| ["queue:#{q}", "queue:#{q}_#{Socket.gethostname}_#{options[:index]}"] }
@algo = (@queues.length == @queues.uniq.length ? Sidekiq::Pro::ReliableFetch::Strict : Sidekiq::Pro::ReliableFetch::Weighted)
@internal = Sidekiq.redis do |conn|
bulk_reply = conn.pipelined do
@queues.each do |(_, working_queue)|
conn.lrange(working_queue, 0, -1)
end
end
memo = []
bulk_reply.each_with_index do |vals, i|
queue = @queues[i][0]
working_queue = @queues[i][1]
memo.unshift(*vals.map do |msg|
[queue, working_queue, msg]
end)
end
memo
end
Sidekiq.logger.warn("ReliableFetch: recovering work on #{@internal.size} messages") if @internal.size > 0
end

protected

def prioritized_queues(base_queues)
Sidekiq::Priority::Server.prioritized_queues(base_queues)
end
end
)
end

def self.basic_priority_fetch_class
return Sidekiq::Priority::Server::BasicFetch if defined?(Sidekiq::Priority::Server::BasicFetch)

Sidekiq::Priority::Server.const_set('BasicFetch', Class.new(Sidekiq::BasicFetch) do

def initialize(options)
queues = prioritized_queues(options[:queues])
@strictly_ordered_queues = !!options[:strict]
@queues = queues.map { |q| "queue:#{q}" }
@unique_queues = @queues.uniq
end

protected

def prioritized_queues(base_queues)
Sidekiq::Priority::Server.prioritized_queues(base_queues)
end
queues
end
)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion sidekiq-priority.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ Gem::Specification.new do |s|
s.add_development_dependency 'simplecov'
s.add_development_dependency 'simplecov-rcov'

s.add_dependency 'sidekiq', '>= 2.1.0'
s.add_dependency 'sidekiq', '~> 2.17.0'
end
68 changes: 38 additions & 30 deletions spec/sidekiq/priority/server/fetch_spec.rb
Original file line number Diff line number Diff line change
@@ -1,39 +1,47 @@
require 'spec_helper'
require 'sidekiq/priority/server/fetch'

describe Sidekiq::Priority::Server::Fetch do
describe Sidekiq::Priority::Server do
describe '#initialize' do
before :all do
options = {
queues: ['foo', 'bar', 'foo'],
strict: true
}
@fetch = Sidekiq::Priority::Server::Fetch.new(options)
end

it 'sets queues' do
@fetch.instance_variable_get(:@queues).should == [
'queue:foo_high',
'queue:bar_high',
'queue:foo_high',
'queue:foo',
'queue:bar',
'queue:foo',
'queue:foo_low',
'queue:bar_low',
'queue:foo_low'
]
end
context 'when sidekiq pro is not installed' do

before(:all) do
if defined? Sidekiq::Pro::ReliableFetch
Sidekiq::Pro.send(:remove_const, :ReliableFetch)
end
end

it 'should use the basic priority fetch class' do
expect(Sidekiq::Priority::Server.priority_fetch_class).to eq(Sidekiq::Priority::Server::BasicFetch)
end

let(:fetch) { Sidekiq::Priority::Server.basic_priority_fetch_class.new(queues: %w(foo bar foo), strict: true, index: 1) }

it 'sets queues' do
expect(fetch.instance_variable_get(:@queues)).to eq([
'queue:foo_high',
'queue:bar_high',
'queue:foo_high',
'queue:foo',
'queue:bar',
'queue:foo',
'queue:foo_low',
'queue:bar_low',
'queue:foo_low'
])
end

it 'sets unique_queues' do
@fetch.instance_variable_get(:@unique_queues).should == [
'queue:foo_high',
'queue:bar_high',
'queue:foo',
'queue:bar',
'queue:foo_low',
'queue:bar_low'
]
it 'sets unique_queues' do
expect(fetch.instance_variable_get(:@unique_queues)).to eq([
'queue:foo_high',
'queue:bar_high',
'queue:foo',
'queue:bar',
'queue:foo_low',
'queue:bar_low'
])
end
end
end
end
80 changes: 80 additions & 0 deletions spec/sidekiq/priority/server/reliable_fetch_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
require 'spec_helper'
require 'sidekiq/priority/server/fetch'

describe Sidekiq::Priority::Server do
describe '#initialize' do
context 'when sidekiq pro is installed' do

before(:all) do
module Sidekiq
module Pro
class ReliableFetch
class Strict
end
class Weighted
end
end
end
end
end

let(:fetch) { Sidekiq::Priority::Server.reliable_priority_fetch_class.new(queues: %w(foo bar), strict: true, index: 1)}

it 'should use the reliable priority fetch class' do
expect(Sidekiq::Priority::Server.priority_fetch_class).to eq(Sidekiq::Priority::Server::ReliableFetch)
end

it 'sets unique queues' do
require 'socket'
expected_list = [
'queue:foo_high',
'queue:bar_high',
'queue:foo',
'queue:bar',
'queue:foo_low',
'queue:bar_low'
].map{ |queue| [queue, "#{queue}_#{Socket.gethostname}_1"] }

expect(fetch.instance_variable_get(:@queues)).to eq(expected_list)
end

it 'sets the algorithm variable' do
expect(fetch.instance_variable_get(:@algo)).to eq(Sidekiq::Pro::ReliableFetch::Strict)
end

it 'sets the internal variable' do
expect(fetch.instance_variable_get(:@internal)).to eq([])
end

context 'and redundant queues are specified' do

let(:fetch) { Sidekiq::Priority::Server.reliable_priority_fetch_class.new(queues: %w(foo bar foo), strict: true, index: 1)}

it 'sets queues' do
require 'socket'
expected_list = [
'queue:foo_high',
'queue:bar_high',
'queue:foo_high',
'queue:foo',
'queue:bar',
'queue:foo',
'queue:foo_low',
'queue:bar_low',
'queue:foo_low'
].map{ |queue| [queue, "#{queue}_#{Socket.gethostname}_1"] }

expect(fetch.instance_variable_get(:@queues)).to eq(expected_list)
end

it 'sets the algorithm variable' do
expect(fetch.instance_variable_get(:@algo)).to eq(Sidekiq::Pro::ReliableFetch::Weighted)
end

it 'sets the internal variable' do
expect(fetch.instance_variable_get(:@internal)).to eq([])
end
end
end
end
end
21 changes: 21 additions & 0 deletions spec/sidekiq/worker_ext_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,26 @@ def perform(first, second)
'queue' => 'foo_high'
}
end

it 'sends an item to the default queue if priority is nil' do
TestWorker.stub(:client_push) { |item| item }
item = TestWorker.perform_with_priority(:invalid_priority, 1, 2)
item.should == {
'class' => TestWorker,
'args' => [1, 2],
'queue' => :foo
}
end

it 'sends an item to the default queue if a random priority is given' do
TestWorker.stub(:client_push) { |item| item }
item = TestWorker.perform_with_priority(:random_priority, 1, 2)
item.should == {
'class' => TestWorker,
'args' => [1, 2],
'queue' => :foo
}
end

end
end