Skip to content

Commit

Permalink
Add TimerTask#interval_type option to configure interval calculation
Browse files Browse the repository at this point in the history
Can be either `:fixed_delay` or `:fixed_rate`, default to `:fixed_delay`
  • Loading branch information
bensheldon committed May 25, 2023
1 parent 9f40827 commit 105480b
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 4 deletions.
52 changes: 49 additions & 3 deletions lib/concurrent-ruby/concurrent/timer_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ module Concurrent
# be tested separately then passed to the `TimerTask` for scheduling and
# running.
#
# A `TimerTask` supports two different types of interval calculations.
# A fixed delay will always wait the same amount of time between the
# completion of one task and the start of the next. A fixed rate will
# attempt to maintain a constant rate of execution regardless of the
# duration of the task. For example, if a fixed rate task is scheduled
# to run every 60 seconds but the task itself takes 10 seconds to
# complete, the next task will be scheduled to run 50 seconds after
# the start of the previous task. If the task takes 70 seconds to
# complete, the next task will be start immediately after the previous
# task completes. Tasks will not be executed concurrently.
#
# In some cases it may be necessary for a `TimerTask` to affect its own
# execution cycle. To facilitate this, a reference to the TimerTask instance
# is passed as an argument to the provided block every time the task is
Expand Down Expand Up @@ -74,6 +85,12 @@ module Concurrent
#
# #=> 'Boom!'
#
# @example Configuring `:interval_type` with either :fixed_delay or :fixed_rate, default is :fixed_delay
# task = Concurrent::TimerTask.new(execution_interval: 5, interval_type: :fixed_rate) do
# puts 'Boom!'
# end
# task.interval_type #=> :fixed_rate
#
# @example Last `#value` and `Dereferenceable` mixin
# task = Concurrent::TimerTask.new(
# dup_on_deref: true,
Expand Down Expand Up @@ -152,8 +169,16 @@ class TimerTask < RubyExecutorService
# Default `:execution_interval` in seconds.
EXECUTION_INTERVAL = 60

# Default `:timeout_interval` in seconds.
TIMEOUT_INTERVAL = 30
# Maintain the interval between the end of one execution and the start of the next execution.
FIXED_DELAY = :fixed_delay

# Maintain the interval between the start of one execution and the start of the next.
# If execution time exceeds the interval, the next execution will start immediately
# after the previous execution finishes. Executions will not run concurrently.
FIXED_RATE = :fixed_rate

# Default `:interval_type`
INTERVAL_TYPE = FIXED_DELAY

# Create a new TimerTask with the given task and configuration.
#
Expand Down Expand Up @@ -242,6 +267,24 @@ def execution_interval=(value)
end
end

# @!attribute [rw] interval_type
# @return [Symbol] method to calculate the interval between executions, can be either
# :fixed_rate or :fixed_delay; default to :fixed_delay.
def interval_type
synchronize { @interval_type }
end

# @!attribute [rw] interval_type
# @return [Symbol] method to calculate the interval between executions, can be either
# :fixed_rate or :fixed_delay; default to :fixed_delay.
def interval_type=(value)
if [FIXED_DELAY, FIXED_RATE].include?(value)
synchronize { @interval_type = value }
else
raise ArgumentError.new('must be either :fixed_delay or :fixed_rate')
end
end

# @!attribute [rw] timeout_interval
# @return [Fixnum] Number of seconds the task can run before it is
# considered to have failed.
Expand All @@ -264,6 +307,7 @@ def ns_initialize(opts, &task)
set_deref_options(opts)

self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
self.interval_type = opts[:interval_type] || INTERVAL_TYPE
if opts[:timeout] || opts[:timeout_interval]
warn 'TimeTask timeouts are now ignored as these were not able to be implemented correctly'
end
Expand Down Expand Up @@ -296,10 +340,12 @@ def schedule_next_task(interval = execution_interval)
# @!visibility private
def execute_task(completion)
return nil unless @running.true?
start = Concurrent.monotonic_time
_success, value, reason = @executor.execute(self)
if completion.try?
self.value = value
schedule_next_task
interval = interval_type == FIXED_DELAY ? execution_interval : [execution_interval - (Concurrent.monotonic_time - start), 0].max
schedule_next_task(interval)
time = Time.now
observers.notify_observers do
[time, self.value, reason]
Expand Down
73 changes: 72 additions & 1 deletion spec/concurrent/timer_task_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,22 @@ def trigger_observable(observable)
subject = TimerTask.new(execution_interval: 5) { nil }
expect(subject.execution_interval).to eq 5
end

it 'raises an exception if :interval_type is not a valid value' do
expect {
Concurrent::TimerTask.new(interval_type: :cat) { nil }
}.to raise_error(ArgumentError)
end

it 'uses the default :interval_type when no type is given' do
subject = TimerTask.new { nil }
expect(subject.interval_type).to eq TimerTask::FIXED_DELAY
end

it 'uses the given interval type' do
subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE) { nil }
expect(subject.interval_type).to eq TimerTask::FIXED_RATE
end
end

context '#kill' do
Expand Down Expand Up @@ -112,7 +128,6 @@ def trigger_observable(observable)
end

specify '#execution_interval is writeable' do

latch = CountDownLatch.new(1)
subject = TimerTask.new(timeout_interval: 1,
execution_interval: 1,
Expand All @@ -132,6 +147,26 @@ def trigger_observable(observable)
subject.kill
end

specify '#interval_type is writeable' do
latch = CountDownLatch.new(1)
subject = TimerTask.new(interval_type: TimerTask::FIXED_DELAY,
execution_interval: 0.1,
run_now: true) do |task|
task.interval_type = TimerTask::FIXED_DELAY
latch.count_down
end

expect(subject.interval_type).to eq(TimerTask::FIXED_DELAY)
subject.interval_type = TimerTask::FIXED_RATE
expect(subject.interval_type).to eq(TimerTask::FIXED_RATE)

subject.execute
latch.wait(0.1)

expect(subject.interval_type).to eq(TimerTask::FIXED_DELAY)
subject.kill
end

specify '#timeout_interval being written produces a warning' do
subject = TimerTask.new(timeout_interval: 1,
execution_interval: 0.1,
Expand Down Expand Up @@ -181,6 +216,42 @@ def trigger_observable(observable)
expect(latch.count).to eq(0)
subject.kill
end

it 'uses a fixed delay when set' do
finished = []
latch = CountDownLatch.new(2)
subject = TimerTask.new(interval_type: TimerTask::FIXED_DELAY,
execution_interval: 0.1,
run_now: true) do |task|
sleep(0.2)
finished << Concurrent.monotonic_time
latch.count_down
end
subject.execute
latch.wait(1)
subject.kill

expect(latch.count).to eq(0)
expect(finished[1] - finished[0]).to be >= 0.3
end

it 'uses a fixed rate when set' do
finished = []
latch = CountDownLatch.new(2)
subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE,
execution_interval: 0.1,
run_now: true) do |task|
sleep(0.2)
finished << Concurrent.monotonic_time
latch.count_down
end
subject.execute
latch.wait(1)
subject.kill

expect(latch.count).to eq(0)
expect(finished[1] - finished[0]).to be < 0.3
end
end

context 'observation' do
Expand Down

0 comments on commit 105480b

Please sign in to comment.