Skip to content

Commit

Permalink
Merge branch 'thumblemonks/master'
Browse files Browse the repository at this point in the history
Conflicts:

	lib/delayed/job.rb
  • Loading branch information
Tobias Lütke committed Dec 16, 2008
2 parents d644b3d + f2ea93c commit a71604e
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 81 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.gem
19 changes: 12 additions & 7 deletions README.textile
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@ It is a direct extraction from Shopify where the job table is responsible for a
* updating solr, our search server, after product changes
* batch imports
* spam checks

h2. Changes

* 1.7 Added failed_at column which can optionally be set after a certain amount of failed job attempts. By default failed job attempts are destroyed after about a month.
* 1.6 Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute.
* 1.5 Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us to use pessimistic locking, which enables us to run as many worker processes as we need to speed up queue processing.
* 1.0 Initial release

h2. Setup

Expand Down Expand Up @@ -74,3 +67,15 @@ You can also run by writing a simple @script/job_runner@, and invoking it extern
h3. Cleaning up

You can invoke @rake jobs:clear@ to delete all jobs in the queue.

h3. Changes

* 1.7.0: Added failed_at column which can optionally be set after a certain amount of failed job attempts. By default failed job attempts are destroyed after about a month.

* 1.6.0: Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute.

* 1.5.0: Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us to use pessimistic locking instead of relying on row level locks. This enables us to run as many worker processes as we need to speed up queue processing.

* 1.2.0: Added #send_later to Object for simpler job creation

* 1.0.0: Initial release
40 changes: 40 additions & 0 deletions delayed_job.gemspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
version = File.read('README.textile').scan(/^\*\s+([\d\.]+)/).flatten

Gem::Specification.new do |s|
s.name = "delayed_job"
s.version = version.first
s.date = "2008-11-28"
s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify"
s.email = "tobi@leetsoft.com"
s.homepage = "http://github.com/tobi/delayed_job/tree/master"
s.description = "Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks."
s.authors = ["Tobias Lütke"]

# s.bindir = "bin"
# s.executables = ["delayed_job"]
# s.default_executable = "delayed_job"

s.has_rdoc = false
s.rdoc_options = ["--main", "README.textile"]
s.extra_rdoc_files = ["README.textile"]

# run git ls-files to get an updated list
s.files = %w[
MIT-LICENSE
README.textile
delayed_job.gemspec
init.rb
lib/delayed/job.rb
lib/delayed/message_sending.rb
lib/delayed/performable_method.rb
lib/delayed/worker.rb
lib/delayed_job.rb
tasks/jobs.rake
]
s.test_files = %w[
spec/database.rb
spec/delayed_method_spec.rb
spec/job_spec.rb
spec/story_spec.rb
]
end
6 changes: 1 addition & 5 deletions init.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
require File.dirname(__FILE__) + '/lib/delayed/message_sending'
require File.dirname(__FILE__) + '/lib/delayed/performable_method'
require File.dirname(__FILE__) + '/lib/delayed/job'

Object.send(:include, Delayed::MessageSending)
require File.dirname(__FILE__) + '/lib/delayed_job'
100 changes: 40 additions & 60 deletions lib/delayed/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class Job < ActiveRecord::Base
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true

# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
cattr_accessor :worker_name
self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"
Expand All @@ -25,10 +25,10 @@ class Job < ActiveRecord::Base
NextTaskOrder = 'priority DESC, run_at ASC'

ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

cattr_accessor :min_priority, :max_priority
self.min_priority = nil
self.max_priority = nil
self.max_priority = nil

class LockError < StandardError
end
Expand All @@ -45,8 +45,8 @@ def failed?
def payload_object
@payload_object ||= deserialize(self['handler'])
end
def name

def name
@name ||= begin
payload = payload_object
if payload.respond_to?(:display_name)
Expand Down Expand Up @@ -90,32 +90,32 @@ def self.enqueue(*args, &block)
end

def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
time_now = db_time_now

time_now = db_time_now

sql = NextTaskSQL.dup

conditions = [time_now, time_now - max_run_time, worker_name]

if self.min_priority
sql << ' AND (priority >= ?)'
conditions << min_priority
end

if self.max_priority
sql << ' AND (priority <= ?)'
conditions << max_priority
conditions << max_priority
end

conditions.unshift(sql)
conditions.unshift(sql)

records = ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end

