Skip to content

feat: Implement PushNotifications as per the 1.0 spec#622

Open
kabir wants to merge 2 commits intoa2aproject:mainfrom
kabir:event-queue-bus-improve-pnc
Open

feat: Implement PushNotifications as per the 1.0 spec#622
kabir wants to merge 2 commits intoa2aproject:mainfrom
kabir:event-queue-bus-improve-pnc

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Feb 4, 2026

Fixes #490
Fixes: #594

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly refactors the event processing and persistence architecture to align with the A2A 1.0 specification, particularly for streaming and push notifications. The introduction of a centralized event bus and processor ensures that all client-visible events and notifications reflect a durably stored state, enhancing consistency and reliability. Resource management for streaming connections has been improved with automatic client-side closing and server-side cancellation on disconnects. These changes, along with a more robust threading model, contribute to a more scalable and compliant system.

Highlights

  • Centralized Event Processing: Introduced a new architecture with a MainEventBus and MainEventBusProcessor to centralize event handling. This ensures all events are persisted to the TaskStore before being distributed to client-facing queues and before push notifications are sent, eliminating race conditions and ensuring data consistency.
  • Reliable Push Notifications: Push notifications are now sent asynchronously by the MainEventBusProcessor after successful persistence. They now support all StreamingEventKind types (Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent) wrapped in the A2A 1.0 StreamResponse format.
  • Improved Streaming Event Handling: Client-side SSE event listeners (JSON-RPC and REST) now automatically close connections when a Task reaches a final state. Server-side, client disconnects (HTTP/SSE closeHandler, gRPC CancellationListener) are detected, triggering cancellation of the EventConsumer polling loop to free up resources.
  • Enhanced Queue Management: The EventQueue has been refactored into MainQueue (for central event management) and ChildQueue (for client-specific consumption). This new design ensures proper event distribution, graceful handling of client disconnects, and correct lifecycle management for both active and finalized tasks.
  • Optimized Threading Model: A dedicated EventConsumerExecutor (cached thread pool) is introduced for EventConsumer polling, separating I/O-bound polling from CPU-bound agent execution. The main AsyncExecutor now uses a bounded queue and allows core thread timeouts, preventing thread pool exhaustion and improving resource utilization.
  • A2A Spec 3.1.6 Compliance: Resubscription logic now explicitly sends the current Task state as the first event in the stream, aligning with the A2A Protocol Specification 3.1.6.
Changelog
  • client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListener.java
    • Added client-side auto-closing of SSE channels for Task objects in final states.
  • client/transport/rest/src/main/java/io/a2a/client/transport/rest/sse/RestSSEEventListener.java
    • Implemented client-side auto-closing of SSE channels for Task objects in final states.
  • examples/cloud-deployment/scripts/deploy.sh
    • Improved PostgreSQL deployment script to wait for pod creation before readiness.
  • extras/common/src/main/java/io/a2a/extras/common/events/TaskFinalizedEvent.java
    • Modified TaskFinalizedEvent to include the full Task object.
  • extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java
    • Minor formatting adjustment.
  • extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java
    • Updated tests to verify StreamingEventKind events for push notifications.
  • extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/MockPushNotificationSender.java
    • Adapted to handle StreamingEventKind events for push notifications.
  • extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedEventQueueItem.java
    • Added isTaskEvent() method for specific event type checking.
  • extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java
    • Integrated MainEventBus for event submission.
    • Ensured TaskStatusUpdateEvent is sent before QueueClosedEvent for finalized tasks in replicated environments.
    • Improved handling of replicated events for inactive tasks.
  • extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java
    • Updated tests to align with the new MainEventBusProcessor and asynchronous event handling.
  • extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java
    • Added new test utility class for EventQueue and MainEventBusProcessor.
  • extras/queue-manager-replicated/tests-multi-instance/quarkus-app-1/src/main/resources/application.properties
    • Added debug logging for task-related components.
  • extras/queue-manager-replicated/tests-multi-instance/quarkus-app-2/src/main/resources/application.properties
    • Added debug logging for task-related components.
  • extras/queue-manager-replicated/tests-multi-instance/tests/src/test/java/io/a2a/extras/queuemanager/replicated/tests/multiinstance/MultiInstanceReplicationTest.java
    • Updated multi-instance tests to filter initial TaskEvent on resubscribe and added log dumping on failure.
  • extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java
    • Enforced A2A spec 3.1.6 for resubscribe events in tests.
  • extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java
    • Modified save method to prevent firing TaskFinalizedEvent for replicated events.
  • extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java
    • Updated taskStore.save calls to include isReplicated parameter.
  • reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java
    • Refactored SSE handling to use SseFormatter and MultiSseSupport for client disconnect detection.
  • reference/jsonrpc/src/test/resources/application.properties
    • Added debug logging for event processing and request handling.
  • reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java
    • Refactored SSE handling to use SseFormatter and MultiSseSupport with specific headers for buffering control.
  • server-common/src/main/java/io/a2a/server/ServerCallContext.java
    • Added eventConsumerCancelCallback for transport-layer client disconnect handling.
  • server-common/src/main/java/io/a2a/server/events/EventConsumer.java
    • Implemented client disconnect cancellation and a grace period for replicated events.
  • server-common/src/main/java/io/a2a/server/events/EventQueue.java
    • Refactored into MainQueue and ChildQueue for hierarchical event management and asynchronous distribution.
  • server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java
    • Updated constructors and builder methods to integrate MainEventBus.
  • server-common/src/main/java/io/a2a/server/events/MainEventBus.java
    • Introduced central event bus for asynchronous, ordered event processing.
  • server-common/src/main/java/io/a2a/server/events/MainEventBusContext.java
    • Added record for MainEventBus event context.
  • server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
    • Implemented background processor for MainEventBus to ensure "persist before distribute" logic.
  • server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java
    • Added callback interface for testing MainEventBusProcessor.
  • server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorInitializer.java
    • Added CDI initializer for eager startup of MainEventBusProcessor.
  • server-common/src/main/java/io/a2a/server/events/QueueManager.java
    • Updated interface to enforce MainEventBus integration in builder methods.
  • server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
    • Integrated MainEventBusProcessor and EventConsumerExecutor.
    • Refactored onMessageSend and onMessageSendStream for new event processing flow and client disconnect handling.
    • Removed background task tracking.
    • Ensured A2A spec 3.1.6 compliance for resubscribe.
  • server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java
    • Updated to send StreamingEventKind events wrapped in StreamResponse format.
  • server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java
    • Updated save method signature.
  • server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java
    • Updated interface to accept StreamingEventKind for push notifications.
  • server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java
    • Updated to use eventConsumerExecutor and removed direct TaskManager.process() calls.
  • server-common/src/main/java/io/a2a/server/tasks/TaskManager.java
    • Modified process and saveTaskEvent to return isFinal status and accept isReplicated.
  • server-common/src/main/java/io/a2a/server/tasks/TaskStore.java
    • Updated save method signature.
  • server-common/src/main/java/io/a2a/server/util/async/AsyncExecutorProducer.java
    • Configured ThreadPoolExecutor with a bounded queue and core thread timeout for better resource management.
  • server-common/src/main/java/io/a2a/server/util/async/EventConsumerExecutorProducer.java
    • Added dedicated cached thread pool for EventConsumer polling.
  • server-common/src/main/java/io/a2a/server/util/sse/SseFormatter.java
    • Added utility for formatting SSE events.
  • server-common/src/main/java/io/a2a/server/util/sse/package-info.java
    • Added package-info for SSE utilities.
  • server-common/src/main/resources/META-INF/a2a-defaults.properties
    • Added a2a.executor.queue-capacity property.
  • server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java
    • Updated tests for new asynchronous event processing.
  • server-common/src/test/java/io/a2a/server/events/EventQueueTest.java
    • Updated tests for new asynchronous event processing and MainQueue/ChildQueue structure.
  • server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java
    • Updated test utility for EventQueue builder and MainEventBusProcessor.
  • server-common/src/test/java/io/a2a/server/events/InMemoryQueueManagerTest.java
    • Updated tests for new MainEventBus integration.
  • server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
    • Updated test setup for MainEventBusProcessor and TestHttpClient to handle StreamingEventKind.
  • server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java
    • Removed test file.
  • server-common/src/test/java/io/a2a/server/tasks/InMemoryTaskStoreTest.java
    • Removed test file.
  • server-common/src/test/java/io/a2a/server/tasks/PushNotificationSenderTest.java
    • Updated tests to verify StreamingEventKind push notifications.
  • server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java
    • Updated tests for new asynchronous event processing and EventConsumerExecutor.
  • server-common/src/test/java/io/a2a/server/tasks/TaskManagerTest.java
    • Updated tests for isReplicated parameter and isFinal return.
  • server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java
    • Updated tests for new asynchronous event processing.
  • tck/src/main/resources/application.properties
    • Added debug logging for thread statistics.
  • tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java
    • Updated tests to enforce A2A spec 3.1.6 for resubscribe events.
  • tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java
    • Updated saveTask to include isReplicated parameter.
  • transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java
    • Implemented client disconnect detection for gRPC streams and EventConsumer cancellation.
  • transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java
    • Updated tests for isReplicated parameter and A2A spec 3.1.6 compliance.
  • transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java
    • Updated tests for isReplicated parameter, asynchronous event processing, and A2A spec 3.1.6 compliance.
  • transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java
    • Added fine-grained logging for streaming event processing.
  • transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java
    • Updated tests for isReplicated parameter.
Activity
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-executed architectural refactoring of the server's event processing and queueing system. The introduction of a centralized MainEventBus and a dedicated MainEventBusProcessor is a major improvement, enforcing a "persist-before-visibility" pattern that enhances robustness and eliminates potential race conditions. The changes also address critical issues like resource leaks on client disconnects and thread pool exhaustion under load, and improve compliance with the A2A specification for push notifications and event streaming. The code quality is high, with clear documentation for the new architecture. I have a couple of minor suggestions regarding code duplication in the client and a potential inconsistency in the replicated event handling logic, but overall this is an excellent set of changes.

@kabir kabir force-pushed the event-queue-bus-improve-pnc branch from 62e99c5 to 9414bfb Compare February 4, 2026 17:43
@kabir
Copy link
Collaborator Author

kabir commented Feb 4, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the push notification mechanism to align with the 1.0 spec, allowing any StreamingEventKind (like Task, Message, TaskStatusUpdateEvent, and TaskArtifactUpdateEvent) to be sent as a notification, instead of just Task snapshots. The changes are well-implemented across the PushNotificationSender interface and its base implementation, ensuring events are serialized correctly into the StreamResponse format. The tests have been comprehensively updated to validate the new behavior for all event types, and documentation has been improved to reflect these changes. The overall implementation is solid and enhances the push notification feature significantly. I have one minor suggestion to improve the extensibility of the BasePushNotificationSender.

@kabir kabir force-pushed the event-queue-bus-improve-pnc branch from 9414bfb to 23c844d Compare February 5, 2026 11:41
@kabir
Copy link
Collaborator Author

kabir commented Feb 5, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively updates the push notification mechanism to align with the 1.0 specification by sending StreamingEventKind events instead of just Task snapshots. The changes are applied consistently across the interface, implementation, and test files. The test suite has been commendably expanded to cover the new event types, and backward compatibility for existing tests has been thoughtfully handled. I've included a couple of suggestions to enhance maintainability and fix a potential issue in a test helper.

@kabir kabir force-pushed the event-queue-bus-improve-pnc branch 2 times, most recently from 0634bbb to a877f14 Compare February 5, 2026 13:05
@kabir kabir closed this Feb 5, 2026
@kabir kabir deleted the event-queue-bus-improve-pnc branch February 5, 2026 13:49
@kabir kabir restored the event-queue-bus-improve-pnc branch February 5, 2026 13:52
@kabir kabir reopened this Feb 5, 2026
@kabir kabir force-pushed the event-queue-bus-improve-pnc branch from a877f14 to 8af8834 Compare February 5, 2026 13:53
@kabir kabir force-pushed the event-queue-bus-improve-pnc branch from 8af8834 to 6cc3737 Compare February 5, 2026 17:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

1 participant