Efficient lightweight Python Agent Workflow Engine
高效轻量的 Python 智能体工作流引擎
Support Sync/Async Nodes, Branching Loops, Visual Flowcharts | Build Agent Task Flows Fast
支持同步/异步节点、分支循环、可视化流程图 | 快速搭建 Agent 任务流
💻 Code → 📊 Log → 🎨 Flowchart - Complete development agent workflow visualization in minutes
💻 代码 → 📊 日志 → 🎨 流程图 - 快速开发复杂的智能体工作流可视化
from typing import TypedDict
from agnflow import Node, Flow
# Define state
class State(TypedDict):
message: str
# Define nodes
start = Node(
name="start",
exec=lambda s: ("process", {"message": "Hello"}) # Instantiate Node, specify exec
)
class ProcessNode(Node): # Inherit Node, override exec
def exec(state: State):
# Route to end node and update message state
return "end", {"message": s["message"] + " World"}
process = ProcessNode() # No name specified, automatically gets variable name "process"
end = Node(exec=print) # Print state
# Create workflow
flow = Flow(start >> process >> end)
# Run workflow
state: State = {"message": ""}
flow.run(state) # Output: {'message': 'Hello World'}🤖 Static Web Chat Room Experience - Visit Backend Interface http://127.0.0.1:8000/en
from agnflow.chatbot.server import Server
server = Server()
server.run()🧠 Structured reasoning with step-by-step thinking process | 💻 Intelligent code generation with detailed explanations
flow.render_mermaid(saved_file="flow.png") # Directly generate image| Complex Connection | Supervisor Agent | Runtime Management |
|---|---|---|
n1 >> [n2 >> n3, n3 >> n4] >> n5 |
s1[n1, n2, n3] >> n4 |
flow += new_nodeflow -= old_node |
![]() |
![]() |
![]() |
# Add/Remove nodes at runtime
flow += new_node
flow -= old_node
# Symmetric connect/disconnect syntax
a >> b >> c # Connect nodes
a - b - c # Symmetrically disconnect- Sync/Async Mixed:
n = Node(exec=sync_func, aexec=async_func) - Branch/Loop:
n1 >> [n2, n3] >> n1(n1 points to n2 and n3, n2 and n3 point to n1) - Swarm Agent:
s = Swarm(); s[n1, n2, n3] >> n4(n1, n2, n3 are fully connected inside s) - Parallel Flow:
pf = ParallelFlow(); pf[n1, n2, n3](Execute child nodes concurrently) - Human Review: CLI/API intervention with
hitl
pip install agnflowfrom typing import TypedDict
from agnflow import Node, Flow
# 定义状态
class State(TypedDict):
message: str
# 定义节点
start = Node(
name="start",
exec=lambda s: ("process", {"message": "Hello"}) # 实例化 Node,指定 exec
)
class ProcessNode(Node): # 继承 Node,重写 exec
def exec(state: State):
# 路由到 end 节点,并且更新 message 状态,
return "end", {"message": s["message"] + " World"}
process = ProcessNode() # 没有指定 name 属性,自动获取实例化变量名 "process"
end = Node(exec=print) # 打印 state
# 创建工作流
flow = Flow(start >> process >> end)
# 运行工作流
state: State = {"message": ""}
flow.run(state) # 输出: {'message': 'Hello World'}2. 🤖 静态 Web 聊天室体验 - 访问后端接口 http://127.0.0.1:8000/zh
from agnflow.chatbot.server import Server
server = Server()
server.run()flow.render_mermaid(saved_file="flow.png") # 直接生成图片| 复杂连接 | 监督者智能体 | 运行期管理 |
|---|---|---|
n1 >> [n2 >> n3, n3 >> n4] >> n5 |
s1[n1, n2, n3] >> n4 |
flow += new_nodeflow -= old_node |
![]() |
![]() |
![]() |
# 运行期增删节点
flow += new_node
flow -= old_node
# 对称的连接/断开语法
a >> b >> c # 建立连接
a - b - c # 对称断开- 同步/异步混合:
n = Node(exec=sync_func, aexec=async_func) - 分支/循环:
n1 >> [n2, n3] >> n1(n1 指向 n2 和 n3,n2 和 n3 指向 n1) - 蜂群智能体:
s = Swarm(); s[n1, n2, n3] >> n4(s 内部的 n1,n2,n3 全互连) - 并行工作流:
pf = ParallelFlow(); pf[n1, n2, n3](并发执行多个子节点) - 人工审核: CLI/API 介入
hitl
pip install agnflow访问我们的文档获取:
Visit our documentation for:
- 详细教程和示例 Detailed tutorials and examples
- API 参考 API reference
- 最佳实践 Best practices
- 高级用法 Advanced usage
我们欢迎贡献!请随时提交 Pull Request。
We welcome contributions! Please feel free to submit a Pull Request.
本项目基于 MIT 许可证 - 查看 LICENSE 文件了解详情。
This project is licensed under the MIT License - see the LICENSE file for details.
如果这个项目对你有帮助,请给它一个 ⭐️ Star!
Your support is my motivation to keep improving 💪
你的支持是我持续改进的动力 💪











