-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.py
85 lines (72 loc) · 3.35 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from pathlib import Path
from dotenv import load_dotenv
from openai import OpenAI
from agents.code_analyzer import analyze_code
from agents.input_guardrail import input_guardrail
from agents.response_calibration_agent import review_security_response, process_review
from agents.vuln_fixer import suggest_secure_fixes
from schemas.analysis import CodeAnalysisResponse, CalibrationResponse, CalibratedCodeAnalysisResponse
from schemas.fix import InsecureCodeFixResponse
from utils import load_top_50_rules, format_result_to_markdown
def run(openai_client, code_snippet, model, top50_rules):
# 1. Validate user intent
result, rationale = input_guardrail(openai_client, model, user_input=code_snippet)
if result != "success":
return {
"success": False,
"step": "guardrail",
"user_input": code_snippet,
"rationale": rationale
}
# 2. Analyze the code
code_analysis:CodeAnalysisResponse = analyze_code(openai_client, model=model, user_input=code_snippet, top50=top50_rules)
if code_analysis.secure:
return {
"success": True,
"step": "analyzer",
"secure": True,
"message": "The code is secure according to analysis."
}
# 3. Calibrate the analysis
calibration_response:CalibrationResponse = review_security_response(openai_client=openai_client, model=model, code_analysis=code_analysis)
calibrated_analysis:CalibratedCodeAnalysisResponse = process_review(code_analysis=code_analysis, calibration_response=calibration_response)
if calibrated_analysis.secure:
return {
"success": True,
"step": "calibration",
"secure": True,
"message": "The code is secure according to calibration. Initial findings were rejected or found to be speculative."
}
# 4. Generate secure code fixes
fix_suggestions:InsecureCodeFixResponse = suggest_secure_fixes(openai_client=openai_client, model=model, code=code_snippet, analysis=calibrated_analysis)
return {
"success": True,
"step": "fix_suggestions",
"secure": False,
"fixes": fix_suggestions
}
def test_with_code_file(filepath:str, label:str, openai_client:OpenAI,model:str, top50_rules:str ):
print(f"\n===== Running test: {label} =====")
with open(filepath, "r", encoding="utf-8") as f:
code = f.read()
try:
result = run(openai_client=openai_client, code_snippet=code, model=model, top50_rules=top50_rules)
return result
except Exception as e:
print(f"❌ Test '{label}' failed: {e}")
if __name__ == "__main__":
load_dotenv(override=True)
client = OpenAI()
model = "gpt-4o-2024-08-06"
top50_path = Path(__file__).parent / "data" / "top_50_vulnerabilities.md"
top50 = load_top_50_rules(filepath=top50_path)
test_files_dir = Path("code_samples")
output_dir = Path("model_outputs")
output_dir.mkdir(exist_ok=True)
for filepath in test_files_dir.glob("*.py"):
filename = filepath.name.replace(".py", "")
result: dict = test_with_code_file(str(filepath), label=filename, openai_client=client, model=model, top50_rules=top50)
print(result)
markdown = format_result_to_markdown(result=result)
output_path = output_dir / f"{filename}.md"
output_path.write_text(markdown, encoding="utf-8")