Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TimerTask#interval_type option to configure interval calculation #997

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions lib/concurrent-ruby/concurrent/timer_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ module Concurrent
# be tested separately then passed to the `TimerTask` for scheduling and
# running.
#
# A `TimerTask` supports two different types of interval calculations.
# A fixed delay will always wait the same amount of time between the
# completion of one task and the start of the next. A fixed rate will
# attempt to maintain a constant rate of execution regardless of the
# duration of the task. For example, if a fixed rate task is scheduled
# to run every 60 seconds but the task itself takes 10 seconds to
# complete, the next task will be scheduled to run 50 seconds after
# the start of the previous task. If the task takes 70 seconds to
# complete, the next task will be start immediately after the previous
# task completes. Tasks will not be executed concurrently.
#
# In some cases it may be necessary for a `TimerTask` to affect its own
# execution cycle. To facilitate this, a reference to the TimerTask instance
# is passed as an argument to the provided block every time the task is
Expand Down Expand Up @@ -74,6 +85,12 @@ module Concurrent
#
# #=> 'Boom!'
#
# @example Configuring `:interval_type` with either :fixed_delay or :fixed_rate, default is :fixed_delay
# task = Concurrent::TimerTask.new(execution_interval: 5, interval_type: :fixed_rate) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally consider the interval_type to be a static configuration of the behaviour of the application rather than something decided at run-time. Therefore, I wonder if this would be better implemented using sub-classes which implement the specific behaviour required, e.g. FixedRateTimerTask.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subclasses seem rather heavy for this, and I am concerned that for compatibility we would need to keep Concurrent::TimerTask so it would be both a leaf class and an abstract class, that sounds very confusing.
So I think a constructor argument like here makes most sense and is consistent with what is already done in concurrent-ruby APIs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would it be an abstract class?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would it be an abstract class?

If you make subclasses the superclass should be abstract. Otherwise I think it's pretty confusing.

# puts 'Boom!'
# end
# task.interval_type #=> :fixed_rate
#
# @example Last `#value` and `Dereferenceable` mixin
# task = Concurrent::TimerTask.new(
# dup_on_deref: true,
Expand Down Expand Up @@ -152,8 +169,16 @@ class TimerTask < RubyExecutorService
# Default `:execution_interval` in seconds.
EXECUTION_INTERVAL = 60

# Default `:timeout_interval` in seconds.
TIMEOUT_INTERVAL = 30
# Maintain the interval between the end of one execution and the start of the next execution.
FIXED_DELAY = :fixed_delay
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we see a third mode, which could be fixed_rate_drop which drops subsequent executions if the current execution violates the maximum time slice given the rate specified.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense to me.


# Maintain the interval between the start of one execution and the start of the next.
# If execution time exceeds the interval, the next execution will start immediately
# after the previous execution finishes. Executions will not run concurrently.
FIXED_RATE = :fixed_rate

# Default `:interval_type`
DEFAULT_INTERVAL_TYPE = FIXED_DELAY

# Create a new TimerTask with the given task and configuration.
#
Expand All @@ -164,6 +189,9 @@ class TimerTask < RubyExecutorService
# @option opts [Boolean] :run_now Whether to run the task immediately
# upon instantiation or to wait until the first # execution_interval
# has passed (default: false)
# @options opts [Symbol] :interval_type method to calculate the interval
# between executions, can be either :fixed_rate or :fixed_delay.
# (default: :fixed_delay)
# @option opts [Executor] executor, default is `global_io_executor`
#
# @!macro deref_options
Expand Down Expand Up @@ -243,6 +271,10 @@ def execution_interval=(value)
end
end

# @!attribute [r] interval_type
# @return [Symbol] method to calculate the interval between executions
attr_reader :interval_type

# @!attribute [rw] timeout_interval
# @return [Fixnum] Number of seconds the task can run before it is
# considered to have failed.
Expand All @@ -265,10 +297,15 @@ def ns_initialize(opts, &task)
set_deref_options(opts)

