Skip to content
This repository was archived by the owner on Oct 28, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true

- name: Log in to the Container registry
if: ${{ github.event_name != 'pull_request' }}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
submodules: true
- name: Set up Python
uses: actions/setup-python@v5
with:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ jobs:

steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@8d9ed9ac5c53483de85588cdf95a591a75ab9f55 # v5.5.0
Expand Down Expand Up @@ -53,6 +55,8 @@ jobs:

steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@8d9ed9ac5c53483de85588cdf95a591a75ab9f55 # v5.5.0
Expand Down
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "src/semgrep_mcp/semgrep_interfaces"]
path = src/semgrep_mcp/semgrep_interfaces
url = https://github.com/semgrep/semgrep-interfaces.git
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,8 @@ max-complexity = 10
packages = [
"src/semgrep_mcp",
]

[tool.pyright]
exclude = [
"src/semgrep_mcp/semgrep_interfaces"
]
251 changes: 251 additions & 0 deletions src/semgrep_mcp/semgrep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
import asyncio
import json
import os
import subprocess
from typing import Any

from mcp.shared.exceptions import McpError
from mcp.types import INTERNAL_ERROR, ErrorData

from semgrep_mcp.models import CodeFile
from semgrep_mcp.semgrep_interfaces.semgrep_output_v1 import CliOutput

################################################################################
# Prelude #
################################################################################
# Communicating with the Semgrep binary.

################################################################################
# Constants #
################################################################################

_SEMGREP_LOCK = asyncio.Lock()

# Global variable to store the semgrep executable path
SEMGREP_EXECUTABLE: str | None = None

################################################################################
# Finding Semgrep #
################################################################################


# Semgrep utilities
def find_semgrep_path() -> str | None:
"""
Dynamically find semgrep in PATH or common installation directories
Returns: Path to semgrep executable or None if not found
"""
# Common paths where semgrep might be installed
common_paths = [
"semgrep", # Default PATH
"/usr/local/bin/semgrep",
"/usr/bin/semgrep",
"/opt/homebrew/bin/semgrep", # Homebrew on macOS
"/opt/semgrep/bin/semgrep",
"/home/linuxbrew/.linuxbrew/bin/semgrep", # Homebrew on Linux
"/snap/bin/semgrep", # Snap on Linux
]

# Add Windows paths if on Windows
if os.name == "nt":
app_data = os.environ.get("APPDATA", "")
if app_data:
common_paths.extend(
[
os.path.join(app_data, "Python", "Scripts", "semgrep.exe"),
os.path.join(app_data, "npm", "semgrep.cmd"),
]
)

# Try each path
for semgrep_path in common_paths:
# For 'semgrep' (without path), check if it's in PATH
if semgrep_path == "semgrep":
try:
subprocess.run(
[semgrep_path, "--version"], check=True, capture_output=True, text=True
)
return semgrep_path
except (subprocess.SubprocessError, FileNotFoundError):
continue

# For absolute paths, check if the file exists before testing
if os.path.isabs(semgrep_path):
if not os.path.exists(semgrep_path):
continue

# Try executing semgrep at this path
try:
subprocess.run(
[semgrep_path, "--version"], check=True, capture_output=True, text=True
)
return semgrep_path
except (subprocess.SubprocessError, FileNotFoundError):
continue

return None


async def ensure_semgrep_available() -> str:
"""
Ensures semgrep is available and sets the global path in a thread-safe manner

Returns:
Path to semgrep executable

Raises:
McpError: If semgrep is not installed or not found
"""
global SEMGREP_EXECUTABLE

# Fast path - check if we already have the path
if SEMGREP_EXECUTABLE:
return SEMGREP_EXECUTABLE

# Slow path - acquire lock and find semgrep
async with _SEMGREP_LOCK:
# Try to find semgrep
semgrep_path = find_semgrep_path()

if not semgrep_path:
raise McpError(
ErrorData(
code=INTERNAL_ERROR,
message="Semgrep is not installed or not in your PATH. "
"Please install Semgrep manually before using this tool. "
"Installation options: "
"pip install semgrep, "
"macOS: brew install semgrep, "
"Or refer to https://semgrep.dev/docs/getting-started/",
)
)

# Store the path for future use
SEMGREP_EXECUTABLE = semgrep_path
return semgrep_path


def set_semgrep_executable(semgrep_path: str) -> None:
global SEMGREP_EXECUTABLE
SEMGREP_EXECUTABLE = semgrep_path


################################################################################
# Communicating with Semgrep over RPC #
################################################################################


class SemgrepContext:
process: asyncio.subprocess.Process
stdin: asyncio.StreamWriter
stdout: asyncio.StreamReader

def __init__(self, process: asyncio.subprocess.Process) -> None:
self.process = process

if process.stdin is not None and process.stdout is not None:
self.stdin = process.stdin
self.stdout = process.stdout
else:
raise McpError(
ErrorData(
code=INTERNAL_ERROR,
message="Semgrep process stdin/stdout not available",
)
)

async def communicate(self, line: str) -> str:
self.stdin.write(f"{line}\n".encode())
await self.stdin.drain()

stdout = await self.stdout.readline()
return stdout.decode()

async def send_request(self, request: str, **kwargs: Any) -> str:
payload = {"method": request, **kwargs}

return await self.communicate(json.dumps(payload))


################################################################################
# Running Semgrep #
################################################################################


async def run_semgrep_process(args: list[str]) -> asyncio.subprocess.Process:
"""
Runs semgrep with the given arguments as a subprocess, without waiting for it to finish.
"""

# Ensure semgrep is available
semgrep_path = await ensure_semgrep_available()

# Just so we get the debug logs for the MCP server
env = os.environ.copy()
env["SEMGREP_LOG_SRCS"] = "mcp"

# Execute semgrep command
process = await asyncio.create_subprocess_exec(
semgrep_path,
*args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
# This ensures that stderr makes it through to
# the server logs, for debugging purposes.
stderr=None,
env=env,
)

return process


async def run_semgrep(args: list[str]) -> str:
"""
Runs semgrep with the given arguments

Args:
args: List of command arguments

Returns:
Output of semgrep command
"""

process = await run_semgrep_process(args)

stdout, stderr = await process.communicate()

if process.returncode != 0:
raise McpError(
ErrorData(
code=INTERNAL_ERROR,
message=f"Error running semgrep: ({process.returncode}) {stderr.decode()}",
)
)

return stdout.decode()


async def run_semgrep_via_rpc(context: SemgrepContext, data: list[CodeFile]) -> CliOutput:
"""
Runs semgrep with the given arguments via RPC

Args:
data: List of code files to scan

Returns:
List of CliMatch objects
"""

files_json = [{"file": data.filename, "content": data.content} for data in data]

# ATD serialized value
resp = await context.send_request("scanFiles", files=files_json)

# The JSON we get is double encoded, looks like
# '"{"results": ..., ...}"'
# so we have to load it twice
resp_json = json.loads(resp)
resp_json = json.loads(resp_json)
assert isinstance(resp_json, dict)

return CliOutput.from_json(resp_json)
1 change: 1 addition & 0 deletions src/semgrep_mcp/semgrep_interfaces
Submodule semgrep_interfaces added at 3ce353
Loading
Loading