Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions backend/app/agent/approval.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
DENY (never execute). Users can respond with yes/always/no/never to
control both immediate and future behavior.

Batch plan approval: when a user request triggers multiple tools, the
system presents a single plan message grouping auto and pending steps.
The user approves or rejects the entire batch with one response.
Sequential approval: when a user request triggers multiple tools, each
tool that requires approval gets its own prompt. The user approves or
rejects each tool independently.
"""

from __future__ import annotations
Expand Down Expand Up @@ -366,7 +366,7 @@ async def request_approval(
self._pending[user_id] = pending

if prompt is None:
prompt = _format_approval_message(tool_name, description)
prompt = format_approval_message(tool_name, description)
try:
from backend.app.bus import OutboundMessage as OMsg

Expand Down Expand Up @@ -506,7 +506,7 @@ class ApprovalClassification(BaseModel):
return result


def _format_approval_message(tool_name: str, description: str) -> str:
def format_approval_message(tool_name: str, description: str) -> str:
"""Build a plain-text approval prompt for the user."""
return f"I'd like to: {description}\n\nReply yes or no (always/never to remember your choice)"

Expand Down
154 changes: 70 additions & 84 deletions backend/app/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
from backend.app.agent.approval import (
ApprovalDecision,
PermissionLevel,
PlanStep,
format_plan_message,
format_approval_message,
get_approval_gate,
get_approval_store,
)
Expand Down Expand Up @@ -516,11 +515,11 @@ async def _execute_tool_round(

pre_validated.append((i, tool_obj, validated_args))

# -- Phase 2: batch approval then execute --------------------------
# -- Phase 2: approve then execute ------------------------------------
#
# Partition validated tools by permission level. Tools that need
# approval are batched into a single plan message so the user
# responds once (not once per tool).
# approval are prompted individually so the user can approve or
# deny each one independently.

_ToolEntry = tuple[int, Tool, dict[str, Any]]

Expand Down Expand Up @@ -563,101 +562,88 @@ async def _execute_tool_round(
approved_entries: list[_ToolEntry] = list(auto_entries)

if ask_entries:
auto_steps = [
PlanStep(
tool_name=t.name,
description=self._get_tool_permission(t, a)[2],
level=PermissionLevel.ALWAYS,
)
for _, t, a in auto_entries
]
ask_steps = [
PlanStep(tool_name=e[1].name, description=desc, level=PermissionLevel.ASK)
for e, _res, desc in ask_entries
]

plan_msg = format_plan_message("Here's what I need to do:", auto_steps, ask_steps)

# Persist the approval prompt to session history so it is
# visible in the web UI regardless of which channel
# originated the conversation.
if self._session_id:
await self._persist_approval_prompt(plan_msg)

if self._publish_outbound is not None and self._chat_id is not None:
# Publish approval prompt as SSE event for webchat clients.
# The publish_outbound path works for Telegram but is a no-op
# for webchat (WebChatChannel.send_text returns "").
if self._request_id:
from backend.app.bus import message_bus

await message_bus.publish_event(
self._request_id,
{"type": "approval_request", "content": plan_msg},
)
store = get_approval_store()
indexed_entries = list(enumerate(ask_entries))

gate = get_approval_gate()
decision = await gate.request_approval(
user_id=self.user.id,
tool_name=ask_entries[0][0][1].name,
description=plan_msg,
publish_outbound=self._publish_outbound,
channel=self._channel,
chat_id=self._chat_id,
prompt=plan_msg,
)
else:
decision = ApprovalDecision.DENIED
for pos, (entry, resource, description) in indexed_entries:
idx, tool_obj, v_args = entry
tc_req = parsed_calls[idx]

store = get_approval_store()
if decision in (ApprovalDecision.APPROVED, ApprovalDecision.ALWAYS_ALLOW):
approved_entries.extend(e for e, _res, _desc in ask_entries)
if decision == ApprovalDecision.ALWAYS_ALLOW:
for (_, tool_obj, _a), resource, _desc in ask_entries:
if self._publish_outbound is not None and self._chat_id is not None:
prompt = format_approval_message(tool_obj.name, description)

if self._session_id:
await self._persist_approval_prompt(prompt)

if self._request_id:
from backend.app.bus import message_bus

await message_bus.publish_event(
self._request_id,
{"type": "approval_request", "content": prompt},
)

gate = get_approval_gate()
decision = await gate.request_approval(
user_id=self.user.id,
tool_name=tool_obj.name,
description=description,
publish_outbound=self._publish_outbound,
channel=self._channel,
chat_id=self._chat_id,
prompt=prompt,
)
else:
decision = ApprovalDecision.DENIED

if decision in (ApprovalDecision.APPROVED, ApprovalDecision.ALWAYS_ALLOW):
approved_entries.append(entry)
if decision == ApprovalDecision.ALWAYS_ALLOW:
try:
store.set_permission(
self.user.id, tool_obj.name, PermissionLevel.ALWAYS, resource
)
except Exception:
logger.warning("Failed to persist ALWAYS for tool %s", tool_obj.name)
elif decision == ApprovalDecision.INTERRUPTED:
# User changed the subject. Don't persist any permission.
for (idx, _tool_obj, v_args), _resource, desc in ask_entries:
tc_req = parsed_calls[idx]
tool_tags = self._get_tool_tags(tc_req.name)
hint = _ERROR_KIND_HINTS[ToolErrorKind.INTERRUPTED]
msg = (
f"Tool request interrupted: the user moved on to a "
f'different topic instead of approving "{desc}". '
f"Do not proactively retry this tool; only call it "
f"again if the user explicitly asks.\n\n{hint}"
)
actions_taken.append(f"Interrupted: {tc_req.name}")
tool_call_records.append(
StoredToolInteraction(
tool_call_id=tc_req.id,
name=tc_req.name,
args=v_args,
result=msg,
is_error=True,
tags=set(tool_tags),

elif decision == ApprovalDecision.INTERRUPTED:
# User changed subject. Error this entry + all remaining.
for _p, (e, _r, d) in indexed_entries[pos:]:
i_rem, _t_rem, va_rem = e
tc_rem = parsed_calls[i_rem]
rem_tags = self._get_tool_tags(tc_rem.name)
hint = _ERROR_KIND_HINTS[ToolErrorKind.INTERRUPTED]
msg = (
f"Tool request interrupted: the user moved on to a "
f'different topic instead of approving "{d}". '
f"Do not proactively retry this tool; only call it "
f"again if the user explicitly asks.\n\n{hint}"
)
)
tool_results.append(
ToolResultMessage(tool_call_id=tc_req.id, content=msg, is_error=True)
)
else:
if decision == ApprovalDecision.ALWAYS_DENY:
for (_, tool_obj, _a), resource, _desc in ask_entries:
actions_taken.append(f"Interrupted: {tc_rem.name}")
tool_call_records.append(
StoredToolInteraction(
tool_call_id=tc_rem.id,
name=tc_rem.name,
args=va_rem,
result=msg,
is_error=True,
tags=set(rem_tags),
)
)
tool_results.append(
ToolResultMessage(tool_call_id=tc_rem.id, content=msg, is_error=True)
)
break

else: # DENIED / ALWAYS_DENY
if decision == ApprovalDecision.ALWAYS_DENY:
try:
store.set_permission(
self.user.id, tool_obj.name, PermissionLevel.DENY, resource
)
except Exception:
logger.warning("Failed to persist DENY for tool %s", tool_obj.name)

for (idx, _tool_obj, v_args), _resource, _desc in ask_entries:
tc_req = parsed_calls[idx]
tool_tags = self._get_tool_tags(tc_req.name)
hint = _ERROR_KIND_HINTS[ToolErrorKind.PERMISSION]
deny_msg = f"Error: permission denied for tool '{tc_req.name}'\n\n{hint}"
Expand Down
6 changes: 3 additions & 3 deletions tests/test_approval.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
ApprovalPolicy,
ApprovalStore,
PermissionLevel,
_format_approval_message,
_parse_approval_response,
format_approval_message,
get_approval_gate,
get_approval_store,
reset_approval_gate,
Expand Down Expand Up @@ -234,13 +234,13 @@ def test_unrecognized_returns_none(self, text: str) -> None:


# ---------------------------------------------------------------------------
# _format_approval_message
# format_approval_message
# ---------------------------------------------------------------------------


class TestFormatApprovalMessage:
def test_output_format(self) -> None:
msg = _format_approval_message("web_fetch", "fetch content from https://example.com")
msg = format_approval_message("web_fetch", "fetch content from https://example.com")
assert "fetch content from https://example.com" in msg
assert "yes" in msg
assert "no" in msg
Expand Down
Loading
Loading