Skip to content

Conversation

@OhYee
Copy link
Contributor

@OhYee OhYee commented Dec 9, 2025

重构服务器协议架构,引入抽象协议处理器基类。新增 AG-UI 协议完整实现,包括事件类型定义和处理器。扩展 AgentRequest 模型支持原始请求访问和生命周期钩子注入。更新 OpenAI 协议适配器以兼容新钩子系统。修改服务器默认配置同时启用 OpenAI 和 AG-UI 双协议。

BREAKING CHANGE: 协议处理器接口重构,parse_request 现在接收 Request 对象并返回上下文。AgentRequest 结构变更增加 raw_headers、raw_body 和 hooks 字段。AgentResult 类型扩展支持 AgentEvent 对象。

Change-Id: I8527db7539fa62ce39e80e28068a98a0b2db3ba3

Thank you for creating a pull request to contribute to Serverless Devs agentrun-sdk-python code! Before you open the request please answer the following questions to help it be more easily integrated. Please check the boxes "[ ]" with "[x]" when done too.
Please select one of the PR types below to complete


Fix bugs

Bug detail

The specific manifestation of the bug or the associated issue.

Pull request tasks

  • Add test cases for the changes
  • Passed the CI test

Update docs

Reason for update

Why do you need to update your documentation?

Pull request tasks

  • Update Chinese documentation
  • Update English documentation

Add contributor

Contributed content

  • Code
  • Document

Content detail

if content_type == 'code' || content_type == 'document':
    please tell us `PR url`,like: https://github.com/Serverless-Devs/agentrun-sdk-python/pull/1
else:
    please describe your contribution in detail

Others

Reason for update

Why do you need to update your documentation?

重构服务器协议架构,引入抽象协议处理器基类。新增 AG-UI 协议完整实现,包括事件类型定义和处理器。扩展 AgentRequest 模型支持原始请求访问和生命周期钩子注入。更新 OpenAI 协议适配器以兼容新钩子系统。修改服务器默认配置同时启用 OpenAI 和 AG-UI 双协议。

BREAKING CHANGE: 协议处理器接口重构,parse_request 现在接收 Request 对象并返回上下文。AgentRequest 结构变更增加 raw_headers、raw_body 和 hooks 字段。AgentResult 类型扩展支持 AgentEvent 对象。

Change-Id: I8527db7539fa62ce39e80e28068a98a0b2db3ba3
Signed-off-by: OhYee <oyohyee@oyohyee.com>
Copilot AI review requested due to automatic review settings December 9, 2025 03:14
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a comprehensive lifecycle hooks system and AG-UI protocol support to the AgentRun Server, marking a significant architectural enhancement. The implementation enables real-time event streaming for agent execution stages (steps, tool calls, text messages, state changes) through a protocol-agnostic hooks abstraction. The refactoring maintains backward compatibility at the API level while introducing breaking changes to the protocol handler interface.

Key Changes:

  • Introduced AgentLifecycleHooks abstract base class with protocol-specific implementations (OpenAI, AG-UI)
  • Added complete AG-UI protocol handler with full event stream support
  • Extended AgentRequest with raw_headers, raw_body, and hooks fields for enhanced request context
  • Refactored BaseProtocolHandler with create_hooks() and modified parse_request() to return tuples
  • Updated default server configuration to enable both OpenAI and AG-UI protocols simultaneously

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
agentrun/server/agui_protocol.py New complete AG-UI protocol implementation with event types, lifecycle hooks, and SSE streaming (834 lines)
agentrun/server/model.py Added AgentLifecycleHooks ABC, AgentEvent class, and extended AgentRequest with hooks/raw request fields
agentrun/server/openai_protocol.py Refactored to implement BaseProtocolHandler, added OpenAILifecycleHooks, improved streaming with thread pool execution
agentrun/server/protocol.py Introduced BaseProtocolHandler with create_hooks() and modified parse_request() signature (breaking change)
agentrun/server/server.py Updated default protocols to include both OpenAI and AG-UI, improved documentation examples
agentrun/server/__init__.py Critical bug: Documentation shows incorrect hook usage patterns with non-existent emit_* methods and wrong async patterns
examples/quick_start_sync.py Comprehensive sync example demonstrating lifecycle hooks and tool call events (234 lines)
examples/quick_start_async.py Async variant of quick start example with proper async/await patterns (241 lines)
examples/a.py LangChain integration example, but filename is non-descriptive (should be renamed)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 35 to 43
... async for event in hooks.on_step_start("processing"):
... yield event
...
... # 处理逻辑...
... yield "Hello, world!"
...
... # 发送步骤结束事件
... async for event in hooks.on_step_finish("processing"):
... yield event
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async example shows incorrect usage pattern. The on_* methods return Optional[AgentEvent], not async iterators. They should be used directly with yield, not with async for.

The correct async pattern should be:

async def invoke_agent(request: AgentRequest):
    hooks = request.hooks
    # Direct yield of event (may be None)
    yield hooks.on_step_start("processing")
    yield "Hello, world!"
    yield hooks.on_step_finish("processing")
Suggested change
... async for event in hooks.on_step_start("processing"):
... yield event
...
... # 处理逻辑...
... yield "Hello, world!"
...
... # 发送步骤结束事件
... async for event in hooks.on_step_finish("processing"):
... yield event
... yield hooks.on_step_start("processing")
...
... # 处理逻辑...
... yield "Hello, world!"
...
... # 发送步骤结束事件
... yield hooks.on_step_finish("processing")

Copilot uses AI. Check for mistakes.
当前时间的字符串表示
"""
from datetime import datetime
import time
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant import: time is imported twice - once at the module level (line 25) and again inside the get_current_time function (line 47). Remove the inner import on line 47 since time is already available from the module-level import.

Suggested change
import time

Copilot uses AI. Check for mistakes.
examples/a.py Outdated
@@ -0,0 +1,166 @@
"""AgentRun Server + LangChain Agent 示例
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file is named a.py which is not descriptive. Consider renaming to something more meaningful like langchain_agent_example.py or quick_start_langchain.py to clearly indicate its purpose and content.

Copilot uses AI. Check for mistakes.
Comment on lines 1 to 834
"""AG-UI (Agent-User Interaction Protocol) 协议实现
AG-UI 是一种开源、轻量级、基于事件的协议,用于标准化 AI Agent 与前端应用之间的交互。
参考: https://docs.ag-ui.com/
基于 Router 的设计:
- 协议自己创建 FastAPI Router
- 定义所有端点和处理逻辑
- Server 只需挂载 Router
生命周期钩子:
- AG-UI 完整支持所有生命周期事件
- 每个钩子映射到对应的 AG-UI 事件类型
"""

from enum import Enum
import json
import time
from typing import (
Any,
AsyncIterator,
Dict,
Iterator,
List,
Optional,
TYPE_CHECKING,
Union,
)
import uuid

from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field

from .model import (
AgentEvent,
AgentLifecycleHooks,
AgentRequest,
AgentResponse,
AgentResult,
AgentRunResult,
Message,
MessageRole,
)
from .protocol import BaseProtocolHandler

if TYPE_CHECKING:
from .invoker import AgentInvoker


# ============================================================================
# AG-UI 事件类型定义
# ============================================================================


class AGUIEventType(str, Enum):
"""AG-UI 事件类型
参考: https://docs.ag-ui.com/concepts/events
"""

# Lifecycle Events (生命周期事件)
RUN_STARTED = "RUN_STARTED"
RUN_FINISHED = "RUN_FINISHED"
RUN_ERROR = "RUN_ERROR"
STEP_STARTED = "STEP_STARTED"
STEP_FINISHED = "STEP_FINISHED"

# Text Message Events (文本消息事件)
TEXT_MESSAGE_START = "TEXT_MESSAGE_START"
TEXT_MESSAGE_CONTENT = "TEXT_MESSAGE_CONTENT"
TEXT_MESSAGE_END = "TEXT_MESSAGE_END"

# Tool Call Events (工具调用事件)
TOOL_CALL_START = "TOOL_CALL_START"
TOOL_CALL_ARGS = "TOOL_CALL_ARGS"
TOOL_CALL_END = "TOOL_CALL_END"
TOOL_CALL_RESULT = "TOOL_CALL_RESULT"

# State Events (状态事件)
STATE_SNAPSHOT = "STATE_SNAPSHOT"
STATE_DELTA = "STATE_DELTA"

# Message Events (消息事件)
MESSAGES_SNAPSHOT = "MESSAGES_SNAPSHOT"

# Special Events (特殊事件)
RAW = "RAW"
CUSTOM = "CUSTOM"


class AGUIRole(str, Enum):
"""AG-UI 消息角色"""

USER = "user"
ASSISTANT = "assistant"
SYSTEM = "system"
TOOL = "tool"


# ============================================================================
# AG-UI 事件模型
# ============================================================================


class AGUIBaseEvent(BaseModel):
"""AG-UI 基础事件"""

type: AGUIEventType
timestamp: Optional[int] = Field(
default_factory=lambda: int(time.time() * 1000)
)
rawEvent: Optional[Dict[str, Any]] = None


class AGUIRunStartedEvent(AGUIBaseEvent):
"""运行开始事件"""

type: AGUIEventType = AGUIEventType.RUN_STARTED
threadId: Optional[str] = None
runId: Optional[str] = None


class AGUIRunFinishedEvent(AGUIBaseEvent):
"""运行结束事件"""

type: AGUIEventType = AGUIEventType.RUN_FINISHED
threadId: Optional[str] = None
runId: Optional[str] = None


class AGUIRunErrorEvent(AGUIBaseEvent):
"""运行错误事件"""

type: AGUIEventType = AGUIEventType.RUN_ERROR
message: str
code: Optional[str] = None


class AGUIStepStartedEvent(AGUIBaseEvent):
"""步骤开始事件"""

type: AGUIEventType = AGUIEventType.STEP_STARTED
stepName: Optional[str] = None


class AGUIStepFinishedEvent(AGUIBaseEvent):
"""步骤结束事件"""

type: AGUIEventType = AGUIEventType.STEP_FINISHED
stepName: Optional[str] = None


class AGUITextMessageStartEvent(AGUIBaseEvent):
"""文本消息开始事件"""

type: AGUIEventType = AGUIEventType.TEXT_MESSAGE_START
messageId: str
role: AGUIRole = AGUIRole.ASSISTANT


class AGUITextMessageContentEvent(AGUIBaseEvent):
"""文本消息内容事件"""

type: AGUIEventType = AGUIEventType.TEXT_MESSAGE_CONTENT
messageId: str
delta: str


class AGUITextMessageEndEvent(AGUIBaseEvent):
"""文本消息结束事件"""

type: AGUIEventType = AGUIEventType.TEXT_MESSAGE_END
messageId: str


class AGUIToolCallStartEvent(AGUIBaseEvent):
"""工具调用开始事件"""

type: AGUIEventType = AGUIEventType.TOOL_CALL_START
toolCallId: str
toolCallName: str
parentMessageId: Optional[str] = None


class AGUIToolCallArgsEvent(AGUIBaseEvent):
"""工具调用参数事件"""

type: AGUIEventType = AGUIEventType.TOOL_CALL_ARGS
toolCallId: str
delta: str


class AGUIToolCallEndEvent(AGUIBaseEvent):
"""工具调用结束事件"""

type: AGUIEventType = AGUIEventType.TOOL_CALL_END
toolCallId: str


class AGUIToolCallResultEvent(AGUIBaseEvent):
"""工具调用结果事件"""

type: AGUIEventType = AGUIEventType.TOOL_CALL_RESULT
toolCallId: str
result: str


class AGUIStateSnapshotEvent(AGUIBaseEvent):
"""状态快照事件"""

type: AGUIEventType = AGUIEventType.STATE_SNAPSHOT
snapshot: Dict[str, Any]


class AGUIStateDeltaEvent(AGUIBaseEvent):
"""状态增量事件"""

type: AGUIEventType = AGUIEventType.STATE_DELTA
delta: List[Dict[str, Any]] # JSON Patch 格式


class AGUIMessage(BaseModel):
"""AG-UI 消息格式"""

id: str
role: AGUIRole
content: Optional[str] = None
name: Optional[str] = None
toolCalls: Optional[List[Dict[str, Any]]] = None
toolCallId: Optional[str] = None


class AGUIMessagesSnapshotEvent(AGUIBaseEvent):
"""消息快照事件"""

type: AGUIEventType = AGUIEventType.MESSAGES_SNAPSHOT
messages: List[AGUIMessage]


class AGUIRawEvent(AGUIBaseEvent):
"""原始事件"""

type: AGUIEventType = AGUIEventType.RAW
event: Dict[str, Any]


class AGUICustomEvent(AGUIBaseEvent):
"""自定义事件"""

type: AGUIEventType = AGUIEventType.CUSTOM
name: str
value: Any


# 事件联合类型
AGUIEvent = Union[
AGUIRunStartedEvent,
AGUIRunFinishedEvent,
AGUIRunErrorEvent,
AGUIStepStartedEvent,
AGUIStepFinishedEvent,
AGUITextMessageStartEvent,
AGUITextMessageContentEvent,
AGUITextMessageEndEvent,
AGUIToolCallStartEvent,
AGUIToolCallArgsEvent,
AGUIToolCallEndEvent,
AGUIToolCallResultEvent,
AGUIStateSnapshotEvent,
AGUIStateDeltaEvent,
AGUIMessagesSnapshotEvent,
AGUIRawEvent,
AGUICustomEvent,
]


# ============================================================================
# AG-UI 请求模型
# ============================================================================


class AGUIRunAgentInput(BaseModel):
"""AG-UI 运行 Agent 请求"""

threadId: Optional[str] = None
runId: Optional[str] = None
messages: List[Dict[str, Any]] = Field(default_factory=list)
tools: Optional[List[Dict[str, Any]]] = None
context: Optional[List[Dict[str, Any]]] = None
forwardedProps: Optional[Dict[str, Any]] = None


