Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions server-common/src/main/java/io/a2a/server/events/EventQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,31 @@ public void taskDone() {

public abstract void close();

public abstract void close(boolean immediate);

public boolean isClosed() {
return closed;
}

public void doClose() {
doClose(false);
}

public void doClose(boolean immediate) {
synchronized (this) {
if (closed) {
return;
}
LOGGER.debug("Closing {}", this);
LOGGER.debug("Closing {} (immediate={})", this, immediate);
closed = true;
}
// Although the Python implementation drains the queue on closing,
// here it makes events go missing
// TODO do we actually need to drain it? If we do, we need some mechanism to determine that noone is
// polling any longer and drain it asynchronously once it is all done. That could perhaps be done
// via an EnhancedRunnable.DoneCallback.
//queue.drainTo(new ArrayList<>());

if (immediate) {
// Immediate close: clear pending events
queue.clear();
LOGGER.debug("Cleared queue for immediate close: {}", this);
}
// For graceful close, let the queue drain naturally through normal consumption
}

static class MainQueue extends EventQueue {
Expand Down Expand Up @@ -151,8 +162,13 @@ void signalQueuePollerStarted() {

@Override
public void close() {
doClose();
children.forEach(EventQueue::doClose);
close(false);
}

@Override
public void close(boolean immediate) {
doClose(immediate);
children.forEach(child -> child.doClose(immediate));
}
}

Expand Down Expand Up @@ -191,5 +207,10 @@ void signalQueuePollerStarted() {
public void close() {
parent.close();
}

@Override
public void close(boolean immediate) {
parent.close(immediate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import io.a2a.server.ServerCallContext;
import io.a2a.server.agentexecution.AgentExecutor;
Expand Down Expand Up @@ -67,6 +69,7 @@ public class DefaultRequestHandler implements RequestHandler {
private final Supplier<RequestContext.Builder> requestContextBuilder;

private final ConcurrentMap<String, CompletableFuture<Void>> runningAgents = new ConcurrentHashMap<>();
private final Set<CompletableFuture<Void>> backgroundTasks = ConcurrentHashMap.newKeySet();

private final Executor executor;

Expand Down Expand Up @@ -190,8 +193,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte

} finally {
if (interrupted) {
// TODO Make this async
cleanupProducer(taskId);
CompletableFuture<Void> cleanupTask = CompletableFuture.runAsync(() -> cleanupProducer(taskId), executor);
trackBackgroundTask(cleanupTask);
} else {
cleanupProducer(taskId);
}
Expand All @@ -212,9 +215,9 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null);

EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId.get(), mss.requestContext, queue);
EventConsumer consumer = new EventConsumer(queue);

try {
EventConsumer consumer = new EventConsumer(queue);

// This callback must be added before we start consuming. Otherwise,
// any errors thrown by the producerRunnable are not picked up by the consumer
Expand Down Expand Up @@ -258,7 +261,8 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(

return convertingProcessor(eventPublisher, event -> (StreamingEventKind) event);
} finally {
cleanupProducer(taskId.get());
CompletableFuture<Void> cleanupTask = CompletableFuture.runAsync(() -> cleanupProducer(taskId.get()), executor);
trackBackgroundTask(cleanupTask);
}
}

Expand Down Expand Up @@ -396,6 +400,24 @@ public void run() {
return runnable;
}

private void trackBackgroundTask(CompletableFuture<Void> task) {
backgroundTasks.add(task);

task.whenComplete((result, throwable) -> {
try {
if (throwable != null) {
if (throwable instanceof java.util.concurrent.CancellationException) {
LOGGER.debug("Background task cancelled: {}", task);
} else {
LOGGER.error("Background task failed", throwable);
}
}
} finally {
backgroundTasks.remove(task);
}
});
}

private void cleanupProducer(String taskId) {
// TODO the Python implementation waits for the producerRunnable
runningAgents.get(taskId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.List;

Expand Down Expand Up @@ -113,4 +114,75 @@ public void testEnqueueDifferentEventTypes() throws Exception {
assertSame(event, dequeuedEvent);
}
}

/**
* Test close behavior sets flag and handles graceful close.
* Backported from Python test: test_close_sets_flag_and_handles_internal_queue_old_python
*/
@Test
public void testCloseGracefulSetsFlag() throws Exception {
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.TYPE_REFERENCE);
eventQueue.enqueueEvent(event);

eventQueue.close(false); // Graceful close
assertTrue(eventQueue.isClosed());
}

/**
* Test immediate close behavior.
* Backported from Python test behavior
*/
@Test
public void testCloseImmediateClearsQueue() throws Exception {
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.TYPE_REFERENCE);
eventQueue.enqueueEvent(event);

eventQueue.close(true); // Immediate close
assertTrue(eventQueue.isClosed());

// After immediate close, queue should be cleared
// Attempting to dequeue should return null or throw exception
try {
Event dequeuedEvent = eventQueue.dequeueEvent(-1);
// If we get here, the event should be null (queue was cleared)
assertNull(dequeuedEvent);
} catch (EventQueueClosedException e) {
// This is also acceptable - queue is closed
}
}

/**
* Test that close is idempotent.
* Backported from Python test: test_close_idempotent
*/
@Test
public void testCloseIdempotent() throws Exception {
eventQueue.close();
assertTrue(eventQueue.isClosed());

// Calling close again should not cause issues
eventQueue.close();
assertTrue(eventQueue.isClosed());

// Test with immediate close as well
EventQueue eventQueue2 = EventQueue.create();
eventQueue2.close(true);
assertTrue(eventQueue2.isClosed());

eventQueue2.close(true);
assertTrue(eventQueue2.isClosed());
}

/**
* Test that child queues are closed when parent closes.
*/
@Test
public void testCloseChildQueues() throws Exception {
EventQueue childQueue = eventQueue.tap();
assertTrue(childQueue != null);

eventQueue.close();
assertTrue(eventQueue.isClosed());
assertTrue(childQueue.isClosed());
}
}
Loading