Skip to content

Commit

Permalink
refactor job object
Browse files Browse the repository at this point in the history
- Move some class methods to instance methods
- Don't use exception to signal lock failure
- Add more explicit test cases for locking with multiple workers
  • Loading branch information
Dan DeMaggio committed Mar 12, 2009
1 parent ad27c3e commit 266fc15
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 64 deletions.
107 changes: 64 additions & 43 deletions lib/delayed/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module Delayed
class DeserializationError < StandardError
end

# A job object that is persisted to the database.
# Contains the work object as a YAML field.
class Job < ActiveRecord::Base
MAX_ATTEMPTS = 25
MAX_RUN_TIME = 4.hours
Expand All @@ -29,9 +31,7 @@ class Job < ActiveRecord::Base
self.min_priority = nil
self.max_priority = nil

class LockError < StandardError
end

# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end
Expand Down Expand Up @@ -60,6 +60,8 @@ def payload_object=(object)
self['handler'] = object.to_yaml
end

# Reschedule the job in the future (when a job fails).
# Uses an exponential scale depending on the number of failed attempts.
def reschedule(message, backtrace = [], time = nil)
if self.attempts < MAX_ATTEMPTS
time ||= Job.db_time_now + (attempts ** 4) + 5
Expand All @@ -75,6 +77,32 @@ def reschedule(message, backtrace = [], time = nil)
end
end


# Try to run one job. Returns true/false (work done/work failed) or nil if job can't be locked.
def run_with_lock(max_run_time, worker_name)
logger.info "* [JOB] aquiring lock on #{name}"
unless lock_exclusively!(max_run_time, worker_name)
# We did not get the lock, some other worker process must have
logger.warn "* [JOB] failed to aquire exclusive lock for #{name}"
return nil # no work done
end

begin
runtime = Benchmark.realtime do
invoke_job # TODO: raise error if takes longer than max_run_time
destroy
end
# TODO: warn if runtime > max_run_time ?
logger.info "* [JOB] #{name} completed after %.4f" % runtime
return true # did work
rescue Exception => e
reschedule e.message, e.backtrace
log_exception(e)
return false # work failed
end
end

# Add a job to the queue
def self.enqueue(*args, &block)
object = block_given? ? EvaledJob.new(&block) : args.shift

Expand All @@ -88,6 +116,8 @@ def self.enqueue(*args, &block)
Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
end

# Find a few candidate jobs to run (in case some immediately get locked by others).
# Return in random order prevent everyone trying to do same head job at once.
def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)

time_now = db_time_now
Expand Down Expand Up @@ -115,38 +145,22 @@ def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
records.sort_by { rand() }
end

# Get the payload of the next job we can get an exclusive lock on.
# Run the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = MAX_RUN_TIME, &block)
def self.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME)

# We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(5, max_run_time).each do |job|
begin
logger.info "* [JOB] aquiring lock on #{job.name}"
job.lock_exclusively!(max_run_time, worker_name)
runtime = Benchmark.realtime do
invoke_job(job.payload_object, &block)
job.destroy
end
logger.info "* [JOB] #{job.name} completed after %.4f" % runtime

return job
rescue LockError
# We did not get the lock, some other worker process must have
logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
rescue StandardError => e
job.reschedule e.message, e.backtrace
log_exception(job, e)
return job
end
t = job.run_with_lock(max_run_time, worker_name)
return t unless t == nil # return if we did work (good or bad)
end

nil
nil # we didn't do any work, all 5 were not lockable
end

# This method is used internally by reserve method to ensure exclusive access
# to the given job. It will rise a LockError if it cannot get this lock.
# Lock this job for this worker.
# Returns true if we have the lock, false otherwise.
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
affected_rows = if locked_by != worker
Expand All @@ -157,46 +171,50 @@ def lock_exclusively!(max_run_time, worker = worker_name)
# Simply resume and update the locked_at
self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
end
raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1

self.locked_at = now
self.locked_by = worker
if affected_rows == 1
self.locked_at = now
self.locked_by = worker
return true
else
return false
end
end

# Unlock this job (note: not saved to DB)
def unlock
self.locked_at = nil
self.locked_by = nil
end

# This is a good hook if you need to report job processing errors in additional or different ways
def self.log_exception(job, error)
logger.error "* [JOB] #{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts"
def log_exception(error)
logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts"
logger.error(error)
end

