Skip to content

fix: Implement background consumption for blocking, non-streaming onMessageSend requests#348

Closed
kabir wants to merge 4 commits intoa2aproject:mainfrom
kabir:event-queues-background-blocking
Closed

fix: Implement background consumption for blocking, non-streaming onMessageSend requests#348
kabir wants to merge 4 commits intoa2aproject:mainfrom
kabir:event-queues-background-blocking

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Oct 14, 2025

Modify ResultAggregator.consumeAndBreakOnInterrupt to continue consuming
events in the background for blocking calls, matching the behavior of
non-blocking and streaming flows.

Previously, blocking calls set continueInBackground=false, which closed
the consumer after the first event was returned. This caused subsequent
events from the still-running AgentExecutor to be enqueued but never
processed, resulting in incomplete task state in the TaskStore.

The fix ensures:

  • Blocking calls still return immediately with the first event
  • Consumer continues processing events in background until agent completes
  • All events are persisted to TaskStore, including final task state

This matches the background processing pattern already established in
PR #284 for streaming client disconnects, but now applies to the
blocking non-streaming code path as well.

kabir added 4 commits October 13, 2025 22:46
… MainQueue closure

Implements reference counting for EventQueues to fix KafkaReplicationIntegrationTest
and prepare for improvements to the replicated QueueManager. Additional fixes resolve
TCK timeout issues discovered during testing.

## Changes

**EventQueue Reference Counting:**
- MainQueue tracks active ChildQueues with reference counting
- Prevents premature closure when ChildQueues are still consuming events
- Fixes KafkaReplicationIntegrationTest failures
- Prepares infrastructure for replicated QueueManager improvements

**Fix ForkJoinPool Saturation (TCK fixes):**
- Inject @internal Executor (15 threads) into all transport handlers
- Changed CompletableFuture.runAsync() to use injected executor instead of ForkJoinPool.commonPool()
- Prevents streaming subscription timeouts under concurrent load on CI (3 threads)
- Affects: RestHandler, GrpcHandler, JSONRPCHandler

**Improved Queue Lifecycle:**
- Removed awaitQueuePollerStart() to eliminate thread blocking bottleneck
- EventConsumer now manages queue closing on terminal events
- Background cleanup to avoid blocking request threads
- Fixed race condition when clients disconnect during streaming
- Better terminal event detection in ResultAggregator

**Tests & Documentation:**
- Updated all transport handler tests to pass executor parameter
- Added executor configuration section to README.md

TCK tests now pass reliably on CI.
…essageSend requests

  Modify ResultAggregator.consumeAndBreakOnInterrupt to continue consuming
  events in the background for blocking calls, matching the behavior of
  non-blocking and streaming flows.

  Previously, blocking calls set continueInBackground=false, which closed
  the consumer after the first event was returned. This caused subsequent
  events from the still-running AgentExecutor to be enqueued but never
  processed, resulting in incomplete task state in the TaskStore.

  The fix ensures:
  - Blocking calls still return immediately with the first event
  - Consumer continues processing events in background until agent completes
  - All events are persisted to TaskStore, including final task state

  This matches the background processing pattern already established in
  PR a2aproject#284 for streaming client disconnects, but now applies to the
  blocking non-streaming code path as well.
@kabir
Copy link
Collaborator Author

kabir commented Oct 14, 2025

This seems to introduce an intermittent failure in this test https://github.com/a2aproject/a2a-java/pull/348/files#diff-99b869ef9cc2557374d0ae2c161c8272f8dcbdbbad8fa7dd429cc2f03d9c6da2R71. I need to look into that, but want to finish the replicated queue manager work first

@kabir
Copy link
Collaborator Author

kabir commented Oct 17, 2025

Superseded by #351

@kabir kabir closed this Oct 20, 2025
@kabir kabir deleted the event-queues-background-blocking branch November 3, 2025 13:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants