Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -612,24 +612,24 @@ client.deleteTaskPushNotificationConfigurations(
new DeleteTaskPushNotificationConfigParams("task-1234", "config-4567", clientCallContext);
```

#### Resubscribe to a task
#### Subscribe to a task

```java
// Resubscribe to an ongoing task with id "task-1234" using configured consumers
// Subscribe to an ongoing task with id "task-1234" using configured consumers
TaskIdParams taskIdParams = new TaskIdParams("task-1234");
client.resubscribe(taskIdParams);
client.subscribeToTask(taskIdParams);

// Or resubscribe with custom consumers and error handler
// Or subscribe with custom consumers and error handler
List<BiConsumer<ClientEvent, AgentCard>> customConsumers = List.of(
(event, card) -> System.out.println("Resubscribe event: " + event)
(event, card) -> System.out.println("Subscribe event: " + event)
);
Consumer<Throwable> customErrorHandler = error ->
System.err.println("Resubscribe error: " + error.getMessage());
System.err.println("Subscribe error: " + error.getMessage());

client.resubscribe(taskIdParams, customConsumers, customErrorHandler);
client.subscribeToTask(taskIdParams, customConsumers, customErrorHandler);

// You can also optionally specify a ClientCallContext with call-specific config to use
client.resubscribe(taskIdParams, clientCallContext);
client.subscribeToTask(taskIdParams, clientCallContext);
```

#### Retrieve details about the server agent that this client agent is communicating with
Expand Down
22 changes: 11 additions & 11 deletions client/base/src/main/java/io/a2a/client/AbstractClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public abstract void deleteTaskPushNotificationConfigurations(
@Nullable ClientCallContext context) throws A2AClientException;

/**
* Resubscribe to a task's event stream.
* Subscribe to a task's event stream.
* This is only available if both the client and server support streaming.
* The configured client consumers will be used to handle messages, tasks,
* and update events received from the remote agent. The configured streaming
Expand All @@ -327,12 +327,12 @@ public abstract void deleteTaskPushNotificationConfigurations(
* @param request the parameters specifying which task's notification configs to delete
* @throws A2AClientException if resubscribing fails for any reason
*/
public void resubscribe(@NonNull TaskIdParams request) throws A2AClientException {
resubscribe(request, consumers, streamingErrorHandler, null);
public void subscribeToTask(@NonNull TaskIdParams request) throws A2AClientException {
subscribeToTask(request, consumers, streamingErrorHandler, null);
}

/**
* Resubscribe to a task's event stream.
* Subscribe to a task's event stream.
* This is only available if both the client and server support streaming.
* The configured client consumers will be used to handle messages, tasks,
* and update events received from the remote agent. The configured streaming
Expand All @@ -342,13 +342,13 @@ public void resubscribe(@NonNull TaskIdParams request) throws A2AClientException
* @param context optional client call context for the request
* @throws A2AClientException if resubscribing fails for any reason
*/
public void resubscribe(@NonNull TaskIdParams request,
public void subscribeToTask(@NonNull TaskIdParams request,
@Nullable ClientCallContext context) throws A2AClientException {
resubscribe(request, consumers, streamingErrorHandler, context);
subscribeToTask(request, consumers, streamingErrorHandler, context);
}

/**
* Resubscribe to a task's event stream.
* Subscribe to a task's event stream.
* This is only available if both the client and server support streaming.
* The specified client consumers will be used to handle messages, tasks, and
* update events received from the remote agent. The specified streaming error
Expand All @@ -359,14 +359,14 @@ public void resubscribe(@NonNull TaskIdParams request,
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
* @throws A2AClientException if resubscribing fails for any reason
*/
public void resubscribe(@NonNull TaskIdParams request,
public void subscribeToTask(@NonNull TaskIdParams request,
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
@Nullable Consumer<Throwable> streamingErrorHandler) throws A2AClientException {
resubscribe(request, consumers, streamingErrorHandler, null);
subscribeToTask(request, consumers, streamingErrorHandler, null);
}

/**
* Resubscribe to a task's event stream.
* Subscribe to a task's event stream.
* This is only available if both the client and server support streaming.
* The specified client consumers will be used to handle messages, tasks, and
* update events received from the remote agent. The specified streaming error
Expand All @@ -378,7 +378,7 @@ public void resubscribe(@NonNull TaskIdParams request,
* @param context optional client call context for the request
* @throws A2AClientException if resubscribing fails for any reason
*/
public abstract void resubscribe(@NonNull TaskIdParams request,
public abstract void subscribeToTask(@NonNull TaskIdParams request,
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
@Nullable Consumer<Throwable> streamingErrorHandler,
@Nullable ClientCallContext context) throws A2AClientException;
Expand Down
16 changes: 8 additions & 8 deletions client/base/src/main/java/io/a2a/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@
*
* // Later, reconnect and resume receiving events
* String taskId = "task-123"; // From original request
* client.resubscribe(
* client.subscribeToTask(
* new TaskIdParams(taskId),
* List.of((event, card) -> {
* // Process events from where we left off
Expand Down Expand Up @@ -529,7 +529,7 @@ public void deleteTaskPushNotificationConfigurations(
}

/**
* Resubscribe to an existing task to receive remaining events.
* Subscribe to an existing task to receive remaining events.
* <p>
* This method is useful when a client disconnects during a long-running task and wants to
* resume receiving events without starting a new task. The agent will deliver any events
Expand All @@ -551,27 +551,27 @@ public void deleteTaskPushNotificationConfigurations(
* // ... client1 disconnects ...
*
* // Later, reconnect (client2)
* client2.resubscribe(
* client2.subscribeToTask(
* new TaskIdParams(taskId),
* List.of((event, card) -> {
* if (event instanceof TaskUpdateEvent tue) {
* System.out.println("Resumed - status: " +
* tue.getTask().status().state());
* }
* }),
* throwable -> System.err.println("Resubscribe error: " + throwable),
* throwable -> System.err.println("Subscribe error: " + throwable),
* null
* );
* }</pre>
*
* @param request the task ID to resubscribe to
* @param request the task ID to subscribe to
* @param consumers the event consumers for processing events (required)
* @param streamingErrorHandler error handler for streaming errors (optional)
* @param context custom call context for request interceptors (optional)
* @throws A2AClientException if resubscription is not supported or if the task cannot be found
* @throws A2AClientException if subscription is not supported or if the task cannot be found
*/
@Override
public void resubscribe(@NonNull TaskIdParams request,
public void subscribeToTask(@NonNull TaskIdParams request,
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
@Nullable Consumer<Throwable> streamingErrorHandler,
@Nullable ClientCallContext context) throws A2AClientException {
Expand All @@ -588,7 +588,7 @@ public void resubscribe(@NonNull TaskIdParams request,
overriddenErrorHandler.accept(e);
}
};
clientTransport.resubscribe(request, eventHandler, overriddenErrorHandler, context);
clientTransport.subscribeToTask(request, eventHandler, overriddenErrorHandler, context);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.a2a.client;

import static io.a2a.spec.AgentCard.CURRENT_PROTOCOL_VERSION;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockserver.model.HttpRequest.request;
Expand Down Expand Up @@ -90,7 +89,6 @@ public void setUp() {
.description("Test skill")
.tags(Collections.singletonList("test"))
.build()))
.protocolVersions(CURRENT_PROTOCOL_VERSION)
.supportedInterfaces(java.util.Arrays.asList(
new AgentInterface(TransportProtocol.JSONRPC.asString(), AGENT_URL),
new AgentInterface(TransportProtocol.HTTP_JSON.asString(), AGENT_URL),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.a2a.client;

import static io.a2a.spec.AgentCard.CURRENT_PROTOCOL_VERSION;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -41,7 +40,6 @@ public class ClientBuilderTest {
.tags(Collections.singletonList("hello world"))
.examples(List.of("hi", "hello world"))
.build()))
.protocolVersions(CURRENT_PROTOCOL_VERSION)
.supportedInterfaces(List.of(
new AgentInterface(TransportProtocol.JSONRPC.asString(), "http://localhost:9999")))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void sendMessageStreaming(MessageSendParams request, Consumer<StreamingEv
public Task getTask(TaskQueryParams request, @Nullable ClientCallContext context) throws A2AClientException {
checkNotNullParam("request", request);
io.a2a.grpc.GetTaskRequest.Builder requestBuilder = io.a2a.grpc.GetTaskRequest.newBuilder();
requestBuilder.setName("tasks/" + request.id());
requestBuilder.setId(request.id());
if (request.historyLength() != null) {
requestBuilder.setHistoryLength(request.historyLength());
}
Expand All @@ -164,7 +164,7 @@ public Task cancelTask(TaskIdParams request, @Nullable ClientCallContext context
checkNotNullParam("request", request);

io.a2a.grpc.CancelTaskRequest cancelTaskRequest = io.a2a.grpc.CancelTaskRequest.newBuilder()
.setName("tasks/" + request.id())
.setId(request.id())
.setTenant(resolveTenant(request.tenant()))
.build();
PayloadAndHeaders payloadAndHeaders = applyInterceptors(CANCEL_TASK_METHOD, cancelTaskRequest, agentCard, context);
Expand Down Expand Up @@ -230,13 +230,13 @@ public ListTasksResult listTasks(ListTasksParams request, @Nullable ClientCallCo

@Override
public TaskPushNotificationConfig createTaskPushNotificationConfiguration(TaskPushNotificationConfig request,
@Nullable ClientCallContext context) throws A2AClientException {
@Nullable ClientCallContext context) throws A2AClientException {
checkNotNullParam("request", request);

String configId = request.pushNotificationConfig().id();
io.a2a.grpc.CreateTaskPushNotificationConfigRequest grpcRequest = io.a2a.grpc.CreateTaskPushNotificationConfigRequest.newBuilder()
.setParent("tasks/" + request.taskId())
.setConfig(ToProto.taskPushNotificationConfig(request))
.setTaskId(request.taskId())
.setConfig(ToProto.taskPushNotificationConfig(request).getPushNotificationConfig())
.setConfigId(configId != null ? configId : request.taskId())
.setTenant(resolveTenant(request.tenant()))
.build();
Expand All @@ -251,14 +251,18 @@ public TaskPushNotificationConfig createTaskPushNotificationConfiguration(TaskPu
}

@Override
public TaskPushNotificationConfig getTaskPushNotificationConfiguration(
GetTaskPushNotificationConfigParams request,
public TaskPushNotificationConfig getTaskPushNotificationConfiguration(GetTaskPushNotificationConfigParams request,
@Nullable ClientCallContext context) throws A2AClientException {
checkNotNullParam("request", request);
checkNotNullParam("taskId", request.id());
if(request.pushNotificationConfigId() == null) {
throw new IllegalArgumentException("Id must not be null");
}

io.a2a.grpc.GetTaskPushNotificationConfigRequest grpcRequest = io.a2a.grpc.GetTaskPushNotificationConfigRequest.newBuilder()
.setName(getTaskPushNotificationConfigName(request))
.setTaskId(request.id())
.setTenant(resolveTenant(request.tenant()))
.setId(request.pushNotificationConfigId())
.build();
PayloadAndHeaders payloadAndHeaders = applyInterceptors(GET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD, grpcRequest, agentCard, context);

Expand All @@ -277,7 +281,7 @@ public ListTaskPushNotificationConfigResult listTaskPushNotificationConfiguratio
checkNotNullParam("request", request);

io.a2a.grpc.ListTaskPushNotificationConfigRequest grpcRequest = io.a2a.grpc.ListTaskPushNotificationConfigRequest.newBuilder()
.setParent("tasks/" + request.id())
.setTaskId(request.id())
.setTenant(resolveTenant(request.tenant()))
.setPageSize(request.pageSize())
.setPageToken(request.pageToken())
Expand All @@ -300,7 +304,8 @@ public void deleteTaskPushNotificationConfigurations(DeleteTaskPushNotificationC
checkNotNullParam("request", request);

io.a2a.grpc.DeleteTaskPushNotificationConfigRequest grpcRequest = io.a2a.grpc.DeleteTaskPushNotificationConfigRequest.newBuilder()
.setName(getTaskPushNotificationConfigName(request.id(), request.pushNotificationConfigId()))
.setTaskId(request.id())
.setId(request.pushNotificationConfigId())
.setTenant(resolveTenant(request.tenant()))
.build();
PayloadAndHeaders payloadAndHeaders = applyInterceptors(DELETE_TASK_PUSH_NOTIFICATION_CONFIG_METHOD, grpcRequest, agentCard, context);
Expand All @@ -314,14 +319,14 @@ public void deleteTaskPushNotificationConfigurations(DeleteTaskPushNotificationC
}

@Override
public void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> eventConsumer,
public void subscribeToTask(TaskIdParams request, Consumer<StreamingEventKind> eventConsumer,
Consumer<Throwable> errorConsumer, @Nullable ClientCallContext context) throws A2AClientException {
checkNotNullParam("request", request);
checkNotNullParam("eventConsumer", eventConsumer);

io.a2a.grpc.SubscribeToTaskRequest grpcRequest = io.a2a.grpc.SubscribeToTaskRequest.newBuilder()
.setTenant(resolveTenant(request.tenant()))
.setName("tasks/" + request.id())
.setId(request.id())
.build();
PayloadAndHeaders payloadAndHeaders = applyInterceptors(SUBSCRIBE_TO_TASK_METHOD, grpcRequest, agentCard, context);

Expand All @@ -331,12 +336,13 @@ public void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> event
A2AServiceStub stubWithMetadata = createAsyncStubWithMetadata(context, payloadAndHeaders);
stubWithMetadata.subscribeToTask(grpcRequest, streamObserver);
} catch (StatusRuntimeException e) {
throw GrpcErrorMapper.mapGrpcError(e, "Failed to resubscribe task push notification config: ");
throw GrpcErrorMapper.mapGrpcError(e, "Failed to subscribe task push notification config: ");
}
}

/**
* Ensure tenant is set, using agent default if not provided in request
*
* @param request the initial request.
* @return the updated request with the tenant set.
*/
Expand Down Expand Up @@ -447,24 +453,6 @@ private A2AServiceStub createAsyncStubWithMetadata(@Nullable ClientCallContext c
return asyncStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));
}

private String getTaskPushNotificationConfigName(GetTaskPushNotificationConfigParams params) {
return getTaskPushNotificationConfigName(params.id(), params.pushNotificationConfigId());
}

private String getTaskPushNotificationConfigName(String taskId, @Nullable String pushNotificationConfigId) {
StringBuilder name = new StringBuilder();
name.append("tasks/");
name.append(taskId);
if (pushNotificationConfigId != null) {
name.append("/pushNotificationConfigs/");
name.append(pushNotificationConfigId);
}
//name.append("/pushNotificationConfigs/");
// Use taskId as default config ID if none provided
//name.append(pushNotificationConfigId != null ? pushNotificationConfigId : taskId);
return name.toString();
}

private PayloadAndHeaders applyInterceptors(String methodName, Object payload,
AgentCard agentCard, @Nullable ClientCallContext clientCallContext) {
PayloadAndHeaders payloadAndHeaders = new PayloadAndHeaders(payload,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void deleteTaskPushNotificationConfigurations(DeleteTaskPushNotificationC
}

@Override
public void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> eventConsumer,
public void subscribeToTask(TaskIdParams request, Consumer<StreamingEventKind> eventConsumer,
Consumer<Throwable> errorConsumer, @Nullable ClientCallContext context) throws A2AClientException {
checkNotNullParam("request", request);
checkNotNullParam("eventConsumer", eventConsumer);
Expand Down
Loading