Skip to content

fix: Eliminate intermittent failure in testQueueClosedEventTerminates…#631

Merged
kabir merged 1 commit intoa2aproject:mainfrom
kabir:racecondition
Feb 6, 2026
Merged

fix: Eliminate intermittent failure in testQueueClosedEventTerminates…#631
kabir merged 1 commit intoa2aproject:mainfrom
kabir:racecondition

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Feb 6, 2026

…RemoteSubscribers

Two-part fix for race condition in Kafka replication test:

  1. EventQueue: Signal polling started BEFORE any early returns

    • Moved signalQueuePollerStarted() to beginning of dequeueEventItem()
    • Previously in finally block, which didn't execute if queue was closed
    • Race: If queue closed before first dequeue, EventQueueClosedException thrown BEFORE try block, so finally never ran and signal never sent
    • Now always signals even if queue immediately throws on closure
  2. Test: Replace unreliable Thread.sleep with proper synchronization

    • Inject QueueManager to access the queue for the task
    • Replace Thread.sleep(2000) with queueManager.awaitQueuePollerStart()
    • Ensures EventConsumer is actually polling before sending QueueClosedEvent
    • Add debug logging to diagnose future failures

Root cause: signalQueuePollerStarted() was only called in finally block, which was skipped when queue was already closed at first dequeue attempt. This caused awaitQueuePollerStart() to hang or timeout intermittently.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses an intermittent test failure caused by a race condition in the event queue's polling mechanism. It refactors the EventQueue to ensure polling signals are consistently emitted and updates a related integration test to use proper synchronization instead of a fixed delay, thereby improving the reliability and diagnostic capabilities of the test suite.

Highlights

  • Race Condition Fix in EventQueue Polling: The signalQueuePollerStarted() call was moved to the beginning of dequeueEventItem() in EventQueue.java. This ensures the polling signal is always sent, even if the queue is closed immediately, preventing a race condition where awaitQueuePollerStart() could hang.
  • Improved Test Synchronization: Replaced an unreliable Thread.sleep(2000) in KafkaReplicationIntegrationTest.java with explicit synchronization using queueManager.awaitQueuePollerStart(). This guarantees the EventConsumer is actively polling before a QueueClosedEvent is sent, making the test more robust.
  • Enhanced Test Debugging: Added debug logging to KafkaReplicationIntegrationTest.java to provide more insight if the streaming subscription does not complete as expected, aiding in future failure diagnosis.
Changelog
  • extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java
    • Injected QueueManager dependency for test synchronization.
    • Replaced Thread.sleep with queueManager.awaitQueuePollerStart() to ensure the event consumer is polling before proceeding.
    • Added debug logging to aid in diagnosing test failures related to stream completion.
  • server-common/src/main/java/io/a2a/server/events/EventQueue.java
    • Moved signalQueuePollerStarted() to the beginning of the dequeueEventItem method to guarantee it's called before any early returns or exceptions.
    • Refactored the try-finally block in dequeueEventItem as the finally content was moved.
Activity
  • No specific activity (comments, reviews, etc.) has been provided for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively resolves an intermittent test failure by correcting a race condition. The use of queueManager.awaitQueuePollerStart() instead of Thread.sleep() is a significant improvement for test reliability. The corresponding change in EventQueue to signal the poller start earlier is also a crucial part of the fix. The changes are well-documented in the pull request description. I have a couple of minor suggestions to enhance code consistency and maintainability.

…RemoteSubscribers

Two-part fix for race condition in Kafka replication test:

1. EventQueue: Signal polling started BEFORE any early returns
   - Moved signalQueuePollerStarted() to beginning of dequeueEventItem()
   - Previously in finally block, which didn't execute if queue was closed
   - Race: If queue closed before first dequeue, EventQueueClosedException
     thrown BEFORE try block, so finally never ran and signal never sent
   - Now always signals even if queue immediately throws on closure

2. Test: Replace unreliable Thread.sleep with proper synchronization
   - Inject QueueManager to access the queue for the task
   - Replace Thread.sleep(2000) with queueManager.awaitQueuePollerStart()
   - Ensures EventConsumer is actually polling before sending QueueClosedEvent
   - Add debug logging to diagnose future failures

Root cause: signalQueuePollerStarted() was only called in finally block,
which was skipped when queue was already closed at first dequeue attempt.
This caused awaitQueuePollerStart() to hang or timeout intermittently.
@kabir kabir merged commit 18d2abf into a2aproject:main Feb 6, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants