Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 57 additions & 9 deletions agentkit/apps/a2a_app/a2a_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

import logging
import os
from typing import Callable, override

import uvicorn
import inspect

from typing import Callable, override
from a2a.server.agent_execution import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
from a2a.server.apps import A2AStarletteApplication
Expand All @@ -26,8 +27,9 @@
from a2a.server.tasks.task_store import TaskStore
from a2a.types import AgentCard
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.responses import JSONResponse, Response
from starlette.routing import Route
from starlette.requests import Request

from agentkit.apps.a2a_app.telemetry import telemetry
from agentkit.apps.base_app import BaseAgentkitApp
Expand All @@ -41,9 +43,7 @@ async def wrapper(*args, **kwargs):
context: RequestContext = args[1]
event_queue: EventQueue = args[2]

with telemetry.tracer.start_as_current_span(
name="a2a_invocation"
) as span:
with telemetry.tracer.start_as_current_span(name="a2a_invocation") as span:
exception = None
try:
result = await execute_func(
Expand Down Expand Up @@ -75,6 +75,7 @@ def __init__(self) -> None:

self._agent_executor: AgentExecutor | None = None
self._task_store: TaskStore | None = None
self._ping_func: Callable | None = None

def agent_executor(self, **kwargs) -> Callable:
"""Wrap an AgentExecutor class, init it, then bind it to the app instance."""
Expand All @@ -86,9 +87,7 @@ def wrapper(cls: type) -> type[AgentExecutor]:
)

if self._agent_executor:
raise RuntimeError(
"An executor is already bound to this app instance."
)
raise RuntimeError("An executor is already bound to this app instance.")

# Wrap the execute method for intercepting context and event_queue
cls.execute = _wrap_agent_executor_execute_func(cls.execute)
Expand Down Expand Up @@ -119,6 +118,50 @@ def wrapper(cls: type) -> type[TaskStore]:

return wrapper

def ping(self, func: Callable) -> Callable:
"""Register a zero-argument health check function and expose it via GET /ping.

The function must accept no arguments and should return either a string or a dict.
The response shape mirrors SimpleApp: {"status": <str|dict>}.
"""
# Ensure zero-argument function similar to SimpleApp
if len(list(inspect.signature(func).parameters.keys())) != 0:
raise AssertionError(
f"Health check function `{func.__name__}` should not receive any arguments."
)

self._ping_func = func
return func

def _format_ping_status(self, result: str | dict) -> dict:
# Align behavior with SimpleApp: always wrap into {"status": result}
if isinstance(result, (str, dict)):
return {"status": result}
logger.error(
f"Health check function {getattr(self._ping_func, '__name__', 'unknown')} must return `dict` or `str` type."
)
return {"status": "error", "message": "Invalid response type."}

async def ping_endpoint(self, request: Request) -> Response:
if not self._ping_func:
logger.error("Ping handler function is not set")
return Response(status_code=404)

try:
result = (
await self._ping_func()
if inspect.iscoroutinefunction(self._ping_func)
else self._ping_func()
)
payload = self._format_ping_status(result)
return JSONResponse(content=payload)
except Exception as e:
logger.exception("Ping handler function failed: %s", e)
return JSONResponse(
content={"status": "error", "message": str(e)},
status_code=500,
)

def add_env_detect_route(self, app: Starlette):
def is_agentkit_runtime() -> bool:
if os.getenv("RUNTIME_IAM_ROLE_TRN", ""):
Expand All @@ -136,6 +179,9 @@ def is_agentkit_runtime() -> bool:
)
app.routes.append(route)

def add_ping_route(self, app: Starlette):
app.add_route("/ping", self.ping_endpoint, methods=["GET"])

@override
def run(self, agent_card: AgentCard, host: str, port: int = 8000):
if not self._agent_executor:
Expand All @@ -155,6 +201,8 @@ def run(self, agent_card: AgentCard, host: str, port: int = 8000):
),
).build()

# Register routes in the same style
self.add_ping_route(a2a_app)
self.add_env_detect_route(a2a_app)

