Skip to content

Commit 1c3c7a8

Browse files
authored
Merge pull request #23 from FuzzingLabs/feature/python-sast-workflow
feat: Add Python SAST workflow (Issue #5)
2 parents 6e4241a + 66e797a commit 1c3c7a8

File tree

11 files changed

+1553
-2
lines changed

11 files changed

+1553
-2
lines changed

backend/toolbox/modules/analyzer/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,7 @@
1010
# Additional attribution and requirements are provided in the NOTICE file.
1111

1212
from .security_analyzer import SecurityAnalyzer
13+
from .bandit_analyzer import BanditAnalyzer
14+
from .mypy_analyzer import MypyAnalyzer
1315

14-
__all__ = ["SecurityAnalyzer"]
16+
__all__ = ["SecurityAnalyzer", "BanditAnalyzer", "MypyAnalyzer"]
Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
"""
2+
Bandit Analyzer Module - Analyzes Python code for security issues using Bandit
3+
"""
4+
5+
# Copyright (c) 2025 FuzzingLabs
6+
#
7+
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
8+
# at the root of this repository for details.
9+
#
10+
# After the Change Date (four years from publication), this version of the
11+
# Licensed Work will be made available under the Apache License, Version 2.0.
12+
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
13+
#
14+
# Additional attribution and requirements are provided in the NOTICE file.
15+
16+
import asyncio
17+
import json
18+
import logging
19+
import time
20+
from pathlib import Path
21+
from typing import Dict, Any, List
22+
23+
try:
24+
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
25+
except ImportError:
26+
try:
27+
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
28+
except ImportError:
29+
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
30+
31+
logger = logging.getLogger(__name__)
32+
33+
34+
class BanditAnalyzer(BaseModule):
35+
"""
36+
Analyzes Python code for security issues using Bandit.
37+
38+
This module:
39+
- Runs Bandit security linter on Python files
40+
- Detects common security issues (SQL injection, hardcoded secrets, etc.)
41+
- Reports findings with severity levels
42+
"""
43+
44+
# Severity mapping from Bandit levels to our standard
45+
SEVERITY_MAP = {
46+
"LOW": "low",
47+
"MEDIUM": "medium",
48+
"HIGH": "high"
49+
}
50+
51+
def get_metadata(self) -> ModuleMetadata:
52+
"""Get module metadata"""
53+
return ModuleMetadata(
54+
name="bandit_analyzer",
55+
version="1.0.0",
56+
description="Analyzes Python code for security issues using Bandit",
57+
author="FuzzForge Team",
58+
category="analyzer",
59+
tags=["python", "security", "bandit", "sast"],
60+
input_schema={
61+
"severity_level": {
62+
"type": "string",
63+
"enum": ["low", "medium", "high"],
64+
"description": "Minimum severity level to report",
65+
"default": "low"
66+
},
67+
"confidence_level": {
68+
"type": "string",
69+
"enum": ["low", "medium", "high"],
70+
"description": "Minimum confidence level to report",
71+
"default": "medium"
72+
},
73+
"exclude_tests": {
74+
"type": "boolean",
75+
"description": "Exclude test files from analysis",
76+
"default": True
77+
},
78+
"skip_ids": {
79+
"type": "array",
80+
"items": {"type": "string"},
81+
"description": "List of Bandit test IDs to skip",
82+
"default": []
83+
}
84+
},
85+
output_schema={
86+
"findings": {
87+
"type": "array",
88+
"description": "List of security issues found by Bandit"
89+
}
90+
},
91+
requires_workspace=True
92+
)
93+
94+
def validate_config(self, config: Dict[str, Any]) -> bool:
95+
"""Validate module configuration"""
96+
severity = config.get("severity_level", "low")
97+
if severity not in ["low", "medium", "high"]:
98+
raise ValueError("severity_level must be one of: low, medium, high")
99+
100+
confidence = config.get("confidence_level", "medium")
101+
if confidence not in ["low", "medium", "high"]:
102+
raise ValueError("confidence_level must be one of: low, medium, high")
103+
104+
skip_ids = config.get("skip_ids", [])
105+
if not isinstance(skip_ids, list):
106+
raise ValueError("skip_ids must be a list")
107+
108+
return True
109+
110+
async def _run_bandit(
111+
self,
112+
workspace: Path,
113+
severity_level: str,
114+
confidence_level: str,
115+
exclude_tests: bool,
116+
skip_ids: List[str]
117+
) -> Dict[str, Any]:
118+
"""
119+
Run Bandit on the workspace.
120+
121+
Args:
122+
workspace: Path to workspace
123+
severity_level: Minimum severity to report
124+
confidence_level: Minimum confidence to report
125+
exclude_tests: Whether to exclude test files
126+
skip_ids: List of test IDs to skip
127+
128+
Returns:
129+
Bandit JSON output as dict
130+
"""
131+
try:
132+
# Build bandit command
133+
cmd = [
134+
"bandit",
135+
"-r", str(workspace),
136+
"-f", "json",
137+
"-ll", # Report all findings (we'll filter later)
138+
]
139+
140+
# Add exclude patterns for test files
141+
if exclude_tests:
142+
cmd.extend(["-x", "*/test_*.py,*/tests/*,*_test.py"])
143+
144+
# Add skip IDs if specified
145+
if skip_ids:
146+
cmd.extend(["-s", ",".join(skip_ids)])
147+
148+
logger.info(f"Running Bandit on: {workspace}")
149+
process = await asyncio.create_subprocess_exec(
150+
*cmd,
151+
stdout=asyncio.subprocess.PIPE,
152+
stderr=asyncio.subprocess.PIPE
153+
)
154+
155+
stdout, stderr = await process.communicate()
156+
157+
# Bandit returns non-zero if issues found, which is expected
158+
if process.returncode not in [0, 1]:
159+
logger.error(f"Bandit failed: {stderr.decode()}")
160+
return {"results": []}
161+
162+
# Parse JSON output
163+
result = json.loads(stdout.decode())
164+
return result
165+
166+
except Exception as e:
167+
logger.error(f"Error running Bandit: {e}")
168+
return {"results": []}
169+
170+
def _should_include_finding(
171+
self,
172+
issue: Dict[str, Any],
173+
min_severity: str,
174+
min_confidence: str
175+
) -> bool:
176+
"""
177+
Determine if a Bandit issue should be included based on severity/confidence.
178+
179+
Args:
180+
issue: Bandit issue dict
181+
min_severity: Minimum severity threshold
182+
min_confidence: Minimum confidence threshold
183+
184+
Returns:
185+
True if issue should be included
186+
"""
187+
severity_order = ["low", "medium", "high"]
188+
issue_severity = issue.get("issue_severity", "LOW").lower()
189+
issue_confidence = issue.get("issue_confidence", "LOW").lower()
190+
191+
severity_meets_threshold = severity_order.index(issue_severity) >= severity_order.index(min_severity)
192+
confidence_meets_threshold = severity_order.index(issue_confidence) >= severity_order.index(min_confidence)
193+
194+
return severity_meets_threshold and confidence_meets_threshold
195+
196+
def _convert_to_findings(
197+
self,
198+
bandit_result: Dict[str, Any],
199+
workspace: Path,
200+
min_severity: str,
201+
min_confidence: str
202+
) -> List[ModuleFinding]:
203+
"""
204+
Convert Bandit results to ModuleFindings.
205+
206+
Args:
207+
bandit_result: Bandit JSON output
208+
workspace: Workspace path for relative paths
209+
min_severity: Minimum severity to include
210+
min_confidence: Minimum confidence to include
211+
212+
Returns:
213+
List of ModuleFindings
214+
"""
215+
findings = []
216+
217+
for issue in bandit_result.get("results", []):
218+
# Filter by severity and confidence
219+
if not self._should_include_finding(issue, min_severity, min_confidence):
220+
continue
221+
222+
# Extract issue details
223+
test_id = issue.get("test_id", "B000")
224+
test_name = issue.get("test_name", "unknown")
225+
issue_text = issue.get("issue_text", "No description")
226+
severity = self.SEVERITY_MAP.get(issue.get("issue_severity", "LOW"), "low")
227+
228+
# File location
229+
filename = issue.get("filename", "")
230+
line_number = issue.get("line_number", 0)
231+
code = issue.get("code", "")
232+
233+
# Try to get relative path
234+
try:
235+
file_path = Path(filename)
236+
rel_path = file_path.relative_to(workspace)
237+
except (ValueError, TypeError):
238+
rel_path = Path(filename).name
239+
240+
# Create finding
241+
finding = self.create_finding(
242+
title=f"{test_name} ({test_id})",
243+
description=issue_text,
244+
severity=severity,
245+
category="security-issue",
246+
file_path=str(rel_path),
247+
line_start=line_number,
248+
line_end=line_number,
249+
code_snippet=code.strip() if code else None,
250+
recommendation=f"Review and fix the security issue identified by Bandit test {test_id}",
251+
metadata={
252+
"test_id": test_id,
253+
"test_name": test_name,
254+
"confidence": issue.get("issue_confidence", "LOW").lower(),
255+
"cwe": issue.get("issue_cwe", {}).get("id") if issue.get("issue_cwe") else None,
256+
"more_info": issue.get("more_info", "")
257+
}
258+
)
259+
findings.append(finding)
260+
261+
return findings
262+
263+
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
264+
"""
265+
Execute the Bandit analyzer module.
266+
267+
Args:
268+
config: Module configuration
269+
workspace: Path to workspace
270+
271+
Returns:
272+
ModuleResult with security findings
273+
"""
274+
start_time = time.time()
275+
metadata = self.get_metadata()
276+
277+
# Validate inputs
278+
self.validate_config(config)
279+
self.validate_workspace(workspace)
280+
281+
# Get configuration
282+
severity_level = config.get("severity_level", "low")
283+
confidence_level = config.get("confidence_level", "medium")
284+
exclude_tests = config.get("exclude_tests", True)
285+
skip_ids = config.get("skip_ids", [])
286+
287+
# Run Bandit
288+
logger.info("Starting Bandit analysis...")
289+
bandit_result = await self._run_bandit(
290+
workspace,
291+
severity_level,
292+
confidence_level,
293+
exclude_tests,
294+
skip_ids
295+
)
296+
297+
# Convert to findings
298+
findings = self._convert_to_findings(
299+
bandit_result,
300+
workspace,
301+
severity_level,
302+
confidence_level
303+
)
304+
305+
# Calculate summary
306+
severity_counts = {}
307+
for finding in findings:
308+
sev = finding.severity
309+
severity_counts[sev] = severity_counts.get(sev, 0) + 1
310+
311+
execution_time = time.time() - start_time
312+
313+
return ModuleResult(
314+
module=metadata.name,
315+
version=metadata.version,
316+
status="success",
317+
execution_time=execution_time,
318+
findings=findings,
319+
summary={
320+
"total_issues": len(findings),
321+
"by_severity": severity_counts,
322+
"files_analyzed": len(set(f.file_path for f in findings if f.file_path))
323+
},
324+
metadata={
325+
"bandit_version": bandit_result.get("generated_at", "unknown"),
326+
"metrics": bandit_result.get("metrics", {})
327+
}
328+
)

0 commit comments

Comments
 (0)