Skip to content

Conversation

@myumura
Copy link
Contributor

@myumura myumura commented Aug 17, 2025

Replaces Concurrent::TimerTask with a pure Ruby implementation Shoryuken::Helpers::TimerTask to reduce external dependencies on the concurrent-ruby gem.

Summary by CodeRabbit

  • New Features
    • Added a built-in, thread-safe periodic task runner for scheduled background work such as automatic message visibility extension.
  • Bug Fixes
    • Improved resilience: exceptions in scheduled tasks are logged and do not stop the periodic loop.
  • Refactor
    • Removed reliance on an external timer implementation in favor of the new internal timer.
  • Tests
    • Added comprehensive tests for lifecycle, concurrency, timing accuracy, and error handling.

- Add Shoryuken::Helpers::TimerTask as drop-in replacement
- Update auto_extend_visibility.rb to use custom implementation
- Add comprehensive test suite for TimerTask
- Remove external dependency on concurrent-ruby for timer functionality
@coderabbitai
Copy link

coderabbitai bot commented Aug 17, 2025

Walkthrough

Introduces a new thread-safe Shoryuken::Helpers::TimerTask, replaces Concurrent::TimerTask usage in MessageVisibilityExtender#auto_extend, and adds comprehensive RSpec tests for the new helper. The middleware logic remains the same aside from switching to the internal timer implementation.

Changes

Cohort / File(s) Summary
New TimerTask helper
lib/shoryuken/helpers/timer_task.rb
Adds Shoryuken::Helpers::TimerTask with initialize(execution_interval:, &block), execute, and kill; implements a mutex-guarded worker thread that runs a task at fixed intervals and rescues/logs task exceptions.
Server middleware integration
lib/shoryuken/middleware/server/auto_extend_visibility.rb
Replaces Concurrent::TimerTask with Shoryuken::Helpers::TimerTask in MessageVisibilityExtender#auto_extend; keeps interval and block behavior unchanged.
Specs for TimerTask
spec/shoryuken/helpers/timer_task_spec.rb
Adds tests covering initialization, execute/kill lifecycle, timing accuracy, error resilience, and thread-safety; inspects internal state and verifies concurrent behaviors.

Sequence Diagram(s)

sequenceDiagram
  participant Worker
  participant Extender as MessageVisibilityExtender
  participant Timer as Helpers::TimerTask
  participant SQS

  Worker->>Extender: auto_extend(sqs_msg, queue_visibility_timeout)
  Extender->>Timer: execute() (interval = timeout - upfront)
  loop every interval
    Timer->>Extender: invoke block
    Extender->>SQS: change_visibility(sqs_msg, new_timeout)
    SQS-->>Extender: response
  end
  Worker-->>Extender: processing done (ensure)
  Extender->>Timer: kill()
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Suggested reviewers

  • mensfeld

Poem

I tap my paw to tick and tock,
A timer hums—no borrowed clock.
Threads awake, then softly doze,
Guarded states, a mutex knows.
Hop! Tasks run on, errors light. 🐇⏱️

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@myumura myumura marked this pull request as ready for review August 17, 2025 14:46
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (2)
lib/shoryuken/middleware/server/auto_extend_visibility.rb (2)

31-39: Clamp the execution interval to a sane positive value

If queue_visibility_timeout < EXTEND_UPFRONT_SECONDS, the interval becomes 0 or negative, which will now raise in TimerTask initialization (and would previously crash at sleep). Clamp to at least 1 second to keep behavior safe and predictable.

           def auto_extend(_worker, queue, sqs_msg, _body)
             queue_visibility_timeout = Shoryuken::Client.queues(queue).visibility_timeout

-            Shoryuken::Helpers::TimerTask.new(execution_interval: queue_visibility_timeout - EXTEND_UPFRONT_SECONDS) do
+            interval = [queue_visibility_timeout - EXTEND_UPFRONT_SECONDS, 1].max
+            Shoryuken::Helpers::TimerTask.new(execution_interval: interval) do
               logger.debug do
                 "Extending message #{queue}/#{sqs_msg.message_id} visibility timeout by #{queue_visibility_timeout}s"
               end