# ============================================================================
# AG-UI 协议生命周期钩子实现
# ============================================================================


class AGUILifecycleHooks(AgentLifecycleHooks):
"""AG-UI 协议的生命周期钩子实现
AG-UI 完整支持所有生命周期事件,每个钩子映射到对应的 AG-UI 事件类型。
所有 on_* 方法直接返回 AgentEvent,可以直接 yield。
Example:
>>> def invoke_agent(request):
... hooks = request.hooks
... yield hooks.on_step_start("processing")
... yield hooks.on_tool_call_start(id="call_1", name="get_time")
... yield hooks.on_tool_call_args(id="call_1", args={"tz": "UTC"})
... result = get_time()
... yield hooks.on_tool_call_result(id="call_1", result=result)
... yield hooks.on_tool_call_end(id="call_1")
... yield f"时间: {result}"
... yield hooks.on_step_finish("processing")
"""

def __init__(self, context: Dict[str, Any]):
"""初始化钩子
Args:
context: 运行上下文,包含 threadId, runId 等
"""
self.context = context
self.thread_id = context.get("threadId", str(uuid.uuid4()))
self.run_id = context.get("runId", str(uuid.uuid4()))

def _create_event(self, event: AGUIBaseEvent) -> AgentEvent:
"""创建 AgentEvent
Args:
event: AG-UI 事件对象
Returns:
AgentEvent 对象
"""
json_str = event.model_dump_json(exclude_none=True)
raw_sse = f"data: {json_str}\n\n"
return AgentEvent(
event_type=event.type.value
if hasattr(event.type, "value")
else str(event.type),
data=event.model_dump(exclude_none=True),
raw_sse=raw_sse,
)

# =========================================================================
# 生命周期事件方法 (on_*) - 直接返回 AgentEvent
# =========================================================================

def on_run_start(self) -> AgentEvent:
"""发送 RUN_STARTED 事件"""
return self._create_event(
AGUIRunStartedEvent(threadId=self.thread_id, runId=self.run_id)
)

def on_run_finish(self) -> AgentEvent:
"""发送 RUN_FINISHED 事件"""
return self._create_event(
AGUIRunFinishedEvent(threadId=self.thread_id, runId=self.run_id)
)

def on_run_error(
self, error: str, code: Optional[str] = None
) -> AgentEvent:
"""发送 RUN_ERROR 事件"""
return self._create_event(AGUIRunErrorEvent(message=error, code=code))

def on_step_start(self, step_name: Optional[str] = None) -> AgentEvent:
"""发送 STEP_STARTED 事件"""
return self._create_event(AGUIStepStartedEvent(stepName=step_name))

def on_step_finish(self, step_name: Optional[str] = None) -> AgentEvent:
"""发送 STEP_FINISHED 事件"""
return self._create_event(AGUIStepFinishedEvent(stepName=step_name))

def on_text_message_start(
self, message_id: str, role: str = "assistant"
) -> AgentEvent:
"""发送 TEXT_MESSAGE_START 事件"""
try:
agui_role = AGUIRole(role)
except ValueError:
agui_role = AGUIRole.ASSISTANT
return self._create_event(
AGUITextMessageStartEvent(messageId=message_id, role=agui_role)
)

def on_text_message_content(
self, message_id: str, delta: str
) -> Optional[AgentEvent]:
"""发送 TEXT_MESSAGE_CONTENT 事件"""
if not delta:
return None
return self._create_event(
AGUITextMessageContentEvent(messageId=message_id, delta=delta)
)

def on_text_message_end(self, message_id: str) -> AgentEvent:
"""发送 TEXT_MESSAGE_END 事件"""
return self._create_event(AGUITextMessageEndEvent(messageId=message_id))

def on_tool_call_start(
self,
id: str,
name: str,
parent_message_id: Optional[str] = None,
) -> AgentEvent:
"""发送 TOOL_CALL_START 事件"""
return self._create_event(
AGUIToolCallStartEvent(
toolCallId=id,
toolCallName=name,
parentMessageId=parent_message_id,
)
)

def on_tool_call_args_delta(
self, id: str, delta: str
) -> Optional[AgentEvent]:
"""发送 TOOL_CALL_ARGS 事件(增量)"""
if not delta:
return None
return self._create_event(
AGUIToolCallArgsEvent(toolCallId=id, delta=delta)
)

def on_tool_call_args(
self, id: str, args: Union[str, Dict[str, Any]]
) -> AgentEvent:
"""发送完整的 TOOL_CALL_ARGS 事件"""
if isinstance(args, dict):
args = json.dumps(args, ensure_ascii=False)
return self._create_event(
AGUIToolCallArgsEvent(toolCallId=id, delta=args)
)

