-
Notifications
You must be signed in to change notification settings - Fork 291
Replace Concurrent::TimerTask with pure Ruby implementation #873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Replace Concurrent::TimerTask with pure Ruby implementation #873
Conversation
- Add Shoryuken::Helpers::TimerTask as drop-in replacement - Update auto_extend_visibility.rb to use custom implementation - Add comprehensive test suite for TimerTask - Remove external dependency on concurrent-ruby for timer functionality
WalkthroughIntroduces a new thread-safe Shoryuken::Helpers::TimerTask, replaces Concurrent::TimerTask usage in MessageVisibilityExtender#auto_extend, and adds comprehensive RSpec tests for the new helper. The middleware logic remains the same aside from switching to the internal timer implementation. Changes
Sequence Diagram(s)sequenceDiagram
participant Worker
participant Extender as MessageVisibilityExtender
participant Timer as Helpers::TimerTask
participant SQS
Worker->>Extender: auto_extend(sqs_msg, queue_visibility_timeout)
Extender->>Timer: execute() (interval = timeout - upfront)
loop every interval
Timer->>Extender: invoke block
Extender->>SQS: change_visibility(sqs_msg, new_timeout)
SQS-->>Extender: response
end
Worker-->>Extender: processing done (ensure)
Extender->>Timer: kill()
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested reviewers
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (2)
lib/shoryuken/middleware/server/auto_extend_visibility.rb (2)
31-39: Clamp the execution interval to a sane positive valueIf queue_visibility_timeout < EXTEND_UPFRONT_SECONDS, the interval becomes 0 or negative, which will now raise in TimerTask initialization (and would previously crash at sleep). Clamp to at least 1 second to keep behavior safe and predictable.
def auto_extend(_worker, queue, sqs_msg, _body) queue_visibility_timeout = Shoryuken::Client.queues(queue).visibility_timeout - Shoryuken::Helpers::TimerTask.new(execution_interval: queue_visibility_timeout - EXTEND_UPFRONT_SECONDS) do + interval = [queue_visibility_timeout - EXTEND_UPFRONT_SECONDS, 1].max + Shoryuken::Helpers::TimerTask.new(execution_interval: interval) do logger.debug do "Extending message #{queue}/#{sqs_msg.message_id} visibility timeout by #{queue_visibility_timeout}s" end
30-44: Require the newTimerTaskhelper
To preventuninitialized constant Shoryuken::Helpers::TimerTaskerrors under certain load orders, you need to explicitly load your new helper. No existingrequireorautoloadfor
shoryuken/helpers/timer_taskwas found in the gem’s entrypoint.Please add one of the following:
- In lib/shoryuken.rb (preferred):
# lib/shoryuken.rb + require 'shoryuken/helpers/timer_task'- Or at the top of lib/shoryuken/middleware/server/auto_extend_visibility.rb:
# lib/shoryuken/middleware/server/auto_extend_visibility.rb + require 'shoryuken/helpers/timer_task'No remaining references to
Concurrent::TimerTaskwere found outside of your new helper implementation.
🧹 Nitpick comments (8)
lib/shoryuken/helpers/timer_task.rb (3)
30-41: Best-effort cleanup: join briefly after killing the threadThread#kill is fine here, but a short non-blocking join helps avoid lingering zombie threads and ensures memory/resources are reclaimed promptly. Keep the join bounded to not delay shutdown paths.
def kill @mutex.synchronize do return false if @killed @killed = true @running = false - @thread.kill if @thread&.alive? + if (t = @thread)&.alive? + t.kill + # Best-effort cleanup; do not block indefinitely + t.join(0.1) + end end true end
55-57: Prefer Shoryuken logger over Kernel.warn for consistencyUsing logger integrates with the existing logging pipeline and respects log levels and formatting. It keeps stderr quiet and makes the TimerTask’s errors observable through the same channels as the rest of Shoryuken.
You can include Util and route messages through logger:
class TimerTask + include Util- warn "TimerTask execution error: #{e.message}" - warn e.backtrace.join("\n") if e.backtrace + logger.warn { "TimerTask execution error: #{e.message}" } + logger.debug { e.backtrace.join("\n") } if e.backtrace
45-58: Optional: reduce drift by scheduling with a monotonic clockCurrent loop sleeps for a fixed interval after each run; execution time adds drift. If tighter cadence matters, compute the next tick from a monotonic clock so long tasks don’t accumulate drift.
Example approach:
- Track next_run_at = now + interval.
- Sleep for max(next_run_at - now, 0).
- After running the task, increment next_run_at by the fixed interval.
I can provide a patch if you want to go this route.
spec/shoryuken/helpers/timer_task_spec.rb (5)
103-109: Reduce flakiness by joining the worker thread instead of sleepingSleeping for 10ms may not be enough under CI load, making this test flaky. Join the thread with a short timeout to deterministically observe termination.
thread = timer_task.instance_variable_get(:@thread) timer_task.kill - sleep(0.01) # Give time for thread to be killed - expect(thread.alive?).to be false + thread.join(0.5) # Wait up to 500ms for thread termination + expect(thread.alive?).to be false
221-229: Widen tolerance to limit timing flakes on shared CIIntermittent load can skew short-interval timing. Slightly widening the tolerance makes the test more robust without weakening intent.
- expect(interval).to be_within(0.05).of(0.1) + expect(interval).to be_within(0.07).of(0.1)
165-170: Stub Kernel.warn directly rather than any instance of ObjectStubbing Kernel.warn is tighter and less intrusive than allow_any_instance_of(Object), and it still captures warn output since warn is provided by Kernel.
- allow_any_instance_of(Object).to receive(:warn) do |*args| + allow(Kernel).to receive(:warn) do |*args| captured_stderr.puts(*args) end
189-191: Same here: stub Kernel.warn directlyKeeps the stub limited to the call site you care about.
- allow_any_instance_of(Object).to receive(:warn) + allow(Kernel).to receive(:warn)
13-40: Add a spec for non-positive intervalsGiven the suggested initializer validation, cover the failure mode explicitly.
Example spec snippet to add:
describe '#initialize' do it 'raises when execution_interval is non-positive' do expect { described_class.new(execution_interval: 0) {} } .to raise_error(ArgumentError, /positive/) expect { described_class.new(execution_interval: -1) {} } .to raise_error(ArgumentError, /positive/) end endI can open a follow-up PR adding this test once you decide on the chosen behavior (raise vs clamp).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
lib/shoryuken/helpers/timer_task.rb(1 hunks)lib/shoryuken/middleware/server/auto_extend_visibility.rb(1 hunks)spec/shoryuken/helpers/timer_task_spec.rb(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
lib/shoryuken/middleware/server/auto_extend_visibility.rb (1)
lib/shoryuken/message.rb (2)
change_visibility(32-36)visibility_timeout=(38-44)
lib/shoryuken/helpers/timer_task.rb (1)
lib/shoryuken/middleware/server/auto_extend_visibility.rb (1)
call(11-23)
spec/shoryuken/helpers/timer_task_spec.rb (1)
lib/shoryuken/helpers/timer_task.rb (2)
execute(20-28)kill(31-41)
🔇 Additional comments (2)
lib/shoryuken/helpers/timer_task.rb (1)
19-28: Execute semantics look solid (idempotent and returns self)execute is correctly idempotent, thread-safe, and returns self for chaining. Nice.
lib/shoryuken/middleware/server/auto_extend_visibility.rb (1)
47-49: LGTM: timer lifecycle is clear and explicitauto_visibility_timer returns a task and tap(&:execute) starts it. The ensure block above guarantees cleanup with kill. Good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (7)
lib/shoryuken/helpers/timer_task.rb (4)
32-43: Consider preferring cooperative shutdown over hard Thread#killKilling a thread can interrupt the task mid-execution. Given you already set @killed and the loop checks it, a cooperative approach (set killed and, if you can afford it, join with a short timeout) avoids interrupting user code in the middle of I/O. If join is undesirable in hot paths (like middleware ensure), consider making this behavior configurable or only killing if the thread is sleeping (hard to detect reliably).
Example tweak (optional):
def kill @mutex.synchronize do return false if @killed @killed = true @running = false - @thread.kill if @thread&.alive? + # Prefer cooperative shutdown; fall back to kill if it lingers (optional) + if (t = @thread) && t.alive? + t.join(0.01) # don't block for long + t.kill if t.alive? + end end true end
47-63: Scheduling semantics: document fixed-delay behavior and potential driftThe loop sleeps for execution_interval then runs the task. This is fixed-delay scheduling; long-running tasks increase the interval between subsequent runs. Concurrent::TimerTask historically targets similar behavior but also exposes options like timeout/overrun handling. Documenting this explicitly in the class comment helps set expectations, especially if tasks are sometimes slower than the interval.
52-59: Use Shoryuken.logger for consistency with project loggingUsing warn writes directly to stderr and bypasses Shoryuken’s logger formatting and filters. Consider switching to Shoryuken.logger.warn (and .debug/.error as needed) for consistent observability. Keep the rescue broad if you want to mirror Concurrent::TimerTask resiliency.
- warn "TimerTask execution error: #{e.message}" - warn e.backtrace.join("\n") if e.backtrace + if defined?(Shoryuken) && Shoryuken.respond_to?(:logger) && Shoryuken.logger + Shoryuken.logger.warn "TimerTask execution error: #{e.message}" + Shoryuken.logger.warn(e.backtrace.join("\n")) if e.backtrace + else + warn "TimerTask execution error: #{e.message}" + warn e.backtrace.join("\n") if e.backtrace + end
21-30: Optional: name the worker thread for easier debuggingNaming the thread can simplify debugging and thread dumps.
- @thread = Thread.new { run_timer_loop } + @thread = Thread.new do + Thread.current.name = 'Shoryuken::TimerTask' if Thread.current.respond_to?(:name=) + run_timer_loop + endAlso applies to: 47-63
spec/shoryuken/helpers/timer_task_spec.rb (3)
23-26: Rubocop: split long expectation lines to satisfy Layout/LineLengthThe two expectations exceed the 125-char limit flagged by RuboCop. Split across lines for readability.
- expect { described_class.new(execution_interval: 0) {} }.to raise_error(ArgumentError, 'execution_interval must be positive') - expect { described_class.new(execution_interval: -1) {} }.to raise_error(ArgumentError, 'execution_interval must be positive') + expect { described_class.new(execution_interval: 0) {} } + .to raise_error(ArgumentError, 'execution_interval must be positive') + expect { described_class.new(execution_interval: -1) {} } + .to raise_error(ArgumentError, 'execution_interval must be positive')
119-125: Thread termination check can be racy; add a small join to reduce flakinessAfter kill, checking thread.alive? right away can be flaky in slower CI. Joining briefly reduces instability.
- timer_task.kill - sleep(0.01) # Give time for thread to be killed + timer_task.kill + thread.join(0.05) # Allow thread to exit expect(thread.alive?).to be false
169-196: Prefer stubbing Kernel.warn directly instead of allow_any_instance_of(Object)allow_any_instance_of is heavy and globally affects objects. Since warn is provided by Kernel, stub Kernel.warn to capture output with a narrower scope.
- allow_any_instance_of(Object).to receive(:warn) do |*args| - captured_stderr.puts(*args) - end + allow(Kernel).to receive(:warn) { |msg| captured_stderr.puts(msg) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
lib/shoryuken/helpers/timer_task.rb(1 hunks)spec/shoryuken/helpers/timer_task_spec.rb(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
lib/shoryuken/helpers/timer_task.rb (1)
lib/shoryuken/middleware/server/auto_extend_visibility.rb (1)
call(11-23)
spec/shoryuken/helpers/timer_task_spec.rb (1)
lib/shoryuken/helpers/timer_task.rb (2)
execute(22-30)kill(33-43)
🪛 RuboCop (1.76.1)
spec/shoryuken/helpers/timer_task_spec.rb
[convention] 24-24: Line is too long. [131/125]
(Layout/LineLength)
[convention] 25-25: Line is too long. [132/125]
(Layout/LineLength)
🔇 Additional comments (6)
lib/shoryuken/helpers/timer_task.rb (2)
8-19: Solid initializer: input validation and state setup look correct
- Coercing execution_interval via Float and enforcing > 0 prevents thread-crashing sleeps.
- Clear error message and early validation are appreciated.
21-30: Idempotent, thread-safe start; returns self for chainingexecute correctly guards against multiple starts and killed state under a mutex, and returns self for tap(&:execute) usage.
spec/shoryuken/helpers/timer_task_spec.rb (4)
33-37: Align error type assertions with coercion behaviorFloat(execution_interval) raises TypeError for nil and non-convertible types; the test asserts TypeError for nil and Hash, and ArgumentError for invalid string. That’s consistent with current behavior. LGTM.
78-86: Good idempotency coverage on executeVerifies no new thread is spawned on repeated execute calls. This matches the mutex-guarded @running flag.
231-246: Time-based test is reasonable; consider broadening tolerance for slow CIUsing be_within(0.05).of(0.1) is okay locally but may be tight under load. If you see flakes in CI, widen tolerance or assert monotonic increase and minimum spacing rather than exact interval.
Do you observe any flakiness in CI for this example? If so, I can propose a revised matcher that’s more resilient under load.
249-297: Thread-safety scenarios look comprehensive
- Concurrent execute results in a single worker thread.
- Concurrent kill produces exactly one true outcome.
- Kill across multiple threads leaves the timer in a stopped state.
These cover the primary contention points around @mutex. Nicely done.
|
@myumura i think we can do it that way for now but I also think that in the future we could use a model where we have a single thread that performs all the time and one-shot operations based on a queue + a periodic scheduler. Great job! |
|
@myumura I will merge this once back from 🇮🇹 - this looks good but I just want to align it with my changes a bit 🙏 |
Replaces Concurrent::TimerTask with a pure Ruby implementation Shoryuken::Helpers::TimerTask to reduce external dependencies on the concurrent-ruby gem.
Summary by CodeRabbit