30-44: Require the new TimerTask helper
To prevent uninitialized constant Shoryuken::Helpers::TimerTask errors under certain load orders, you need to explicitly load your new helper. No existing require or autoload for
shoryuken/helpers/timer_task was found in the gem’s entrypoint.

Please add one of the following:

  • In lib/shoryuken.rb (preferred):
     # lib/shoryuken.rb
    + require 'shoryuken/helpers/timer_task'
  • Or at the top of lib/shoryuken/middleware/server/auto_extend_visibility.rb:
     # lib/shoryuken/middleware/server/auto_extend_visibility.rb
    + require 'shoryuken/helpers/timer_task'

No remaining references to Concurrent::TimerTask were found outside of your new helper implementation.

🧹 Nitpick comments (8)
lib/shoryuken/helpers/timer_task.rb (3)

30-41: Best-effort cleanup: join briefly after killing the thread

Thread#kill is fine here, but a short non-blocking join helps avoid lingering zombie threads and ensures memory/resources are reclaimed promptly. Keep the join bounded to not delay shutdown paths.

       def kill
         @mutex.synchronize do
           return false if @killed

           @killed = true
           @running = false

-          @thread.kill if @thread&.alive?
+          if (t = @thread)&.alive?
+            t.kill
+            # Best-effort cleanup; do not block indefinitely
+            t.join(0.1)
+          end
         end
         true
       end

55-57: Prefer Shoryuken logger over Kernel.warn for consistency

Using logger integrates with the existing logging pipeline and respects log levels and formatting. It keeps stderr quiet and makes the TimerTask’s errors observable through the same channels as the rest of Shoryuken.

You can include Util and route messages through logger:

 class TimerTask
+      include Util
-            warn "TimerTask execution error: #{e.message}"
-            warn e.backtrace.join("\n") if e.backtrace
+            logger.warn { "TimerTask execution error: #{e.message}" }
+            logger.debug { e.backtrace.join("\n") } if e.backtrace

45-58: Optional: reduce drift by scheduling with a monotonic clock

Current loop sleeps for a fixed interval after each run; execution time adds drift. If tighter cadence matters, compute the next tick from a monotonic clock so long tasks don’t accumulate drift.

Example approach:

  • Track next_run_at = now + interval.
  • Sleep for max(next_run_at - now, 0).
  • After running the task, increment next_run_at by the fixed interval.

I can provide a patch if you want to go this route.

spec/shoryuken/helpers/timer_task_spec.rb (5)

103-109: Reduce flakiness by joining the worker thread instead of sleeping

Sleeping for 10ms may not be enough under CI load, making this test flaky. Join the thread with a short timeout to deterministically observe termination.

       thread = timer_task.instance_variable_get(:@thread)
       timer_task.kill
-      sleep(0.01) # Give time for thread to be killed
-      expect(thread.alive?).to be false
+      thread.join(0.5) # Wait up to 500ms for thread termination
+      expect(thread.alive?).to be false

221-229: Widen tolerance to limit timing flakes on shared CI

Intermittent load can skew short-interval timing. Slightly widening the tolerance makes the test more robust without weakening intent.

-        expect(interval).to be_within(0.05).of(0.1)
+        expect(interval).to be_within(0.07).of(0.1)

165-170: Stub Kernel.warn directly rather than any instance of Object

Stubbing Kernel.warn is tighter and less intrusive than allow_any_instance_of(Object), and it still captures warn output since warn is provided by Kernel.

-      allow_any_instance_of(Object).to receive(:warn) do |*args|
+      allow(Kernel).to receive(:warn) do |*args|
         captured_stderr.puts(*args)
       end

189-191: Same here: stub Kernel.warn directly

Keeps the stub limited to the call site you care about.

-      allow_any_instance_of(Object).to receive(:warn)
+      allow(Kernel).to receive(:warn)

13-40: Add a spec for non-positive intervals

Given the suggested initializer validation, cover the failure mode explicitly.

Example spec snippet to add:

describe '#initialize' do
  it 'raises when execution_interval is non-positive' do
    expect { described_class.new(execution_interval: 0) {} }
      .to raise_error(ArgumentError, /positive/)
    expect { described_class.new(execution_interval: -1) {} }
      .to raise_error(ArgumentError, /positive/)
  end
