Skip to content

Use PyGithub for Github operations in manage-cookie #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions cookie_python/manage/github.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import contextlib
import os
from functools import cached_property, lru_cache
from typing import Optional

import github
from github.PullRequest import PullRequest
from github.Repository import Repository


class GithubRepo:
def __init__(self) -> None:
self._gh = github.Github(os.environ["GITHUB_API_TOKEN"])

@cached_property
def username(self) -> str:
return self._gh.get_user().login

@lru_cache # noqa: B019
def find_repo(self, search: str) -> Repository:
if "/" not in search:
search = f"{self.username}/{search}"
if "github.com" in search:
with contextlib.suppress(IndexError):
search = os.path.splitext(search.split(":")[1])[0]
return self._gh.get_repo(search)

def find_pr(
self, repo: Repository, head: str, base: str = "main"
) -> Optional[PullRequest]:
pulls = [
pr
for pr in repo.get_pulls(head=f"{self.username}:{head}", base=base)
]
if not pulls:
return None
if len(pulls) > 1:
raise Exception(f"Multiple PRs found matching {head}")
return pulls[0]
36 changes: 11 additions & 25 deletions cookie_python/manage/release.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,10 @@


def release_patch_version(repo: RepoSandbox) -> None:
releases = (
repo.run(
["gh", "release", "list"],
capture_output=True,
check=True,
)
.stdout.decode("utf-8")
.splitlines()
)
latest_tag = None
for line in releases:
tag, status, *_ = line.split()
if status.lower() == "latest":
latest_tag = tag
break
if not latest_tag:
repo.logger.warning("Unable to find latest version")
return None
check_refs = ["origin/main", latest_tag]
if not repo.latest_release:
repo.logger.warning("Unable to find latest release version")
return
check_refs = ["origin/main", repo.latest_release]
refs = []
for ref in check_refs:
refs.append(
Expand All @@ -33,17 +18,18 @@ def release_patch_version(repo: RepoSandbox) -> None:
.strip()
)
if len(refs) == len(check_refs) and len(set(refs)) == 1:
repo.logger.info(f"No new changes since latest release {latest_tag}")
return None
sv = semver.VersionInfo.parse(latest_tag.lstrip("v"))
repo.logger.info(
f"No new changes since latest release {repo.latest_release}"
)
return
sv = semver.VersionInfo.parse(repo.latest_release.lstrip("v"))
next_patch_ver = sv.bump_patch()
new_tag = f"v{next_patch_ver}"
if repo.dry_run:
repo.logger.success(f"Would release new version {new_tag}")
return None
repo.run(["gh", "release", "create", new_tag, "--generate-notes"])
return
repo.create_release(new_tag)
repo.logger.success(f"Released new version {new_tag}")
return None


def release_action(args: Namespace) -> None:
Expand Down
92 changes: 40 additions & 52 deletions cookie_python/manage/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@

import loguru

from .github import GithubRepo


class RepoSandbox:
def __init__(self, repo: str, dry_run: bool = False) -> None:
self._stack = contextlib.ExitStack()
self.repo = repo
self.repo = self.gh.find_repo(repo)
self.branch = "update-cookie"
self.dry_run = dry_run

Expand All @@ -32,9 +34,30 @@ def __exit__(
) -> None:
self._stack.close()

@cached_property
def gh(self) -> GithubRepo:
return GithubRepo()

@cached_property
def latest_release(self) -> str:
return self.repo.get_latest_release().title

def create_release(self, tag: str) -> None:
print(f"Should create release {tag}")
# PyGithub's repository create_tag_and_release() doesn't support
# generate_release_notes
self.repo._requester.requestJsonAndCheck( # type: ignore
"POST",
f"/repos/{self.repo.full_name}/releases",
input={
"tag_name": tag,
"generate_release_notes": True,
},
)

@cached_property
def logger(self) -> "loguru.Logger":
return loguru.logger.bind(repo=self.repo)
return loguru.logger.bind(repo=self.repo.full_name)

@cached_property
def tempdir(self) -> Path:
Expand All @@ -47,7 +70,9 @@ def tempdir(self) -> Path:
@cached_property
def clone_path(self) -> Path:
subprocess.run(
["git", "clone", self.repo, "repo"], cwd=self.tempdir, check=True
["git", "clone", self.repo.ssh_url, "repo"],
cwd=self.tempdir,
check=True,
)
clone_path = self.tempdir / "repo"
for cmd in (
Expand Down Expand Up @@ -107,41 +132,15 @@ def lint_test(self) -> None:
self.logger.error("Resolve errors and exit shell to continue")
self.shell()

def find_existing_pr(self) -> Optional[str]:
with contextlib.suppress(
subprocess.CalledProcessError, json.JSONDecodeError, TypeError
):
for pr in json.loads(
self.run(
[
"gh",
"pr",
"list",
"-H",
self.branch,
"-B",
"main",
"--json",
",".join(("url", "headRefName", "baseRefName")),
],
capture_output=True,
check=True,
).stdout.decode()
):
pr_url = str(pr.pop("url"))
if pr == {"headRefName": self.branch, "baseRefName": "main"}:
return pr_url
return None

def close_existing_pr(self) -> None:
# Locate existing PR
pr_url = self.find_existing_pr()
if pr_url:
pr = self.gh.find_pr(self.repo, self.branch)
if pr:
if self.dry_run:
self.logger.info(f"Would close existing PR {pr_url}")
self.logger.info(f"Would close existing PR {pr.url}")
else:
self.run(["gh", "pr", "close", pr_url])
self.logger.info(f"Closed existing PR {pr_url}")
pr.edit(state="closed")
self.logger.info(f"Closed existing PR {pr.url}")
if self.dry_run:
return
# Delete existing branch
Expand All @@ -160,21 +159,10 @@ def open_pr(self, message: str) -> None:
return
self.run(["git", "push", "origin", self.branch])
commit_title, _, *commit_body = message.splitlines()
pr_url = self.run(
[
"gh",
"pr",
"create",
"--title",
commit_title.strip(),
"--body-file",
"-",
"--base",
"main",
"--head",
self.branch,
],
input=os.linesep.join(commit_body).encode("utf-8"),
capture_output=True,
).stdout.decode()
self.logger.success(f"Opened PR {pr_url}")
pr = self.repo.create_pull(
base="main",
head=self.branch,
title=commit_title.strip(),
body=os.linesep.join(commit_body),
)
self.logger.success(f"Opened PR {pr.url}")
Loading