# Do num jobs and return stats on success/failure.
# Exit early if interrupted.
def self.work_off(num = 100)
success, failure = 0, 0

num.times do
job = self.reserve do |j|
begin
j.perform
case self.reserve_and_run_one_job
when true
success += 1
rescue
when false
failure += 1
raise
end
else
break # leave if no work could be done
end

break if job.nil?
break if $exit # leave if we're exiting
end

return [success, failure]
end

# Moved into its own method so that new_relic can trace it.
def self.invoke_job(job, &block)
block.call(job)
def invoke_job
payload_object.perform
end

private
Expand Down Expand Up @@ -227,6 +245,9 @@ def attempt_to_load(klass)
klass.constantize
end

# Get the current time (GMT or local depending on DB)
# Note: This does not ping the DB to get the time, so all your clients
# must have syncronized clocks.
def self.db_time_now
(ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
end
Expand Down
10 changes: 3 additions & 7 deletions spec/delayed_method_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,9 @@ def read(story)

Delayed::Job.count.should == 1

output = nil
Delayed::Job.reserve_and_run_one_job

Delayed::Job.reserve do |e|
output = e.perform
end

output.should == true
Delayed::Job.count.should == 0

end

Expand Down Expand Up @@ -129,4 +125,4 @@ def read(story)
job.payload_object.perform.should == 'Once upon...'
end

end
end
62 changes: 48 additions & 14 deletions spec/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ def perform; @@runs += 1; end
end

it "should not allow a second worker to get exclusive access" do
lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
@job.lock_exclusively!(4.hours, 'worker2').should == false
end

it "should allow a second worker to get exclusive access if the timeout has passed" do
lambda { @job.lock_exclusively! 1.minute, 'worker2' }.should_not raise_error(Delayed::Job::LockError)
@job.lock_exclusively!(1.minute, 'worker2').should == true
end

it "should be able to get access to the task if it was started more then max_age ago" do
Expand Down Expand Up @@ -283,29 +283,63 @@ def perform; @@runs += 1; end

it "should leave the queue in a consistent state and not run the job if locking fails" do
SimpleJob.runs.should == 0
@job.stub!(:lock_exclusively!).with(any_args).once.and_raise(Delayed::Job::LockError)
@job.stub!(:lock_exclusively!).with(any_args).once.and_return(false)
Delayed::Job.should_receive(:find_available).once.and_return([@job])
Delayed::Job.work_off(1)
SimpleJob.runs.should == 0
end

end

context "while running alongside other workers with enqueued jobs, it" do
context "while running alongside other workers that locked jobs, it" do
before(:each) do
Delayed::Job.worker_name = 'worker1'
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 3.minutes))
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 11.minutes))
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 2.minutes))
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
Delayed::Job.create(:payload_object => SimpleJob.new)
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
end
it "should only find jobs if the lock has expired reguardless of the worker" do
SimpleJob.runs.should == 0
Delayed::Job.work_off(5)
SimpleJob.runs.should == 2
Delayed::Job.find_available(5, 10.minutes).length.should == 1

it "should ingore locked jobs from other workers" do
Delayed::Job.worker_name = 'worker3'
SimpleJob.runs.should == 0
Delayed::Job.work_off
SimpleJob.runs.should == 1 # runs the one open job
end


it "should find our own jobs regardless of locks" do
Delayed::Job.worker_name = 'worker1'
SimpleJob.runs.should == 0
Delayed::Job.work_off
SimpleJob.runs.should == 3 # runs open job plus worker1 jobs that were already locked
end
end

context "while running with locked and expired jobs, it" do
before(:each) do
Delayed::Job.worker_name = 'worker1'
exp_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Job::MAX_RUN_TIME)
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => exp_time)
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
Delayed::Job.create(:payload_object => SimpleJob.new)
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
end

it "should only find unlocked and expired jobs" do
Delayed::Job.worker_name = 'worker3'
SimpleJob.runs.should == 0
Delayed::Job.work_off
SimpleJob.runs.should == 2 # runs the one open job and one expired job
end

it "should ignore locks when finding our own jobs" do
Delayed::Job.worker_name = 'worker1'
SimpleJob.runs.should == 0
Delayed::Job.work_off
SimpleJob.runs.should == 3 # runs open job plus worker1 jobs
# This is useful in the case of a crash/restart on worker1, but make sure multiple workers on the same host have unique names!
end

end

end

0 comments on commit 266fc15

Please sign in to comment.