fix: Implement background cleanup and producer continuation for clien…#284
fix: Implement background cleanup and producer continuation for clien…#284fjuma merged 2 commits intoa2aproject:mainfrom
Conversation
…t disconnect Backports key fixes from a2a-python PRs a2aproject#440 and a2aproject#472 to resolve event queue lifecycle issues that prevented task resubscription after client disconnect. - Add background task tracking to DefaultRequestHandler with CompletableFuture - Implement asynchronous cleanup when clients disconnect during streaming - Add EventQueue graceful vs immediate close behavior - Restore TCK sleep for task state persistence timing requirements
Summary of ChangesHello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request resolves critical event queue lifecycle issues that previously hindered task resubscription following a client disconnect. By introducing asynchronous background cleanup and refined event queue closing mechanisms, the system now ensures that producers continue their work even if a client drops, allowing for robust task state persistence and successful client resubscription to ongoing tasks. These changes enhance the stability and reliability of streaming operations. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request effectively addresses event queue lifecycle issues upon client disconnect by introducing asynchronous background cleanup and producer continuation. The changes in EventQueue to support immediate vs. graceful closing are well-implemented and propagated correctly. The use of CompletableFuture in DefaultRequestHandler to manage cleanup tasks asynchronously is a solid improvement that should prevent blocking and allow for task resubscription as intended. The new tests, backported from the Python implementation, provide good coverage for the new functionality. I have one suggestion in a new test file to make an assertion more robust.
...-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerBackgroundTest.java
Outdated
Show resolved
Hide resolved
…essageSend requests Modify ResultAggregator.consumeAndBreakOnInterrupt to continue consuming events in the background for blocking calls, matching the behavior of non-blocking and streaming flows. Previously, blocking calls set continueInBackground=false, which closed the consumer after the first event was returned. This caused subsequent events from the still-running AgentExecutor to be enqueued but never processed, resulting in incomplete task state in the TaskStore. The fix ensures: - Blocking calls still return immediately with the first event - Consumer continues processing events in background until agent completes - All events are persisted to TaskStore, including final task state This matches the background processing pattern already established in PR a2aproject#284 for streaming client disconnects, but now applies to the blocking non-streaming code path as well.
…essageSend requests Modify ResultAggregator.consumeAndBreakOnInterrupt to continue consuming events in the background for blocking calls, matching the behavior of non-blocking and streaming flows. Previously, blocking calls set continueInBackground=false, which closed the consumer after the first event was returned. This caused subsequent events from the still-running AgentExecutor to be enqueued but never processed, resulting in incomplete task state in the TaskStore. The fix ensures: - Blocking calls still return immediately with the first event - Consumer continues processing events in background until agent completes - All events are persisted to TaskStore, including final task state This matches the background processing pattern already established in PR a2aproject#284 for streaming client disconnects, but now applies to the blocking non-streaming code path as well.
a2aproject#284) …t disconnect Backports key fixes from a2a-python PRs a2aproject#440 and a2aproject#472 to resolve event queue lifecycle issues that prevented task resubscription after client disconnect. - Add background task tracking to DefaultRequestHandler with CompletableFuture - Implement asynchronous cleanup when clients disconnect during streaming - Add EventQueue graceful vs immediate close behavior Fixes a2aproject#283 🦕
…t disconnect
Backports key fixes from a2a-python PRs #440 and #472 to resolve event queue
lifecycle issues that prevented task resubscription after client disconnect.
Fixes #283 🦕