Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ docker-compose.override.yml
# Intermediate files/output
repos.txt
clones/
jacoco_results/

# Binary files
*.bin
18 changes: 15 additions & 3 deletions remediate_repos.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ def _resolve_path(base_dir: str, path: str) -> str:
def main():
parser = argparse.ArgumentParser(description="Run SonarQube on a list of repositories.")
parser.add_argument(
"token",
help="SonarQube token. To get this: run Docker on your PC, then: `docker run -d --name sonarqube -p 9000:9000 sonarqube:community`. Log into 'http://localhost:9000', then My Account > Security, and generate a global analysis token to use."
"--token",
help="""
SonarQube token. If not provided, will read from SONARQUBE_TOKEN environment variable.
To get this: run Docker on your PC, then: `docker run -d --name sonarqube -p 9000:9000 sonarqube:community`.
Use "admin" as the username and the password you set during the setup.
Log into 'http://localhost:9000', then My Account > Security, and generate a "User Token" type token to use.
It will have the necessary permissions for analysis.
"""
)
parser.add_argument(
"--repos",
Expand All @@ -35,13 +41,19 @@ def main():
)
args = parser.parse_args()

# Get token from argument or environment variable
token = args.token or os.getenv('SONARQUBE_TOKEN')
if not token:
print("Error: SonarQube token is required. Provide it via --token argument or set SONARQUBE_TOKEN environment variable.")
return

base_dir = os.path.dirname(os.path.abspath(__file__))

repos_file = _resolve_path(base_dir, args.repos)
clone_dir = _resolve_path(base_dir, args.clone_dir)

clone_repos_from_file(repos_file, clone_dir, post_pull_hook=delete_sonarqube_output_if_updated)
scan_repos(args.token, clone_dir, args.force_scan)
scan_repos(token, clone_dir, args.force_scan)


if __name__ == "__main__":
Expand Down
5 changes: 5 additions & 0 deletions sonarqube_tool/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .sonarqube_api import SonarQubeAPI

__all__ = [
'SonarQubeAPI'
]
16 changes: 14 additions & 2 deletions sonarqube_tool/scan_repos.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import platform
import shutil
import subprocess
from sonarqube_tool.sonarqube_api import SonarQubeAPI

SONARQUBE_URL = "http://localhost:9000"
SONARQUBE_FILE_NAME = "sonarqube_output.txt"
Expand All @@ -28,14 +29,20 @@ def _compile_java_sources(repo_dir):
Compile Java sources using Maven, supporting both Windows and Linux.
Skips Apache RAT license checking with -Drat.skip=true.
"""
print(f"Compiling Java sources for {repo_dir.name}...")

# Check if it's a Maven project
pom_file = repo_dir / "pom.xml"
if not pom_file.exists():
print(f"No pom.xml found in {repo_dir.name}, skipping compilation.")
return

# Check if already compiled (target directory exists)
target_dir = repo_dir / "target"
if target_dir.exists():
print(f"Target directory already exists for {repo_dir.name}, skipping compilation.")
return

print(f"Compiling Java sources for {repo_dir.name}...")

try:
# Determine the Maven command based on platform
if platform.system() == "Windows":
Expand Down Expand Up @@ -91,6 +98,11 @@ def _scan_repo(repo_dir, token, force_scan):
)
output_file.write_text(result.stdout)
print(f"Scan complete. Output saved to {output_file}")
api = SonarQubeAPI(token=token)
if api.is_scan_successful(project_key):
issues_path = repo_dir / "issues.json"
api.save_all_issues(project_key, issues_path)
print(f"All issues saved for {repo_dir.name}.")
except subprocess.CalledProcessError as e:
print(f"SonarQube scan failed for {repo_dir.name}: {e}")
output_file.write_text(e.stdout or "No output captured.")
Expand Down
188 changes: 188 additions & 0 deletions sonarqube_tool/sonarqube_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import os
import json
import requests
from typing import Optional, Dict

SONARQUBE_URL = "http://localhost:9000"
PAGE_SIZE = 500

class SonarQubeAPI:
def __init__(self, base_url: str = SONARQUBE_URL, token: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.token = token or os.getenv('SONARQUBE_TOKEN')

if not self.token:
raise ValueError("SonarQube token is required. Provide it as parameter or set SONARQUBE_TOKEN environment variable.")

self.headers = {
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json'
}

def _get_issues(self, component_key: str) -> Dict:
url = f"{self.base_url}/api/issues/search"
all_issues = []
page = 1

while True:
params = {
'componentKeys': component_key,
'ps': PAGE_SIZE,
'p': page
}
try:
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
data = response.json()

page_issues = data.get('issues', [])
all_issues.extend(page_issues)

if len(page_issues) == 0:
break
page += 1

except requests.exceptions.RequestException as e:
print(f"Error calling SonarQube API: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Response status: {e.response.status_code}")
print(f"Response text: {e.response.text}")
raise
return {
'total': len(all_issues),
'issues': all_issues
}

def _get_rule_details(self, rule_key: str) -> Dict:
url = f"{self.base_url}/api/rules/show"
params = {
'key': rule_key
}
try:
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
data = response.json()
except requests.exceptions.RequestException as e:
print(f"Error calling SonarQube API: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Response status: {e.response.status_code}")
print(f"Response text: {e.response.text}")
raise
return data

def is_scan_successful(self, project_key: str) -> bool:
url = f"{self.base_url}/api/ce/component"
params = {
'component': project_key
}
try:
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
data = response.json()
info = data.get('current', {})
result = info.get('status', '') == 'SUCCESS'
return result
except Exception as e:
print(f"Error: {e}")
return False

def get_issues_for_file(self, project_key: str, file_path: str) -> Dict:
component_key = f"{project_key}:{file_path}"
return self._get_issues(component_key)

def get_all_issues(self, project_key: str) -> Dict:
return self._get_issues(project_key)

def get_rules_and_fix_method(self, rule_key: str) -> Dict:
data = self._get_rule_details(rule_key)
result = {}
rule = data.get('rule', {})
if rule:
result['rule_key'] = rule.get('key', '')
result['rule_name'] = rule.get('name', '')
result['severity'] = rule.get('severity', '')
result['type'] = rule.get('type', '')

description_sections = rule.get('descriptionSections', [])
for section in description_sections:
section_key = section.get('key', '')
section_content = section.get('content', '')
result[section_key] = section_content
return result

def print_all_issues(self, project_key: str) -> None:
try:
issues_data = self.get_all_issues(project_key)
issues = issues_data.get('issues', [])
total = issues_data.get('total', 0)

severity_count = {}
for issue in issues:
impacts = issue.get('impacts', [])
if impacts and len(impacts) > 0:
severity = impacts[0].get('severity', 'UNKNOWN')
severity_count[severity] = severity_count.get(severity, 0) + 1
print(f"Total issues: {total}")
for severity, count in sorted(severity_count.items()):
print(f"{severity}: {count}")
except Exception as e:
print(f"Error: {e}")

def print_file_issues(self, project_key: str, file_path: str) -> None:
try:
issues_data = self.get_issues_for_file(project_key, file_path)
issues = issues_data.get('issues', [])

if issues:
# Show only first 3 issues
for i, issue in enumerate(issues[:3], 1):
print(f"{i}. {issue.get('message', 'No message')}")
print(f" Rule: {issue.get('rule', 'Unknown')}")

impacts = issue.get('impacts', [])
severity = 'Unknown'
if impacts and len(impacts) > 0:
severity = impacts[0].get('severity', 'Unknown')

print(f" Severity: {severity}")
print(f" Type: {issue.get('type', 'Unknown')}")

if 'textRange' in issue:
line = issue['textRange'].get('startLine', 'Unknown')
print(f" Line: {line}")
print()
# Show remaining count if there are more than 3 issues
if len(issues) > 3:
remaining = len(issues) - 3
print(f"... and {remaining} more issues not shown")
else:
print("No issues found")
except Exception as e:
print(f"Error: {e}")

def save_all_issues(self, project_key: str, file_path: str) -> None:
try:
issues_data = self.get_all_issues(project_key)
with open(file_path, 'w') as f:
json.dump(issues_data, f, indent=4)
print(f"All issues saved to {file_path}")
except Exception as e:
print(f"Error: {e}")

if __name__ == "__main__":
api = SonarQubeAPI()
project_key = "commons-collections"
file_path = "src/main/java/org/apache/commons/collections4/map/AbstractHashedMap.java"
rule_key_1 = "java:S2160"
rule_key_2 = "java:S1117"
rule_key_3 = "java:S5993"
print(f"Checking scan success for project {project_key}: {api.is_scan_successful(project_key)}")

print("Project issues summary:")
api.print_all_issues(project_key)

print(f"\nFile issues details:")
api.print_file_issues(project_key, file_path)

print(f"\nRule details:")
print(api.get_rules_and_fix_method(rule_key_3))
19 changes: 16 additions & 3 deletions sonarqube_tool/tests/test_scan_repos.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ def mock_which():
with patch("sonarqube_tool.scan_repos.shutil.which", return_value="/usr/bin/sonar-scanner"):
yield

# Mock SonarQubeAPI to avoid requiring real tokens in tests
@pytest.fixture
def mock_sonarqube_api():
with patch("sonarqube_tool.scan_repos.SonarQubeAPI") as mock_api:
mock_instance = MagicMock()
mock_instance.is_scan_successful.return_value = True
mock_instance.save_all_issues.return_value = None
mock_api.return_value = mock_instance
yield mock_api


def test_scan_repos_skips_if_sonar_scanner_missing(tmp_path):
# Arrange
Expand Down Expand Up @@ -75,10 +85,11 @@ def test_scan_repos_skips_repo_if_output_exists(tmp_path, mock_which, mock_subpr
mock_subprocess_success.assert_not_called()


def test_scan_repos_runs_repo_and_writes_output(tmp_path, mock_which, mock_subprocess_success):
def test_scan_repos_runs_repo_and_writes_output(tmp_path, mock_which, mock_subprocess_success, mock_sonarqube_api):
# Arrange
_ = mock_which
_ = mock_subprocess_success
_ = mock_sonarqube_api
repo = tmp_path / "repo1"
repo.mkdir()

Expand Down Expand Up @@ -109,10 +120,11 @@ def test_scan_repos_handles_scan_failure(tmp_path, mock_which, mock_subprocess_f
assert not (repo / PROPERTIES_FILE_NAME).exists()


def test_scan_repos_runs_multiple_repos(tmp_path, mock_which, mock_subprocess_success):
def test_scan_repos_runs_multiple_repos(tmp_path, mock_which, mock_subprocess_success, mock_sonarqube_api):
# Arrange
_ = mock_which
_ = mock_subprocess_success
_ = mock_sonarqube_api
repo1 = tmp_path / "repo1"
repo2 = tmp_path / "repo2"
repo1.mkdir()
Expand All @@ -127,9 +139,10 @@ def test_scan_repos_runs_multiple_repos(tmp_path, mock_which, mock_subprocess_su
assert "scan succeeded" in (repo / SONARQUBE_FILE_NAME).read_text()


def test_scan_repos_runs_mixed_state_repos(tmp_path, mock_which):
def test_scan_repos_runs_mixed_state_repos(tmp_path, mock_which, mock_sonarqube_api):
# Arrange
_ = mock_which
_ = mock_sonarqube_api
scanned = tmp_path / "scanned"
unscanned = tmp_path / "success"
failed = tmp_path / "failure"
Expand Down