Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased]

### Added
- WebSocket connection lifecycle hardening: `AtomicUsize` slot reservation before upgrade handshake eliminates TOCTOU between capacity check and `DashMap` insertion; 30s ping / 90s pong-timeout keepalive; binary frame rejection with close code 1003; graceful disconnect with 1s write-task drain window to ensure close frame delivery per RFC 6455 (#936)
- Bearer token authentication middleware for ACP HTTP and WebSocket transports (`auth.rs`): constant-time token comparison via `subtle::ConstantTimeEq`, configurable via `acp.auth_bearer_token` / `ZEPH_ACP_AUTH_TOKEN` env var; no-auth open mode when token is unset (#936)
- Agent discovery manifest endpoint `GET /.well-known/acp.json`: returns agent name, version, supported transports, and authentication type; publicly accessible (exempt from bearer auth), controlled by `acp.discovery_enabled` (default `true`) / `ZEPH_ACP_DISCOVERY_ENABLED` env var (#936)
- `AcpConfig` fields: `auth_bearer_token: Option<String>`, `discovery_enabled: bool` (#936)
- `--acp-auth-token` CLI flag for runtime bearer token injection (#936)
- `zeph-core`: `LoopbackEvent::ToolStart { tool_name, tool_call_id }` variant emitted before tool execution; `LoopbackEvent::ToolOutput` extended with `tool_call_id` and `is_error` fields (#926)
- `zeph-core`: `Channel::send_tool_start` method; `LoopbackChannel` emits `ToolStart` events; tool UUIDs generated per execution and threaded through the pipeline (#926)
- `zeph-acp`: ACP tool call lifecycle now emits `SessionUpdate::ToolCall(InProgress)` before execution and `SessionUpdate::ToolCallUpdate(Completed|Failed)` with content after, per protocol spec G5 (#926)
Expand All @@ -32,6 +37,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- `AgentCapabilities` in `initialize()` now advertises `PromptCapabilities` with `image=true` and `embedded_context=true`, reflecting actual Image and Resource content block support (#917)
- ACP: `AgentCapabilities` in `initialize` response now advertises `config_options` and `ext_methods` support via meta fields (#930)

### Security
- `AcpConfig` now uses custom `impl std::fmt::Debug` that redacts `auth_token` as `[REDACTED]`, consistent with `A2aServerConfig` and `TelegramConfig` (#936)

## [0.12.1] - 2026-02-25

### Security
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ candle-nn = { version = "0.9", default-features = false }
candle-transformers = { version = "0.9", default-features = false }
chrono = { version = "0.4", default-features = false, features = ["std"] }
dashmap = "6.1"
clap = { version = "4.5", features = ["derive"] }
clap = { version = "4.5", features = ["derive", "env"] }
criterion = "0.8"
cron = "0.15"
crossterm = "0.29"
Expand Down
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,60 @@ zeph acp --http :8080 # HTTP+SSE (shared/remote)
zeph acp --ws :8080 # WebSocket
```

### WebSocket transport hardening

The WebSocket transport is hardened against a range of protocol and concurrency issues:

| Property | Value |
|---|---|
| Max concurrent sessions | Configurable; enforced with atomic slot reservation (eliminates TOCTOU race) |
| Keepalive | 30 s ping interval / 90 s pong timeout — idle connections are closed |
| Max message size | 1 MiB |
| Binary frames | Rejected with close code `1003 Unsupported Data` (text-only protocol) |
| Disconnect drain | Write task given 1 s to deliver the RFC 6455 close frame before the socket is dropped |

### Bearer token authentication

Protect the `/acp` (HTTP+SSE) and `/acp/ws` (WebSocket) endpoints with a static bearer token. The discovery endpoint is always exempt.

```toml
# config.toml
[acp]
auth_bearer_token = "your-secret-token"
```

| Method | Value |
|---|---|
| Config key | `acp.auth_bearer_token` |
| Environment variable | `ZEPH_ACP_AUTH_TOKEN` |
| CLI flag | `--acp-auth-token <token>` |

Requests to guarded routes without a valid `Authorization: Bearer <token>` header receive `401 Unauthorized`. When no token is configured, the server runs in open mode — no authentication is enforced. stdio transport is always unaffected.

> [!TIP]
> Always set `auth_bearer_token` when exposing the ACP server on a network interface. For local-only stdio or single-user setups no token is required.

> [!CAUTION]
> Open mode (no token configured) is suitable only for trusted local use. Any process on the same host can issue agent commands without authentication.

### Agent discovery

Zeph publishes a machine-readable agent manifest at `GET /.well-known/acp.json` that ACP-compatible clients use for capability discovery. The manifest includes the agent name, version, supported transports, and authentication type.

```toml
# config.toml
[acp]
discovery_enabled = true # default: true
```

| Method | Value |
|---|---|
| Config key | `acp.discovery_enabled` |
| Environment variable | `ZEPH_ACP_DISCOVERY_ENABLED=false` to disable |

> [!NOTE]
> The discovery endpoint is intentionally unauthenticated so clients can discover the agent and its auth requirements before presenting credentials. Do not include sensitive data in the manifest. Set `discovery_enabled = false` if the endpoint must not be publicly reachable.

[ACP setup guide →](https://bug-ops.github.io/zeph/advanced/acp.html)

## TUI Demo
Expand Down
4 changes: 3 additions & 1 deletion crates/zeph-acp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ description = "ACP (Agent Client Protocol) server for IDE embedding"
readme = "README.md"

[features]
acp-http = ["dep:axum", "dep:dashmap", "dep:async-stream", "dep:tower-http"]
acp-http = ["dep:axum", "dep:dashmap", "dep:async-stream", "dep:tower-http", "dep:subtle", "dep:tower"]
unstable-session-list = ["agent-client-protocol/unstable_session_list"]
unstable-session-fork = ["agent-client-protocol/unstable_session_fork"]
unstable-session-resume = ["agent-client-protocol/unstable_session_resume"]
Expand Down Expand Up @@ -42,6 +42,8 @@ zeph-tools.workspace = true
axum = { workspace = true, features = ["ws"], optional = true }
async-stream = { version = "0.3", optional = true }
dashmap = { workspace = true, optional = true }
subtle = { workspace = true, optional = true }
tower = { workspace = true, features = ["util"], optional = true }
tower-http = { workspace = true, features = ["cors", "limit"], optional = true }

[dev-dependencies]
Expand Down
79 changes: 79 additions & 0 deletions crates/zeph-acp/src/transport/auth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
// SPDX-License-Identifier: MIT OR Apache-2.0

use axum::body::Body;
use axum::http::{Request, Response, StatusCode, header};
use axum::response::IntoResponse;
use subtle::ConstantTimeEq as _;
use tower::{Layer, Service};

/// Tower middleware layer that validates `Authorization: Bearer <token>` headers
/// using constant-time comparison to prevent timing attacks.
#[derive(Clone)]
pub(crate) struct BearerAuthLayer {
token: String,
}

impl BearerAuthLayer {
pub(crate) fn new(token: impl Into<String>) -> Self {
Self {
token: token.into(),
}
}
}

impl<S> Layer<S> for BearerAuthLayer {
type Service = BearerAuthMiddleware<S>;

fn layer(&self, inner: S) -> Self::Service {
BearerAuthMiddleware {
inner,
token: self.token.clone(),
}
}
}

/// Middleware service that enforces bearer token authentication.
#[derive(Clone)]
pub(crate) struct BearerAuthMiddleware<S> {
inner: S,
token: String,
}

impl<S> Service<Request<Body>> for BearerAuthMiddleware<S>
where
S: Service<Request<Body>, Response = Response<Body>> + Clone + Send + 'static,
S::Future: Send + 'static,
{
type Response = Response<Body>;
type Error = S::Error;
type Future = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
let expected = self.token.clone();

let authorized = req
.headers()
.get(header::AUTHORIZATION)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
// Constant-time comparison to prevent timing attacks.
.is_some_and(|provided| provided.as_bytes().ct_eq(expected.as_bytes()).into());

if authorized {
let fut = self.inner.call(req);
Box::pin(fut)
} else {
Box::pin(async move { Ok(StatusCode::UNAUTHORIZED.into_response()) })
}
}
}
35 changes: 35 additions & 0 deletions crates/zeph-acp/src/transport/discovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
// SPDX-License-Identifier: MIT OR Apache-2.0

use axum::Json;
use axum::extract::State;
use axum::response::IntoResponse;
use serde_json::{Value, json};

use crate::transport::http::AcpHttpState;

/// `GET /.well-known/acp.json` — publicly accessible agent discovery manifest.
///
/// Returns a JSON document describing the agent's identity, supported transports,
/// and authentication requirements. This endpoint is never behind auth middleware.
pub async fn discovery_handler(State(state): State<AcpHttpState>) -> impl IntoResponse {
let auth = if state.server_config.auth_bearer_token.is_some() {
json!({ "type": "bearer" })
} else {
Value::Null
};

let manifest = json!({
"name": state.server_config.agent_name,
"version": state.server_config.agent_version,
"protocol": "acp",
"protocol_version": "0.9",
"transports": {
"http_sse": { "url": "/acp" },
"websocket": { "url": "/acp/ws" }
},
"authentication": auth
});

Json(manifest)
}
41 changes: 40 additions & 1 deletion crates/zeph-acp/src/transport/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#[cfg(feature = "acp-http")]
use std::sync::Arc;
#[cfg(feature = "acp-http")]
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
#[cfg(feature = "acp-http")]
use std::time::{Duration, SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -65,6 +65,10 @@ pub struct AcpHttpState {
pub(crate) connections: Arc<DashMap<String, Arc<ConnectionHandle>>>,
pub spawner: SendAgentSpawner,
pub server_config: Arc<AcpServerConfig>,
/// Atomic counter for active WebSocket sessions.
/// Used to atomically reserve a slot before the upgrade handshake, eliminating TOCTOU
/// between the capacity check and the actual `DashMap` insertion.
pub(crate) active_ws: Arc<AtomicUsize>,
}

#[cfg(feature = "acp-http")]
Expand All @@ -74,9 +78,44 @@ impl AcpHttpState {
connections: Arc::new(DashMap::new()),
spawner,
server_config: Arc::new(server_config),
active_ws: Arc::new(AtomicUsize::new(0)),
}
}

/// Try to atomically reserve a WebSocket session slot.
///
/// Returns `true` and increments the counter if a slot is available.
/// Returns `false` if `max_sessions` is already reached, without modifying the counter.
pub(crate) fn try_reserve_ws_slot(&self) -> bool {
let max = self.server_config.max_sessions;
// Saturating loop: attempt CAS until either we claim a slot or find it full.
let mut current = self.active_ws.load(Ordering::Relaxed);
loop {
if current >= max {
return false;
}
match self.active_ws.compare_exchange_weak(
current,
current + 1,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(actual) => current = actual,
}
}
}

/// Release a previously reserved WebSocket session slot.
pub(crate) fn release_ws_slot(&self) {
self.active_ws.fetch_sub(1, Ordering::AcqRel);
}

/// Remove a connection from the session map immediately (e.g. on WebSocket disconnect).
pub(crate) fn remove_connection(&self, id: &str) {
self.connections.remove(id);
}

/// Spawn a background task that reaps idle connections every 60 seconds.
pub fn start_reaper(&self) {
let connections = Arc::clone(&self.connections);
Expand Down
13 changes: 13 additions & 0 deletions crates/zeph-acp/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use std::rc::Rc;

use agent_client_protocol as acp;

#[cfg(feature = "acp-http")]
pub mod auth;
pub mod bridge;
#[cfg(feature = "acp-http")]
pub mod discovery;
pub mod http;
pub mod router;
pub mod stdio;
Expand Down Expand Up @@ -39,6 +43,11 @@ pub struct AcpServerConfig {
pub available_models: Vec<String>,
/// Optional shared MCP manager for `ext_method` add/remove/list.
pub mcp_manager: Option<std::sync::Arc<zeph_mcp::McpManager>>,
/// Bearer token for HTTP and WebSocket transport authentication.
/// When `Some`, all /acp and /acp/ws requests must include the token.
pub auth_bearer_token: Option<String>,
/// Whether to serve the /.well-known/acp.json discovery manifest.
pub discovery_enabled: bool,
}

impl Clone for AcpServerConfig {
Expand All @@ -52,6 +61,8 @@ impl Clone for AcpServerConfig {
provider_factory: self.provider_factory.clone(),
available_models: self.available_models.clone(),
mcp_manager: self.mcp_manager.clone(),
auth_bearer_token: self.auth_bearer_token.clone(),
discovery_enabled: self.discovery_enabled,
}
}
}
Expand All @@ -67,6 +78,8 @@ impl Default for AcpServerConfig {
provider_factory: None,
available_models: Vec::new(),
mcp_manager: None,
auth_bearer_token: None,
discovery_enabled: true,
}
}
}
25 changes: 22 additions & 3 deletions crates/zeph-acp/src/transport/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ use axum::routing::{get, post};
#[cfg(feature = "acp-http")]
use tower_http::cors::CorsLayer;

#[cfg(feature = "acp-http")]
use crate::transport::auth::BearerAuthLayer;
#[cfg(feature = "acp-http")]
use crate::transport::discovery::discovery_handler;
#[cfg(feature = "acp-http")]
use crate::transport::http::{AcpHttpState, get_handler, post_handler};
#[cfg(feature = "acp-http")]
Expand All @@ -25,16 +29,31 @@ const MAX_BODY_BYTES: usize = 1_048_576;
/// - `POST /acp` — JSON-RPC request body (≤ 1 MiB), SSE response stream
/// - `GET /acp` — SSE notification reconnect (requires `Acp-Session-Id`)
/// - `GET /acp/ws` — WebSocket upgrade
/// - `GET /.well-known/acp.json` — discovery manifest (always public, no auth)
///
/// Security layers applied:
/// - `DefaultBodyLimit::max(1_048_576)` — rejects oversized POST bodies
/// - `CorsLayer` with empty origin list — denies all cross-origin requests
/// - `BearerAuthLayer` — applied to /acp routes when `auth_bearer_token` is `Some`
#[cfg(feature = "acp-http")]
pub fn acp_router(state: AcpHttpState) -> Router {
Router::new()
let acp_routes = Router::new()
.route("/acp", post(post_handler).get(get_handler))
.route("/acp/ws", get(ws_upgrade_handler))
.layer(DefaultBodyLimit::max(MAX_BODY_BYTES))
.layer(CorsLayer::new())
.with_state(state)
.layer(CorsLayer::new());

let acp_routes = if let Some(token) = state.server_config.auth_bearer_token.clone() {
acp_routes.layer(BearerAuthLayer::new(token))
} else {
acp_routes
};

let mut router = Router::new().merge(acp_routes);

if state.server_config.discovery_enabled {
router = router.route("/.well-known/acp.json", get(discovery_handler));
}

router.with_state(state)
}
Loading
Loading