records.sort { rand() }
end
end

# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = MAX_RUN_TIME, &block)
Expand All @@ -136,7 +136,7 @@ def self.reserve(max_run_time = MAX_RUN_TIME, &block)
rescue LockError
# We did not get the lock, some other worker process must have
logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
rescue StandardError => e
rescue StandardError => e
job.reschedule e.message, e.backtrace
log_exception(job, e)
return job
Expand All @@ -154,16 +154,16 @@ def lock_exclusively!(max_run_time, worker = worker_name)
# We don't own this job so we will update the locked_by name and the locked_at
self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
else
# We already own this job, this may happen if the job queue crashes.
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
end
raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1

self.locked_at = now
self.locked_by = worker
end

def unlock
self.locked_at = nil
self.locked_by = nil
Expand All @@ -179,7 +179,6 @@ def self.work_off(num = 100)
success, failure = 0, 0

num.times do

job = self.reserve do |j|
begin
j.perform
Expand All @@ -194,56 +193,37 @@ def self.work_off(num = 100)
end

return [success, failure]
end


end

# Moved into its own method so that new_relic can trace it.
def self.invoke_job(job, &block)
block.call(job)
end


private
private

def deserialize(source)
attempt_to_load_file = true

begin
handler = YAML.load(source) rescue nil
return handler if handler.respond_to?(:perform)

if handler.nil?
if source =~ ParseObjectFromYaml

# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load($1)
handler = YAML.load(source) rescue nil

# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
end
end

if handler.is_a?(YAML::Object)

# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load(handler.class)

# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
unless handler.respond_to?(:perform)
if handler.nil? && source =~ ParseObjectFromYaml
handler_class = $1
end
attempt_to_load(handler_class || handler.class)
handler = YAML.load(source)
end

raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'

rescue TypeError, LoadError, NameError => e
return handler if handler.respond_to?(:perform)

raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
end
raise DeserializationError,
'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
rescue TypeError, LoadError, NameError => e
raise DeserializationError,
"Job failed to load: #{e.message}. Try to manually require the required file."
end

# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
def attempt_to_load(klass)
klass.constantize
end
Expand All @@ -252,7 +232,7 @@ def self.db_time_now
(ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
end

protected
protected

def before_save
self.run_at ||= self.class.db_time_now
Expand Down
18 changes: 10 additions & 8 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ module Delayed
class Worker
SLEEP = 5

cattr_accessor :logger
self.logger = RAILS_DEFAULT_LOGGER if const_defined?(:RAILS_DEFAULT_LOGGER)

def initialize(options={})
@quiet = options[:quiet]
@quiet = options[:quiet]
Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority)
Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end
end

def start
say "*** Starting job worker #{Delayed::Job.worker_name}"

trap('TERM') { say 'Exiting...'; $exit = true }
trap('INT') { say 'Exiting...'; $exit = true }



loop do
result = nil

Expand All @@ -33,15 +35,15 @@ def start
end

break if $exit
end
end

ensure
Delayed::Job.clear_locks!
end

def say(text)
puts text unless @quiet
RAILS_DEFAULT_LOGGER.info text
logger.info text if logger
end

end
Expand Down
6 changes: 6 additions & 0 deletions lib/delayed_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
require File.dirname(__FILE__) + '/delayed/message_sending'
require File.dirname(__FILE__) + '/delayed/performable_method'
require File.dirname(__FILE__) + '/delayed/job'
require File.dirname(__FILE__) + '/delayed/worker'

Object.send(:include, Delayed::MessageSending)
1 change: 1 addition & 0 deletions spec/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

require 'rubygems'
require 'active_record'
gem 'sqlite3-ruby'

require File.dirname(__FILE__) + '/../init'
require 'spec'
Expand Down
2 changes: 1 addition & 1 deletion spec/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def perform; @@runs += 1; end

it "should leave the queue in a consistent state and not run the job if locking fails" do
SimpleJob.runs.should == 0
@job.stub!(:lock_exclusively!).with(:any_args).once.and_raise(Delayed::Job::LockError)
@job.stub!(:lock_exclusively!).with(any_args).once.and_raise(Delayed::Job::LockError)
Delayed::Job.should_receive(:find_available).once.and_return([@job])
Delayed::Job.work_off(1)
SimpleJob.runs.should == 0
Expand Down

0 comments on commit a71604e

Please sign in to comment.