fix: Implement background consumption for blocking, non-streaming onMessageSend requests#348
Closed
kabir wants to merge 4 commits intoa2aproject:mainfrom
Closed
fix: Implement background consumption for blocking, non-streaming onMessageSend requests#348kabir wants to merge 4 commits intoa2aproject:mainfrom
kabir wants to merge 4 commits intoa2aproject:mainfrom
Conversation
… MainQueue closure Implements reference counting for EventQueues to fix KafkaReplicationIntegrationTest and prepare for improvements to the replicated QueueManager. Additional fixes resolve TCK timeout issues discovered during testing. ## Changes **EventQueue Reference Counting:** - MainQueue tracks active ChildQueues with reference counting - Prevents premature closure when ChildQueues are still consuming events - Fixes KafkaReplicationIntegrationTest failures - Prepares infrastructure for replicated QueueManager improvements **Fix ForkJoinPool Saturation (TCK fixes):** - Inject @internal Executor (15 threads) into all transport handlers - Changed CompletableFuture.runAsync() to use injected executor instead of ForkJoinPool.commonPool() - Prevents streaming subscription timeouts under concurrent load on CI (3 threads) - Affects: RestHandler, GrpcHandler, JSONRPCHandler **Improved Queue Lifecycle:** - Removed awaitQueuePollerStart() to eliminate thread blocking bottleneck - EventConsumer now manages queue closing on terminal events - Background cleanup to avoid blocking request threads - Fixed race condition when clients disconnect during streaming - Better terminal event detection in ResultAggregator **Tests & Documentation:** - Updated all transport handler tests to pass executor parameter - Added executor configuration section to README.md TCK tests now pass reliably on CI.
…essageSend requests Modify ResultAggregator.consumeAndBreakOnInterrupt to continue consuming events in the background for blocking calls, matching the behavior of non-blocking and streaming flows. Previously, blocking calls set continueInBackground=false, which closed the consumer after the first event was returned. This caused subsequent events from the still-running AgentExecutor to be enqueued but never processed, resulting in incomplete task state in the TaskStore. The fix ensures: - Blocking calls still return immediately with the first event - Consumer continues processing events in background until agent completes - All events are persisted to TaskStore, including final task state This matches the background processing pattern already established in PR a2aproject#284 for streaming client disconnects, but now applies to the blocking non-streaming code path as well.
fjuma
approved these changes
Oct 14, 2025
Collaborator
Author
|
This seems to introduce an intermittent failure in this test https://github.com/a2aproject/a2a-java/pull/348/files#diff-99b869ef9cc2557374d0ae2c161c8272f8dcbdbbad8fa7dd429cc2f03d9c6da2R71. I need to look into that, but want to finish the replicated queue manager work first |
Collaborator
Author
|
Superseded by #351 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Modify ResultAggregator.consumeAndBreakOnInterrupt to continue consuming
events in the background for blocking calls, matching the behavior of
non-blocking and streaming flows.
Previously, blocking calls set continueInBackground=false, which closed
the consumer after the first event was returned. This caused subsequent
events from the still-running AgentExecutor to be enqueued but never
processed, resulting in incomplete task state in the TaskStore.
The fix ensures:
This matches the background processing pattern already established in
PR #284 for streaming client disconnects, but now applies to the
blocking non-streaming code path as well.