Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,14 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
// We do this HERE (not in ResultAggregator) to avoid blocking Vert.x worker threads
// during the consumption loop itself.
kind = etai.eventType();

// Store push notification config for newly created tasks (mirrors streaming logic)
// Only for NEW tasks - existing tasks are handled by initMessageSend()
if (mss.task() == null && kind instanceof Task createdTask && shouldAddPushInfo(params)) {
LOGGER.debug("Storing push notification config for new task {}", createdTask.getId());
pushConfigStore.setInfo(createdTask.getId(), params.configuration().pushNotificationConfig());
}

if (blocking && interruptedOrNonBlocking) {
// For blocking calls: ensure all events are processed before returning
// Order of operations is critical to avoid circular dependency:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -18,12 +20,14 @@
import io.a2a.server.auth.UnauthenticatedUser;
import io.a2a.server.events.EventQueue;
import io.a2a.server.events.InMemoryQueueManager;
import io.a2a.server.tasks.InMemoryPushNotificationConfigStore;
import io.a2a.server.tasks.InMemoryTaskStore;
import io.a2a.server.tasks.TaskUpdater;
import io.a2a.spec.JSONRPCError;
import io.a2a.spec.Message;
import io.a2a.spec.MessageSendConfiguration;
import io.a2a.spec.MessageSendParams;
import io.a2a.spec.PushNotificationConfig;
import io.a2a.spec.Task;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
Expand Down Expand Up @@ -794,6 +798,160 @@ void testBlockingCallReturnsCompleteTaskWithArtifacts() throws Exception {
returnedTask.getArtifacts().size());
}

/**
* Test that pushNotificationConfig from SendMessageConfiguration is stored for NEW tasks
* in non-streaming (blocking) mode. This reproduces the bug from issue #84.
*
* Expected behavior:
* 1. Client sends message with pushNotificationConfig in SendMessageConfiguration
* 2. Agent creates a new task
* 3. pushNotificationConfig should be stored in PushNotificationConfigStore
* 4. Config should be retrievable via getInfo()
*/
@Test
@Timeout(10)
void testBlockingMessageStoresPushNotificationConfigForNewTask() throws Exception {
String taskId = "push-config-blocking-new-task";
String contextId = "push-config-ctx";

// Create test config store
InMemoryPushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore();

// Re-create request handler with pushConfigStore
requestHandler = DefaultRequestHandler.create(
agentExecutor,
taskStore,
queueManager,
pushConfigStore, // Add push config store
null, // pushSender
Executors.newCachedThreadPool()
);

// Create push notification config
PushNotificationConfig pushConfig = new PushNotificationConfig.Builder()
.id("config-1")
.url("https://example.com/webhook")
.token("test-token-123")
.build();

// Create message with pushNotificationConfig
Message message = new Message.Builder()
.messageId("msg-push-config")
.role(Message.Role.USER)
.parts(new TextPart("test message"))
.taskId(taskId)
.contextId(contextId)
.build();

MessageSendConfiguration config = new MessageSendConfiguration.Builder()
.blocking(true)
.pushNotificationConfig(pushConfig)
.build();

MessageSendParams params = new MessageSendParams(message, config, null);

// Agent creates a new task
agentExecutor.setExecuteCallback((context, queue) -> {
TaskUpdater updater = new TaskUpdater(context, queue);
updater.submit(); // Creates new task in SUBMITTED state
updater.complete();
});

// Call blocking onMessageSend
Object result = requestHandler.onMessageSend(params, serverCallContext);

// Verify result is a task
assertTrue(result instanceof Task, "Result should be a Task");
Task returnedTask = (Task) result;
assertEquals(taskId, returnedTask.getId());

// THE KEY ASSERTION: Verify pushNotificationConfig was stored
List<PushNotificationConfig> storedConfigs = pushConfigStore.getInfo(taskId);
assertNotNull(storedConfigs, "Push notification config should be stored for new task");
assertEquals(1, storedConfigs.size(),
"Should have exactly 1 push config stored");
assertEquals("config-1", storedConfigs.get(0).id());
assertEquals("https://example.com/webhook", storedConfigs.get(0).url());
}

/**
* Test that pushNotificationConfig is stored for EXISTING tasks.
* This verifies the initMessageSend logic works correctly.
*/
@Test
@Timeout(10)
void testMessageStoresPushNotificationConfigForExistingTask() throws Exception {
String taskId = "push-config-existing-task";
String contextId = "push-config-existing-ctx";

// Create test config store
InMemoryPushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore();

// Re-create request handler with pushConfigStore
requestHandler = DefaultRequestHandler.create(
agentExecutor,
taskStore,
queueManager,
pushConfigStore, // Add push config store
null, // pushSender
Executors.newCachedThreadPool()
);

// Create EXISTING task in store
Task existingTask = new Task.Builder()
.id(taskId)
.contextId(contextId)
.status(new TaskStatus(TaskState.WORKING))
.build();
taskStore.save(existingTask);

// Create push notification config
PushNotificationConfig pushConfig = new PushNotificationConfig.Builder()
.id("config-existing-1")
.url("https://example.com/existing-webhook")
.token("existing-token-789")
.build();

Message message = new Message.Builder()
.messageId("msg-push-existing")
.role(Message.Role.USER)
.parts(new TextPart("update existing task"))
.taskId(taskId)
.contextId(contextId)
.build();

MessageSendConfiguration config = new MessageSendConfiguration.Builder()
.blocking(true)
.pushNotificationConfig(pushConfig)
.build();

MessageSendParams params = new MessageSendParams(message, config, null);

// Agent updates the existing task
agentExecutor.setExecuteCallback((context, queue) -> {
TaskUpdater updater = new TaskUpdater(context, queue);
updater.addArtifact(
List.of(new TextPart("update artifact", null)),
"artifact-1", "Update", null);
updater.complete();
});

// Call blocking onMessageSend
Object result = requestHandler.onMessageSend(params, serverCallContext);

// Verify result
assertTrue(result instanceof Task, "Result should be a Task");

// Verify pushNotificationConfig was stored (initMessageSend path)
List<PushNotificationConfig> storedConfigs = pushConfigStore.getInfo(taskId);
assertNotNull(storedConfigs,
"Push notification config should be stored for existing task");
assertEquals(1, storedConfigs.size(),
"Should have exactly 1 push config stored");
assertEquals("config-existing-1", storedConfigs.get(0).id());
assertEquals("https://example.com/existing-webhook", storedConfigs.get(0).url());
}

/**
* Simple test agent executor that allows controlling execution timing
*/
Expand Down