Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 47 additions & 16 deletions src/praisonai-agents/praisonaiagents/tools/duckdb_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import List, Dict, Any, Optional, Union, TYPE_CHECKING
from importlib import util
import json
import re

if TYPE_CHECKING:
import duckdb
Expand All @@ -29,6 +30,25 @@ def __init__(self, database: str = ':memory:'):
"""
self.database = database
self._conn = None

def _validate_identifier(self, identifier: str) -> str:
"""Validate and quote a SQL identifier to prevent injection.

Args:
identifier: Table or column name to validate

Returns:
Quoted identifier safe for SQL

Raises:
ValueError: If identifier contains invalid characters
"""
# Only allow alphanumeric characters, underscores, and dots
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)?$', identifier):
raise ValueError(f"Invalid identifier: {identifier}")

# Quote the identifier to handle reserved words
return f'"{identifier}"'

def _get_duckdb(self) -> Optional['duckdb']:
"""Get duckdb module, installing if needed"""
Expand Down Expand Up @@ -125,39 +145,50 @@ def load_csv(
if conn is None:
return False

# Check if table exists
exists = conn.execute(f"""
# Check if table exists using parameterized query
exists = conn.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='{table_name}'
""").fetchone() is not None
WHERE type='table' AND name=?
""", [table_name]).fetchone() is not None
Comment on lines +149 to +152
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix table existence check for DuckDB.

The query uses sqlite_master which is SQLite-specific, but this is a DuckDB tool. DuckDB uses different system tables.

Apply this fix for DuckDB compatibility:

-            exists = conn.execute("""
-                SELECT name FROM sqlite_master 
-                WHERE type='table' AND name=?
-            """, [table_name]).fetchone() is not None
+            exists = conn.execute("""
+                SELECT table_name FROM information_schema.tables 
+                WHERE table_name=?
+            """, [table_name]).fetchone() is not None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
exists = conn.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='{table_name}'
""").fetchone() is not None
WHERE type='table' AND name=?
""", [table_name]).fetchone() is not None
exists = conn.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_name=?
""", [table_name]).fetchone() is not None
🤖 Prompt for AI Agents
In src/praisonai-agents/praisonaiagents/tools/duckdb_tools.py around lines 149
to 152, the table existence check uses a SQLite-specific query on sqlite_master,
which is incompatible with DuckDB. Replace this query with a DuckDB-compatible
one by querying the information_schema.tables or the DuckDB system catalog to
check if the table exists. Adjust the SQL to use DuckDB's metadata tables and
ensure the parameterized query checks for the table name correctly.


if exists:
if if_exists == 'fail':
raise ValueError(f"Table {table_name} already exists")
elif if_exists == 'replace':
conn.execute(f"DROP TABLE IF EXISTS {table_name}")
# Validate and quote table name to prevent injection
safe_table = self._validate_identifier(table_name)
conn.execute(f"DROP TABLE IF EXISTS {safe_table}")
elif if_exists != 'append':
raise ValueError("if_exists must be 'fail', 'replace', or 'append'")

# Create table if needed
if not exists or if_exists == 'replace':
safe_table = self._validate_identifier(table_name)
if schema:
# Create table with schema
columns = ', '.join(f"{k} {v}" for k, v in schema.items())
conn.execute(f"CREATE TABLE {table_name} ({columns})")
# Create table with schema - validate column names
column_defs = []
for col_name, col_type in schema.items():
safe_col = self._validate_identifier(col_name)
# Validate column type to prevent injection
if not re.match(r'^[A-Z][A-Z0-9_]*(\([0-9,]+\))?$', col_type.upper()):
raise ValueError(f"Invalid column type: {col_type}")
Comment on lines +173 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The regex for validating column types is too restrictive. It doesn't allow for spaces within the parentheses, which is common for types like DECIMAL(10, 2). This will cause a ValueError for valid schema definitions.

The regex should be updated to accommodate optional whitespace.

if not re.match(r'^[A-Z][A-Z0-9_]*(\([0-9,\s]+\))?$', col_type.upper()):
    raise ValueError(f"Invalid column type: {col_type}")

column_defs.append(f"{safe_col} {col_type}")
columns = ', '.join(column_defs)
conn.execute(f"CREATE TABLE {safe_table} ({columns})")
else:
# Infer schema from CSV
# Infer schema from CSV - use parameterized query for filepath
conn.execute(f"""
CREATE TABLE {table_name} AS
SELECT * FROM read_csv_auto('{filepath}')
CREATE TABLE {safe_table} AS
SELECT * FROM read_csv_auto(?)
WHERE 1=0
""")
""", [filepath])

