fix: Wait for agent completion and ensure all events processed in blo…#430
fix: Wait for agent completion and ensure all events processed in blo…#430kabir wants to merge 1 commit intoa2aproject:mainfrom
Conversation
…cking calls Fixes race condition where DefaultRequestHandler.onMessageSend() returns before all task events are fully processed and persisted to TaskStore, resulting in incomplete Task objects being returned to clients (missing artifacts, incorrect state). Root Cause: - Blocking calls interrupted immediately after first event and returned to client before background event consumption completed - Agent execution and event processing happened asynchronously in background - No synchronization to ensure all events were consumed and persisted before returning Task to client Solution (4-step process): 1. Wait for agent to finish enqueueing events (5s timeout) 2. Close the queue to signal consumption can complete (breaks dependency) 3. Wait for consumption to finish processing events (2s timeout) 4. Fetch final task state from TaskStore (has all artifacts and correct state) This ensures blocking calls return complete Task objects with all artifacts and correct state, including support for fire-and-forget tasks that never emit final state events. Added unit tests: - testBlockingFireAndForgetReturnsNonFinalTask: Validates fire-and-forget pattern - testBlockingCallReturnsCompleteTaskWithArtifacts: Ensures all artifacts included 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Summary of ChangesHello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request resolves a critical race condition that caused incomplete task information to be returned to clients. It introduces a synchronization mechanism to guarantee that all task events are fully processed and persisted before a task is returned, particularly addressing 'fire-and-forget' scenarios. The changes include modifications to Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request addresses a critical race condition in blocking calls by introducing a synchronized waiting mechanism to ensure all events are processed before returning a response. The solution, which involves waiting for both agent execution and event consumption with timeouts, seems robust and well-reasoned. The new logic in DefaultRequestHandler is clear and the added unit tests effectively cover the fixed bug and related edge cases like fire-and-forget tasks. My review includes a few suggestions to improve maintainability by replacing magic numbers with constants, cleaning up an unused parameter, and enhancing test assertions for better precision and clarity.
| // Step 1: Wait for agent to finish (with short timeout for fast agents) | ||
| if (agentFuture != null) { | ||
| try { | ||
| agentFuture.get(5, java.util.concurrent.TimeUnit.SECONDS); |
|
|
||
| // Step 3: Wait for consumption to complete (now that queue is closed) | ||
| if (etai.consumptionFuture() != null) { | ||
| etai.consumptionFuture().get(2, java.util.concurrent.TimeUnit.SECONDS); |
| return consumeAndBreakOnInterrupt(consumer, blocking, eventCallback, null); | ||
| } | ||
|
|
||
| public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback, CompletableFuture<Void> agentFuture) throws JSONRPCError { |
| assertTrue(returnedTask.getStatus().state() == TaskState.COMPLETED, | ||
| "Returned task should be COMPLETED, got: " + returnedTask.getStatus().state()); |
There was a problem hiding this comment.
For consistency with other assertions in this file and for better failure messages, it's preferable to use assertEquals to compare enum values.
| assertTrue(returnedTask.getStatus().state() == TaskState.COMPLETED, | |
| "Returned task should be COMPLETED, got: " + returnedTask.getStatus().state()); | |
| assertEquals(TaskState.COMPLETED, returnedTask.getStatus().state(), | |
| "Returned task should be COMPLETED, got: " + returnedTask.getStatus().state()); |
| assertTrue(returnedTask.getArtifacts().size() >= 2, | ||
| "Returned task should have at least 2 artifacts, got: " + | ||
| (returnedTask.getArtifacts() != null ? returnedTask.getArtifacts().size() : 0)); |
There was a problem hiding this comment.
The agent in this test adds exactly two artifacts. Using assertEquals(2, ...) would make this assertion more precise than assertTrue(size >= 2, ...). It ensures that exactly two artifacts are present, not more. The assertion message can also be simplified since returnedTask.getArtifacts() is asserted to be not null on line 790.
assertEquals(2, returnedTask.getArtifacts().size(),
"Returned task should have 2 artifacts, got: " + returnedTask.getArtifacts().size());|
@flex-myeonghyeon Your issue shed some light on why my attempt to upgrade the Jakarta SDK to 0.3.1.Final of a2a-java was failing. Thanks for that! Would you be able to take a look at this PR and see if it does what you have in mind? One slight change from what you say in your issue is for the 'fire and forget' case. It is valid for an AgentExecutor to not return a final task. In these cases blocking sendMessage calls, should return when the AgentExecutor is done. |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request effectively addresses a critical race condition in blocking calls by ensuring the agent and event processing complete before returning a response. The solution is well-structured and the inclusion of new unit tests for both the fix and for fire-and-forget scenarios is a great addition that improves the robustness of the code. The changes are clear and well-commented. I have a few suggestions to further improve code clarity and test precision.
| if (blocking && interruptedOrNonBlocking) { | ||
| // For blocking calls: ensure all events are processed before returning | ||
| // Order of operations is critical to avoid circular dependency: | ||
| // 1. Wait for agent to finish enqueueing events | ||
| // 2. Close the queue to signal consumption can complete | ||
| // 3. Wait for consumption to finish processing events | ||
| // 4. Fetch final task state from TaskStore | ||
|
|
||
| try { | ||
| // Step 1: Wait for agent to finish (with short timeout for fast agents) | ||
| if (agentFuture != null) { | ||
| try { | ||
| agentFuture.get(5, java.util.concurrent.TimeUnit.SECONDS); | ||
| LOGGER.debug("Agent completed for task {}", taskId); | ||
| } catch (java.util.concurrent.TimeoutException e) { | ||
| // Agent still running after 5s - that's fine, events already being processed | ||
| LOGGER.debug("Agent still running for task {} after 5s", taskId); | ||
| } | ||
| } | ||
|
|
||
| // Step 2: Close the queue to signal consumption can complete | ||
| // For fire-and-forget tasks, there's no final event, so we need to close the queue | ||
| // This allows EventConsumer.consumeAll() to exit | ||
| queue.close(false, false); // graceful close, don't notify parent yet | ||
| LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId); | ||
|
|
||
| // Step 3: Wait for consumption to complete (now that queue is closed) | ||
| if (etai.consumptionFuture() != null) { | ||
| etai.consumptionFuture().get(2, java.util.concurrent.TimeUnit.SECONDS); | ||
| LOGGER.debug("Consumption completed for task {}", taskId); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| LOGGER.warn("Interrupted waiting for task {} completion", taskId, e); | ||
| } catch (java.util.concurrent.ExecutionException e) { | ||
| LOGGER.warn("Error during task {} execution", taskId, e.getCause()); | ||
| } catch (java.util.concurrent.TimeoutException e) { | ||
| LOGGER.warn("Timeout waiting for consumption to complete for task {}", taskId); | ||
| } | ||
|
|
||
| // Step 4: Fetch the final task state from TaskStore (all events have been processed) | ||
| Task updatedTask = taskStore.get(taskId); | ||
| if (updatedTask != null) { | ||
| kind = updatedTask; | ||
| if (LOGGER.isDebugEnabled()) { | ||
| LOGGER.debug("Fetched final task for {} with state {} and {} artifacts", | ||
| taskId, updatedTask.getStatus().state(), | ||
| updatedTask.getArtifacts() != null ? updatedTask.getArtifacts().size() : 0); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The logic inside the if (blocking && interruptedOrNonBlocking) block is quite complex and spans many lines. To improve the readability and maintainability of the onMessageSend method, consider extracting this logic into a private helper method, for example waitForCompletionAndFetchFinalTask. This new method could take parameters like agentFuture, queue, etai, and taskId and return the final Task.
| try { | ||
| // Step 1: Wait for agent to finish (with short timeout for fast agents) | ||
| if (agentFuture != null) { | ||
| try { | ||
| agentFuture.get(5, java.util.concurrent.TimeUnit.SECONDS); | ||
| LOGGER.debug("Agent completed for task {}", taskId); | ||
| } catch (java.util.concurrent.TimeoutException e) { | ||
| // Agent still running after 5s - that's fine, events already being processed | ||
| LOGGER.debug("Agent still running for task {} after 5s", taskId); | ||
| } | ||
| } | ||
|
|
||
| // Step 2: Close the queue to signal consumption can complete | ||
| // For fire-and-forget tasks, there's no final event, so we need to close the queue | ||
| // This allows EventConsumer.consumeAll() to exit | ||
| queue.close(false, false); // graceful close, don't notify parent yet | ||
| LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId); | ||
|
|
||
| // Step 3: Wait for consumption to complete (now that queue is closed) | ||
| if (etai.consumptionFuture() != null) { | ||
| etai.consumptionFuture().get(2, java.util.concurrent.TimeUnit.SECONDS); | ||
| LOGGER.debug("Consumption completed for task {}", taskId); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| LOGGER.warn("Interrupted waiting for task {} completion", taskId, e); | ||
| } catch (java.util.concurrent.ExecutionException e) { | ||
| LOGGER.warn("Error during task {} execution", taskId, e.getCause()); | ||
| } catch (java.util.concurrent.TimeoutException e) { | ||
| LOGGER.warn("Timeout waiting for consumption to complete for task {}", taskId); | ||
| } |
There was a problem hiding this comment.
| String taskId = "blocking-persist-task"; | ||
| String contextId = "blocking-persist-ctx"; |
There was a problem hiding this comment.
This test was renamed to testNonBlockingMessagePersistsAllEventsInBackground, but the taskId and contextId variables still contain "blocking". This could be confusing for future readers. It would be clearer to update these identifiers to reflect the non-blocking nature of the test.
| String taskId = "blocking-persist-task"; | |
| String contextId = "blocking-persist-ctx"; | |
| String taskId = "non-blocking-persist-task"; | |
| String contextId = "non-blocking-persist-ctx"; |
| assertTrue(returnedTask.getStatus().state() == TaskState.COMPLETED, | ||
| "Returned task should be COMPLETED, got: " + returnedTask.getStatus().state()); |
There was a problem hiding this comment.
Using assertEquals is preferred over assertTrue for comparing values. It provides a more informative failure message by showing both the expected and actual values, which makes debugging tests easier.
| assertTrue(returnedTask.getStatus().state() == TaskState.COMPLETED, | |
| "Returned task should be COMPLETED, got: " + returnedTask.getStatus().state()); | |
| assertEquals(TaskState.COMPLETED, returnedTask.getStatus().state(), | |
| "Returned task should be COMPLETED, got: " + returnedTask.getStatus().state()); |
| assertTrue(returnedTask.getArtifacts().size() >= 2, | ||
| "Returned task should have at least 2 artifacts, got: " + | ||
| (returnedTask.getArtifacts() != null ? returnedTask.getArtifacts().size() : 0)); |
There was a problem hiding this comment.
The test agent adds exactly two artifacts. Using assertEquals(2, ...) would make this assertion more precise than assertTrue(size >= 2, ...). This ensures the test fails if an incorrect number of artifacts (more or fewer than expected) is returned.
| assertTrue(returnedTask.getArtifacts().size() >= 2, | |
| "Returned task should have at least 2 artifacts, got: " + | |
| (returnedTask.getArtifacts() != null ? returnedTask.getArtifacts().size() : 0)); | |
| assertEquals(2, returnedTask.getArtifacts().size(), | |
| "Returned task should have at least 2 artifacts, got: " + | |
| (returnedTask.getArtifacts() != null ? returnedTask.getArtifacts().size() : 0)); |
…cking calls
Fixes race condition where DefaultRequestHandler.onMessageSend() returns before all task events are fully processed and persisted to TaskStore, resulting in incomplete Task objects being returned to clients (missing artifacts, incorrect state).
Root Cause:
Solution (4-step process):
This ensures blocking calls return complete Task objects with all artifacts and correct state, including support for fire-and-forget tasks that never emit final state events.
Added unit tests:
Fixes #428