Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions python/packages/kagent-adk/src/kagent/adk/_a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
from fastapi.responses import PlainTextResponse
from google.adk.agents import BaseAgent
from google.adk.apps import App
from google.adk.artifacts import InMemoryArtifactService
from google.adk.plugins import BasePlugin
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.artifacts import InMemoryArtifactService

from google.genai import types

from kagent.core.a2a import KAgentRequestContextBuilder, KAgentTaskStore
Expand Down
13 changes: 13 additions & 0 deletions python/packages/kagent-adk/src/kagent/adk/artifacts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .artifacts_toolset import ArtifactsToolset
from .return_artifacts_tool import ReturnArtifactsTool
from .session_path import clear_session_cache, get_session_path, initialize_session_path
from .stage_artifacts_tool import StageArtifactsTool

__all__ = [
"ArtifactsToolset",
"ReturnArtifactsTool",
"StageArtifactsTool",
"get_session_path",
"initialize_session_path",
"clear_session_cache",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from __future__ import annotations

import logging
from typing import List, Optional

try:
from typing_extensions import override
except ImportError:
from typing import override

from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools import BaseTool
from google.adk.tools.base_toolset import BaseToolset

from .return_artifacts_tool import ReturnArtifactsTool
from .stage_artifacts_tool import StageArtifactsTool

logger = logging.getLogger("kagent_adk." + __name__)


class ArtifactsToolset(BaseToolset):
"""Toolset for managing artifact upload and download workflows.

This toolset provides tools for the complete artifact lifecycle:
1. StageArtifactsTool - Download artifacts from artifact service to working directory
2. ReturnArtifactsTool - Upload generated files from working directory to artifact service

Artifacts enable file-based interactions:
- Users upload files via frontend → stored as artifacts
- StageArtifactsTool copies them to working directory for processing
- Processing tools (bash, skills, etc.) work with files on disk
- ReturnArtifactsTool saves generated outputs back as artifacts
- Users download results via frontend

This toolset is independent of skills and can be used with any processing workflow.
"""

def __init__(self):
"""Initialize the artifacts toolset."""
super().__init__()

# Create artifact lifecycle tools
self.stage_artifacts_tool = StageArtifactsTool()
self.return_artifacts_tool = ReturnArtifactsTool()

@override
async def get_tools(self, readonly_context: Optional[ReadonlyContext] = None) -> List[BaseTool]:
"""Get both artifact tools.

Returns:
List containing StageArtifactsTool and ReturnArtifactsTool.
"""
return [
self.stage_artifacts_tool,
self.return_artifacts_tool,
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""Tool for returning generated files from working directory to artifact service."""

from __future__ import annotations

import logging
import mimetypes
from pathlib import Path
from typing import Any, Dict, List

from google.adk.tools import BaseTool, ToolContext
from google.genai import types
from typing_extensions import override

from .session_path import get_session_path
from .stage_artifacts_tool import MAX_ARTIFACT_SIZE_BYTES

logger = logging.getLogger("kagent_adk." + __name__)


class ReturnArtifactsTool(BaseTool):
"""Save generated files from working directory to artifact service for user download.

This tool enables users to download outputs generated during processing.
Files are saved to the artifact service where they can be retrieved by the frontend.
"""

def __init__(self):
super().__init__(
name="return_artifacts",
description=(
"Save generated files from the working directory to the artifact service, "
"making them available for user download.\n\n"
"WORKFLOW:\n"
"1. Generate output files in the 'outputs/' directory\n"
"2. Use this tool to save those files to the artifact service\n"
"3. Users can then download the files via the frontend\n\n"
"USAGE EXAMPLE:\n"
"- bash('python scripts/analyze.py > outputs/report.txt')\n"
"- return_artifacts(file_paths=['outputs/report.txt'])\n"
" Returns: 'Saved 1 file(s): report.txt (v0, 15.2 KB)'\n\n"
"PARAMETERS:\n"
"- file_paths: List of relative paths from working directory (required)\n"
"- artifact_names: Optional custom names for artifacts (default: use filename)\n\n"
"BEST PRACTICES:\n"
"- Generate outputs in 'outputs/' directory for clarity\n"
"- Use descriptive filenames (they become artifact names)\n"
"- Return all outputs at once for efficiency"
),
)

def _get_declaration(self) -> types.FunctionDeclaration | None:
return types.FunctionDeclaration(
name=self.name,
description=self.description,
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"file_paths": types.Schema(
type=types.Type.ARRAY,
description=(
"List of relative file paths from the working directory to save as artifacts. "
"Example: ['outputs/report.pdf', 'outputs/data.csv']. "
"Files must exist in the working directory and be within size limits."
),
items=types.Schema(type=types.Type.STRING),
),
"artifact_names": types.Schema(
type=types.Type.ARRAY,
description=(
"Optional custom names for the artifacts. "
"If not provided, the filename will be used. "
"Must match the length of file_paths if provided."
),
items=types.Schema(type=types.Type.STRING),
),
},
required=["file_paths"],
),
)

@override
async def run_async(self, *, args: Dict[str, Any], tool_context: ToolContext) -> str:
file_paths: List[str] = args.get("file_paths", [])
artifact_names: List[str] = args.get("artifact_names", [])

if not file_paths:
return "Error: No file paths provided."

if artifact_names and len(artifact_names) != len(file_paths):
return "Error: artifact_names length must match file_paths length."

if not tool_context._invocation_context.artifact_service:
return "Error: Artifact service is not available in this context."

try:
working_dir = get_session_path(session_id=tool_context.session.id)

saved_artifacts = []
for idx, rel_path in enumerate(file_paths):
file_path = (working_dir / rel_path).resolve()

# Security: Ensure file is within working directory
if not file_path.is_relative_to(working_dir):
logger.warning(f"Skipping file outside working directory: {rel_path}")
continue

# Check file exists
if not file_path.exists():
logger.warning(f"File not found: {rel_path}")
continue

# Check file size
file_size = file_path.stat().st_size
if file_size > MAX_ARTIFACT_SIZE_BYTES:
size_mb = file_size / (1024 * 1024)
logger.warning(f"File too large: {rel_path} ({size_mb:.1f} MB)")
continue

# Determine artifact name
artifact_name = artifact_names[idx] if artifact_names else file_path.name

# Read file data and detect MIME type
file_data = file_path.read_bytes()
mime_type = self._detect_mime_type(file_path)

# Create artifact Part
artifact_part = types.Part.from_bytes(data=file_data, mime_type=mime_type)

# Save to artifact service
version = await tool_context.save_artifact(
filename=artifact_name,
artifact=artifact_part,
)

size_kb = file_size / 1024
saved_artifacts.append(f"{artifact_name} (v{version}, {size_kb:.1f} KB)")
logger.info(f"Saved artifact: {artifact_name} v{version} ({size_kb:.1f} KB)")

if not saved_artifacts:
return "No valid files were saved as artifacts."

return f"Saved {len(saved_artifacts)} file(s) for download:\n" + "\n".join(
f" • {artifact}" for artifact in saved_artifacts
)

except Exception as e:
logger.error("Error returning artifacts: %s", e, exc_info=True)
return f"An error occurred while returning artifacts: {e}"

def _detect_mime_type(self, file_path: Path) -> str:
"""Detect MIME type from file extension.

Args:
file_path: Path to the file

Returns:
MIME type string, defaults to 'application/octet-stream' if unknown
"""
mime_type, _ = mimetypes.guess_type(str(file_path))
return mime_type or "application/octet-stream"
106 changes: 106 additions & 0 deletions python/packages/kagent-adk/src/kagent/adk/artifacts/session_path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import logging
import tempfile
from pathlib import Path

logger = logging.getLogger("kagent_adk." + __name__)

# Cache of initialized session paths to avoid re-creating symlinks
_session_path_cache: dict[str, Path] = {}


def initialize_session_path(session_id: str, skills_directory: str) -> Path:
"""Initialize a session's working directory with skills symlink.

This is called by SkillsPlugin.before_agent_callback() to ensure the session
is set up before any tools run. Creates the directory structure and symlink
to the skills directory.

Directory structure:
/tmp/kagent/{session_id}/
├── skills/ -> symlink to skills_directory (read-only shared skills)
├── uploads/ -> staged user files (temporary)
└── outputs/ -> generated files for return

Args:
session_id: The unique ID of the current session.
skills_directory: Path to the shared skills directory.

Returns:
The resolved path to the session's root directory.
"""
# Return cached path if already initialized
if session_id in _session_path_cache:
return _session_path_cache[session_id]

# Initialize new session path
base_path = Path(tempfile.gettempdir()) / "kagent"
session_path = base_path / session_id

# Create working directories
(session_path / "uploads").mkdir(parents=True, exist_ok=True)
(session_path / "outputs").mkdir(parents=True, exist_ok=True)

# Create symlink to skills directory
skills_mount = Path(skills_directory)
skills_link = session_path / "skills"
if skills_mount.exists() and not skills_link.exists():
try:
skills_link.symlink_to(skills_mount)
logger.debug(f"Created symlink: {skills_link} -> {skills_mount}")
except FileExistsError:
# Symlink already exists (race condition from concurrent session setup)
pass
except Exception as e:
# Log but don't fail - skills can still be accessed via absolute path
logger.warning(f"Failed to create skills symlink for session {session_id}: {e}")

# Cache and return
resolved_path = session_path.resolve()
_session_path_cache[session_id] = resolved_path
return resolved_path


def get_session_path(session_id: str) -> Path:
"""Get the working directory path for a session.

This function retrieves the cached session path that was initialized by
SkillsPlugin. If the session hasn't been initialized (plugin not used),
it falls back to auto-initialization with default /skills directory.

Tools should call this function to get their working directory. The session
must be initialized by SkillsPlugin before tools run, which happens automatically
via the before_agent_callback() hook.

Args:
session_id: The unique ID of the current session.

Returns:
The resolved path to the session's root directory.

Note:
If session is not initialized, automatically initializes with /skills.
For custom skills directories, ensure SkillsPlugin is installed.
"""
# Return cached path if already initialized
if session_id in _session_path_cache:
return _session_path_cache[session_id]

# Fallback: auto-initialize with default /skills
logger.warning(
f"Session {session_id} not initialized by SkillsPlugin. "
f"Auto-initializing with default /skills. "
f"Install SkillsPlugin for custom skills directories."
)
return initialize_session_path(session_id, "/skills")


def clear_session_cache(session_id: str | None = None) -> None:
"""Clear cached session path(s).

Args:
session_id: Specific session to clear. If None, clears all cached sessions.
"""
if session_id:
_session_path_cache.pop(session_id, None)
else:
_session_path_cache.clear()
Loading
Loading