uvicorn.run(a2a_app, host=host, port=port)
16 changes: 4 additions & 12 deletions agentkit/apps/mcp_app/mcp_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ def tool(self, func: Callable) -> Callable:
@wraps(func)
async def async_wrapper(*args, **kwargs) -> Any:
# with tracer.start_as_current_span("tool") as span:
with telemetry.tracer.start_as_current_span(
name="tool"
) as span:
with telemetry.tracer.start_as_current_span(name="tool") as span:
exception = None
try:
result = await func(*args, **kwargs)
Expand Down Expand Up @@ -70,9 +68,7 @@ async def async_wrapper(*args, **kwargs) -> Any:
@wraps(func)
def sync_wrapper(*args, **kwargs) -> Any:
# with tracer.start_as_current_span("tool") as span:
with telemetry.tracer.start_as_current_span(
name="tool"
) as span:
with telemetry.tracer.start_as_current_span(name="tool") as span:
exception = None
try:
result = func(*args, **kwargs)
Expand Down Expand Up @@ -100,9 +96,7 @@ def agent_as_a_tool(self, func: Callable) -> Callable:

@wraps(func)
async def async_wrapper(*args, **kwargs) -> Any:
with telemetry.tracer.start_as_current_span(
name="tool"
) as span:
with telemetry.tracer.start_as_current_span(name="tool") as span:
exception = None
try:
result = await func(*args, **kwargs)
Expand All @@ -126,9 +120,7 @@ async def async_wrapper(*args, **kwargs) -> Any:

@wraps(func)
def sync_wrapper(*args, **kwargs) -> Any:
with telemetry.tracer.start_as_current_span(
name="tool"
) as span:
with telemetry.tracer.start_as_current_span(name="tool") as span:
exception = None
try:
result = func(*args, **kwargs)
Expand Down
9 changes: 6 additions & 3 deletions agentkit/toolkit/resources/samples/a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,19 @@
class MyAgentExecutor(A2aAgentExecutor):
pass

@a2a_app.ping
def ping() -> str:
return "pong!"

if __name__ == "__main__":
from a2a.types import AgentCard, AgentProvider, AgentSkill, AgentCapabilities

agent_card = AgentCard(
capabilities=AgentCapabilities(streaming=True), # 启用流式
capabilities=AgentCapabilities(streaming=True),
description=agent.description,
name=agent.name,
defaultInputModes=["text"],
defaultOutputModes=["text"],
default_input_modes=["text"],
default_output_modes=["text"],
provider=AgentProvider(organization="veadk", url=""),
skills=[AgentSkill(id="0", name="chat", description="Chat", tags=["chat"])],
url="http://0.0.0.0:8000",
Expand Down
37 changes: 31 additions & 6 deletions agentkit/toolkit/runners/ve_agentkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,28 @@ def status(self, config: VeAgentkitRunnerConfig) -> StatusResult:
},
timeout=10,
)
ping_status = ping_response.status_code == 200
if ping_response.status_code == 200:
ping_status = True
elif ping_response.status_code in (404, 405):
# Fallback: try /health for SimpleApp compatibility
try:
health_response = requests.get(
urljoin(public_endpoint, "health"),
headers={
"Authorization": f"Bearer {runner_config.runtime_apikey}"
},
timeout=10,
)
if health_response.status_code == 200:
ping_status = True
else:
ping_status = None # Endpoint reachable but health route not available
except Exception:
# Endpoint reachable (ping returned 404/405), but health check failed
ping_status = None
else:
# Non-200 status indicates server responded but not healthy
ping_status = False
except Exception as e:
logger.error(f"Failed to check endpoint connectivity: {str(e)}")
ping_status = False
Expand All @@ -265,11 +286,15 @@ def status(self, config: VeAgentkitRunnerConfig) -> StatusResult:
status=status,
endpoint_url=public_endpoint,
service_id=runner_config.runtime_id,
health="healthy"
if ping_status
else "unhealthy"
if ping_status is False
else None,
health=(
"healthy"
if ping_status is True
else "unhealthy"
if ping_status is False
else "unknown"
if ping_status is None
else None
),
metadata={
"runtime_id": runner_config.runtime_id,
"runtime_name": runtime.name
Expand Down