def on_tool_call_result_delta(
self, id: str, delta: str
) -> Optional[AgentEvent]:
"""发送 TOOL_CALL_RESULT 事件(增量)"""
if not delta:
return None
return self._create_event(
AGUIToolCallResultEvent(toolCallId=id, result=delta)
)

def on_tool_call_result(self, id: str, result: str) -> AgentEvent:
"""发送 TOOL_CALL_RESULT 事件"""
return self._create_event(
AGUIToolCallResultEvent(toolCallId=id, result=result)
)

def on_tool_call_end(self, id: str) -> AgentEvent:
"""发送 TOOL_CALL_END 事件"""
return self._create_event(AGUIToolCallEndEvent(toolCallId=id))

def on_state_snapshot(self, snapshot: Dict[str, Any]) -> AgentEvent:
"""发送 STATE_SNAPSHOT 事件"""
return self._create_event(AGUIStateSnapshotEvent(snapshot=snapshot))

def on_state_delta(self, delta: List[Dict[str, Any]]) -> AgentEvent:
"""发送 STATE_DELTA 事件"""
return self._create_event(AGUIStateDeltaEvent(delta=delta))

def on_custom_event(self, name: str, value: Any) -> AgentEvent:
"""发送 CUSTOM 事件"""
return self._create_event(AGUICustomEvent(name=name, value=value))


# ============================================================================
# AG-UI 协议处理器
# ============================================================================


class AGUIProtocolHandler(BaseProtocolHandler):
"""AG-UI 协议处理器
实现 AG-UI (Agent-User Interaction Protocol) 兼容接口
参考: https://docs.ag-ui.com/
特点:
- 基于事件的流式通信
- 完整支持所有生命周期事件
- 支持状态同步
- 支持工具调用
Example:
>>> from agentrun.server import AgentRunServer, AGUIProtocolHandler
>>>
>>> server = AgentRunServer(
... invoke_agent=my_agent,
... protocols=[AGUIProtocolHandler()]
... )
>>> server.start(port=8000)
# 可访问: POST http://localhost:8000/agui/v1/run
"""

def get_prefix(self) -> str:
"""AG-UI 协议建议使用 /agui/v1 前缀"""
return "/agui/v1"

def create_hooks(self, context: Dict[str, Any]) -> AgentLifecycleHooks:
"""创建 AG-UI 协议的生命周期钩子"""
return AGUILifecycleHooks(context)

def as_fastapi_router(self, agent_invoker: "AgentInvoker") -> APIRouter:
"""创建 AG-UI 协议的 FastAPI Router"""
router = APIRouter()

@router.post("/run")
async def run_agent(request: Request):
"""AG-UI 运行 Agent 端点
接收 AG-UI 格式的请求,返回 SSE 事件流。
"""
# SSE 响应头,禁用缓冲
sse_headers = {
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 nginx 缓冲
}

try:
# 1. 解析请求
request_data = await request.json()
agent_request, context = await self.parse_request(
request, request_data
)

# 2. 调用 Agent
agent_result = await agent_invoker.invoke(agent_request)

# 3. 格式化为 AG-UI 事件流
event_stream = self.format_response(
agent_result, agent_request, context
)

# 4. 返回 SSE 流
return StreamingResponse(
event_stream,
media_type="text/event-stream",
headers=sse_headers,
)

except ValueError as e:
# 返回错误事件流
return StreamingResponse(
self._error_stream(str(e)),
media_type="text/event-stream",
headers=sse_headers,
)
except Exception as e:
return StreamingResponse(
self._error_stream(f"Internal error: {str(e)}"),
media_type="text/event-stream",
headers=sse_headers,
)

@router.get("/health")
async def health_check():
"""健康检查端点"""
return {"status": "ok", "protocol": "ag-ui", "version": "1.0"}

return router

async def parse_request(
self,
request: Request,
request_data: Dict[str, Any],
) -> tuple[AgentRequest, Dict[str, Any]]:
"""解析 AG-UI 格式的请求
Args:
request: FastAPI Request 对象
request_data: HTTP 请求体 JSON 数据
Returns:
tuple: (AgentRequest, context)
Raises:
ValueError: 请求格式不正确
"""
# 创建上下文
context = {
"threadId": request_data.get("threadId") or str(uuid.uuid4()),
"runId": request_data.get("runId") or str(uuid.uuid4()),
}

# 创建钩子
hooks = self.create_hooks(context)

# 解析消息列表
messages = []
raw_messages = request_data.get("messages", [])

for msg_data in raw_messages:
if not isinstance(msg_data, dict):
continue

role_str = msg_data.get("role", "user")
try:
role = MessageRole(role_str)
except ValueError:
role = MessageRole.USER

messages.append(
Message(
role=role,
content=msg_data.get("content"),
name=msg_data.get("name"),
tool_calls=msg_data.get("toolCalls"),
tool_call_id=msg_data.get("toolCallId"),
)
)

# 提取原始请求头
raw_headers = dict(request.headers)

# 构建 AgentRequest
agent_request = AgentRequest(
messages=messages,
stream=True, # AG-UI 总是流式
tools=request_data.get("tools"),
raw_headers=raw_headers,
raw_body=request_data,
hooks=hooks,
)

# 保存额外参数
agent_request.extra = {
"threadId": context["threadId"],
"runId": context["runId"],
"context": request_data.get("context"),
"forwardedProps": request_data.get("forwardedProps"),
}

return agent_request, context

async def format_response(
self,
result: AgentResult,
request: AgentRequest,
context: Dict[str, Any],
) -> AsyncIterator[str]:
"""格式化响应为 AG-UI 事件流
Agent 可以 yield 三种类型的内容:
1. 普通字符串 - 会被包装成 TEXT_MESSAGE_CONTENT 事件
2. AgentEvent - 直接输出其 raw_sse
3. None - 忽略
Args:
result: Agent 执行结果
request: 原始请求
context: 运行上下文
Yields:
SSE 格式的事件数据
"""
hooks = request.hooks
message_id = str(uuid.uuid4())
text_message_started = False

# 1. 发送 RUN_STARTED 事件
if hooks:
event = hooks.on_run_start()
if event and event.raw_sse:
yield event.raw_sse

try:
# 2. 处理 Agent 结果
content = self._extract_content(result)

# 3. 流式发送内容
if self._is_iterator(content):
async for chunk in self._iterate_content(content):
if chunk is None:
continue

# 检查是否是 AgentEvent
if isinstance(chunk, AgentEvent):
if chunk.raw_sse:
yield chunk.raw_sse
elif isinstance(chunk, str) and chunk:
# 普通文本内容,包装成 TEXT_MESSAGE_CONTENT
if not text_message_started and hooks:
# 延迟发送 TEXT_MESSAGE_START,只在有文本内容时才发送
event = hooks.on_text_message_start(message_id)
if event and event.raw_sse:
yield event.raw_sse
text_message_started = True

if hooks:
event = hooks.on_text_message_content(
message_id, chunk
)
if event and event.raw_sse:
yield event.raw_sse
else:
# 非迭代器内容
if isinstance(content, AgentEvent):
if content.raw_sse:
yield content.raw_sse
elif content:
content_str = str(content)
if hooks:
event = hooks.on_text_message_start(message_id)
if event and event.raw_sse:
yield event.raw_sse
text_message_started = True
event = hooks.on_text_message_content(
message_id, content_str
)
if event and event.raw_sse:
yield event.raw_sse

# 4. 发送 TEXT_MESSAGE_END 事件(如果有文本消息)
if text_message_started and hooks:
event = hooks.on_text_message_end(message_id)
if event and event.raw_sse:
yield event.raw_sse

# 5. 发送 RUN_FINISHED 事件
if hooks:
event = hooks.on_run_finish()
if event and event.raw_sse:
yield event.raw_sse

except Exception as e:
# 发送错误事件
if hooks:
event = hooks.on_run_error(str(e), "AGENT_ERROR")
if event and event.raw_sse:
yield event.raw_sse

async def _error_stream(self, message: str) -> AsyncIterator[str]:
"""生成错误事件流"""
context = {
"threadId": str(uuid.uuid4()),
"runId": str(uuid.uuid4()),
}
hooks = self.create_hooks(context)

event = hooks.on_run_start()
if event and event.raw_sse:
yield event.raw_sse
event = hooks.on_run_error(message, "REQUEST_ERROR")
if event and event.raw_sse:
yield event.raw_sse

def _extract_content(self, result: AgentResult) -> Any:
"""从结果中提取内容"""
if isinstance(result, AgentRunResult):
return result.content
if isinstance(result, AgentResponse):
return result.content
if isinstance(result, str):
return result
return result

async def _iterate_content(
self, content: Union[Iterator, AsyncIterator]
) -> AsyncIterator:
"""统一迭代同步和异步迭代器
支持迭代包含字符串或 AgentEvent 的迭代器。
对于同步迭代器,每次 next() 调用都在线程池中执行,避免阻塞事件循环。
"""
import asyncio

if hasattr(content, "__aiter__"):
# 异步迭代器
async for chunk in content: # type: ignore
yield chunk
else:
# 同步迭代器 - 在线程池中迭代,避免阻塞
loop = asyncio.get_event_loop()
iterator = iter(content) # type: ignore

while True:
try:
# 在线程池中执行 next(),避免 time.sleep 阻塞事件循环
chunk = await loop.run_in_executor(None, next, iterator)
yield chunk
except StopIteration:
break


