Skip to content

fix: Wait for agent completion and ensure all events processed in blo…#430

Closed
kabir wants to merge 1 commit intoa2aproject:mainfrom
kabir:blocking-send-fixes
Closed

fix: Wait for agent completion and ensure all events processed in blo…#430
kabir wants to merge 1 commit intoa2aproject:mainfrom
kabir:blocking-send-fixes

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Nov 3, 2025

…cking calls

Fixes race condition where DefaultRequestHandler.onMessageSend() returns before all task events are fully processed and persisted to TaskStore, resulting in incomplete Task objects being returned to clients (missing artifacts, incorrect state).

Root Cause:

  • Blocking calls interrupted immediately after first event and returned to client before background event consumption completed
  • Agent execution and event processing happened asynchronously in background
  • No synchronization to ensure all events were consumed and persisted before returning Task to client

Solution (4-step process):

  1. Wait for agent to finish enqueueing events (5s timeout)
  2. Close the queue to signal consumption can complete (breaks dependency)
  3. Wait for consumption to finish processing events (2s timeout)
  4. Fetch final task state from TaskStore (has all artifacts and correct state)

This ensures blocking calls return complete Task objects with all artifacts and correct state, including support for fire-and-forget tasks that never emit final state events.

Added unit tests:

  • testBlockingFireAndForgetReturnsNonFinalTask: Validates fire-and-forget pattern
  • testBlockingCallReturnsCompleteTaskWithArtifacts: Ensures all artifacts included

Fixes #428

…cking calls

Fixes race condition where DefaultRequestHandler.onMessageSend() returns
before all task events are fully processed and persisted to TaskStore,
resulting in incomplete Task objects being returned to clients (missing
artifacts, incorrect state).

Root Cause:
- Blocking calls interrupted immediately after first event and returned
  to client before background event consumption completed
- Agent execution and event processing happened asynchronously in background
- No synchronization to ensure all events were consumed and persisted
  before returning Task to client

Solution (4-step process):
1. Wait for agent to finish enqueueing events (5s timeout)
2. Close the queue to signal consumption can complete (breaks dependency)
3. Wait for consumption to finish processing events (2s timeout)
4. Fetch final task state from TaskStore (has all artifacts and correct state)

This ensures blocking calls return complete Task objects with all artifacts
and correct state, including support for fire-and-forget tasks that never
emit final state events.

Added unit tests:
- testBlockingFireAndForgetReturnsNonFinalTask: Validates fire-and-forget pattern
- testBlockingCallReturnsCompleteTaskWithArtifacts: Ensures all artifacts included

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical race condition that caused incomplete task information to be returned to clients. It introduces a synchronization mechanism to guarantee that all task events are fully processed and persisted before a task is returned, particularly addressing 'fire-and-forget' scenarios. The changes include modifications to DefaultRequestHandler and ResultAggregator, along with new unit tests to verify the fix.

Highlights

  • Race Condition Fix: Addresses a race condition in DefaultRequestHandler.onMessageSend() where the method could return before all task events were fully processed and persisted, leading to incomplete Task objects.
  • Synchronization Implementation: Implements a 4-step synchronization process to ensure all events are consumed and persisted before returning a Task to the client, involving waiting for the agent, closing the queue, waiting for consumption, and fetching the final task state.
  • Unit Tests: Adds unit tests, including testBlockingFireAndForgetReturnsNonFinalTask and testBlockingCallReturnsCompleteTaskWithArtifacts, to validate the fix and ensure complete Task objects are returned.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a critical race condition in blocking calls by introducing a synchronized waiting mechanism to ensure all events are processed before returning a response. The solution, which involves waiting for both agent execution and event consumption with timeouts, seems robust and well-reasoned. The new logic in DefaultRequestHandler is clear and the added unit tests effectively cover the fixed bug and related edge cases like fire-and-forget tasks. My review includes a few suggestions to improve maintainability by replacing magic numbers with constants, cleaning up an unused parameter, and enhancing test assertions for better precision and clarity.

