Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased]

### Added
- `LoopbackChannel` / `LoopbackHandle` / `LoopbackEvent` in zeph-core — headless channel for daemon mode, pairs with a handle that exposes `input_tx` / `output_rx` for programmatic agent I/O
- `ProcessorEvent` enum in zeph-a2a server — streaming event type replacing synchronous `ProcessResult`; `TaskProcessor::process` now accepts an `mpsc::Sender<ProcessorEvent>` and returns `Result<(), A2aError>`
- `--daemon` CLI flag (feature `daemon+a2a`) — bootstraps a full agent + A2A JSON-RPC server under `DaemonSupervisor` with PID file lifecycle and Ctrl-C graceful shutdown
- `--connect <URL>` CLI flag (feature `tui+a2a`) — connects the TUI to a remote daemon via A2A SSE, mapping `TaskEvent` to `AgentEvent` in real-time
- Command palette daemon commands: `daemon:connect`, `daemon:disconnect`, `daemon:status`
- Command palette action commands: `app:quit` (shortcut `q`), `app:help` (shortcut `?`), `session:new`, `app:theme`
- Fuzzy-matching for command palette — character-level gap-penalty scoring replaces substring filter; `daemon_command_registry()` merged into `filter_commands`
- `TuiCommand::ToggleTheme` variant in command palette (placeholder — theme switching not yet implemented)
- `--init` wizard daemon step — prompts for A2A server host, port, and auth token; writes `config.a2a.*`
- `Embeddable` trait and `EmbeddingRegistry<T>` in zeph-memory — generic Qdrant sync/search extracted from duplicated code in QdrantSkillMatcher and McpToolRegistry (~350 lines removed)
- MCP server command allowlist validation — only permitted commands (npx, uvx, node, python3, python, docker, deno, bun) can spawn child processes; configurable via `mcp.allowed_commands`
- MCP env var blocklist — blocks 21 dangerous variables (LD_PRELOAD, DYLD_*, NODE_OPTIONS, PYTHONPATH, JAVA_TOOL_OPTIONS, etc.) and BASH_FUNC_* prefix from MCP server processes
Expand Down Expand Up @@ -51,6 +60,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

### Fixed
- False positive: "sudoku" no longer matched by "sudo" blocked pattern (word-boundary matching)
- PID file creation uses `OpenOptions::create_new(true)` (O_CREAT|O_EXCL) to prevent TOCTOU symlink attacks

## [0.11.2] - 2026-02-19

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ stt = ["zeph-llm/stt", "dep:reqwest"]
anyhow.workspace = true
clap.workspace = true
dialoguer.workspace = true
futures.workspace = true
toml.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal", "sync", "time"] }
tokio-util.workspace = true
Expand Down
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ zeph Run the agent (default)
zeph init Interactive configuration wizard
zeph init -o path.toml Write generated config to a specific path
zeph --tui Run with TUI dashboard
zeph --daemon Run as headless background daemon with A2A endpoint
zeph --connect <url> Connect TUI to a running daemon via SSE
zeph --config <path> Use a custom config file
zeph --vault <backend> Secrets backend: env or age
zeph --vault-key <path> Path to age identity key file
Expand Down Expand Up @@ -288,6 +290,24 @@ cargo build --release --features tui
./target/release/zeph --tui
```

### Daemon Mode

Run Zeph as a headless background agent. The daemon exposes an A2A endpoint for programmatic interaction and accepts remote TUI connections via SSE streaming.

```bash
# Start the daemon (requires daemon + a2a features)
cargo run --release --features daemon,a2a -- --daemon

# Connect a TUI to the running daemon (requires tui + a2a features)
cargo run --release --features tui,a2a -- --connect http://localhost:3000
```

The daemon uses a `LoopbackChannel` for headless agent I/O, allowing programmatic message exchange without a terminal or chat adapter.

### Command Palette

Press `Ctrl+P` in the TUI to open the command palette. 14+ commands with fuzzy matching and keybinding hints provide quick access to agent actions — file picker, skill stats, metrics toggle, theme switching, and more.

[TUI guide →](https://bug-ops.github.io/zeph/guide/tui.html)

### TUI Testing
Expand Down Expand Up @@ -381,7 +401,7 @@ Always compiled in: `openai`, `compatible`, `orchestrator`, `router`, `self-lear
| `a2a` | Agent-to-agent protocol |
| `index` | AST-based code indexing |
| `gateway` | HTTP webhook ingestion |
| `daemon` | Component supervisor |
| `daemon` | Headless background agent with A2A endpoint and remote TUI support |
| `pdf` | PDF document loading for RAG |
| `stt` | Speech-to-text via OpenAI Whisper API |
| `scheduler` | Cron-based periodic tasks; auto-update check runs daily at 09:00 |
Expand Down
3 changes: 2 additions & 1 deletion crates/zeph-a2a/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub use card::AgentCardBuilder;
pub use client::{A2aClient, TaskEvent, TaskEventStream};
pub use discovery::AgentRegistry;
pub use error::A2aError;
pub use jsonrpc::SendMessageParams;
#[cfg(feature = "server")]
pub use server::{A2aServer, ProcessResult, TaskManager, TaskProcessor};
pub use server::{A2aServer, ProcessorEvent, TaskManager, TaskProcessor};
pub use types::*;
243 changes: 186 additions & 57 deletions crates/zeph-a2a/src/server/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::jsonrpc::{
};
use crate::types::{TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent};

use super::state::{AppState, CancelError, now_rfc3339};
use super::state::{AppState, CancelError, ProcessorEvent, now_rfc3339};