# ============================================================================
# 辅助函数 - 用于用户自定义 AG-UI 事件
# ============================================================================


def create_agui_event(event_type: AGUIEventType, **kwargs) -> AGUIBaseEvent:
"""创建 AG-UI 事件的辅助函数
Args:
event_type: 事件类型
**kwargs: 事件参数
Returns:
对应类型的事件对象
Example:
>>> event = create_agui_event(
... AGUIEventType.TEXT_MESSAGE_CONTENT,
... messageId="msg-123",
... delta="Hello"
... )
"""
event_classes = {
AGUIEventType.RUN_STARTED: AGUIRunStartedEvent,
AGUIEventType.RUN_FINISHED: AGUIRunFinishedEvent,
AGUIEventType.RUN_ERROR: AGUIRunErrorEvent,
AGUIEventType.STEP_STARTED: AGUIStepStartedEvent,
AGUIEventType.STEP_FINISHED: AGUIStepFinishedEvent,
AGUIEventType.TEXT_MESSAGE_START: AGUITextMessageStartEvent,
AGUIEventType.TEXT_MESSAGE_CONTENT: AGUITextMessageContentEvent,
AGUIEventType.TEXT_MESSAGE_END: AGUITextMessageEndEvent,
AGUIEventType.TOOL_CALL_START: AGUIToolCallStartEvent,
AGUIEventType.TOOL_CALL_ARGS: AGUIToolCallArgsEvent,
AGUIEventType.TOOL_CALL_END: AGUIToolCallEndEvent,
AGUIEventType.TOOL_CALL_RESULT: AGUIToolCallResultEvent,
AGUIEventType.STATE_SNAPSHOT: AGUIStateSnapshotEvent,
AGUIEventType.STATE_DELTA: AGUIStateDeltaEvent,
AGUIEventType.MESSAGES_SNAPSHOT: AGUIMessagesSnapshotEvent,
AGUIEventType.RAW: AGUIRawEvent,
AGUIEventType.CUSTOM: AGUICustomEvent,
}

event_class = event_classes.get(event_type, AGUIBaseEvent)
return event_class(type=event_type, **kwargs)
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new AG-UI protocol handler (AGUIProtocolHandler) and lifecycle hooks system lack test coverage. This is a substantial new feature that should have comprehensive unit tests covering:

  • Request parsing and validation
  • Event generation for all hook methods
  • Response formatting
  • Error handling
  • SSE stream generation

Consider adding tests in tests/unittests/server/ directory similar to the existing test structure for other modules.

Copilot uses AI. Check for mistakes.
Comment on lines 20 to 28
... # 发送步骤开始事件 (使用 emit_* 同步方法)
... yield hooks.emit_step_start("processing")
...
... # 处理逻辑...
... yield "Hello, "
... yield "world!"
...
... # 发送步骤结束事件
... yield hooks.emit_step_finish("processing")
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation example references non-existent methods emit_step_start() and emit_step_finish(). Based on the actual implementation in agentrun/server/model.py, the lifecycle hooks only provide on_* methods that return Optional[AgentEvent].

The correct usage pattern should be:

def invoke_agent(request: AgentRequest):
    hooks = request.hooks
    # Direct yield of event (may be None)
    yield hooks.on_step_start("processing")
    yield "Hello, "
    yield "world!"
    yield hooks.on_step_finish("processing")
Suggested change
... # 发送步骤开始事件 (使用 emit_* 同步方法)
... yield hooks.emit_step_start("processing")
...
... # 处理逻辑...
... yield "Hello, "
... yield "world!"
...
... # 发送步骤结束事件
... yield hooks.emit_step_finish("processing")
... # 发送步骤开始事件 (使用 on_* 同步方法)
... yield hooks.on_step_start("processing")
...
... # 处理逻辑...
... yield "Hello, "
... yield "world!"
...
... # 发送步骤结束事件
... yield hooks.on_step_finish("processing")

Copilot uses AI. Check for mistakes.
- context: 协议特定的上下文信息
"""
# 提取原始请求头
raw_headers = dict(request.headers)
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable raw_headers is not used.

Suggested change
raw_headers = dict(request.headers)

Copilot uses AI. Check for mistakes.
"""

import asyncio
from typing import Any, AsyncIterator, Callable, Dict, Optional
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'Optional' is not used.
Import of 'Any' is not used.

Suggested change
from typing import Any, AsyncIterator, Callable, Dict, Optional
from typing import AsyncIterator, Callable, Dict

