A small, modular agent framework for building LLM-powered applications in Python.
Inspired by smolagents and Pi — borrowing the minimal-abstraction philosophy from the former and the conversational agent loop from the latter.
Beta — TinyAgent is usable but not production-ready. APIs may change between minor versions.
TinyAgent provides a lightweight foundation for creating conversational AI agents with tool use capabilities. It features:
- Streaming-first architecture: All LLM interactions support streaming responses
- Tool execution: Define and execute tools with structured outputs
- Event-driven: Subscribe to agent events for real-time UI updates
- Provider agnostic: Works with any OpenAI-compatible
/chat/completionsendpoint (OpenRouter, OpenAI, Chutes, local servers) - Prompt caching: Reduce token costs and latency with Anthropic-style cache breakpoints
- Dual provider paths: Pure-Python or optional Rust binding via PyO3 for native-speed streaming
- Type-safe: Full type hints throughout
import asyncio
from tinyagent import Agent, AgentOptions, OpenRouterModel, stream_openrouter
# Create an agent
agent = Agent(
AgentOptions(
stream_fn=stream_openrouter,
session_id="my-session"
)
)
# Configure
agent.set_system_prompt("You are a helpful assistant.")
agent.set_model(OpenRouterModel(id="anthropic/claude-3.5-sonnet"))
# Optional: use any OpenAI-compatible /chat/completions endpoint
# agent.set_model(OpenRouterModel(id="gpt-4o-mini", base_url="https://api.openai.com/v1/chat/completions"))
# Simple prompt
async def main():
response = await agent.prompt_text("What is the capital of France?")
print(response)
asyncio.run(main())pip install tiny-agent-osThe Agent class is the main entry point. It manages:
- Conversation state (messages, tools, system prompt)
- Streaming responses
- Tool execution
- Event subscription
Messages follow a typed dictionary structure:
UserMessage: Input from the userAssistantMessage: Response from the LLMToolResultMessage: Result from tool execution
Tools are functions the LLM can call:
from tinyagent import AgentTool, AgentToolResult
async def calculate_sum(tool_call_id: str, args: dict, signal, on_update) -> AgentToolResult:
result = args["a"] + args["b"]
return AgentToolResult(
content=[{"type": "text", "text": str(result)}]
)
tool = AgentTool(
name="sum",
description="Add two numbers",
parameters={
"type": "object",
"properties": {
"a": {"type": "number"},
"b": {"type": "number"}
},
"required": ["a", "b"]
},
execute=calculate_sum
)
agent.set_tools([tool])The agent emits events during execution:
AgentStartEvent/AgentEndEvent: Agent run lifecycleTurnStartEvent/TurnEndEvent: Single turn lifecycleMessageStartEvent/MessageUpdateEvent/MessageEndEvent: Message streamingToolExecutionStartEvent/ToolExecutionUpdateEvent/ToolExecutionEndEvent: Tool execution
Subscribe to events:
def on_event(event):
print(f"Event: {event.type}")
unsubscribe = agent.subscribe(on_event)TinyAgent supports Anthropic-style prompt caching to reduce costs on multi-turn conversations. Enable it when creating the agent:
agent = Agent(
AgentOptions(
stream_fn=stream_openrouter,
session_id="my-session",
enable_prompt_caching=True,
)
)Cache breakpoints are automatically placed on user message content blocks so the prompt prefix stays cached across turns. See Prompt Caching for details.
TinyAgent ships with an optional Rust-based LLM provider located in
bindings/alchemy_llm_py/. It wraps the alchemy-llm
Rust crate and exposes it to Python via PyO3, giving you native-speed
OpenAI-compatible streaming without leaving the Python process.
The pure-Python providers (openrouter_provider.py, proxy.py) work fine, but the Rust
binding gives you:
- Lower per-token overhead -- SSE parsing, JSON deserialization, and event dispatch all happen in compiled Rust with a multi-threaded Tokio runtime.
- Unified provider abstraction --
alchemy-llmnormalizes differences across providers (OpenRouter, Anthropic, custom endpoints) behind a single streaming interface. - Full event fidelity -- text deltas, thinking deltas, tool call deltas, and terminal events are all surfaced as typed Python dicts.
Python (async) Rust (Tokio)
───────────────── ─────────────────────────
stream_alchemy_*() ──> alchemy_llm::stream()
│
AlchemyStreamResponse ├─ SSE parse + deserialize
.__anext__() <── ├─ event_to_py_value()
(asyncio.to_thread) └─ mpsc channel -> Python
- Python calls
openai_completions_stream(model, context, options)which is a#[pyfunction]. - The Rust side builds an
alchemy-llmrequest, opens an SSE stream on a shared Tokio runtime, and sends events through anmpscchannel. - Python reads events by calling the blocking
next_event()method viaasyncio.to_thread, making it async-compatible without busy-waiting. - A terminal
doneorerrorevent signals the end of the stream. The finalAssistantMessagedict is available viaresult().
Requires a Rust toolchain (1.70+) and maturin.
pip install maturin
cd bindings/alchemy_llm_py
maturin develop # debug build, installs into current venv
maturin develop --release # optimized buildTwo functions are exposed from the alchemy_llm_py module:
| Function | Description |
|---|---|
collect_openai_completions(model, context, options?) |
Blocking. Consumes the entire stream and returns {"events": [...], "final_message": {...}}. Useful for one-shot calls. |
openai_completions_stream(model, context, options?) |
Returns an OpenAICompletionsStream handle for incremental consumption. |
The OpenAICompletionsStream handle has two methods:
| Method | Description |
|---|---|
next_event() |
Blocking. Returns the next event dict, or None when the stream ends. |
result() |
Blocking. Returns the final assistant message dict. |
All three arguments are plain Python dicts:
model = {
"id": "anthropic/claude-3.5-sonnet",
"base_url": "https://openrouter.ai/api/v1/chat/completions",
"provider": "openrouter", # optional
"headers": {"X-Custom": "val"}, # optional
"reasoning": False, # optional
"context_window": 128000, # optional
"max_tokens": 4096, # optional
}
context = {
"system_prompt": "You are helpful.",
"messages": [
{"role": "user", "content": [{"type": "text", "text": "Hello"}]}
],
"tools": [ # optional
{"name": "sum", "description": "Add numbers", "parameters": {...}}
],
}
options = {
"api_key": "sk-...", # optional
"temperature": 0.7, # optional
"max_tokens": 1024, # optional
}You don't need to call the Rust binding directly. Use the alchemy_provider module:
from tinyagent import Agent, AgentOptions
from tinyagent.alchemy_provider import OpenAICompatModel, stream_alchemy_openai_completions
agent = Agent(
AgentOptions(
stream_fn=stream_alchemy_openai_completions,
session_id="my-session",
)
)
agent.set_model(
OpenAICompatModel(
id="anthropic/claude-3.5-sonnet",
base_url="https://openrouter.ai/api/v1/chat/completions",
)
)- Only OpenAI-compatible
/chat/completionsstreaming is supported. - Image blocks are not yet supported (text and thinking blocks work).
next_event()is blocking and runs in a thread viaasyncio.to_thread-- this adds slight overhead compared to a native async generator, but keeps the GIL released during the Rust work.
- Architecture: System design and component interactions
- API Reference: Detailed module documentation
- Prompt Caching: Cache breakpoints, cost savings, and provider requirements
- OpenAI-Compatible Endpoints: Using
OpenRouterModel.base_urlwith OpenRouter, OpenAI, Chutes, and local compatible backends - Usage Semantics: Unified
message["usage"]schema across Python and Rust provider paths - Changelog: Release history
tinyagent/
├── agent.py # Agent class
├── agent_loop.py # Core agent execution loop
├── agent_tool_execution.py # Tool execution helpers
├── agent_types.py # Type definitions
├── caching.py # Prompt caching utilities
├── openrouter_provider.py # OpenRouter integration
├── alchemy_provider.py # Rust-based provider (PyO3)
├── proxy.py # Proxy server integration
└── proxy_event_handlers.py # Proxy event parsing
