Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions patchwork/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,8 @@ def __init__(self, inputs: DataPoint):
Initializes the step.
:param inputs: a dictionary of inputs
"""

# check if the inputs have the required keys
if self.__input_class is not None:
missing_keys = self.__input_class.__required_keys__.difference(inputs.keys())
if len(missing_keys) > 0:
raise ValueError(f"Missing required data: {list(missing_keys)}")
self.test_inputs(inputs)

# store the inputs
self.inputs = inputs
Expand Down Expand Up @@ -78,6 +74,13 @@ def __init_subclass__(cls, **kwargs):
else:
cls.__output_class = None

@classmethod
def test_inputs(cls, inputs: DataPoint):
if cls.__input_class is not None:
missing_keys = cls.__input_class.__required_keys__.difference(inputs.keys())
if len(missing_keys) > 0:
raise ValueError(f"Missing required data: {list(missing_keys)}")

def __managed_run(self, *args, **kwargs) -> Any:
self.debug(self.inputs)
logger.info(f"Run started {self.__step_name}")
Expand Down
57 changes: 57 additions & 0 deletions patchwork/steps/CallCommand/CallCommand.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from __future__ import annotations

import shlex
import shutil
import subprocess
from pathlib import Path

from patchwork.logger import logger
from patchwork.step import Step, StepStatus
from patchwork.steps.CallCommand.typed import CallCommandInputs, CallCommandOutputs


class CallCommand(Step, input_class=CallCommandInputs, output_class=CallCommandOutputs):
def __init__(self, inputs: dict):
super().__init__(inputs)
self.command = shutil.which(inputs["command"])
if self.command is None:
raise ValueError(f"Command `{inputs['command']}` not found in PATH")
self.command_args = shlex.split(inputs.get("command_args", ""))
self.working_dir = inputs.get("working_dir", Path.cwd())
self.env = self.__parse_env_text(inputs.get("env", ""))

@staticmethod
def __parse_env_text(env_text: str) -> dict[str, str]:
env_spliter = shlex.shlex(env_text, posix=True)
env_spliter.whitespace_split = True
env_spliter.whitespace += ";"

env: dict[str, str] = dict()
for env_assign in env_spliter:
env_assign_spliter = shlex.shlex(env_assign, posix=True)
env_assign_spliter.whitespace_split = True
env_assign_spliter.whitespace += "="
env_parts = list(env_assign_spliter)
if len(env_parts) < 1:
continue

env_assign_target = env_parts[0]
if len(env_parts) < 2:
logger.error(f"{env_assign_target} is not assigned anything, skipping...")
continue
if len(env_parts) > 2:
logger.error(f"{env_assign_target} has more than 1 assignment, skipping...")
continue
env[env_assign_target] = env_parts[1]

return env

def run(self) -> dict:
cmd = [self.command, *self.command_args]
p = subprocess.run(cmd, capture_output=True, text=True, cwd=self.working_dir, env=self.env)
try:
p.check_returncode()
return dict(stdout_output=p.stdout)
except subprocess.CalledProcessError as e:
self.set_status(StepStatus.FAILED, f"`{self.command} {self.command_args}` failed with stdout:\n{p.stdout}\nstderr:\n{e.stderr}")
return dict()
Empty file.
16 changes: 16 additions & 0 deletions patchwork/steps/CallCommand/typed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing_extensions import Annotated, TypedDict

from patchwork.common.utils.step_typing import StepTypeConfig


class __RequiredCallCommandInputs(TypedDict):
command: str

class CallCommandInputs(__RequiredCallCommandInputs, total=False):
command_args: str
working_dir: Annotated[str, StepTypeConfig(is_path=True)]
env: str


class CallCommandOutputs(TypedDict):
stdout_output: str
49 changes: 49 additions & 0 deletions patchwork/steps/ScanPSFuzz/ScanPSFuzz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import annotations

import subprocess

from patchwork.logger import logger
from patchwork.step import Step, DataPoint
from patchwork.steps import CallCommand
from patchwork.steps.ScanPSFuzz.typed import ScanPSFuzzInputs, ScanPSFuzzOutputs


class ScanPSFuzz(Step, input_class=ScanPSFuzzInputs, output_class=ScanPSFuzzOutputs):
def __init__(self, inputs: dict):
if not self.__is_ps_fuzz_installed():
raise ValueError("""\
`prompt-security-fuzzer` is not installed. Please install with the following instructions:
1. Install pipx: https://github.com/pypa/pipx
2. pipx install prompt-security-fuzzer
3. pipx inject prompt-security-fuzzer setuptools
""")
super().__init__(inputs)
wrapped_input = dict(
command="prompt-security-fuzzer",
command_args=f'-b {inputs["prompt_file_path"]}',
env=f'OPENAI_API_KEY={inputs["openai_api_key"]}'
)

working_dir = inputs.get("working_dir")
if working_dir is not None:
wrapped_input["working_dir"] = working_dir

self.inner_step = CallCommand(wrapped_input)

@staticmethod
def __is_ps_fuzz_installed():
try:
subprocess.run(["prompt-security-fuzzer", "-h"], capture_output=True, check=True)
return True
except subprocess.CalledProcessError as e:
err = e
except FileNotFoundError as e:
err = e
# If the command fails, prompt-security-fuzzer is not installed
logger.info(f"prompt-security-fuzzer is not installed: {err}")
return False

def run(self) -> DataPoint:
rv = self.inner_step.run()
self.set_status(self.inner_step.status, self.inner_step.status_message)
return rv
Empty file.
15 changes: 15 additions & 0 deletions patchwork/steps/ScanPSFuzz/typed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing_extensions import Annotated, TypedDict

from patchwork.common.utils.step_typing import StepTypeConfig


class __RequiredScanPSFuzzInputs(TypedDict):
prompt_file_path: Annotated[str, StepTypeConfig(is_path=True)]
openai_api_key: Annotated[str, StepTypeConfig(is_config=True)]

class ScanPSFuzzInputs(__RequiredScanPSFuzzInputs, total=False):
working_dir: Annotated[str, StepTypeConfig(is_path=True)]


class ScanPSFuzzOutputs(TypedDict):
stdout_output: str
4 changes: 4 additions & 0 deletions patchwork/steps/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from patchwork.steps.AnalyzeImpact.AnalyzeImpact import AnalyzeImpact
from patchwork.steps.CallAPI.CallAPI import CallAPI
from patchwork.steps.CallCode2Prompt.CallCode2Prompt import CallCode2Prompt
from patchwork.steps.CallCommand.CallCommand import CallCommand
from patchwork.steps.CallLLM.CallLLM import CallLLM
from patchwork.steps.Combine.Combine import Combine
from patchwork.steps.CommitChanges.CommitChanges import CommitChanges
Expand Down Expand Up @@ -41,6 +42,7 @@
from patchwork.steps.ReadPRDiffs.ReadPRDiffs import ReadPRDiffs
from patchwork.steps.ReadPRs.ReadPRs import ReadPRs
from patchwork.steps.ScanDepscan.ScanDepscan import ScanDepscan
from patchwork.steps.ScanPSFuzz.ScanPSFuzz import ScanPSFuzz
from patchwork.steps.ScanSemgrep.ScanSemgrep import ScanSemgrep
from patchwork.steps.SimplifiedLLM.SimplifiedLLM import SimplifiedLLM
from patchwork.steps.SimplifiedLLMOnce.SimplifiedLLMOnce import SimplifiedLLMOnce
Expand All @@ -57,6 +59,7 @@
"AnalyzeImpact",
"CallAPI",
"CallCode2Prompt",
"CallCommand",
"CallLLM",
"Combine",
"CommitChanges",
Expand Down Expand Up @@ -88,6 +91,7 @@
"ReadPRDiffsPB",
"ReadPRs",
"ScanDepscan",
"ScanPSFuzz"
"ScanSemgrep",
"SimplifiedLLM",
"SimplifiedLLMOnce",
Expand Down
Loading
Loading