end

I can open a follow-up PR adding this test once you decide on the chosen behavior (raise vs clamp).

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 45b94dd and 50d4a1e.

📒 Files selected for processing (3)
  • lib/shoryuken/helpers/timer_task.rb (1 hunks)
  • lib/shoryuken/middleware/server/auto_extend_visibility.rb (1 hunks)
  • spec/shoryuken/helpers/timer_task_spec.rb (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
lib/shoryuken/middleware/server/auto_extend_visibility.rb (1)
lib/shoryuken/message.rb (2)
  • change_visibility (32-36)
  • visibility_timeout= (38-44)
lib/shoryuken/helpers/timer_task.rb (1)
lib/shoryuken/middleware/server/auto_extend_visibility.rb (1)
  • call (11-23)
spec/shoryuken/helpers/timer_task_spec.rb (1)
lib/shoryuken/helpers/timer_task.rb (2)
  • execute (20-28)
  • kill (31-41)
🔇 Additional comments (2)
lib/shoryuken/helpers/timer_task.rb (1)

19-28: Execute semantics look solid (idempotent and returns self)

execute is correctly idempotent, thread-safe, and returns self for chaining. Nice.

lib/shoryuken/middleware/server/auto_extend_visibility.rb (1)

47-49: LGTM: timer lifecycle is clear and explicit

auto_visibility_timer returns a task and tap(&:execute) starts it. The ensure block above guarantees cleanup with kill. Good.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (7)
lib/shoryuken/helpers/timer_task.rb (4)

32-43: Consider preferring cooperative shutdown over hard Thread#kill

Killing a thread can interrupt the task mid-execution. Given you already set @killed and the loop checks it, a cooperative approach (set killed and, if you can afford it, join with a short timeout) avoids interrupting user code in the middle of I/O. If join is undesirable in hot paths (like middleware ensure), consider making this behavior configurable or only killing if the thread is sleeping (hard to detect reliably).

Example tweak (optional):

 def kill
   @mutex.synchronize do
     return false if @killed

     @killed = true
     @running = false

-    @thread.kill if @thread&.alive?
+    # Prefer cooperative shutdown; fall back to kill if it lingers (optional)
+    if (t = @thread) && t.alive?
+      t.join(0.01) # don't block for long
+      t.kill if t.alive?
+    end
   end
   true
 end

47-63: Scheduling semantics: document fixed-delay behavior and potential drift

The loop sleeps for execution_interval then runs the task. This is fixed-delay scheduling; long-running tasks increase the interval between subsequent runs. Concurrent::TimerTask historically targets similar behavior but also exposes options like timeout/overrun handling. Documenting this explicitly in the class comment helps set expectations, especially if tasks are sometimes slower than the interval.


52-59: Use Shoryuken.logger for consistency with project logging

Using warn writes directly to stderr and bypasses Shoryuken’s logger formatting and filters. Consider switching to Shoryuken.logger.warn (and .debug/.error as needed) for consistent observability. Keep the rescue broad if you want to mirror Concurrent::TimerTask resiliency.

-            warn "TimerTask execution error: #{e.message}"
-            warn e.backtrace.join("\n") if e.backtrace
+            if defined?(Shoryuken) && Shoryuken.respond_to?(:logger) && Shoryuken.logger
+              Shoryuken.logger.warn "TimerTask execution error: #{e.message}"
+              Shoryuken.logger.warn(e.backtrace.join("\n")) if e.backtrace
+            else
+              warn "TimerTask execution error: #{e.message}"
+              warn e.backtrace.join("\n") if e.backtrace
+            end

21-30: Optional: name the worker thread for easier debugging

Naming the thread can simplify debugging and thread dumps.

-          @thread = Thread.new { run_timer_loop }
+          @thread = Thread.new do
+            Thread.current.name = 'Shoryuken::TimerTask' if Thread.current.respond_to?(:name=)
+            run_timer_loop
+          end

Also applies to: 47-63

spec/shoryuken/helpers/timer_task_spec.rb (3)

23-26: Rubocop: split long expectation lines to satisfy Layout/LineLength

The two expectations exceed the 125-char limit flagged by RuboCop. Split across lines for readability.

-      expect { described_class.new(execution_interval: 0) {} }.to raise_error(ArgumentError, 'execution_interval must be positive')
-      expect { described_class.new(execution_interval: -1) {} }.to raise_error(ArgumentError, 'execution_interval must be positive')
+      expect { described_class.new(execution_interval: 0) {} }
+        .to raise_error(ArgumentError, 'execution_interval must be positive')
+      expect { described_class.new(execution_interval: -1) {} }
+        .to raise_error(ArgumentError, 'execution_interval must be positive')

119-125: Thread termination check can be racy; add a small join to reduce flakiness

After kill, checking thread.alive? right away can be flaky in slower CI. Joining briefly reduces instability.

-      timer_task.kill
-      sleep(0.01) # Give time for thread to be killed
+      timer_task.kill
+      thread.join(0.05) # Allow thread to exit
       expect(thread.alive?).to be false

169-196: Prefer stubbing Kernel.warn directly instead of allow_any_instance_of(Object)

allow_any_instance_of is heavy and globally affects objects. Since warn is provided by Kernel, stub Kernel.warn to capture output with a narrower scope.

-      allow_any_instance_of(Object).to receive(:warn) do |*args|
-        captured_stderr.puts(*args)
-      end
+      allow(Kernel).to receive(:warn) { |msg| captured_stderr.puts(msg) }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 50d4a1e and 5e0337b.

📒 Files selected for processing (2)
  • lib/shoryuken/helpers/timer_task.rb (1 hunks)
  • spec/shoryuken/helpers/timer_task_spec.rb (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
lib/shoryuken/helpers/timer_task.rb (1)
lib/shoryuken/middleware/server/auto_extend_visibility.rb (1)
  • call (11-23)
spec/shoryuken/helpers/timer_task_spec.rb (1)
lib/shoryuken/helpers/timer_task.rb (2)
  • execute (22-30)
  • kill (33-43)
🪛 RuboCop (1.76.1)
spec/shoryuken/helpers/timer_task_spec.rb

[convention] 24-24: Line is too long. [131/125]

(Layout/LineLength)


[convention] 25-25: Line is too long. [132/125]

(Layout/LineLength)

🔇 Additional comments (6)
lib/shoryuken/helpers/timer_task.rb (2)

8-19: Solid initializer: input validation and state setup look correct

  • Coercing execution_interval via Float and enforcing > 0 prevents thread-crashing sleeps.
  • Clear error message and early validation are appreciated.

21-30: Idempotent, thread-safe start; returns self for chaining

execute correctly guards against multiple starts and killed state under a mutex, and returns self for tap(&:execute) usage.

spec/shoryuken/helpers/timer_task_spec.rb (4)

33-37: Align error type assertions with coercion behavior

Float(execution_interval) raises TypeError for nil and non-convertible types; the test asserts TypeError for nil and Hash, and ArgumentError for invalid string. That’s consistent with current behavior. LGTM.


78-86: Good idempotency coverage on execute

Verifies no new thread is spawned on repeated execute calls. This matches the mutex-guarded @running flag.


231-246: Time-based test is reasonable; consider broadening tolerance for slow CI

Using be_within(0.05).of(0.1) is okay locally but may be tight under load. If you see flakes in CI, widen tolerance or assert monotonic increase and minimum spacing rather than exact interval.

Do you observe any flakiness in CI for this example? If so, I can propose a revised matcher that’s more resilient under load.


249-297: Thread-safety scenarios look comprehensive

  • Concurrent execute results in a single worker thread.
  • Concurrent kill produces exactly one true outcome.
  • Kill across multiple threads leaves the timer in a stopped state.

These cover the primary contention points around @mutex. Nicely done.

@mensfeld mensfeld self-assigned this Aug 17, 2025
@mensfeld
Copy link
Collaborator

@myumura i think we can do it that way for now but I also think that in the future we could use a model where we have a single thread that performs all the time and one-shot operations based on a queue + a periodic scheduler. Great job!

@mensfeld
Copy link
Collaborator

mensfeld commented Sep 8, 2025

@myumura I will merge this once back from 🇮🇹 - this looks good but I just want to align it with my changes a bit 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants