Skip to content

fix: Implement background cleanup and producer continuation for clien…#284

Merged
fjuma merged 2 commits intoa2aproject:mainfrom
kabir:async-fixes
Sep 22, 2025
Merged

fix: Implement background cleanup and producer continuation for clien…#284
fjuma merged 2 commits intoa2aproject:mainfrom
kabir:async-fixes

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Sep 22, 2025

…t disconnect

Backports key fixes from a2a-python PRs #440 and #472 to resolve event queue
lifecycle issues that prevented task resubscription after client disconnect.

  • Add background task tracking to DefaultRequestHandler with CompletableFuture
  • Implement asynchronous cleanup when clients disconnect during streaming
  • Add EventQueue graceful vs immediate close behavior

Fixes #283 🦕

…t disconnect

  Backports key fixes from a2a-python PRs a2aproject#440 and a2aproject#472 to resolve event queue
  lifecycle issues that prevented task resubscription after client disconnect.

  - Add background task tracking to DefaultRequestHandler with CompletableFuture
  - Implement asynchronous cleanup when clients disconnect during streaming
  - Add EventQueue graceful vs immediate close behavior
  - Restore TCK sleep for task state persistence timing requirements
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves critical event queue lifecycle issues that previously hindered task resubscription following a client disconnect. By introducing asynchronous background cleanup and refined event queue closing mechanisms, the system now ensures that producers continue their work even if a client drops, allowing for robust task state persistence and successful client resubscription to ongoing tasks. These changes enhance the stability and reliability of streaming operations.

Highlights

  • Background Task Tracking: Implemented background task tracking within DefaultRequestHandler using CompletableFuture to manage asynchronous cleanup operations.
  • Asynchronous Cleanup on Disconnect: Ensured that cleanup processes run asynchronously when clients disconnect during streaming, preventing event queue lifecycle issues.
  • EventQueue Close Behavior: Introduced a new close(boolean immediate) method in EventQueue to support both graceful (drain naturally) and immediate (clear queue) closing behaviors.
  • TCK Sleep Restoration: Restored a sleep mechanism in the TCK AgentExecutorProducer to allow for proper task state persistence timing requirements, especially for resubscription tests.
  • Comprehensive Test Coverage: Added a new test file DefaultRequestHandlerBackgroundTest.java and expanded EventQueueTest.java to cover the new background cleanup, producer continuation, and event queue closing functionalities, backporting tests from Python PRs.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses event queue lifecycle issues upon client disconnect by introducing asynchronous background cleanup and producer continuation. The changes in EventQueue to support immediate vs. graceful closing are well-implemented and propagated correctly. The use of CompletableFuture in DefaultRequestHandler to manage cleanup tasks asynchronously is a solid improvement that should prevent blocking and allow for task resubscription as intended. The new tests, backported from the Python implementation, provide good coverage for the new functionality. I have one suggestion in a new test file to make an assertion more robust.

@fjuma fjuma merged commit 8903e3d into a2aproject:main Sep 22, 2025
4 checks passed
kabir added a commit to kabir/a2a-java that referenced this pull request Oct 14, 2025
…essageSend requests

  Modify ResultAggregator.consumeAndBreakOnInterrupt to continue consuming
  events in the background for blocking calls, matching the behavior of
  non-blocking and streaming flows.

  Previously, blocking calls set continueInBackground=false, which closed
  the consumer after the first event was returned. This caused subsequent
  events from the still-running AgentExecutor to be enqueued but never
  processed, resulting in incomplete task state in the TaskStore.

  The fix ensures:
  - Blocking calls still return immediately with the first event
  - Consumer continues processing events in background until agent completes
  - All events are persisted to TaskStore, including final task state

  This matches the background processing pattern already established in
  PR a2aproject#284 for streaming client disconnects, but now applies to the
  blocking non-streaming code path as well.
kabir added a commit to kabir/a2a-java that referenced this pull request Oct 16, 2025
…essageSend requests

  Modify ResultAggregator.consumeAndBreakOnInterrupt to continue consuming
  events in the background for blocking calls, matching the behavior of
  non-blocking and streaming flows.

  Previously, blocking calls set continueInBackground=false, which closed
  the consumer after the first event was returned. This caused subsequent
  events from the still-running AgentExecutor to be enqueued but never
  processed, resulting in incomplete task state in the TaskStore.

  The fix ensures:
  - Blocking calls still return immediately with the first event
  - Consumer continues processing events in background until agent completes
  - All events are persisted to TaskStore, including final task state

  This matches the background processing pattern already established in
  PR a2aproject#284 for streaming client disconnects, but now applies to the
  blocking non-streaming code path as well.
kabir added a commit to kabir/a2a-java that referenced this pull request Dec 23, 2025
a2aproject#284)

…t disconnect

Backports key fixes from a2a-python PRs a2aproject#440 and a2aproject#472 to resolve event
queue
lifecycle issues that prevented task resubscription after client
disconnect.

- Add background task tracking to DefaultRequestHandler with
CompletableFuture
- Implement asynchronous cleanup when clients disconnect during
streaming
  - Add EventQueue graceful vs immediate close behavior

Fixes a2aproject#283 🦕
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: Agent execution cancelled once request is saved

2 participants