Skip to content

Commit

Permalink
Update sync_issues.py
Browse files Browse the repository at this point in the history
  • Loading branch information
rcstanciu authored Oct 20, 2023
1 parent a59e296 commit 1f7a672
Showing 1 changed file with 110 additions and 74 deletions.
184 changes: 110 additions & 74 deletions .data/sync_issues.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,16 @@
token = os.environ.get("GITHUB_TOKEN")
github = Github(token)

exception_filenames = [".data", ".git", ".github", "README.md", "Audit_Report.pdf", "comments.csv", ".gitignore"]
exception_filenames = [
".data",
".git",
".github",
"README.md",
"Audit_Report.pdf",
"comments.csv",
".gitkeep",
]


def github_retry_on_rate_limit(func):
@wraps(func)
Expand Down Expand Up @@ -67,9 +76,14 @@ def cast(cls, content_file: ContentFile):
content_file.__class__ = ContentFileExtended

for func in ["_completeIfNotSet"]:
setattr(content_file, func, github_retry_on_rate_limit(getattr(content_file, func)))
setattr(
content_file,
func,
github_retry_on_rate_limit(getattr(content_file, func)),
)
return content_file


class GithubExtended(Github):
@classmethod
def cast(cls, github: Github):
Expand All @@ -79,6 +93,7 @@ def cast(cls, github: Github):
setattr(github, func, github_retry_on_rate_limit(getattr(github, func)))
return github


github = GithubExtended.cast(github)

# Issues list. Each issue is in the format:
Expand All @@ -96,95 +111,116 @@ def cast(cls, github: Github):


def process_directory(repo, path):
print("Processing directory %s" % path)
global issues

repo_items = [
x
for x in repo.get_contents(path)
if x.name not in exception_filenames
print(f"[+] Processing directory /{path}")

path_items = [
x for x in repo.get_contents(path) if x.name not in exception_filenames
]
for item in repo_items:
print("Reading file %s" % item.name)

parent = None
closed = True # Root issues are closed by default
files = []
dir_issues_ids = []
severity = "false"
if item.type == "dir":
closed = any(x in item.name.lower() for x in ["low", "false", "invalid"])
# If it's a directory, we have some duplicate issues
files = list(repo.get_contents(item.path))
dirs = [x for x in files if x.type == 'dir']
files = [x for x in files if x.type != 'dir' and x.name not in [".gitkeep"]]
for dir in dirs:
process_directory(repo, dir.path)
dirs = [x for x in path_items if x.type == "dir"]
files = [x for x in path_items if x.type != "dir"]

# Root issues are closed by default
closed = (
True
if path == ""
else any(x in path.lower() for x in ["low", "false", "invalid"])
)
severity = "false"

if not closed:
directory_severity = None

try:
directory_severity = (
re.match(r"^(H|M|High|Medium)-\d+$", path, re.IGNORECASE)
.group(1)
.upper()[0]
)
except Exception:
pass

if not directory_severity:
try:
if not closed:
directory_severity = None
try:
directory_severity = re.match(r"^(H|M|High|Medium)-\d+$", item.name, re.IGNORECASE).group(1).upper()[0]
except Exception:
pass
if not directory_severity:
try:
directory_severity = re.match(r"^\d+-(H|M|High|Medium)$", item.name, re.IGNORECASE).group(1).upper()[0]
except Exception:
pass
if directory_severity:
severity = directory_severity
directory_severity = (
re.match(r"^\d+-(H|M|High|Medium)$", path, re.IGNORECASE)
.group(1)
.upper()[0]
)
except Exception:
pass

if directory_severity:
severity = directory_severity

dir_issues_ids = []
parent = None
for index, file in enumerate(files):
print(f"[-] Reading file {file.name}")
last_file = index == len(files) - 1

file = ContentFileExtended.cast(file)
if "best" in file.name:
issue_id = int(file.name.replace("-best.md", ""))
parent = issue_id
else:
# If it's a file, there is a solo issue
files = [item]

for file in files:
file = ContentFileExtended.cast(file)
if "best" in file.name:
issue_id = int(file.name.replace("-best.md", ""))
parent = issue_id
else:
issue_id = int(file.name.replace(".md", ""))

if len(files) == 1:
parent = issue_id

body = file.decoded_content.decode("utf-8")
auditor = body.split("\n")[0]
issue_title = re.match(r"^(?:[#\s]+)(.*)$", body.split("\n")[4]).group(1)
title = f"{auditor} - {issue_title}"

# Stop the script if an issue is found multiple times in the filesystem
if issue_id in issues.keys():
raise Exception("Issue %s found multiple times." % issue_id)

issues[issue_id] = {
"id": issue_id,
"parent": None,
"severity": severity,
"body": body,
"closed": closed,
"auditor": auditor,
"title": title,
"has_duplicates": False,
}
dir_issues_ids.append(issue_id)
issue_id = int(file.name.replace(".md", ""))

# We automatically set the parent in the following cases:
# 1. The family has only one issue and no report has been selected.
# We select the only issue available as the report.
# 2. The family is an invalid family (deduplicated inside the invalid folder) and no report is selected.
# We select the last processed issue in that family as the report.
if not parent and (
len(files) == 1
or (
severity == "false"
and path not in ["low", "false", "invalid"]
and last_file
)
):
print(
f"[!] Setting issue {issue_id} as the default parent of the current family /{path}"
)
parent = issue_id

body = file.decoded_content.decode("utf-8")
auditor = body.split("\n")[0]
issue_title = re.match(r"^(?:[#\s]+)(.*)$", body.split("\n")[4]).group(1)
title = f"{auditor} - {issue_title}"

# Stop the script if an issue is found multiple times in the filesystem
if issue_id in issues.keys():
raise Exception("Issue %s found multiple times." % issue_id)

issues[issue_id] = {
"id": issue_id,
"parent": None,
"severity": severity,
"body": body,
"closed": closed,
"auditor": auditor,
"title": title,
"has_duplicates": False,
}
dir_issues_ids.append(issue_id)

# Set the parent field for all duplicates in this directory
if parent is None and severity != "false":
raise Exception(
"Issue %s does not have a primary file (-best.md)." % item.path
)
raise Exception("Family %s does not have a primary file (-best.md)." % path)

if parent and not closed:
if parent:
for issue_id in dir_issues_ids:
if issue_id != parent:
issues[parent]["has_duplicates"] = True
issues[issue_id]["parent"] = parent
issues[issue_id]["closed"] = True

# Process any directories inside
for directory in dirs:
process_directory(repo, directory.path)


@lru_cache(maxsize=1024)
def get_github_issue(repo, issue_id):
Expand Down

0 comments on commit 1f7a672

Please sign in to comment.