Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static io.a2a.server.util.async.AsyncUtils.convertingProcessor;
import static io.a2a.server.util.async.AsyncUtils.createTubeConfig;
import static io.a2a.server.util.async.AsyncUtils.processor;
import static java.util.concurrent.TimeUnit.*;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -51,11 +52,12 @@
import io.a2a.spec.Task;
import io.a2a.spec.TaskIdParams;
import io.a2a.spec.TaskNotCancelableError;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskNotFoundError;
import io.a2a.spec.TaskPushNotificationConfig;
import io.a2a.spec.TaskQueryParams;
import io.a2a.spec.TaskState;
import io.a2a.spec.UnsupportedOperationError;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -64,6 +66,26 @@ public class DefaultRequestHandler implements RequestHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRequestHandler.class);

/**
* Timeout in seconds to wait for agent execution to complete in blocking calls.
* This allows slow agents (LLM-based, data processing, external APIs) sufficient time.
* Configurable via: a2a.blocking.agent.timeout.seconds
* Default: 30 seconds
*/
@Inject
@ConfigProperty(name = "a2a.blocking.agent.timeout.seconds", defaultValue = "30")
int agentCompletionTimeoutSeconds;

/**
* Timeout in seconds to wait for event consumption to complete in blocking calls.
* This ensures all events are processed and persisted before returning to client.
* Configurable via: a2a.blocking.consumption.timeout.seconds
* Default: 5 seconds
*/
@Inject
@ConfigProperty(name = "a2a.blocking.consumption.timeout.seconds", defaultValue = "5")
int consumptionCompletionTimeoutSeconds;

private final AgentExecutor agentExecutor;
private final TaskStore taskStore;
private final QueueManager queueManager;
Expand Down Expand Up @@ -93,6 +115,19 @@ public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore,
this.requestContextBuilder = () -> new SimpleRequestContextBuilder(taskStore, false);
}

/**
* For testing
*/
public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStore taskStore,
QueueManager queueManager, PushNotificationConfigStore pushConfigStore,
PushNotificationSender pushSender, Executor executor) {
DefaultRequestHandler handler =
new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, pushSender, executor);
handler.agentCompletionTimeoutSeconds = 5;
handler.consumptionCompletionTimeoutSeconds = 2;
return handler;
}

@Override
public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws JSONRPCError {
LOGGER.debug("onGetTask {}", params.id());
Expand Down Expand Up @@ -192,6 +227,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte

EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue);
ResultAggregator.EventTypeAndInterrupt etai = null;
EventKind kind = null; // Declare outside try block so it's in scope for return
try {
// Create callback for push notifications during background event processing
Runnable pushNotificationCallback = () -> sendPushNotification(taskId, resultAggregator);
Expand All @@ -201,7 +237,10 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
// This callback must be added before we start consuming. Otherwise,
// any errors thrown by the producerRunnable are not picked up by the consumer
producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback);

// Get agent future before consuming (for blocking calls to wait for agent completion)
CompletableFuture<Void> agentFuture = runningAgents.get(taskId);
etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking);

if (etai == null) {
LOGGER.debug("No result, throwing InternalError");
Expand All @@ -210,7 +249,69 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
interruptedOrNonBlocking = etai.interrupted();
LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking);

EventKind kind = etai.eventType();
// For blocking calls that were interrupted (returned on first event),
// wait for agent execution and event processing BEFORE returning to client.
// This ensures the returned Task has all artifacts and current state.
// We do this HERE (not in ResultAggregator) to avoid blocking Vert.x worker threads
// during the consumption loop itself.
kind = etai.eventType();
if (blocking && interruptedOrNonBlocking) {
// For blocking calls: ensure all events are processed before returning
// Order of operations is critical to avoid circular dependency:
// 1. Wait for agent to finish enqueueing events
// 2. Close the queue to signal consumption can complete
// 3. Wait for consumption to finish processing events
// 4. Fetch final task state from TaskStore

try {
// Step 1: Wait for agent to finish (with configurable timeout)
if (agentFuture != null) {
try {
agentFuture.get(agentCompletionTimeoutSeconds, SECONDS);
LOGGER.debug("Agent completed for task {}", taskId);
} catch (java.util.concurrent.TimeoutException e) {
// Agent still running after timeout - that's fine, events already being processed
LOGGER.debug("Agent still running for task {} after {}s", taskId, agentCompletionTimeoutSeconds);
}
}

// Step 2: Close the queue to signal consumption can complete
// For fire-and-forget tasks, there's no final event, so we need to close the queue
// This allows EventConsumer.consumeAll() to exit
queue.close(false, false); // graceful close, don't notify parent yet
LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId);

// Step 3: Wait for consumption to complete (now that queue is closed)
if (etai.consumptionFuture() != null) {
etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, SECONDS);
LOGGER.debug("Consumption completed for task {}", taskId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
String msg = String.format("Error waiting for task %s completion", taskId);
LOGGER.warn(msg, e);
throw new InternalError(msg);
} catch (java.util.concurrent.ExecutionException e) {
String msg = String.format("Error during task %s execution", taskId);
LOGGER.warn(msg, e.getCause());
throw new InternalError(msg);
} catch (java.util.concurrent.TimeoutException e) {
String msg = String.format("Timeout waiting for consumption to complete for task %s", taskId);
LOGGER.warn(msg, taskId);
throw new InternalError(msg);
}

// Step 4: Fetch the final task state from TaskStore (all events have been processed)
Task updatedTask = taskStore.get(taskId);
if (updatedTask != null) {
kind = updatedTask;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Fetched final task for {} with state {} and {} artifacts",
taskId, updatedTask.getStatus().state(),
updatedTask.getArtifacts().size());
}
}
}
if (kind instanceof Task taskResult && !taskId.equals(taskResult.getId())) {
throw new InternalError("Task ID mismatch in agent response");
}
Expand All @@ -227,8 +328,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
trackBackgroundTask(cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false));
}