// Step 1: Wait for agent to finish (with short timeout for fast agents)
if (agentFuture != null) {
try {
agentFuture.get(5, java.util.concurrent.TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The timeout value 5 is hardcoded. It's a good practice to define it as a named constant (e.g., AGENT_COMPLETION_TIMEOUT_SECONDS) at the top of the class for better readability and maintainability.


// Step 3: Wait for consumption to complete (now that queue is closed)
if (etai.consumptionFuture() != null) {
etai.consumptionFuture().get(2, java.util.concurrent.TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the agent completion timeout, the hardcoded timeout value 2 should be defined as a named constant (e.g., CONSUMPTION_COMPLETION_TIMEOUT_SECONDS) for improved code clarity and ease of maintenance.

return consumeAndBreakOnInterrupt(consumer, blocking, eventCallback, null);
}

public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback, CompletableFuture<Void> agentFuture) throws JSONRPCError {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The agentFuture parameter is not used within this method. If it's not needed here, it should be removed to avoid confusion and simplify the method signature. This would require updating the calling methods as well.

Comment on lines +786 to +787
assertTrue(returnedTask.getStatus().state() == TaskState.COMPLETED,
"Returned task should be COMPLETED, got: " + returnedTask.getStatus().state());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with other assertions in this file and for better failure messages, it's preferable to use assertEquals to compare enum values.

Suggested change
assertTrue(returnedTask.getStatus().state() == TaskState.COMPLETED,
"Returned task should be COMPLETED, got: " + returnedTask.getStatus().state());
assertEquals(TaskState.COMPLETED, returnedTask.getStatus().state(),
"Returned task should be COMPLETED, got: " + returnedTask.getStatus().state());

Comment on lines +792 to +794
assertTrue(returnedTask.getArtifacts().size() >= 2,
"Returned task should have at least 2 artifacts, got: " +
(returnedTask.getArtifacts() != null ? returnedTask.getArtifacts().size() : 0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The agent in this test adds exactly two artifacts. Using assertEquals(2, ...) would make this assertion more precise than assertTrue(size >= 2, ...). It ensures that exactly two artifacts are present, not more. The assertion message can also be simplified since returnedTask.getArtifacts() is asserted to be not null on line 790.

assertEquals(2, returnedTask.getArtifacts().size(),
    "Returned task should have 2 artifacts, got: " + returnedTask.getArtifacts().size());

@kabir
Copy link
Collaborator Author

kabir commented Nov 3, 2025

@flex-myeonghyeon Your issue shed some light on why my attempt to upgrade the Jakarta SDK to 0.3.1.Final of a2a-java was failing. Thanks for that!

Would you be able to take a look at this PR and see if it does what you have in mind?

One slight change from what you say in your issue is for the 'fire and forget' case. It is valid for an AgentExecutor to not return a final task. In these cases blocking sendMessage calls, should return when the AgentExecutor is done.

@kabir
Copy link
Collaborator Author

kabir commented Nov 3, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses a critical race condition in blocking calls by ensuring the agent and event processing complete before returning a response. The solution is well-structured and the inclusion of new unit tests for both the fix and for fire-and-forget scenarios is a great addition that improves the robustness of the code. The changes are clear and well-commented. I have a few suggestions to further improve code clarity and test precision.

Comment on lines +225 to +275
if (blocking && interruptedOrNonBlocking) {
// For blocking calls: ensure all events are processed before returning
// Order of operations is critical to avoid circular dependency:
// 1. Wait for agent to finish enqueueing events
// 2. Close the queue to signal consumption can complete
// 3. Wait for consumption to finish processing events
// 4. Fetch final task state from TaskStore

try {
// Step 1: Wait for agent to finish (with short timeout for fast agents)
if (agentFuture != null) {
try {
agentFuture.get(5, java.util.concurrent.TimeUnit.SECONDS);
LOGGER.debug("Agent completed for task {}", taskId);
} catch (java.util.concurrent.TimeoutException e) {
// Agent still running after 5s - that's fine, events already being processed
LOGGER.debug("Agent still running for task {} after 5s", taskId);
}
}

// Step 2: Close the queue to signal consumption can complete
// For fire-and-forget tasks, there's no final event, so we need to close the queue
// This allows EventConsumer.consumeAll() to exit
queue.close(false, false); // graceful close, don't notify parent yet
LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId);

// Step 3: Wait for consumption to complete (now that queue is closed)
if (etai.consumptionFuture() != null) {
etai.consumptionFuture().get(2, java.util.concurrent.TimeUnit.SECONDS);
LOGGER.debug("Consumption completed for task {}", taskId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted waiting for task {} completion", taskId, e);
} catch (java.util.concurrent.ExecutionException e) {
LOGGER.warn("Error during task {} execution", taskId, e.getCause());
} catch (java.util.concurrent.TimeoutException e) {
LOGGER.warn("Timeout waiting for consumption to complete for task {}", taskId);
}

// Step 4: Fetch the final task state from TaskStore (all events have been processed)
Task updatedTask = taskStore.get(taskId);
if (updatedTask != null) {
kind = updatedTask;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Fetched final task for {} with state {} and {} artifacts",
taskId, updatedTask.getStatus().state(),
updatedTask.getArtifacts() != null ? updatedTask.getArtifacts().size() : 0);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic inside the if (blocking && interruptedOrNonBlocking) block is quite complex and spans many lines. To improve the readability and maintainability of the onMessageSend method, consider extracting this logic into a private helper method, for example waitForCompletionAndFetchFinalTask. This new method could take parameters like agentFuture, queue, etai, and taskId and return the final Task.

Comment on lines +233 to +263
try {
// Step 1: Wait for agent to finish (with short timeout for fast agents)
if (agentFuture != null) {
try {
agentFuture.get(5, java.util.concurrent.TimeUnit.SECONDS);
LOGGER.debug("Agent completed for task {}", taskId);
} catch (java.util.concurrent.TimeoutException e) {
// Agent still running after 5s - that's fine, events already being processed
LOGGER.debug("Agent still running for task {} after 5s", taskId);
}
}

// Step 2: Close the queue to signal consumption can complete
// For fire-and-forget tasks, there's no final event, so we need to close the queue
// This allows EventConsumer.consumeAll() to exit
queue.close(false, false); // graceful close, don't notify parent yet
LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId);

// Step 3: Wait for consumption to complete (now that queue is closed)
if (etai.consumptionFuture() != null) {
etai.consumptionFuture().get(2, java.util.concurrent.TimeUnit.SECONDS);
LOGGER.debug("Consumption completed for task {}", taskId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted waiting for task {} completion", taskId, e);
} catch (java.util.concurrent.ExecutionException e) {
LOGGER.warn("Error during task {} execution", taskId, e.getCause());
} catch (java.util.concurrent.TimeoutException e) {
LOGGER.warn("Timeout waiting for consumption to complete for task {}", taskId);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The timeout values 5 (line 237) and 2 (line 253) are hardcoded. For better readability and maintainability, consider extracting them into private static final constants with descriptive names, for example AGENT_COMPLETION_TIMEOUT_SECONDS and CONSUMPTION_COMPLETION_TIMEOUT_SECONDS.

Comment on lines 504 to 505
String taskId = "blocking-persist-task";
String contextId = "blocking-persist-ctx";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test was renamed to testNonBlockingMessagePersistsAllEventsInBackground, but the taskId and contextId variables still contain "blocking". This could be confusing for future readers. It would be clearer to update these identifiers to reflect the non-blocking nature of the test.

Suggested change
String taskId = "blocking-persist-task";
String contextId = "blocking-persist-ctx";
String taskId = "non-blocking-persist-task";
String contextId = "non-blocking-persist-ctx";

Comment on lines +786 to +787
assertTrue(returnedTask.getStatus().state() == TaskState.COMPLETED,
"Returned task should be COMPLETED, got: " + returnedTask.getStatus().state());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using assertEquals is preferred over assertTrue for comparing values. It provides a more informative failure message by showing both the expected and actual values, which makes debugging tests easier.

Suggested change
assertTrue(returnedTask.getStatus().state() == TaskState.COMPLETED,
"Returned task should be COMPLETED, got: " + returnedTask.getStatus().state());
assertEquals(TaskState.COMPLETED, returnedTask.getStatus().state(),
"Returned task should be COMPLETED, got: " + returnedTask.getStatus().state());

Comment on lines +792 to +794
assertTrue(returnedTask.getArtifacts().size() >= 2,
"Returned task should have at least 2 artifacts, got: " +
(returnedTask.getArtifacts() != null ? returnedTask.getArtifacts().size() : 0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The test agent adds exactly two artifacts. Using assertEquals(2, ...) would make this assertion more precise than assertTrue(size >= 2, ...). This ensures the test fails if an incorrect number of artifacts (more or fewer than expected) is returned.

Suggested change
assertTrue(returnedTask.getArtifacts().size() >= 2,
"Returned task should have at least 2 artifacts, got: " +
(returnedTask.getArtifacts() != null ? returnedTask.getArtifacts().size() : 0));
assertEquals(2, returnedTask.getArtifacts().size(),
"Returned task should have at least 2 artifacts, got: " +
(returnedTask.getArtifacts() != null ? returnedTask.getArtifacts().size() : 0));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: DefaultRequestHandler does not block until final state when blocking=true

1 participant