|
| 1 | +""" |
| 2 | +This module provides a CLI to check if a version tag has the expected format we expect in the BO4E repository. |
| 3 | +""" |
| 4 | + |
| 5 | +import functools |
| 6 | +import logging |
| 7 | +import re |
| 8 | +import subprocess |
| 9 | +import sys |
| 10 | +from typing import ClassVar, Iterable, Optional |
| 11 | + |
| 12 | +import click |
| 13 | +from github import Github |
| 14 | +from github.Auth import Token |
| 15 | +from more_itertools import one |
| 16 | +from pydantic import BaseModel, ConfigDict |
| 17 | + |
| 18 | +from .__main__ import compare_bo4e_versions |
| 19 | + |
| 20 | +logger = logging.getLogger(__name__) |
| 21 | + |
| 22 | + |
| 23 | +@functools.total_ordering |
| 24 | +class Version(BaseModel): |
| 25 | + """ |
| 26 | + A class to represent a BO4E version number. |
| 27 | + """ |
| 28 | + |
| 29 | + version_pattern: ClassVar[re.Pattern[str]] = re.compile( |
| 30 | + r"^v(?P<major>\d{6})\.(?P<functional>\d+)\.(?P<technical>\d+)(?:-rc(?P<candidate>\d+))?$" |
| 31 | + ) |
| 32 | + |
| 33 | + major: int |
| 34 | + functional: int |
| 35 | + technical: int |
| 36 | + candidate: Optional[int] = None |
| 37 | + model_config = ConfigDict(frozen=True) |
| 38 | + |
| 39 | + @classmethod |
| 40 | + def from_string(cls, version: str, allow_candidate: bool = False) -> "Version": |
| 41 | + """ |
| 42 | + Parse a version string and return a Version object. |
| 43 | + Raises a ValueError if the version string does not match the expected pattern. |
| 44 | + Raises a ValueError if allow_candidate is False and the version string contains a candidate version. |
| 45 | + """ |
| 46 | + match = cls.version_pattern.fullmatch(version) |
| 47 | + if match is None: |
| 48 | + raise ValueError(f"Expected version to match {cls.version_pattern}, got {version}") |
| 49 | + inst = cls( |
| 50 | + major=int(match.group("major")), |
| 51 | + functional=int(match.group("functional")), |
| 52 | + technical=int(match.group("technical")), |
| 53 | + candidate=int(match.group("candidate")) if match.group("candidate") is not None else None, |
| 54 | + ) |
| 55 | + if not allow_candidate and inst.is_candidate(): |
| 56 | + raise ValueError(f"Expected a version without candidate, got a candidate version: {version}") |
| 57 | + return inst |
| 58 | + |
| 59 | + @property |
| 60 | + def tag_name(self) -> str: |
| 61 | + """ |
| 62 | + Return the tag name for this version. |
| 63 | + """ |
| 64 | + return f"v{self.major}.{self.functional}.{self.technical}" + ( |
| 65 | + f"-rc{self.candidate}" if self.is_candidate() else "" |
| 66 | + ) |
| 67 | + |
| 68 | + def is_candidate(self) -> bool: |
| 69 | + """ |
| 70 | + Return True if this version is a candidate version. |
| 71 | + """ |
| 72 | + return self.candidate is not None |
| 73 | + |
| 74 | + def bumped_major(self, other: "Version") -> bool: |
| 75 | + """ |
| 76 | + Return True if this version is a major bump from the other version. |
| 77 | + """ |
| 78 | + return self.major > other.major |
| 79 | + |
| 80 | + def bumped_functional(self, other: "Version") -> bool: |
| 81 | + """ |
| 82 | + Return True if this version is a functional bump from the other version. |
| 83 | + Return False if major bump is detected. |
| 84 | + """ |
| 85 | + return not self.bumped_major(other) and self.functional > other.functional |
| 86 | + |
| 87 | + def bumped_technical(self, other: "Version") -> bool: |
| 88 | + """ |
| 89 | + Return True if this version is a technical bump from the other version. |
| 90 | + Return False if major or functional bump is detected. |
| 91 | + """ |
| 92 | + return not self.bumped_functional(other) and self.technical > other.technical |
| 93 | + |
| 94 | + def bumped_candidate(self, other: "Version") -> bool: |
| 95 | + """ |
| 96 | + Return True if this version is a candidate bump from the other version. |
| 97 | + Return False if major, functional or technical bump is detected. |
| 98 | + Raises ValueError if one of the versions is not a candidate version. |
| 99 | + """ |
| 100 | + if self.candidate is None or other.candidate is None: |
| 101 | + raise ValueError("Cannot compare candidate versions if one of them is not a candidate.") |
| 102 | + return not self.bumped_technical(other) and self.candidate > other.candidate |
| 103 | + |
| 104 | + def __lt__(self, other: "Version") -> bool: |
| 105 | + if not isinstance(other, Version): |
| 106 | + return NotImplemented |
| 107 | + return ( |
| 108 | + self.major < other.major |
| 109 | + or self.functional < other.functional |
| 110 | + or self.technical < other.technical |
| 111 | + or (self.candidate is not None and (other.candidate is None or self.candidate < other.candidate)) |
| 112 | + ) |
| 113 | + |
| 114 | + def __eq__(self, other: object) -> bool: |
| 115 | + if not isinstance(other, Version): |
| 116 | + return NotImplemented |
| 117 | + return ( |
| 118 | + self.major == other.major |
| 119 | + and self.functional == other.functional |
| 120 | + and self.technical == other.technical |
| 121 | + and self.is_candidate() == other.is_candidate() |
| 122 | + and (self.candidate is None or self.candidate == other.candidate) |
| 123 | + ) |
| 124 | + |
| 125 | + def __str__(self) -> str: |
| 126 | + return self.tag_name |
| 127 | + |
| 128 | + |
| 129 | +def get_latest_version(gh_token: str | None = None) -> Version: |
| 130 | + """ |
| 131 | + Get the release from BO4E-python repository which is marked as 'latest'. |
| 132 | + """ |
| 133 | + if gh_token is not None: |
| 134 | + gh = Github(auth=Token(gh_token)) |
| 135 | + else: |
| 136 | + gh = Github() |
| 137 | + return Version.from_string(gh.get_repo("bo4e/BO4E-python").get_latest_release().tag_name) |
| 138 | + |
| 139 | + |
| 140 | +def get_last_n_tags(n: int, *, on_branch: str = "main", exclude_candidates: bool = True) -> Iterable[str]: |
| 141 | + """ |
| 142 | + Get the last n tags in chronological descending order starting from `on_branch`. |
| 143 | + If `on_branch` is a branch, it will start from the current HEAD of the branch. |
| 144 | + If `on_branch` is a tag, it will start from the tag itself. But the tag itself will not be included in the output. |
| 145 | + If `exclude_candidates` is True, candidate versions will be excluded from the output. |
| 146 | + If the number of found versions is less than `n`, a warning will be logged. |
| 147 | + """ |
| 148 | + try: |
| 149 | + Version.from_string(on_branch, allow_candidate=True) |
| 150 | + except ValueError: |
| 151 | + reference = f"remotes/origin/{on_branch}" |
| 152 | + else: |
| 153 | + reference = f"tags/{on_branch}" |
| 154 | + output = subprocess.check_output(["git", "tag", "--merged", reference, "--sort=-creatordate"]).decode().splitlines() |
| 155 | + if reference.startswith("tags/"): |
| 156 | + output = output[1:] # Skip the reference tag |
| 157 | + |
| 158 | + counter = 0 |
| 159 | + for tag in output: |
| 160 | + if counter >= n: |
| 161 | + return |
| 162 | + version = Version.from_string(tag, allow_candidate=True) |
| 163 | + if exclude_candidates and version.is_candidate(): |
| 164 | + continue |
| 165 | + yield tag |
| 166 | + counter += 1 |
| 167 | + if counter < n: |
| 168 | + if reference.startswith("tags/"): |
| 169 | + logger.warning("Only found %d tags before tag %s, tried to retrieve %d", counter, on_branch, n) |
| 170 | + else: |
| 171 | + logger.warning("Only found %d tags on branch %s, tried to retrieve %d", counter, on_branch, n) |
| 172 | + |
| 173 | + |
| 174 | +def get_last_version_before(version: Version) -> Version: |
| 175 | + """ |
| 176 | + Get the last non-candidate version before the provided version following the commit history. |
| 177 | + """ |
| 178 | + return Version.from_string(one(get_last_n_tags(1, on_branch=version.tag_name))) |
| 179 | + |
| 180 | + |
| 181 | +def ensure_latest_on_main(latest_version: Version, is_cur_version_latest: bool) -> None: |
| 182 | + """ |
| 183 | + Ensure that the latest release is on the main branch. |
| 184 | + Will also be called if the currently tagged version is marked as `latest`. |
| 185 | + In this case both versions are equal. |
| 186 | +
|
| 187 | + Note: This doesn't revert the release on GitHub. If you accidentally released on the wrong branch, you have to |
| 188 | + manually mark an old or create a new release as `latest` on the main branch. Otherwise, the publish workflow |
| 189 | + will fail here. |
| 190 | + """ |
| 191 | + commit_id = subprocess.check_output(["git", "rev-parse", f"tags/{latest_version.tag_name}~0"]).decode().strip() |
| 192 | + output = subprocess.check_output(["git", "branch", "-a", "--contains", f"{commit_id}"]).decode() |
| 193 | + branches_containing_commit = [line.strip().lstrip("*").lstrip() for line in output.splitlines()] |
| 194 | + if "remotes/origin/main" not in branches_containing_commit: |
| 195 | + if is_cur_version_latest: |
| 196 | + raise ValueError( |
| 197 | + f"Tagged version {latest_version} is marked as latest but is not on main branch " |
| 198 | + f"(branches {branches_containing_commit} contain commit {commit_id}).\n" |
| 199 | + "Either tag on main branch or don't mark the release as latest.\n" |
| 200 | + "If you accidentally marked the release as latest please remember to revert it. " |
| 201 | + "Otherwise, the next publish workflow will fail as the latest version is assumed to be on main.\n" |
| 202 | + f"Output from git-command: {output}" |
| 203 | + ) |
| 204 | + raise ValueError( |
| 205 | + f"Fatal Error: Latest release {latest_version.tag_name} is not on main branch " |
| 206 | + f"(branches {branches_containing_commit} contain commit {commit_id}).\n" |
| 207 | + "Please ensure that the latest release is on the main branch.\n" |
| 208 | + f"Output from git-command: {output}" |
| 209 | + ) |
| 210 | + |
| 211 | + |
| 212 | +def compare_work_tree_with_latest_version( |
| 213 | + gh_version: str, gh_token: str | None = None, major_bump_allowed: bool = True |
| 214 | +) -> None: |
| 215 | + """ |
| 216 | + Compare the work tree with the latest release from the BO4E repository. |
| 217 | + If any inconsistency is detected, a Value- or an AssertionError will be raised. |
| 218 | + """ |
| 219 | + logger.info("Github Access Token %s", "provided" if gh_token is not None else "not provided") |
| 220 | + cur_version = Version.from_string(gh_version, allow_candidate=True) |
| 221 | + logger.info("Tagged release version: %s", cur_version) |
| 222 | + latest_version = get_latest_version(gh_token) |
| 223 | + logger.info("Got latest release version from GitHub: %s", latest_version) |
| 224 | + is_cur_version_latest = cur_version == latest_version |
| 225 | + if is_cur_version_latest: |
| 226 | + logger.info("Tagged version is marked as latest.") |
| 227 | + ensure_latest_on_main(latest_version, is_cur_version_latest) |
| 228 | + logger.info("Latest release is on main branch.") |
| 229 | + |
| 230 | + version_ahead = cur_version |
| 231 | + version_behind = get_last_version_before(cur_version) |
| 232 | + logger.info( |
| 233 | + "Comparing with the version before the tagged release (excluding release candidates): %s", |
| 234 | + version_behind, |
| 235 | + ) |
| 236 | + |
| 237 | + assert version_ahead > version_behind, f"Version did not increase: {version_ahead} <= {version_behind}" |
| 238 | + |
| 239 | + logger.info( |
| 240 | + "Current version is ahead of the compared version. Comparing versions: %s -> %s", |
| 241 | + version_behind, |
| 242 | + version_ahead, |
| 243 | + ) |
| 244 | + if version_ahead.bumped_major(version_behind): |
| 245 | + if not major_bump_allowed: |
| 246 | + raise ValueError("Major bump detected. Major bump is not allowed.") |
| 247 | + logger.info("Major version bump detected. No further checks needed.") |
| 248 | + return |
| 249 | + changes = list( |
| 250 | + compare_bo4e_versions(version_behind.tag_name, version_ahead.tag_name, gh_token=gh_token, from_local=True) |
| 251 | + ) |
| 252 | + logger.info("Check if functional or technical release bump is needed") |
| 253 | + functional_changes = len(changes) > 0 |
| 254 | + logger.info("%s release bump is needed", "Functional" if functional_changes else "Technical") |
| 255 | + |
| 256 | + if not functional_changes and version_ahead.bumped_functional(version_behind): |
| 257 | + raise ValueError( |
| 258 | + "Functional version bump detected but no functional changes found. " |
| 259 | + "Please bump the technical release count instead of the functional." |
| 260 | + ) |
| 261 | + if functional_changes and not version_ahead.bumped_functional(version_behind): |
| 262 | + raise ValueError( |
| 263 | + "No functional version bump detected but functional changes found. " |
| 264 | + "Please bump the functional release count.\n" |
| 265 | + f"Detected changes: {changes}" |
| 266 | + ) |
| 267 | + |
| 268 | + |
| 269 | +@click.command() |
| 270 | +@click.option("--gh-version", type=str, required=True, help="The new version to compare the latest release with.") |
| 271 | +@click.option( |
| 272 | + "--gh-token", type=str, default=None, help="GitHub Access token. This helps to avoid rate limiting errors." |
| 273 | +) |
| 274 | +@click.option( |
| 275 | + "--major-bump-allowed/--major-bump-disallowed", |
| 276 | + is_flag=True, |
| 277 | + default=True, |
| 278 | + help="Indicate if a major bump is allowed. " |
| 279 | + "If it is not allowed, the script will exit with an error if a major bump is detected.", |
| 280 | +) |
| 281 | +def compare_work_tree_with_latest_version_cli( |
| 282 | + gh_version: str, gh_token: str | None = None, major_bump_allowed: bool = True |
| 283 | +) -> None: |
| 284 | + """ |
| 285 | + Check a version tag and compare the work tree with the latest release from the BO4E repository. |
| 286 | + Exits with status code 1 iff the version is inconsistent with the commit history or if the detected changes in |
| 287 | + the JSON-schemas are inconsistent with the version bump. |
| 288 | + """ |
| 289 | + try: |
| 290 | + compare_work_tree_with_latest_version(gh_version, gh_token, major_bump_allowed) |
| 291 | + except Exception as error: |
| 292 | + logger.error("An error occurred.", exc_info=error) |
| 293 | + raise click.exceptions.Exit(1) |
| 294 | + logger.info("All checks passed.") |
| 295 | + |
| 296 | + |
| 297 | +if __name__ == "__main__": |
| 298 | + # pylint: disable=no-value-for-parameter |
| 299 | + compare_work_tree_with_latest_version_cli() |
| 300 | + |
| 301 | + |
| 302 | +def test_compare_work_tree_with_latest_version() -> None: |
| 303 | + """ |
| 304 | + Little test function for local testing. |
| 305 | + """ |
| 306 | + logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) |
| 307 | + compare_work_tree_with_latest_version("v202401.1.2-rc3", gh_token=None) |
0 commit comments