self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
if opts[:interval_type] && ![FIXED_DELAY, FIXED_RATE].include?(opts[:interval_type])
raise ArgumentError.new('interval_type must be either :fixed_delay or :fixed_rate')
end
if opts[:timeout] || opts[:timeout_interval]
warn 'TimeTask timeouts are now ignored as these were not able to be implemented correctly'
end

@run_now = opts[:now] || opts[:run_now]
@interval_type = opts[:interval_type] || DEFAULT_INTERVAL_TYPE
@task = Concurrent::SafeTaskExecutor.new(task)
@executor = opts[:executor] || Concurrent.global_io_executor
@running = Concurrent::AtomicBoolean.new(false)
Expand Down Expand Up @@ -298,16 +335,27 @@ def schedule_next_task(interval = execution_interval)
# @!visibility private
def execute_task(completion)
return nil unless @running.true?
start_time = Concurrent.monotonic_time
_success, value, reason = @task.execute(self)
if completion.try?
self.value = value
schedule_next_task
schedule_next_task(calculate_next_interval(start_time))
time = Time.now
observers.notify_observers do
[time, self.value, reason]
end
end
nil
end

# @!visibility private
def calculate_next_interval(start_time)
if @interval_type == FIXED_RATE
run_time = Concurrent.monotonic_time - start_time
[execution_interval - run_time, 0].max
else # FIXED_DELAY
execution_interval
end
end
end
end
74 changes: 73 additions & 1 deletion spec/concurrent/timer_task_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ def trigger_observable(observable)
expect(subject.execution_interval).to eq 5
end

it 'raises an exception if :interval_type is not a valid value' do
expect {
Concurrent::TimerTask.new(interval_type: :cat) { nil }
}.to raise_error(ArgumentError)
end

it 'uses the default :interval_type when no type is given' do
subject = TimerTask.new { nil }
expect(subject.interval_type).to eq TimerTask::FIXED_DELAY
end

it 'uses the given interval type' do
subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE) { nil }
expect(subject.interval_type).to eq TimerTask::FIXED_RATE
end
end

context '#kill' do
Expand Down Expand Up @@ -113,7 +128,6 @@ def trigger_observable(observable)
end

specify '#execution_interval is writeable' do

latch = CountDownLatch.new(1)
subject = TimerTask.new(timeout_interval: 1,
execution_interval: 1,
Expand All @@ -133,6 +147,28 @@ def trigger_observable(observable)
subject.kill
end

it 'raises on invalid interval_type' do
expect {
fixed_delay = TimerTask.new(interval_type: TimerTask::FIXED_DELAY,
execution_interval: 0.1,
run_now: true) { nil }
fixed_delay.kill
}.not_to raise_error

expect {
fixed_rate = TimerTask.new(interval_type: TimerTask::FIXED_RATE,
execution_interval: 0.1,
run_now: true) { nil }
fixed_rate.kill
}.not_to raise_error

expect {
TimerTask.new(interval_type: :unknown,
execution_interval: 0.1,
run_now: true) { nil }
}.to raise_error(ArgumentError)
end

specify '#timeout_interval being written produces a warning' do
subject = TimerTask.new(timeout_interval: 1,
execution_interval: 0.1,
Expand Down Expand Up @@ -209,6 +245,42 @@ def trigger_observable(observable)

expect(executor).to have_received(:post)
end

it 'uses a fixed delay when set' do
finished = []
latch = CountDownLatch.new(2)
subject = TimerTask.new(interval_type: TimerTask::FIXED_DELAY,
execution_interval: 0.1,
run_now: true) do |task|
sleep(0.2)
finished << Concurrent.monotonic_time
latch.count_down
end
subject.execute
latch.wait(1)
subject.kill

expect(latch.count).to eq(0)
expect(finished[1] - finished[0]).to be >= 0.3
end

it 'uses a fixed rate when set' do
finished = []
latch = CountDownLatch.new(2)
subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE,
execution_interval: 0.1,
run_now: true) do |task|
sleep(0.2)
finished << Concurrent.monotonic_time
latch.count_down
end
subject.execute
latch.wait(1)
subject.kill

expect(latch.count).to eq(0)
expect(finished[1] - finished[0]).to be < 0.3
end
end

context 'observation' do
Expand Down