Copilot uses AI. Check for mistakes.
"""

import time
from typing import Any, Callable, Dict, Iterator, List, Optional
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'List' is not used.
Import of 'Optional' is not used.
Import of 'Any' is not used.

Suggested change
from typing import Any, Callable, Dict, Iterator, List, Optional
from typing import Callable, Dict, Iterator

Copilot uses AI. Check for mistakes.
当前时间的字符串表示
"""
# 模拟异步 I/O 操作
import asyncio
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import of module asyncio is redundant, as it was previously imported on line 26.

Suggested change
import asyncio

Copilot uses AI. Check for mistakes.
…ator handlers and lifecycle hooks

This commit introduces comprehensive AGUI protocol support with lifecycle hooks for both LangChain and LangGraph integrations. The changes include:

1. Added convert function exports to both LangChain and LangGraph integration modules with usage examples
2. Refactored model adapter to use default_headers instead of async_client for ChatOpenAI
3. Enhanced server invoker to handle both coroutine and async generator functions
4. Updated protocol handlers to support async response formatting and safe iterator handling
5. Refined AgentRequest model to be protocol-agnostic with core fields
6. Added CORS middleware configuration to AgentRunServer
7. Updated quick start example to use async streaming with event conversion

The changes enable proper streaming support, tool call event handling, and improved async performance for both LangChain and LangGraph integrations.

feat(langchain|langgraph): 添加 AGUI 协议支持和异步生成器处理器以及生命周期钩子

此提交为 LangChain 和 LangGraph 集成引入了全面的 AGUI 协议支持和生命周期钩子。变更包括:

1. 为 LangChain 和 LangGraph 集成模块添加了 convert 函数导出以及使用示例
2. 重构模型适配器,使用 default_headers 替代 async_client 用于 ChatOpenAI
3. 增强服务器调用器以处理协程和异步生成器函数
4. 更新协议处理器以支持异步响应格式化和安全的迭代器处理
5. 精简 AgentRequest 模型以实现协议无关性并使用核心字段
6. 为 AgentRunServer 添加 CORS 中间件配置
7. 更新快速开始示例以使用带事件转换的异步流

这些变更启用了适当的流式传输支持、工具调用事件处理以及改进的异步性能。

Change-Id: I941d1d797b930243282555b5a6db0e6d420f3691
Signed-off-by: OhYee <oyohyee@oyohyee.com>
…mented out obsolete tool start handler

    - Remove unnecessary line breaks and reformat function calls for better readability
    - Comment out on_tool_start event handler that was causing duplicate tool call events
    - Adjust code indentation and formatting in _get_tool_chunks and _get_tool_calls functions

  修复代码格式并注释掉过时的工具开始处理器

    - 移除不必要的换行并重新格式化函数调用以提高可读性
    - 注释掉导致工具调用事件重复的 on_tool_start 事件处理器
    - 调整 _get_tool_chunks 和 _get_tool_calls 函数中的代码缩进和格式

Change-Id: Id90475c30ecc0134dae7ef2d1b97ef20986018a1
Signed-off-by: OhYee <oyohyee@oyohyee.com>
…esult events

This change replaces the previous hook-based system with a standardized event-driven architecture using AgentResult. The new system provides consistent event handling across all protocols (AG-UI and OpenAI) and simplifies the API for end users.

The key changes include:
- Removing AgentLifecycleHooks and all on_* hook methods
- Introducing EventType enum with comprehensive event types
- Using AgentResult as the standard return type for all events
- Updating protocol handlers to transform AgentResult into protocol-specific formats
- Simplifying the server model and request/response handling

Additionally, the LangChain/LangGraph integration now uses the new to_agui_events function which supports multiple streaming formats including astream_events, stream, and astream with different modes.

Existing convert functions are preserved for backward compatibility but aliased to to_agui_events.

This change provides a more consistent and extensible foundation for agent event handling.

refactor(server): 从生命周期钩子迁移到标准化的 AgentResult 事件

此更改将以前基于钩子的系统替换为使用 AgentResult 的标准化事件驱动架构。新系统为所有协议(AG-UI 和 OpenAI)提供一致的事件处理,并简化了最终用户的 API。

主要变更包括:
- 移除 AgentLifecycleHooks 和所有 on_* 钩子方法
- 引入包含全面事件类型的 EventType 枚举
- 使用 AgentResult 作为所有事件的标准返回类型
- 更新协议处理器以将 AgentResult 转换为协议特定格式
- 简化服务器模型和请求/响应处理

此外,LangChain/LangGraph 集成现在使用新的 to_agui_events 函数,该函数支持多种流式传输格式,包括 astream_events、stream 和不同模式的 astream。

为了向后兼容保留了现有的 convert 函数,但别名为 to_agui_events。

此更改提供了更一致和可扩展的代理事件处理基础。

Change-Id: Ie9f3aad829e03a7f8437cc605317876edee4ae49
Signed-off-by: OhYee <oyohyee@oyohyee.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants