Skip to content

Commit

Permalink
add webhook parser script
Browse files Browse the repository at this point in the history
  • Loading branch information
oraNod committed Sep 19, 2024
1 parent d97509b commit c66454d
Showing 1 changed file with 123 additions and 0 deletions.
123 changes: 123 additions & 0 deletions forge-webhook-parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#!/usr/bin/python3
"""
Parse a webhook payload file to clone the git repository to
the current directory.
"""

import argparse
import json
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional


@dataclass
class PayloadFields:
"""
Dataclass of details extracted from the payload.
"""

added: List[str]
removed: List[str]
modified: List[str]
default_repo_url: Optional[str] = None
default_clone_path: Optional[Path] = None


PAYLOAD_FILE = "./github-payload.json"

parser = argparse.ArgumentParser()
parser.add_argument(
"-f", "--file", help="webhook payload file to parse", default=PAYLOAD_FILE
)
args = parser.parse_args()

with open(args.file, "r", encoding="utf-8") as file:
data = json.load(file)

repo_url_keys = []

if "repository" in data and "clone_url" in data["repository"]:
repo_url_keys.append(data["repository"]["clone_url"])

if "project" in data and "git_http_url" in data["project"]:
repo_url_keys.append(data["project"]["git_http_url"])

default_repo_url = next((url for url in repo_url_keys if url), "")


def extract_project_name() -> str:
"""Extract the name of the project from the clone url."""
if default_repo_url:
remove_git = default_repo_url.rstrip(".git")
project_name = remove_git.split("/")[-1]
return project_name
return ""


default_clone_path = Path.cwd() / extract_project_name()


def clean_clone_path(clone_path: Path = default_clone_path):
"""Remove already cloned repositories if they exist.
Args:
clone_path (Path): Directory of the cloned repo.
"""
if clone_path.exists():
shutil.rmtree(clone_path, ignore_errors=True)


def clone_repo(repo_url: str = default_repo_url, clone_path: Path = default_clone_path):
"""Clone the repository to the current working directory.
Args:
repo_url (str): Location of the repo.
clone_path (Path): Directory to clone the repo into.
"""
cmd: list[str] = ["git", "clone", repo_url, "--depth=1"]
cmd.append(str(clone_path))
subprocess.run(cmd, check=True)


def parse_commits(commit_data: List[dict]) -> List[PayloadFields]:
"""Parse the commit fields in the JSON payload.
Args:
commit_data (dict): List of commit data from the payload.
Returns:
List[PayloadFields]: List of changes for each commit.
"""
commit_changes = []
for commit in commit_data:
changes = PayloadFields(
added=commit.get("added", []),
removed=commit.get("removed", []),
modified=commit.get("modified", []),
)
commit_changes.append(changes)
return commit_changes


def main():
"""
Main execution function.
"""
clean_clone_path()
clone_repo()

commits = data.get("commits", [])
commit_changes = parse_commits(commits)

for i, changes in enumerate(commit_changes, 1):
print(f"Commit {i}:")
print(f"Added: {', '.join(changes.added)}")
print(f"Removed: {', '.join(changes.removed)}")
print(f"Modified: {', '.join(changes.modified)}")


if __name__ == "__main__":
main()

0 comments on commit c66454d

Please sign in to comment.