# Load data
# Load data - use validated table name and parameterized filepath
safe_table = self._validate_identifier(table_name)
conn.execute(f"""
INSERT INTO {table_name}
SELECT * FROM read_csv_auto('{filepath}')
""")
INSERT INTO {safe_table}
SELECT * FROM read_csv_auto(?)
""", [filepath])

return True

Expand Down
62 changes: 52 additions & 10 deletions src/praisonai-agents/praisonaiagents/tools/file_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,32 @@
class FileTools:
"""Tools for file operations including read, write, list, and information."""

@staticmethod
def _validate_path(filepath: str) -> str:
"""
Validate and normalize a file path to prevent path traversal attacks.

Args:
filepath: Path to validate

Returns:
str: Normalized absolute path

Raises:
ValueError: If path contains suspicious patterns
"""
# Normalize the path
normalized = os.path.normpath(filepath)
absolute = os.path.abspath(normalized)

# Check for suspicious patterns
if '..' in filepath or filepath.startswith('~'):
raise ValueError(f"Suspicious path pattern detected: {filepath}")
Comment on lines +42 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix path validation logic to prevent bypass.

The suspicious pattern check is performed on the original filepath before normalization, which could miss path traversal attempts that become apparent only after normalization.

Apply this fix to check patterns after normalization:

-        # Check for suspicious patterns
-        if '..' in filepath or filepath.startswith('~'):
-            raise ValueError(f"Suspicious path pattern detected: {filepath}")
+        # Check for suspicious patterns in normalized path
+        if '..' in normalized or '~' in normalized:
+            raise ValueError(f"Suspicious path pattern detected: {filepath}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Check for suspicious patterns
if '..' in filepath or filepath.startswith('~'):
raise ValueError(f"Suspicious path pattern detected: {filepath}")
# Check for suspicious patterns in normalized path
if '..' in normalized or '~' in normalized:
raise ValueError(f"Suspicious path pattern detected: {filepath}")
🤖 Prompt for AI Agents
In src/praisonai-agents/praisonaiagents/tools/file_tools.py around lines 42 to
44, the path validation checks for suspicious patterns on the original filepath
before normalization, which can be bypassed. To fix this, normalize the filepath
first and then perform the checks for '..' or '~' patterns on the normalized
path to ensure path traversal attempts are detected reliably.


# Additional check: ensure the resolved path doesn't escape expected boundaries
# This is a basic check - in production, you'd want to define allowed directories
return absolute
Comment on lines +38 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current path validation logic in _validate_path is an improvement but is not sufficient to prevent all path traversal attacks. An attacker can still access any file on the filesystem by providing an absolute path (e.g., /etc/passwd), as the check only looks for .. and ~ in the path string.

A more robust approach is to define a safe base directory and ensure that the resolved absolute path of any file operation is within that directory. This would prevent the tool from accessing unintended files outside its designated workspace.

agent_workspace = os.path.abspath(os.getcwd())

if os.path.isabs(filepath):
    raise ValueError(f"Absolute paths are not permitted: {filepath}")

full_path = os.path.normpath(os.path.join(agent_workspace, filepath))

if not full_path.startswith(agent_workspace):
    raise ValueError(f"Path traversal attempt detected: {filepath}")

return full_path

Comment on lines +46 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance boundary validation for production use.

The current implementation lacks enforcement of allowed directory boundaries, which is critical for preventing unauthorized file access.

Consider implementing a more robust boundary check:

+        # Define allowed base directories (configure as needed)
+        allowed_bases = [os.getcwd(), '/tmp', '/var/tmp']  # Example
+        
         # Additional check: ensure the resolved path doesn't escape expected boundaries
-        # This is a basic check - in production, you'd want to define allowed directories
+        if not any(absolute.startswith(base) for base in allowed_bases):
+            raise ValueError(f"Path outside allowed directories: {filepath}")
+            
         return absolute
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Additional check: ensure the resolved path doesn't escape expected boundaries
# This is a basic check - in production, you'd want to define allowed directories
return absolute
# Define allowed base directories (configure as needed)
allowed_bases = [os.getcwd(), '/tmp', '/var/tmp'] # Example
# Additional check: ensure the resolved path doesn't escape expected boundaries
if not any(absolute.startswith(base) for base in allowed_bases):
raise ValueError(f"Path outside allowed directories: {filepath}")
return absolute
🤖 Prompt for AI Agents
In src/praisonai-agents/praisonaiagents/tools/file_tools.py around lines 46 to
48, the current code returns the absolute path without verifying if it resides
within allowed directories. To fix this, implement a check that compares the
resolved absolute path against a predefined list of allowed base directories and
only return the path if it is within these boundaries; otherwise, raise an
exception or handle the error to prevent unauthorized file access.


@staticmethod
def read_file(filepath: str, encoding: str = 'utf-8') -> str:
"""
Expand All @@ -34,7 +60,9 @@ def read_file(filepath: str, encoding: str = 'utf-8') -> str:
str: Content of the file
"""
try:
with open(filepath, 'r', encoding=encoding) as f:
# Validate path to prevent traversal attacks
safe_path = FileTools._validate_path(filepath)
with open(safe_path, 'r', encoding=encoding) as f:
return f.read()
except Exception as e:
error_msg = f"Error reading file {filepath}: {str(e)}"
Expand All @@ -56,9 +84,11 @@ def write_file(filepath: str, content: str, encoding: str = 'utf-8') -> bool:
bool: True if successful, False otherwise
"""
try:
# Validate path to prevent traversal attacks
safe_path = FileTools._validate_path(filepath)
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, 'w', encoding=encoding) as f:
os.makedirs(os.path.dirname(safe_path), exist_ok=True)
with open(safe_path, 'w', encoding=encoding) as f:
f.write(content)
return True
except Exception as e:
Expand All @@ -79,7 +109,9 @@ def list_files(directory: str, pattern: Optional[str] = None) -> List[Dict[str,
List[Dict]: List of file information dictionaries
"""
try:
path = Path(directory)
# Validate directory path
safe_dir = FileTools._validate_path(directory)
path = Path(safe_dir)
if pattern:
files = path.glob(pattern)
else:
Expand Down Expand Up @@ -114,7 +146,9 @@ def get_file_info(filepath: str) -> Dict[str, Union[str, int]]:
Dict: File information including size, dates, etc.
"""
try:
path = Path(filepath)
# Validate file path
safe_path = FileTools._validate_path(filepath)
path = Path(safe_path)
if not path.exists():
return {'error': f'File not found: {filepath}'}

Expand Down Expand Up @@ -149,9 +183,12 @@ def copy_file(src: str, dst: str) -> bool:
bool: True if successful, False otherwise
"""
try:
# Validate paths to prevent traversal attacks
safe_src = FileTools._validate_path(src)
safe_dst = FileTools._validate_path(dst)
# Create destination directory if it doesn't exist
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copy2(src, dst)
os.makedirs(os.path.dirname(safe_dst), exist_ok=True)
shutil.copy2(safe_src, safe_dst)
return True
except Exception as e:
error_msg = f"Error copying file from {src} to {dst}: {str(e)}"
Expand All @@ -172,9 +209,12 @@ def move_file(src: str, dst: str) -> bool:
bool: True if successful, False otherwise
"""
try:
# Validate paths to prevent traversal attacks
safe_src = FileTools._validate_path(src)
safe_dst = FileTools._validate_path(dst)
# Create destination directory if it doesn't exist
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.move(src, dst)
os.makedirs(os.path.dirname(safe_dst), exist_ok=True)
shutil.move(safe_src, safe_dst)
return True
except Exception as e:
error_msg = f"Error moving file from {src} to {dst}: {str(e)}"
Expand All @@ -194,7 +234,9 @@ def delete_file(filepath: str) -> bool:
bool: True if successful, False otherwise
"""
try:
os.remove(filepath)
# Validate path to prevent traversal attacks
safe_path = FileTools._validate_path(filepath)
os.remove(safe_path)
return True
except Exception as e:
error_msg = f"Error deleting file {filepath}: {str(e)}"
Expand Down
88 changes: 84 additions & 4 deletions src/praisonai-agents/praisonaiagents/tools/python_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,100 @@ def execute_code(
timeout: int = 30,
max_output_size: int = 10000
) -> Dict[str, Any]:
"""Execute Python code safely."""
"""Execute Python code safely with restricted builtins."""
try:
# Set up execution environment
# Create safe builtins - restricted set of functions
safe_builtins = {
# Basic functions
'print': print,
'len': len,
'range': range,
'enumerate': enumerate,
'zip': zip,
'map': map,
'filter': filter,
'sum': sum,
'min': min,
'max': max,
'abs': abs,
'round': round,
'sorted': sorted,
'reversed': reversed,
'any': any,
'all': all,
# Type constructors
'int': int,
'float': float,
'str': str,
'bool': bool,
'list': list,
'tuple': tuple,
'dict': dict,
'set': set,
# Math functions
'pow': pow,
'divmod': divmod,
# Exceptions
'Exception': Exception,
'ValueError': ValueError,
'TypeError': TypeError,
'KeyError': KeyError,
'IndexError': IndexError,
'RuntimeError': RuntimeError,
# Other safe functions
'isinstance': isinstance,
'type': type,
'hasattr': hasattr,
'getattr': getattr,
'setattr': setattr,
'dir': dir,
'help': help,
# Disable dangerous functions
'__import__': None,
'eval': None,
'exec': None,
'compile': None,
'open': None,
'input': None,
'globals': None,
'locals': None,
'vars': None,
}

# Set up execution environment with safe builtins
if globals_dict is None:
globals_dict = {'__builtins__': __builtins__}
globals_dict = {'__builtins__': safe_builtins}
else:
# Override builtins in provided globals
globals_dict['__builtins__'] = safe_builtins

if locals_dict is None:
locals_dict = {}

# Security check: validate code doesn't contain dangerous patterns
dangerous_patterns = [
'__import__', 'import ', 'from ', 'exec', 'eval',
'compile', 'open(', 'file(', 'input(', 'raw_input',
'__subclasses__', '__bases__', '__globals__', '__code__',
'__class__', 'globals(', 'locals(', 'vars('
]

code_lower = code.lower()
for pattern in dangerous_patterns:
if pattern.lower() in code_lower:
return {
'result': None,
'stdout': '',
'stderr': f'Security Error: Code contains restricted pattern: {pattern}',
'success': False
}
Comment on lines +120 to +135
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve dangerous pattern detection accuracy.

The current pattern detection has some issues that could lead to false positives or bypasses:

  1. Mixed case sensitivity handling
  2. Overly broad patterns like 'from ' that could block legitimate code
  3. Missing some potential bypass patterns

Consider this more precise pattern detection:

-            dangerous_patterns = [
-                '__import__', 'import ', 'from ', 'exec', 'eval', 
-                'compile', 'open(', 'file(', 'input(', 'raw_input',
-                '__subclasses__', '__bases__', '__globals__', '__code__',
-                '__class__', 'globals(', 'locals(', 'vars('
-            ]
-            
-            code_lower = code.lower()
-            for pattern in dangerous_patterns:
-                if pattern.lower() in code_lower:
+            # Use more precise regex patterns to reduce false positives
+            dangerous_patterns = [
+                r'\b__import__\b',
+                r'\bimport\s+\w+',
+                r'\bfrom\s+\w+\s+import\b',
+                r'\bexec\s*\(',
+                r'\beval\s*\(',
+                r'\bcompile\s*\(',
+                r'\bopen\s*\(',
+                r'\bglobals\s*\(',
+                r'\blocals\s*\(',
+                r'__\w+__'  # Dunder attributes
+            ]
+            
+            import re
+            for pattern in dangerous_patterns:
+                if re.search(pattern, code, re.IGNORECASE):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dangerous_patterns = [
'__import__', 'import ', 'from ', 'exec', 'eval',
'compile', 'open(', 'file(', 'input(', 'raw_input',
'__subclasses__', '__bases__', '__globals__', '__code__',
'__class__', 'globals(', 'locals(', 'vars('
]
code_lower = code.lower()
for pattern in dangerous_patterns:
if pattern.lower() in code_lower:
return {
'result': None,
'stdout': '',
'stderr': f'Security Error: Code contains restricted pattern: {pattern}',
'success': False
}
# Use more precise regex patterns to reduce false positives
dangerous_patterns = [
r'\b__import__\b',
r'\bimport\s+\w+',
r'\bfrom\s+\w+\s+import\b',
r'\bexec\s*\(',
r'\beval\s*\(',
r'\bcompile\s*\(',
r'\bopen\s*\(',
r'\bglobals\s*\(',
r'\blocals\s*\(',
r'__\w+__' # Dunder attributes
]
import re
for pattern in dangerous_patterns:
if re.search(pattern, code, re.IGNORECASE):
return {
'result': None,
'stdout': '',
'stderr': f'Security Error: Code contains restricted pattern: {pattern}',
'success': False
}
🤖 Prompt for AI Agents
In src/praisonai-agents/praisonaiagents/tools/python_tools.py around lines 120
to 135, improve the dangerous pattern detection by making it case-sensitive and
more precise to avoid false positives and bypasses. Replace broad patterns like
'from ' with more specific ones, and add additional known bypass patterns. Use
exact substring matching or regex with word boundaries instead of simple
lowercase containment checks to increase accuracy.


# Capture output
stdout_buffer = io.StringIO()
stderr_buffer = io.StringIO()

try:
# Compile code
# Compile code with restricted mode
compiled_code = compile(code, '<string>', 'exec')

# Execute with output capture
Expand Down
19 changes: 8 additions & 11 deletions src/praisonai-agents/praisonaiagents/tools/shell_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def execute_command(
command: str,
cwd: Optional[str] = None,
timeout: int = 30,
shell: bool = False,
env: Optional[Dict[str, str]] = None,
max_output_size: int = 10000
) -> Dict[str, Union[str, int, bool]]:
Expand All @@ -50,22 +49,20 @@ def execute_command(
command: Command to execute
cwd: Working directory
timeout: Maximum execution time in seconds
shell: Whether to run command in shell
env: Environment variables
max_output_size: Maximum output size in bytes

Returns:
Dictionary with execution results
"""
try:
# Split command if not using shell
if not shell:
# Use shlex.split with appropriate posix flag
if platform.system() == 'Windows':
# Use shlex with posix=False for Windows to handle quotes properly
command = shlex.split(command, posix=False)
else:
command = shlex.split(command)
# Always split command for safety (no shell execution)
# Use shlex.split with appropriate posix flag
if platform.system() == 'Windows':
# Use shlex with posix=False for Windows to handle quotes properly
command = shlex.split(command, posix=False)
else:
command = shlex.split(command)

# Set up process environment
process_env = os.environ.copy()
Expand All @@ -79,7 +76,7 @@ def execute_command(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=cwd,
shell=shell,
shell=False, # Always use shell=False for security
env=process_env,
text=True
)
Expand Down
Loading
Loading