From 105480b007134adfdbcd261c0a01dfc00331be15 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Wed, 24 May 2023 17:36:23 -0700 Subject: [PATCH] Add `TimerTask#interval_type` option to configure interval calculation Can be either `:fixed_delay` or `:fixed_rate`, default to `:fixed_delay` --- lib/concurrent-ruby/concurrent/timer_task.rb | 52 +++++++++++++- spec/concurrent/timer_task_spec.rb | 73 +++++++++++++++++++- 2 files changed, 121 insertions(+), 4 deletions(-) diff --git a/lib/concurrent-ruby/concurrent/timer_task.rb b/lib/concurrent-ruby/concurrent/timer_task.rb index 1274482b9..e2349b7d7 100644 --- a/lib/concurrent-ruby/concurrent/timer_task.rb +++ b/lib/concurrent-ruby/concurrent/timer_task.rb @@ -32,6 +32,17 @@ module Concurrent # be tested separately then passed to the `TimerTask` for scheduling and # running. # + # A `TimerTask` supports two different types of interval calculations. + # A fixed delay will always wait the same amount of time between the + # completion of one task and the start of the next. A fixed rate will + # attempt to maintain a constant rate of execution regardless of the + # duration of the task. For example, if a fixed rate task is scheduled + # to run every 60 seconds but the task itself takes 10 seconds to + # complete, the next task will be scheduled to run 50 seconds after + # the start of the previous task. If the task takes 70 seconds to + # complete, the next task will be start immediately after the previous + # task completes. Tasks will not be executed concurrently. + # # In some cases it may be necessary for a `TimerTask` to affect its own # execution cycle. To facilitate this, a reference to the TimerTask instance # is passed as an argument to the provided block every time the task is @@ -74,6 +85,12 @@ module Concurrent # # #=> 'Boom!' # + # @example Configuring `:interval_type` with either :fixed_delay or :fixed_rate, default is :fixed_delay + # task = Concurrent::TimerTask.new(execution_interval: 5, interval_type: :fixed_rate) do + # puts 'Boom!' + # end + # task.interval_type #=> :fixed_rate + # # @example Last `#value` and `Dereferenceable` mixin # task = Concurrent::TimerTask.new( # dup_on_deref: true, @@ -152,8 +169,16 @@ class TimerTask < RubyExecutorService # Default `:execution_interval` in seconds. EXECUTION_INTERVAL = 60 - # Default `:timeout_interval` in seconds. - TIMEOUT_INTERVAL = 30 + # Maintain the interval between the end of one execution and the start of the next execution. + FIXED_DELAY = :fixed_delay + + # Maintain the interval between the start of one execution and the start of the next. + # If execution time exceeds the interval, the next execution will start immediately + # after the previous execution finishes. Executions will not run concurrently. + FIXED_RATE = :fixed_rate + + # Default `:interval_type` + INTERVAL_TYPE = FIXED_DELAY # Create a new TimerTask with the given task and configuration. # @@ -242,6 +267,24 @@ def execution_interval=(value) end end + # @!attribute [rw] interval_type + # @return [Symbol] method to calculate the interval between executions, can be either + # :fixed_rate or :fixed_delay; default to :fixed_delay. + def interval_type + synchronize { @interval_type } + end + + # @!attribute [rw] interval_type + # @return [Symbol] method to calculate the interval between executions, can be either + # :fixed_rate or :fixed_delay; default to :fixed_delay. + def interval_type=(value) + if [FIXED_DELAY, FIXED_RATE].include?(value) + synchronize { @interval_type = value } + else + raise ArgumentError.new('must be either :fixed_delay or :fixed_rate') + end + end + # @!attribute [rw] timeout_interval # @return [Fixnum] Number of seconds the task can run before it is # considered to have failed. @@ -264,6 +307,7 @@ def ns_initialize(opts, &task) set_deref_options(opts) self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL + self.interval_type = opts[:interval_type] || INTERVAL_TYPE if opts[:timeout] || opts[:timeout_interval] warn 'TimeTask timeouts are now ignored as these were not able to be implemented correctly' end @@ -296,10 +340,12 @@ def schedule_next_task(interval = execution_interval) # @!visibility private def execute_task(completion) return nil unless @running.true? + start = Concurrent.monotonic_time _success, value, reason = @executor.execute(self) if completion.try? self.value = value - schedule_next_task + interval = interval_type == FIXED_DELAY ? execution_interval : [execution_interval - (Concurrent.monotonic_time - start), 0].max + schedule_next_task(interval) time = Time.now observers.notify_observers do [time, self.value, reason] diff --git a/spec/concurrent/timer_task_spec.rb b/spec/concurrent/timer_task_spec.rb index 44cc3e22e..04e1e41af 100644 --- a/spec/concurrent/timer_task_spec.rb +++ b/spec/concurrent/timer_task_spec.rb @@ -82,6 +82,22 @@ def trigger_observable(observable) subject = TimerTask.new(execution_interval: 5) { nil } expect(subject.execution_interval).to eq 5 end + + it 'raises an exception if :interval_type is not a valid value' do + expect { + Concurrent::TimerTask.new(interval_type: :cat) { nil } + }.to raise_error(ArgumentError) + end + + it 'uses the default :interval_type when no type is given' do + subject = TimerTask.new { nil } + expect(subject.interval_type).to eq TimerTask::FIXED_DELAY + end + + it 'uses the given interval type' do + subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE) { nil } + expect(subject.interval_type).to eq TimerTask::FIXED_RATE + end end context '#kill' do @@ -112,7 +128,6 @@ def trigger_observable(observable) end specify '#execution_interval is writeable' do - latch = CountDownLatch.new(1) subject = TimerTask.new(timeout_interval: 1, execution_interval: 1, @@ -132,6 +147,26 @@ def trigger_observable(observable) subject.kill end + specify '#interval_type is writeable' do + latch = CountDownLatch.new(1) + subject = TimerTask.new(interval_type: TimerTask::FIXED_DELAY, + execution_interval: 0.1, + run_now: true) do |task| + task.interval_type = TimerTask::FIXED_DELAY + latch.count_down + end + + expect(subject.interval_type).to eq(TimerTask::FIXED_DELAY) + subject.interval_type = TimerTask::FIXED_RATE + expect(subject.interval_type).to eq(TimerTask::FIXED_RATE) + + subject.execute + latch.wait(0.1) + + expect(subject.interval_type).to eq(TimerTask::FIXED_DELAY) + subject.kill + end + specify '#timeout_interval being written produces a warning' do subject = TimerTask.new(timeout_interval: 1, execution_interval: 0.1, @@ -181,6 +216,42 @@ def trigger_observable(observable) expect(latch.count).to eq(0) subject.kill end + + it 'uses a fixed delay when set' do + finished = [] + latch = CountDownLatch.new(2) + subject = TimerTask.new(interval_type: TimerTask::FIXED_DELAY, + execution_interval: 0.1, + run_now: true) do |task| + sleep(0.2) + finished << Concurrent.monotonic_time + latch.count_down + end + subject.execute + latch.wait(1) + subject.kill + + expect(latch.count).to eq(0) + expect(finished[1] - finished[0]).to be >= 0.3 + end + + it 'uses a fixed rate when set' do + finished = [] + latch = CountDownLatch.new(2) + subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE, + execution_interval: 0.1, + run_now: true) do |task| + sleep(0.2) + finished << Concurrent.monotonic_time + latch.count_down + end + subject.execute + latch.wait(1) + subject.kill + + expect(latch.count).to eq(0) + expect(finished[1] - finished[0]).to be < 0.3 + end end context 'observation' do