LOGGER.debug("Returning: {}", etai.eventType());
return etai.eventType();
LOGGER.debug("Returning: {}", kind);
return kind;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.a2a.server.events.EventConsumer;
import io.a2a.server.events.EventQueueItem;
import io.a2a.spec.A2AServerException;
import io.a2a.spec.Event;
import io.a2a.spec.EventKind;
import io.a2a.spec.InternalError;
import io.a2a.spec.JSONRPCError;
import io.a2a.spec.Message;
import io.a2a.spec.Task;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResultAggregator {
private static final Logger LOGGER = LoggerFactory.getLogger(ResultAggregator.class);
Expand Down Expand Up @@ -106,10 +106,6 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError {
}

public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws JSONRPCError {
return consumeAndBreakOnInterrupt(consumer, blocking, null);
}

public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback) throws JSONRPCError {
Flow.Publisher<EventQueueItem> allItems = consumer.consumeAll();
AtomicReference<Message> message = new AtomicReference<>();
AtomicBoolean interrupted = new AtomicBoolean(false);
Expand Down Expand Up @@ -180,11 +176,11 @@ else if (!blocking) {
shouldInterrupt = true;
continueInBackground = true;
}
else {
// For ALL blocking calls (both final and non-final events), use background consumption
// This ensures all events are processed and persisted to TaskStore in background
// Queue lifecycle is now managed by DefaultRequestHandler.cleanupProducer()
// which waits for BOTH agent and consumption futures before closing queues
else if (blocking) {
// For blocking calls: Interrupt to free Vert.x thread, but continue in background
// Python's async consumption doesn't block threads, but Java's does
// So we interrupt to return quickly, then rely on background consumption
// DefaultRequestHandler will fetch the final state from TaskStore
shouldInterrupt = true;
continueInBackground = true;
if (LOGGER.isDebugEnabled()) {
Expand All @@ -198,10 +194,17 @@ else if (!blocking) {
interrupted.set(true);
completionFuture.complete(null);

// Signal that cleanup can proceed while consumption continues in background.
// This prevents infinite hangs for fire-and-forget agents that never emit final events.
// Processing continues (return true below) and all events are still persisted to TaskStore.
consumptionCompletionFuture.complete(null);
// For blocking calls, DON'T complete consumptionCompletionFuture here.
// Let it complete naturally when subscription finishes (onComplete callback below).
// This ensures all events are processed and persisted to TaskStore before
// DefaultRequestHandler.cleanupProducer() proceeds with cleanup.
//
// For non-blocking and auth-required calls, complete immediately to allow
// cleanup to proceed while consumption continues in background.
if (!blocking) {
consumptionCompletionFuture.complete(null);
}
// else: blocking calls wait for actual consumption completion in onComplete

// Continue consuming in background - keep requesting events
// Note: continueInBackground is always true when shouldInterrupt is true
Expand Down Expand Up @@ -244,8 +247,8 @@ else if (!blocking) {
}
}

// Background consumption continues automatically via the subscription
// returning true in the consumer function keeps the subscription alive
// Note: For blocking calls that were interrupted, the wait logic has been moved
// to DefaultRequestHandler.onMessageSend() to avoid blocking Vert.x worker threads.
// Queue lifecycle is managed by DefaultRequestHandler.cleanupProducer()

Throwable error = errorRef.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPC
PushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore();
PushNotificationSender pushSender = new BasePushNotificationSender(pushConfigStore, httpClient);

requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor);
requestHandler = DefaultRequestHandler.create(
executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor);
}

@AfterEach
Expand Down
Loading