Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for repo-pkg release check #92

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 79 additions & 6 deletions packj/audit/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from typing import Optional
import email.utils as eutils

from datetime import timedelta
from colorama import Fore, Style

from packj.util.net import __parse_url, download_file, check_site_exist, check_domain_popular
from packj.util.dates import datetime_delta
from packj.util.dates import datetime_delta, date_str_to_datetime
from packj.util.email_validity import check_email_address
from packj.util.files import write_json_to_file, read_from_csv, read_file_lines
from packj.util.enum_util import PackageManagerEnum, LanguageEnum
Expand Down Expand Up @@ -459,17 +460,88 @@ def analyze_repo_activity(risks, report):
risks = alert_user(alert_type, THREAT_MODEL, reason, risks)
msg_alert(reason)
elif repo_data:
commits, contributors, tags = tuple(repo_data[k] for k in ('commits', 'contributors', 'tags'))
commits, contributors, tags = tuple(len(repo_data[k]) if repo_data[k] else None for k in ('commits', 'contributors', 'tags'))
msg_ok(f'commits: {commits}, contributors: {contributors}, tags: {tags}')
report['repo'].update(repo_data)
except Exception as e:
msg_fail(str(e))
finally:
return risks, report, repo_data

def get_pkg_ver_release_dates_before_after(release_history, pkg_ver):
pkg_release_date = release_history[pkg_ver]['release_date']
pkg_release_history = {v['release_date']:k for k,v in release_history.items()}
release_dates = list(pkg_release_history.keys())

try:
index = release_dates.index(pkg_release_date)

pkg_release_date_before = release_dates[index - 1] if index - 1 >= 0 else None
pkg_ver_before = None
if pkg_release_date_before:
pkg_ver_before = pkg_release_history[pkg_release_date_before]

pkg_release_date_after = release_dates[index + 1] if index + 1 < len(release_dates) else None
pkg_ver_after = None
if pkg_release_date_after:
pkg_ver_after = pkg_release_history[pkg_release_date_after]

return {
'before': (pkg_release_date_before, pkg_ver_before),
'target': (pkg_release_date, pkg_ver),
'after': (pkg_release_date_after, pkg_ver_after)
}
except ValueError:
raise Exception(f'Failed to find version {pkg_ver} in pkg release history')

def get_repo_ver_release_dates(tag_list, cutoff_datetime):
try:
dates = {}
for tag, date_str in tag_list:
if date_str_to_datetime(date_str) > cutoff_datetime:
continue
if date_str not in dates:
dates[date_str] = [tag]
else:
dates[date_str].append(tag)
return sorted(dates.items(), key = lambda x:date_str_to_datetime(x[0]), reverse=True)[0]
except Exception as e:
raise Exception(f'Failed to find repo release dates: {str(e)}')

def analyze_repo_releases(repo_data, risks, report, release_history):
try:
msg_info('Analyzing repo-pkg release match...', end='', flush=True, indent=1)
release_tags = repo_data['tags']
if not release_tags or not len(release_tags):
alert_type = 'inconsistent with repo source'
reason = 'no repo releases'
risks = alert_user(alert_type, THREAT_MODEL, reason, risks)
msg_alert(reason)
else:
pkg_ver = report['pkg_ver']
pkg_release_history = get_pkg_ver_release_dates_before_after(release_history, pkg_ver)
cutoff_datestr, _ = pkg_release_history['target']
if not cutoff_datestr:
msg_warn('FAIL', 'Insufficent metadata')
else:
pkg_ver_release_date = date_str_to_datetime(cutoff_datestr)
repo_ver_release_date, repo_ver_tag = get_repo_ver_release_dates(repo_data['tags'], pkg_ver_release_date)
delta = datetime_delta(repo_ver_release_date, pkg_ver_release_date, days=True)
#print(pkg_ver_release_date, repo_ver_release_date, delta)
if delta > 1:
alert_type = 'inconsistent with repo source'
reason = 'more repo releases'
risks = alert_user(alert_type, THREAT_MODEL, reason, risks)
msg_alert(reason)
else:
msg_ok(f'matching tag(s) {",".join(repo_ver_tag)} on {repo_ver_release_date}')
except Exception as e:
msg_fail(str(e))
finally:
return risks, report

def analyze_repo_code(risks, report):
def analyze_repo_code(repo_data, risks, report):
try:
repo_url = report['repo']['url']
msg_info('Analyzing repo-pkg src code match...', end='', flush=True, indent=1)
# TODO
msg_warn(' N/A','Coming soon!')
Expand Down Expand Up @@ -890,8 +962,9 @@ def audit(pm_args, pkg_name, ver_str, report_dir, extra_args, config):
risks, report = analyze_repo_data(config, risks, report)
if 'description' in report['repo']:
risks, report = analyze_repo_descr(risks, report)
risks, report = analyze_repo_code(risks, report)
risks, report = analyze_repo_activity(risks, report)
risks, report, repo_data = analyze_repo_activity(risks, report)
risks, report = analyze_repo_releases(repo_data, risks, report, release_history)
risks, report = analyze_repo_code(repo_data, risks, report)
risks, report = analyze_cves(pm_name, pkg_name, ver_str, risks, report)
risks, report = analyze_deps(pm_proxy, pkg_name, ver_str, pkg_info, ver_info, risks, report)

Expand Down
5 changes: 4 additions & 1 deletion packj/util/dates.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
def nearest(tuple_list, pivot):
return min(tuple_list, key=lambda x: abs(x[1] - pivot))

def ts_to_date_str(tstamp:float, fmt:str='%m-%d-%Y'):
try:
import datetime
Expand Down Expand Up @@ -45,7 +48,7 @@ def date_str_to_datetime(date_str, fmt=None):
except Exception as e:
raise Exception("Failed to get datetime from date string %s: %s" % (date_str, str(e)))

def datetime_delta(date1, date2=None, days=None):
def datetime_delta(date1, date2=None, days=False):
try:
import datetime
import pytz
Expand Down
21 changes: 12 additions & 9 deletions packj/util/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,42 @@ def git_clone(repo_url):
logging.debug("Failed to get parse repo at %s: %s" % (clone_dir, str(e)))
return "invalid repo", None

reason = None

# tags
tags = None
try:
tags = [(t.name, datetime_to_date_str(t.commit.committed_datetime)) for t in repo.tags]
except Exception as e:
logging.debug("Failed to get tags %s: %s" % (clone_dir, str(e)))
tags = None

# branches
branches = None
try:
branches = [b.name for b in repo.remote().refs]
except Exception as e:
logging.debug("Failed to get branches %s: %s" % (clone_dir, str(e)))
branches = None

commits = []
try:
commits = []
for commit in repo.iter_commits():
commits.append(commit)
except Exception as e:
logging.debug("Failed to get commits %s: %s" % (clone_dir, str(e)))
commits = None
reason = 'No commits'

authors = None
try:
if commits:
authors = set([commit.author.email for commit in commits])
except Exception as e:
logging.debug("Failed to get authors %s: %s" % (clone_dir, str(e)))
authors = None

shutil.rmtree(os.path.dirname(clone_dir))
return None, {
'commits' : len(commits) if commits else None,
'branches' : len(branches),
'tags' : len(tags),
'contributors' : len(authors),
return reason, {
'commits' : commits,
'branches' : branches,
'tags' : tags,
'contributors' : authors,
}