const ERR_METHOD_NOT_FOUND: i32 = -32601;
const ERR_INVALID_PARAMS: i32 = -32602;
Expand Down Expand Up @@ -104,33 +104,51 @@ async fn handle_send_message(
.update_status(&task.id, TaskState::Working, None)
.await;

match state
let (event_tx, mut event_rx) = mpsc::channel::<ProcessorEvent>(32);
let proc_future = state
.processor
.process(task.id.clone(), params.message)
.await
{
Ok(result) => {
state
.task_manager
.append_history(&task.id, result.response)
.await;
for artifact in result.artifacts {
state.task_manager.add_artifact(&task.id, artifact).await;
.process(task.id.clone(), params.message, event_tx);

let proc_handle = tokio::spawn(proc_future);

let mut accumulated = String::new();
while let Some(event) = event_rx.recv().await {
match event {
ProcessorEvent::ArtifactChunk { text, .. } => {
accumulated.push_str(&text);
}
state
.task_manager
.update_status(&task.id, TaskState::Completed, None)
.await;
ProcessorEvent::StatusUpdate { .. } => {}
}
Err(e) => {
}

let final_state = match proc_handle.await {
Ok(Ok(())) => TaskState::Completed,
Ok(Err(e)) => {
tracing::error!(task_id = %task.id, "task processing failed: {e}");
state
.task_manager
.update_status(&task.id, TaskState::Failed, None)
.await;
TaskState::Failed
}
Err(e) => {
tracing::error!(task_id = %task.id, "task processor panicked: {e}");
TaskState::Failed
}
};

if final_state == TaskState::Completed && !accumulated.is_empty() {
use crate::types::{Artifact, Part};
let artifact = Artifact {
artifact_id: format!("{}-artifact", task.id),
name: None,
parts: vec![Part::text(accumulated)],
metadata: None,
};
state.task_manager.add_artifact(&task.id, artifact).await;
}

state
.task_manager
.update_status(&task.id, final_state, None)
.await;

match state.task_manager.get_task(&task.id, None).await {
Some(t) => success_response(id, t),
None => error_response(id, ERR_INTERNAL, "task vanished during processing"),
Expand Down Expand Up @@ -273,54 +291,86 @@ async fn stream_task(state: AppState, message: crate::types::Message, tx: mpsc::
))
.await;

match state.processor.process(task_id.clone(), message).await {
Ok(result) => {
state
.task_manager
.append_history(&task_id, result.response)
.await;

for artifact in result.artifacts {
let (event_tx, mut event_rx) = mpsc::channel::<ProcessorEvent>(32);
let proc_future = state.processor.process(task_id.clone(), message, event_tx);

let proc_handle = tokio::spawn(proc_future);

let mut accumulated = String::new();
while let Some(event) = event_rx.recv().await {
match event {
ProcessorEvent::ArtifactChunk { text, is_final } => {
accumulated.push_str(&text);
let artifact = crate::types::Artifact {
artifact_id: uuid::Uuid::new_v4().to_string(),
name: None,
parts: vec![crate::types::Part::text(text)],
metadata: None,
};
let evt = TaskArtifactUpdateEvent {
kind: "artifact-update".into(),
task_id: task_id.clone(),
context_id: context_id.clone(),
artifact: artifact.clone(),
is_final: false,
artifact,
is_final,
};
let _ = tx.send(sse_rpc_event(&evt)).await;
state.task_manager.add_artifact(&task_id, artifact).await;
}
ProcessorEvent::StatusUpdate {
state: task_state,
is_final,
} => {
state
.task_manager
.update_status(&task_id, task_state, None)
.await;
let _ = tx
.send(status_event(
&task_id,
context_id.as_ref(),
task_state,
is_final,
))
.await;
}
}
}

state
.task_manager
.update_status(&task_id, TaskState::Completed, None)
.await;
let _ = tx
.send(status_event(
&task_id,
context_id.as_ref(),
TaskState::Completed,
true,
))
.await;
let final_state = match proc_handle.await {
Ok(Ok(())) => TaskState::Completed,
Ok(Err(e)) => {
tracing::error!(task_id = %task_id, "stream task processing failed: {e}");
TaskState::Failed
}
Err(e) => {
tracing::error!(task_id = %task_id, "stream task processing failed: {e}");
state
.task_manager
.update_status(&task_id, TaskState::Failed, None)
.await;
let _ = tx
.send(status_event(
&task_id,
context_id.as_ref(),
TaskState::Failed,
true,
))
.await;
tracing::error!(task_id = %task_id, "stream task processor panicked: {e}");
TaskState::Failed
}
};

if final_state == TaskState::Completed && !accumulated.is_empty() {
use crate::types::{Artifact, Part};
let artifact = Artifact {
artifact_id: format!("{task_id}-artifact"),
name: None,
parts: vec![Part::text(accumulated)],
metadata: None,
};
state.task_manager.add_artifact(&task_id, artifact).await;
}

state
.task_manager
.update_status(&task_id, final_state, None)
.await;
let _ = tx
.send(status_event(
&task_id,
context_id.as_ref(),
final_state,
true,
))
.await;
}

#[cfg(test)]
Expand Down Expand Up @@ -410,4 +460,83 @@ mod tests {
assert_eq!(msg, "invalid parameters");
assert!(!msg.contains("invalid type"), "serde details must not leak");
}

// Multi-chunk ArtifactChunk accumulation test

struct MultiChunkProcessor;

impl super::super::state::TaskProcessor for MultiChunkProcessor {
fn process(
&self,
_task_id: String,
_message: crate::types::Message,
event_tx: tokio::sync::mpsc::Sender<super::super::state::ProcessorEvent>,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<(), crate::error::A2aError>> + Send>,
> {
Box::pin(async move {
let _ = event_tx
.send(super::super::state::ProcessorEvent::ArtifactChunk {
text: "chunk1".into(),
is_final: false,
})
.await;
let _ = event_tx
.send(super::super::state::ProcessorEvent::ArtifactChunk {
text: " chunk2".into(),
is_final: false,
})
.await;
let _ = event_tx
.send(super::super::state::ProcessorEvent::ArtifactChunk {
text: " chunk3".into(),
is_final: true,
})
.await;
let _ = event_tx
.send(super::super::state::ProcessorEvent::StatusUpdate {
state: crate::types::TaskState::Completed,
is_final: true,
})
.await;
Ok(())
})
}
}

fn multi_chunk_state() -> super::super::state::AppState {
use std::sync::Arc;
super::super::state::AppState {
card: super::super::testing::test_card(),
task_manager: super::super::state::TaskManager::new(),
processor: Arc::new(MultiChunkProcessor),
}
}

#[tokio::test]
async fn multi_chunk_accumulation_produces_joined_artifact() {
let app = build_router_with_config(multi_chunk_state(), None, 0);
let msg = crate::types::Message {
role: crate::types::Role::User,
parts: vec![crate::types::Part::Text {
text: "hello".into(),
metadata: None,
}],
message_id: Some("m1".into()),
task_id: None,
context_id: None,
metadata: None,
};
let req = make_rpc_request(
"message/send",
serde_json::json!({ "message": serde_json::to_value(&msg).unwrap() }),
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), 200);
let body = get_rpc_body(resp).await;
// Result should contain artifact with all chunks joined
let artifacts = &body["result"]["artifacts"];
let text = artifacts[0]["parts"][0]["text"].as_str().unwrap_or("");
assert_eq!(text, "chunk1 chunk2 chunk3");
}
}
Loading
Loading