diff --git a/.github/workflows/cygwin-test.yml b/.github/workflows/cygwin-test.yml index 16b42f89c..808dc5608 100644 --- a/.github/workflows/cygwin-test.yml +++ b/.github/workflows/cygwin-test.yml @@ -1,36 +1,31 @@ name: test-cygwin -on: - push: - branches: - main - pull_request: - branches: - main +on: [push, pull_request, workflow_dispatch] jobs: build: runs-on: windows-latest + strategy: + fail-fast: false env: CHERE_INVOKING: 1 SHELLOPTS: igncr TMP: "/tmp" TEMP: "/tmp" - + steps: - name: Force LF line endings run: git config --global core.autocrlf input - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 9999 - - uses: cygwin/cygwin-install-action@v3 + - uses: cygwin/cygwin-install-action@v4 with: packages: python39 python39-pip python39-virtualenv git - name: Tell git to trust this repo shell: bash.exe -eo pipefail -o igncr "{0}" - run: | - /usr/bin/git config --global --add safe.directory $(pwd) - /usr/bin/git config --global protocol.file.allow always + run: | + /usr/bin/git config --global --add safe.directory "$(pwd)" - name: Install dependencies and prepare tests shell: bash.exe -eo pipefail -o igncr "{0}" run: | @@ -47,11 +42,6 @@ jobs: # If we rewrite the user's config by accident, we will mess it up # and cause subsequent tests to fail cat test/fixtures/.gitconfig >> ~/.gitconfig - - name: Lint with flake8 - shell: bash.exe -eo pipefail -o igncr "{0}" - run: | - set -x - /usr/bin/python -m flake8 - name: Test with pytest shell: bash.exe -eo pipefail -o igncr "{0}" run: | diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 000000000..5e79664a8 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,14 @@ +name: Lint + +on: [push, pull_request, workflow_dispatch] + +jobs: + lint: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v4 + with: + python-version: "3.x" + - uses: pre-commit/action@v3.0.0 diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 5373dace6..a6af507d1 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -3,11 +3,7 @@ name: Python package -on: - push: - branches: [ main ] - pull_request: - branches: [ main ] +on: [push, pull_request, workflow_dispatch] permissions: contents: read @@ -17,11 +13,12 @@ jobs: runs-on: ubuntu-latest strategy: + fail-fast: false matrix: python-version: [3.7, 3.8, 3.9, "3.10", "3.11"] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 9999 - name: Set up Python ${{ matrix.python-version }} @@ -35,7 +32,7 @@ jobs: python -m pip install --upgrade pip setuptools wheel python --version; git --version git submodule update --init --recursive - git fetch --tags + git fetch --tags --force pip install -r requirements.txt pip install -r test-requirements.txt @@ -47,11 +44,6 @@ jobs: # and cause subsequent tests to fail cat test/fixtures/.gitconfig >> ~/.gitconfig - - name: Lint with flake8 - run: | - set -x - flake8 - - name: Check types with mypy # With new versions of pypi new issues might arise. This is a problem if there is nobody able to fix them, # so we have to ignore errors until that changes. @@ -60,11 +52,6 @@ jobs: set -x mypy -p git - - name: Tell git to trust this repo - run: | - /usr/bin/git config --global --add safe.directory $(pwd) - /usr/bin/git config --global protocol.file.allow always - - name: Test with pytest run: | set -x diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 000000000..581cb69b2 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,19 @@ +repos: + - repo: https://github.com/PyCQA/flake8 + rev: 6.0.0 + hooks: + - id: flake8 + additional_dependencies: + [ + flake8-bugbear==22.12.6, + flake8-comprehensions==3.10.1, + flake8-typing-imports==1.14.0, + ] + exclude: ^doc|^git/ext/|^test/ + + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: check-merge-conflict + - id: check-toml + - id: check-yaml diff --git a/AUTHORS b/AUTHORS index 8ccc09fc0..ba5636db8 100644 --- a/AUTHORS +++ b/AUTHORS @@ -51,4 +51,5 @@ Contributors are: -Luke Twist -Joseph Hale -Santos Gallegos +-Wenhan Zhu Portions derived from other open source works and are clearly marked. diff --git a/README.md b/README.md index 54a735e53..94fcc76d9 100644 --- a/README.md +++ b/README.md @@ -22,12 +22,8 @@ implementation of 'git' in [Rust](https://www.rust-lang.org). GitPython is a python library used to interact with git repositories, high-level like git-porcelain, or low-level like git-plumbing. -It provides abstractions of git objects for easy access of repository data, and additionally -allows you to access the git repository more directly using either a pure python implementation, -or the faster, but more resource intensive _git command_ implementation. - -The object database implementation is optimized for handling large quantities of objects and large datasets, -which is achieved by using low-level structures and data streaming. +It provides abstractions of git objects for easy access of repository data often backed by calling the `git` +command-line program. ### DEVELOPMENT STATUS @@ -41,8 +37,7 @@ The project is open to contributions of all kinds, as well as new maintainers. ### REQUIREMENTS -GitPython needs the `git` executable to be installed on the system and available -in your `PATH` for most operations. +GitPython needs the `git` executable to be installed on the system and available in your `PATH` for most operations. If it is not in your `PATH`, you can help GitPython find it by setting the `GIT_PYTHON_GIT_EXECUTABLE=` environment variable. @@ -56,17 +51,19 @@ The installer takes care of installing them for you. If you have downloaded the source code: - python setup.py install +```bash +python setup.py install +``` or if you want to obtain a copy from the Pypi repository: - pip install GitPython +```bash +pip install GitPython +``` Both commands will install the required package dependencies. -A distribution package can be obtained for manual installation at: - - http://pypi.python.org/pypi/GitPython +A distribution package can be obtained for manual installation at: . If you like to clone from source, you can do it like so: @@ -96,9 +93,9 @@ See [Issue #525](https://github.com/gitpython-developers/GitPython/issues/525). ### RUNNING TESTS -_Important_: Right after cloning this repository, please be sure to have executed -the `./init-tests-after-clone.sh` script in the repository root. Otherwise -you will encounter test failures. +_Important_: Right after cloning this repository, please be sure to have +executed `git fetch --tags` followed by the `./init-tests-after-clone.sh` +script in the repository root. Otherwise you will encounter test failures. On _Windows_, make sure you have `git-daemon` in your PATH. For MINGW-git, the `git-daemon.exe` exists in `Git\mingw64\libexec\git-core\`; CYGWIN has no daemon, but should get along fine @@ -107,12 +104,14 @@ with MINGW's. Ensure testing libraries are installed. In the root directory, run: `pip install -r test-requirements.txt` -To lint, run: `flake8` +To lint, run: `pre-commit run --all-files` To typecheck, run: `mypy -p git` To test, run: `pytest` +For automatic code formatting run: `black git` + Configuration for flake8 is in the ./.flake8 file. Configurations for mypy, pytest and coverage.py are in ./pyproject.toml. @@ -145,9 +144,7 @@ Please have a look at the [contributions file][contributing]. - Run `git tag -s ` to tag the version in Git - Run `make release` - Close the milestone mentioned in the _changelog_ and create a new one. _Do not reuse milestones by renaming them_. -- set the upcoming version in the `VERSION` file, usually be - incrementing the patch level, and possibly by appending `-dev`. Probably you - want to `git push` once more. +- Got to [GitHub Releases](https://github.com/gitpython-developers/GitPython/releases) and publish a new one with the recently pushed tag. Generate the changelog. ### How to verify a release (DEPRECATED) @@ -162,7 +159,7 @@ tarballs. This script shows how to verify the tarball was indeed created by the authors of this project: -``` +```bash curl https://files.pythonhosted.org/packages/09/bc/ae32e07e89cc25b9e5c793d19a1e5454d30a8e37d95040991160f942519e/GitPython-3.1.8-py3-none-any.whl > gitpython.whl curl https://files.pythonhosted.org/packages/09/bc/ae32e07e89cc25b9e5c793d19a1e5454d30a8e37d95040991160f942519e/GitPython-3.1.8-py3-none-any.whl.asc > gitpython-signature.asc gpg --verify gitpython-signature.asc gitpython.whl @@ -170,7 +167,7 @@ gpg --verify gitpython-signature.asc gitpython.whl which outputs -``` +```bash gpg: Signature made Fr 4 Sep 10:04:50 2020 CST gpg: using RSA key 27C50E7F590947D7273A741E85194C08421980C9 gpg: Good signature from "Sebastian Thiel (YubiKey USB-C) " [ultimate] @@ -180,19 +177,19 @@ gpg: aka "Sebastian Thiel (In Rust I trust) ` object + +Initialize a new git Repo +######################### + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [1-test_init_repo_object] + :end-before: # ![1-test_init_repo_object] + +Existing local git Repo +####################### + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [2-test_init_repo_object] + :end-before: # ![2-test_init_repo_object] + +Clone from URL +############## + +For the rest of this tutorial we will use a clone from https://github.com/gitpython-developers/QuickStartTutorialFiles.git + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [1-test_cloned_repo_object] + :end-before: # ![1-test_cloned_repo_object] + + +Trees & Blobs +************** + +Latest Commit Tree +################## + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [12-test_cloned_repo_object] + :end-before: # ![12-test_cloned_repo_object] + +Any Commit Tree +############### + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [13-test_cloned_repo_object] + :end-before: # ![13-test_cloned_repo_object] + +Display level 1 Contents +######################## + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [14-test_cloned_repo_object] + :end-before: # ![14-test_cloned_repo_object] + +Recurse through the Tree +######################## + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [15-test_cloned_repo_object] + :end-before: # ![15-test_cloned_repo_object] + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [16-test_cloned_repo_object] + :end-before: # ![16-test_cloned_repo_object] + + + + +Usage +**************** + +Add file to staging area +######################## + + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [2-test_cloned_repo_object] + :end-before: # ![2-test_cloned_repo_object] + + Now lets add the updated file to git + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [3-test_cloned_repo_object] + :end-before: # ![3-test_cloned_repo_object] + + Notice the add method requires a list as a parameter + + Warning: If you experience any trouble with this, try to invoke :class:`git ` instead via repo.git.add(path) + +Commit +###### + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [4-test_cloned_repo_object] + :end-before: # ![4-test_cloned_repo_object] + +List of commits associated with a file +####################################### + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [5-test_cloned_repo_object] + :end-before: # ![5-test_cloned_repo_object] + + Notice this returns a generator object + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [6-test_cloned_repo_object] + :end-before: # ![6-test_cloned_repo_object] + + returns list of :class:`Commit ` objects + +Printing text files +#################### +Lets print the latest version of `/dir1/file2.txt` + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [17-test_cloned_repo_object] + :end-before: # ![17-test_cloned_repo_object] + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [18-test_cloned_repo_object] + :end-before: # ![18-test_cloned_repo_object] + + Previous version of `/dir1/file2.txt` + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [18.1-test_cloned_repo_object] + :end-before: # ![18.1-test_cloned_repo_object] + +Status +###### + * Untracked files + + Lets create a new file + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [7-test_cloned_repo_object] + :end-before: # ![7-test_cloned_repo_object] + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [8-test_cloned_repo_object] + :end-before: # ![8-test_cloned_repo_object] + + * Modified files + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [9-test_cloned_repo_object] + :end-before: # ![9-test_cloned_repo_object] + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [10-test_cloned_repo_object] + :end-before: # ![10-test_cloned_repo_object] + + returns a list of :class:`Diff ` objects + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [11-test_cloned_repo_object] + :end-before: # ![11-test_cloned_repo_object] + +Diffs +###### + +Compare staging area to head commit + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [11.1-test_cloned_repo_object] + :end-before: # ![11.1-test_cloned_repo_object] + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [11.2-test_cloned_repo_object] + :end-before: # ![11.2-test_cloned_repo_object] + +Compare commit to commit + + .. literalinclude:: ../../test/test_quick_doc.py + :language: python + :dedent: 8 + :start-after: # [11.3-test_cloned_repo_object] + :end-before: # ![11.3-test_cloned_repo_object] + + +More Resources +**************** + +Remember, this is just the beginning! There's a lot more you can achieve with GitPython in your development workflow. +To explore further possibilities and discover advanced features, check out the full :ref:`GitPython tutorial ` +and the :ref:`API Reference `. Happy coding! diff --git a/git/__init__.py b/git/__init__.py index f746e1fca..6196a42d7 100644 --- a/git/__init__.py +++ b/git/__init__.py @@ -56,8 +56,8 @@ def _init_externals() -> None: Actor, rmtree, ) -except GitError as exc: - raise ImportError("%s: %s" % (exc.__class__.__name__, exc)) from exc +except GitError as _exc: + raise ImportError("%s: %s" % (_exc.__class__.__name__, _exc)) from _exc # } END imports @@ -76,7 +76,7 @@ def refresh(path: Optional[PathLike] = None) -> None: if not Git.refresh(path=path): return if not FetchInfo.refresh(): - return + return # type: ignore [unreachable] GIT_OK = True @@ -87,6 +87,6 @@ def refresh(path: Optional[PathLike] = None) -> None: ################# try: refresh() -except Exception as exc: - raise ImportError("Failed to initialize: {0}".format(exc)) from exc +except Exception as _exc: + raise ImportError("Failed to initialize: {0}".format(_exc)) from _exc ################# diff --git a/git/cmd.py b/git/cmd.py index 9ef1e3a65..d6f8f946a 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -5,7 +5,7 @@ # the BSD License: http://www.opensource.org/licenses/bsd-license.php from __future__ import annotations import re -from contextlib import contextmanager +import contextlib import io import logging import os @@ -23,7 +23,7 @@ is_win, ) from git.exc import CommandError -from git.util import is_cygwin_git, cygpath, expand_path, remove_password_if_present +from git.util import is_cygwin_git, cygpath, expand_path, remove_password_if_present, patch_env from .exc import GitCommandError, GitCommandNotFound, UnsafeOptionError, UnsafeProtocolError from .util import ( @@ -122,6 +122,7 @@ def handle_process_output( To specify a timeout in seconds for the git command, after which the process should be killed. """ + # Use 2 "pump" threads and wait for both to finish. def pump_stream( cmdline: List[str], @@ -154,7 +155,7 @@ def pump_stream( p_stdout = process.proc.stdout if process.proc else None p_stderr = process.proc.stderr if process.proc else None else: - process = cast(Popen, process) + process = cast(Popen, process) # type: ignore [redundant-cast] cmdline = getattr(process, "args", "") p_stdout = process.stdout p_stderr = process.stderr @@ -210,7 +211,7 @@ def dashify(string: str) -> str: return string.replace("_", "-") -def slots_to_dict(self: object, exclude: Sequence[str] = ()) -> Dict[str, Any]: +def slots_to_dict(self: "Git", exclude: Sequence[str] = ()) -> Dict[str, Any]: return {s: getattr(self, s) for s in self.__slots__ if s not in exclude} @@ -488,10 +489,7 @@ def check_unsafe_options(cls, options: List[str], unsafe_options: List[str]) -> """ # Options can be of the form `foo` or `--foo bar` `--foo=bar`, # so we need to check if they start with "--foo" or if they are equal to "foo". - bare_unsafe_options = [ - option.lstrip("-") - for option in unsafe_options - ] + bare_unsafe_options = [option.lstrip("-") for option in unsafe_options] for option in options: for unsafe_option, bare_option in zip(unsafe_options, bare_unsafe_options): if option.startswith(unsafe_option) or option == bare_option: @@ -695,15 +693,14 @@ def __iter__(self) -> "Git.CatFileContentStream": return self def __next__(self) -> bytes: - return next(self) - - def next(self) -> bytes: line = self.readline() if not line: raise StopIteration return line + next = __next__ + def __del__(self) -> None: bytes_left = self._size - self._nbr if bytes_left: @@ -735,6 +732,7 @@ def __init__(self, working_dir: Union[None, PathLike] = None): def __getattr__(self, name: str) -> Any: """A convenience method as it allows to call the command as if it was an object. + :return: Callable object that will execute call _call_process with your arguments.""" if name[0] == "_": return LazyMixin.__getattr__(self, name) @@ -915,7 +913,7 @@ def execute( render the repository incapable of accepting changes until the lock is manually removed. :param strip_newline_in_stdout: - Whether to strip the trailing `\n` of the command stdout. + Whether to strip the trailing ``\\n`` of the command stdout. :return: * str(output) if extended_output = False (Default) * tuple(int(status), str(stdout), str(stderr)) if extended_output = True @@ -965,8 +963,11 @@ def execute( redacted_command, '"kill_after_timeout" feature is not supported on Windows.', ) + # Only search PATH, not CWD. This must be in the *caller* environment. The "1" can be any value. + maybe_patch_caller_env = patch_env("NoDefaultCurrentDirectoryInExePath", "1") else: cmd_not_found_exception = FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable + maybe_patch_caller_env = contextlib.nullcontext() # end handle stdout_sink = PIPE if with_stdout else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb") @@ -982,21 +983,21 @@ def execute( istream_ok, ) try: - proc = Popen( - command, - env=env, - cwd=cwd, - bufsize=-1, - stdin=istream or DEVNULL, - stderr=PIPE, - stdout=stdout_sink, - shell=shell is not None and shell or self.USE_SHELL, - close_fds=is_posix, # unsupported on windows - universal_newlines=universal_newlines, - creationflags=PROC_CREATIONFLAGS, - **subprocess_kwargs, - ) - + with maybe_patch_caller_env: + proc = Popen( + command, + env=env, + cwd=cwd, + bufsize=-1, + stdin=istream or DEVNULL, + stderr=PIPE, + stdout=stdout_sink, + shell=shell is not None and shell or self.USE_SHELL, + close_fds=is_posix, # unsupported on windows + universal_newlines=universal_newlines, + creationflags=PROC_CREATIONFLAGS, + **subprocess_kwargs, + ) except cmd_not_found_exception as err: raise GitCommandNotFound(redacted_command, err) from err else: @@ -1146,7 +1147,7 @@ def update_environment(self, **kwargs: Any) -> Dict[str, Union[str, None]]: del self._environment[key] return old_env - @contextmanager + @contextlib.contextmanager def custom_environment(self, **kwargs: Any) -> Iterator[None]: """ A context manager around the above ``update_environment`` method to restore the @@ -1194,7 +1195,6 @@ def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any @classmethod def _unpack_args(cls, arg_list: Sequence[str]) -> List[str]: - outlist = [] if isinstance(arg_list, (list, tuple)): for arg in arg_list: @@ -1384,7 +1384,8 @@ def get_object_header(self, ref: str) -> Tuple[str, str, int]: def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]: """As get_object_header, but returns object data as well - :return: (hexsha, type_string, size_as_int,data_string) + + :return: (hexsha, type_string, size_as_int, data_string) :note: not threadsafe""" hexsha, typename, size, stream = self.stream_object_data(ref) data = stream.read(size) diff --git a/git/config.py b/git/config.py index 71d7ea689..1973111eb 100644 --- a/git/config.py +++ b/git/config.py @@ -248,7 +248,6 @@ def items_all(self) -> List[Tuple[str, List[_T]]]: def get_config_path(config_level: Lit_config_levels) -> str: - # we do not support an absolute path of the gitconfig on windows , # use the global config instead if is_win and config_level == "system": @@ -265,8 +264,8 @@ def get_config_path(config_level: Lit_config_levels) -> str: raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path") else: # Should not reach here. Will raise ValueError if does. Static typing will warn missing elifs - assert_never( - config_level, # type: ignore[unreachable] + assert_never( # type: ignore[unreachable] + config_level, ValueError(f"Invalid configuration level: {config_level!r}"), ) @@ -655,7 +654,7 @@ def write_section(name: str, section_dict: _OMD) -> None: values: Sequence[str] # runtime only gets str in tests, but should be whatever _OMD stores v: str - for (key, values) in section_dict.items_all(): + for key, values in section_dict.items_all(): if key == "__name__": continue @@ -796,6 +795,7 @@ def get_values( :raise TypeError: in case the value could not be understood Otherwise the exceptions known to the ConfigParser will be raised.""" try: + self.sections() lst = self._sections[section].getall(option) except Exception: if default is not None: diff --git a/git/diff.py b/git/diff.py index c4424592f..1424ff3ad 100644 --- a/git/diff.py +++ b/git/diff.py @@ -144,7 +144,10 @@ def diff( args.append("--abbrev=40") # we need full shas args.append("--full-index") # get full index paths, not only filenames - args.append("-M") # check for renames, in both formats + # remove default '-M' arg (check for renames) if user is overriding it + if not any(x in kwargs for x in ("find_renames", "no_renames", "M")): + args.append("-M") + if create_patch: args.append("-p") else: @@ -335,7 +338,6 @@ def __init__( change_type: Optional[Lit_change_type], score: Optional[int], ) -> None: - assert a_rawpath is None or isinstance(a_rawpath, bytes) assert b_rawpath is None or isinstance(b_rawpath, bytes) self.a_rawpath = a_rawpath diff --git a/git/exc.py b/git/exc.py index 9b69a5889..775528bf6 100644 --- a/git/exc.py +++ b/git/exc.py @@ -139,7 +139,6 @@ def __init__( valid_files: Sequence[PathLike], failed_reasons: List[str], ) -> None: - Exception.__init__(self, message) self.failed_files = failed_files self.failed_reasons = failed_reasons @@ -170,7 +169,6 @@ def __init__( stderr: Union[bytes, str, None] = None, stdout: Union[bytes, str, None] = None, ) -> None: - super(HookExecutionError, self).__init__(command, status, stderr, stdout) self._msg = "Hook('%s') failed%s" diff --git a/git/index/base.py b/git/index/base.py index 17d18db58..193baf3ad 100644 --- a/git/index/base.py +++ b/git/index/base.py @@ -4,6 +4,7 @@ # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php +from contextlib import ExitStack import datetime import glob from io import BytesIO @@ -352,27 +353,22 @@ def from_tree(cls, repo: "Repo", *treeish: Treeish, **kwargs: Any) -> "IndexFile # tmp file created in git home directory to be sure renaming # works - /tmp/ dirs could be on another device - tmp_index = tempfile.mktemp("", "", repo.git_dir) - arg_list.append("--index-output=%s" % tmp_index) - arg_list.extend(treeish) - - # move current index out of the way - otherwise the merge may fail - # as it considers existing entries. moving it essentially clears the index. - # Unfortunately there is no 'soft' way to do it. - # The TemporaryFileSwap assure the original file get put back - if repo.git_dir: - index_handler = TemporaryFileSwap(join_path_native(repo.git_dir, "index")) - try: + with ExitStack() as stack: + tmp_index = stack.enter_context(tempfile.NamedTemporaryFile(dir=repo.git_dir)) + arg_list.append("--index-output=%s" % tmp_index.name) + arg_list.extend(treeish) + + # move current index out of the way - otherwise the merge may fail + # as it considers existing entries. moving it essentially clears the index. + # Unfortunately there is no 'soft' way to do it. + # The TemporaryFileSwap assure the original file get put back + + stack.enter_context(TemporaryFileSwap(join_path_native(repo.git_dir, "index"))) repo.git.read_tree(*arg_list, **kwargs) - index = cls(repo, tmp_index) + index = cls(repo, tmp_index.name) index.entries # force it to read the file as we will delete the temp-file - del index_handler # release as soon as possible - finally: - if osp.exists(tmp_index): - os.remove(tmp_index) - # END index merge handling - - return index + return index + # END index merge handling # UTILITIES @unbare_repo @@ -660,7 +656,7 @@ def _store_path(self, filepath: PathLike, fprogress: Callable) -> BaseIndexEntry def _entries_for_paths( self, paths: List[str], - path_rewriter: Callable, + path_rewriter: Union[Callable, None], fprogress: Callable, entries: List[BaseIndexEntry], ) -> List[BaseIndexEntry]: @@ -982,12 +978,12 @@ def move( Additional arguments you would like to pass to git-mv, such as dry_run or force. - :return:List(tuple(source_path_string, destination_path_string), ...) + :return: List(tuple(source_path_string, destination_path_string), ...) A list of pairs, containing the source file moved as well as its actual destination. Relative to the repository root. :raise ValueError: If only one item was given - GitCommandError: If git could not handle your request""" + :raise GitCommandError: If git could not handle your request""" args = [] if skip_errors: args.append("-k") @@ -1156,7 +1152,6 @@ def checkout( unknown_lines = [] def handle_stderr(proc: "Popen[bytes]", iter_checked_out_files: Iterable[PathLike]) -> None: - stderr_IO = proc.stderr if not stderr_IO: return None # return early if stderr empty diff --git a/git/index/fun.py b/git/index/fun.py index 4659ac898..b50f1f465 100644 --- a/git/index/fun.py +++ b/git/index/fun.py @@ -76,12 +76,13 @@ def hook_path(name: str, git_dir: PathLike) -> str: return osp.join(git_dir, "hooks", name) -def _has_file_extension(path): +def _has_file_extension(path: str) -> str: return osp.splitext(path)[1] def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None: """Run the commit hook of the given name. Silently ignores hooks that do not exist. + :param name: name of hook, like 'pre-commit' :param index: IndexFile instance :param args: arguments passed to hook file @@ -101,7 +102,7 @@ def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None: relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix() cmd = ["bash.exe", relative_hp] - cmd = subprocess.Popen( + process = subprocess.Popen( cmd + list(args), env=env, stdout=subprocess.PIPE, @@ -115,13 +116,13 @@ def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None: else: stdout_list: List[str] = [] stderr_list: List[str] = [] - handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process) + handle_process_output(process, stdout_list.append, stderr_list.append, finalize_process) stdout = "".join(stdout_list) stderr = "".join(stderr_list) - if cmd.returncode != 0: + if process.returncode != 0: stdout = force_text(stdout, defenc) stderr = force_text(stderr, defenc) - raise HookExecutionError(hp, cmd.returncode, stderr, stdout) + raise HookExecutionError(hp, process.returncode, stderr, stdout) # end handle return code @@ -234,11 +235,13 @@ def read_cache( stream: IO[bytes], ) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]: """Read a cache file from the given stream + :return: tuple(version, entries_dict, extension_data, content_sha) - * version is the integer version number - * entries dict is a dictionary which maps IndexEntry instances to a path at a stage - * extension_data is '' or 4 bytes of type + 4 bytes of size + size bytes - * content_sha is a 20 byte sha on all cache file contents""" + + * version is the integer version number + * entries dict is a dictionary which maps IndexEntry instances to a path at a stage + * extension_data is '' or 4 bytes of type + 4 bytes of size + size bytes + * content_sha is a 20 byte sha on all cache file contents""" version, num_entries = read_header(stream) count = 0 entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {} @@ -391,7 +394,6 @@ def aggressive_tree_merge(odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]) -> out.append(_tree_entry_to_baseindexentry(theirs, 0)) # END handle modification else: - if ours[0] != base[0] or ours[1] != base[1]: # they deleted it, we changed it, conflict out.append(_tree_entry_to_baseindexentry(base, 1)) diff --git a/git/index/util.py b/git/index/util.py index bfc7fadd6..6cf838f3b 100644 --- a/git/index/util.py +++ b/git/index/util.py @@ -3,6 +3,7 @@ import os import struct import tempfile +from types import TracebackType from git.compat import is_win @@ -11,7 +12,7 @@ # typing ---------------------------------------------------------------------- -from typing import Any, Callable, TYPE_CHECKING +from typing import Any, Callable, TYPE_CHECKING, Optional, Type from git.types import PathLike, _T @@ -47,12 +48,21 @@ def __init__(self, file_path: PathLike) -> None: except OSError: pass - def __del__(self) -> None: + def __enter__(self) -> "TemporaryFileSwap": + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> bool: if osp.isfile(self.tmp_file_path): if is_win and osp.exists(self.file_path): os.remove(self.file_path) os.rename(self.tmp_file_path, self.file_path) - # END temp file exists + + return False # { Decorators diff --git a/git/objects/base.py b/git/objects/base.py index 9d0057254..eb9a8ac3d 100644 --- a/git/objects/base.py +++ b/git/objects/base.py @@ -143,6 +143,7 @@ def data_stream(self) -> "OStream": def stream_data(self, ostream: "OStream") -> "Object": """Writes our data directly to the given output stream + :param ostream: File object compatible stream object. :return: self""" istream = self.repo.odb.stream(self.binsha) diff --git a/git/objects/commit.py b/git/objects/commit.py index 82d2387b3..6db3ea0f3 100644 --- a/git/objects/commit.py +++ b/git/objects/commit.py @@ -26,6 +26,7 @@ import os from io import BytesIO import logging +from collections import defaultdict # typing ------------------------------------------------------------------ @@ -324,19 +325,81 @@ def stats(self) -> Stats: :return: git.Stats""" if not self.parents: - text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, root=True) + text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, no_renames=True, root=True) text2 = "" for line in text.splitlines()[1:]: (insertions, deletions, filename) = line.split("\t") text2 += "%s\t%s\t%s\n" % (insertions, deletions, filename) text = text2 else: - text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, "--", numstat=True) + text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, "--", numstat=True, no_renames=True) return Stats._list_from_string(self.repo, text) @property - def trailers(self) -> Dict: - """Get the trailers of the message as dictionary + def trailers(self) -> Dict[str, str]: + """Get the trailers of the message as a dictionary + + :note: This property is deprecated, please use either ``Commit.trailers_list`` or ``Commit.trailers_dict``. + + :return: + Dictionary containing whitespace stripped trailer information. + Only contains the latest instance of each trailer key. + """ + return {k: v[0] for k, v in self.trailers_dict.items()} + + @property + def trailers_list(self) -> List[Tuple[str, str]]: + """Get the trailers of the message as a list + + Git messages can contain trailer information that are similar to RFC 822 + e-mail headers (see: https://git-scm.com/docs/git-interpret-trailers). + + This functions calls ``git interpret-trailers --parse`` onto the message + to extract the trailer information, returns the raw trailer data as a list. + + Valid message with trailer:: + + Subject line + + some body information + + another information + + key1: value1.1 + key1: value1.2 + key2 : value 2 with inner spaces + + + Returned list will look like this:: + + [ + ("key1", "value1.1"), + ("key1", "value1.2"), + ("key2", "value 2 with inner spaces"), + ] + + + :return: + List containing key-value tuples of whitespace stripped trailer information. + """ + cmd = ["git", "interpret-trailers", "--parse"] + proc: Git.AutoInterrupt = self.repo.git.execute(cmd, as_process=True, istream=PIPE) # type: ignore + trailer: str = proc.communicate(str(self.message).encode())[0].decode("utf8") + trailer = trailer.strip() + + if not trailer: + return [] + + trailer_list = [] + for t in trailer.split("\n"): + key, val = t.split(":", 1) + trailer_list.append((key.strip(), val.strip())) + + return trailer_list + + @property + def trailers_dict(self) -> Dict[str, List[str]]: + """Get the trailers of the message as a dictionary Git messages can contain trailer information that are similar to RFC 822 e-mail headers (see: https://git-scm.com/docs/git-interpret-trailers). @@ -345,9 +408,7 @@ def trailers(self) -> Dict: to extract the trailer information. The key value pairs are stripped of leading and trailing whitespaces before they get saved into a dictionary. - Valid message with trailer: - - .. code-block:: + Valid message with trailer:: Subject line @@ -355,32 +416,27 @@ def trailers(self) -> Dict: another information - key1: value1 + key1: value1.1 + key1: value1.2 key2 : value 2 with inner spaces - dictionary will look like this: - .. code-block:: + Returned dictionary will look like this:: { - "key1": "value1", - "key2": "value 2 with inner spaces" + "key1": ["value1.1", "value1.2"], + "key2": ["value 2 with inner spaces"], } - :return: Dictionary containing whitespace stripped trailer information + :return: + Dictionary containing whitespace stripped trailer information. + Mapping trailer keys to a list of their corresponding values. """ - d = {} - cmd = ["git", "interpret-trailers", "--parse"] - proc: Git.AutoInterrupt = self.repo.git.execute(cmd, as_process=True, istream=PIPE) # type: ignore - trailer: str = proc.communicate(str(self.message).encode())[0].decode() - if trailer.endswith("\n"): - trailer = trailer[0:-1] - if trailer != "": - for line in trailer.split("\n"): - key, value = line.split(":", 1) - d[key.strip()] = value.strip() - return d + d = defaultdict(list) + for key, val in self.trailers_list: + d[key].append(val) + return dict(d) @classmethod def _iter_from_process_or_stream(cls, repo: "Repo", proc_or_stream: Union[Popen, IO]) -> Iterator["Commit"]: @@ -402,7 +458,7 @@ def _iter_from_process_or_stream(cls, repo: "Repo", proc_or_stream: Union[Popen, if proc_or_stream.stdout is not None: stream = proc_or_stream.stdout elif hasattr(proc_or_stream, "readline"): - proc_or_stream = cast(IO, proc_or_stream) + proc_or_stream = cast(IO, proc_or_stream) # type: ignore [redundant-cast] stream = proc_or_stream readline = stream.readline diff --git a/git/objects/fun.py b/git/objects/fun.py index 001e10e47..043eec721 100644 --- a/git/objects/fun.py +++ b/git/objects/fun.py @@ -37,6 +37,7 @@ def tree_to_stream(entries: Sequence[EntryTup], write: Callable[["ReadableBuffer"], Union[int, None]]) -> None: """Write the give list of entries into a stream using its write method + :param entries: **sorted** list of tuples with (binsha, mode, name) :param write: write method which takes a data string""" ord_zero = ord("0") @@ -68,6 +69,7 @@ def tree_to_stream(entries: Sequence[EntryTup], write: Callable[["ReadableBuffer def tree_entries_from_data(data: bytes) -> List[EntryTup]: """Reads the binary representation of a tree and returns tuples of Tree items + :param data: data block with tree data (as bytes) :return: list(tuple(binsha, mode, tree_relative_path), ...)""" ord_zero = ord("0") @@ -188,7 +190,6 @@ def traverse_trees_recursive( # is a tree. If the match is a non-tree item, put it into the result. # Processed items will be set None for ti, tree_data in enumerate(trees_data): - for ii, item in enumerate(tree_data): if not item: continue diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py index 9aa9deb27..0d20305c6 100644 --- a/git/objects/submodule/base.py +++ b/git/objects/submodule/base.py @@ -287,7 +287,9 @@ def _clone_repo( :param url: url to clone from :param path: repository - relative path to the submodule checkout location :param name: canonical of the submodule - :param kwrags: additinoal arguments given to git.clone""" + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack + :param kwargs: additional arguments given to git.clone""" module_abspath = cls._module_abspath(repo, path, name) module_checkout_path = module_abspath if cls._need_gitfile_submodules(repo.git): @@ -411,6 +413,8 @@ def add( as its value. :param clone_multi_options: A list of Clone options. Please see ``git.repo.base.Repo.clone`` for details. + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :return: The newly created submodule instance :note: works atomically, such that no change will be done if the repository update fails for instance""" @@ -581,6 +585,8 @@ def update( as its value. :param clone_multi_options: list of Clone options. Please see ``git.repo.base.Repo.clone`` for details. Only take effect with `init` option. + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :note: does nothing in bare repositories :note: method is definitely not atomic if recurisve is True :return: self""" @@ -1396,6 +1402,10 @@ def iter_items( # END handle keyerror # END handle critical error + # Make sure we are looking at a submodule object + if type(sm) != git.objects.submodule.base.Submodule: + continue + # fill in remaining info - saves time as it doesn't have to be parsed again sm._name = n if pc != repo.commit(): diff --git a/git/objects/tree.py b/git/objects/tree.py index b72e88c48..a9b491e23 100644 --- a/git/objects/tree.py +++ b/git/objects/tree.py @@ -128,6 +128,7 @@ def set_done(self) -> "TreeModifier": """Call this method once you are done modifying the tree information. It may be called several times, but be aware that each call will cause a sort operation + :return self:""" merge_sort(self._cache, git_cmp) return self @@ -175,6 +176,7 @@ def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None: """Add the given item to the tree, its correctness is assumed, which puts the caller into responsibility to assure the input is correct. For more information on the parameters, see ``add`` + :param binsha: 20 byte binary sha""" assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str) tree_cache = (binsha, mode, name) @@ -259,8 +261,8 @@ def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[ def join(self, file: str) -> IndexObjUnion: """Find the named object in this tree's contents - :return: ``git.Blob`` or ``git.Tree`` or ``git.Submodule`` + :return: ``git.Blob`` or ``git.Tree`` or ``git.Submodule`` :raise KeyError: if given file or tree does not exist in tree""" msg = "Blob or Tree named %r not found" if "/" in file: diff --git a/git/objects/util.py b/git/objects/util.py index 636a58316..56938507e 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -137,21 +137,25 @@ def get_object_type_by_name( def utctz_to_altz(utctz: str) -> int: - """we convert utctz to the timezone in seconds, it is the format time.altzone - returns. Git stores it as UTC timezone which has the opposite sign as well, - which explains the -1 * ( that was made explicit here ) - :param utctz: git utc timezone string, i.e. +0200""" - return -1 * int(float(utctz) / 100 * 3600) + """Convert a git timezone offset into a timezone offset west of + UTC in seconds (compatible with time.altzone). + + :param utctz: git utc timezone string, i.e. +0200 + """ + int_utctz = int(utctz) + seconds = (abs(int_utctz) // 100) * 3600 + (abs(int_utctz) % 100) * 60 + return seconds if int_utctz < 0 else -seconds def altz_to_utctz_str(altz: float) -> str: - """As above, but inverses the operation, returning a string that can be used - in commit objects""" - utci = -1 * int((float(altz) / 3600) * 100) - utcs = str(abs(utci)) - utcs = "0" * (4 - len(utcs)) + utcs - prefix = (utci < 0 and "-") or "+" - return prefix + utcs + """Convert a timezone offset west of UTC in seconds into a git timezone offset string + + :param altz: timezone offset in seconds west of UTC + """ + hours = abs(altz) // 3600 + minutes = (abs(altz) % 3600) // 60 + sign = "-" if altz >= 60 else "+" + return "{}{:02}{:02}".format(sign, hours, minutes) def verify_utctz(offset: str) -> str: diff --git a/git/refs/log.py b/git/refs/log.py index a5f4de58b..1f86356a4 100644 --- a/git/refs/log.py +++ b/git/refs/log.py @@ -253,6 +253,7 @@ def entry_at(cls, filepath: PathLike, index: int) -> "RefLogEntry": def to_file(self, filepath: PathLike) -> None: """Write the contents of the reflog instance to a file at the given filepath. + :param filepath: path to file, parent directories are assumed to exist""" lfd = LockedFD(filepath) assure_directory_exists(filepath, is_file=True) @@ -326,6 +327,7 @@ def append_entry( def write(self) -> "RefLog": """Write this instance's data to the file we are originating from + :return: self""" if self._path is None: raise ValueError("Instance was not initialized with a path, use to_file(...) instead") diff --git a/git/refs/reference.py b/git/refs/reference.py index ca43cc430..4f9e3a0a7 100644 --- a/git/refs/reference.py +++ b/git/refs/reference.py @@ -49,8 +49,8 @@ class Reference(SymbolicReference, LazyMixin, IterableObj): def __init__(self, repo: "Repo", path: PathLike, check_path: bool = True) -> None: """Initialize this instance - :param repo: Our parent repository + :param repo: Our parent repository :param path: Path relative to the .git/ directory pointing to the ref in question, i.e. refs/heads/master @@ -73,6 +73,7 @@ def set_object( logmsg: Union[str, None] = None, ) -> "Reference": """Special version which checks if the head-log needs an update as well + :return: self""" oldbinsha = None if logmsg is not None: diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py index 33c3bf15b..5c293aa7b 100644 --- a/git/refs/symbolic.py +++ b/git/refs/symbolic.py @@ -168,6 +168,8 @@ def _get_ref_info_helper( """Return: (str(sha), str(target_ref_path)) if available, the sha the file at rela_path points to, or None. target_ref_path is the reference we point to, or None""" + if ".." in str(ref_path): + raise ValueError(f"Invalid reference '{ref_path}'") tokens: Union[None, List[str], Tuple[str, str]] = None repodir = _git_dir(repo, ref_path) try: diff --git a/git/remote.py b/git/remote.py index 4240223e8..95a2b8ac6 100644 --- a/git/remote.py +++ b/git/remote.py @@ -641,6 +641,7 @@ def set_url( :param new_url: string being the URL to add as an extra remote URL :param old_url: when set, replaces this URL with new_url for the remote + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext :return: self """ if not allow_unsafe_protocols: @@ -660,6 +661,7 @@ def add_url(self, url: str, allow_unsafe_protocols: bool = False, **kwargs: Any) multiple URLs for a single remote. :param url: string being the URL to add as an extra remote URL + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext :return: self """ return self.set_url(url, add=True, allow_unsafe_protocols=allow_unsafe_protocols) @@ -756,9 +758,11 @@ def stale_refs(self) -> IterableList[Reference]: @classmethod def create(cls, repo: "Repo", name: str, url: str, allow_unsafe_protocols: bool = False, **kwargs: Any) -> "Remote": """Create a new remote to the given repository + :param repo: Repository instance that is to receive the new remote :param name: Desired name of the remote :param url: URL which corresponds to the remote's name + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext :param kwargs: Additional arguments to be passed to the git-remote add command :return: New Remote instance :raise GitCommandError: in case an origin with that name already exists""" @@ -778,6 +782,7 @@ def add(cls, repo: "Repo", name: str, url: str, **kwargs: Any) -> "Remote": @classmethod def remove(cls, repo: "Repo", name: str) -> str: """Remove the remote with the given name + :return: the passed remote name to remove """ repo.git.remote("rm", name) @@ -790,6 +795,7 @@ def remove(cls, repo: "Repo", name: str) -> str: def rename(self, new_name: str) -> "Remote": """Rename self to the given new_name + :return: self""" if self.name == new_name: return self @@ -820,7 +826,6 @@ def _get_fetch_info_from_stderr( progress: Union[Callable[..., Any], RemoteProgress, None], kill_after_timeout: Union[None, float] = None, ) -> IterableList["FetchInfo"]: - progress = to_progress_instance(progress) # skip first line as it is some remote info we are not interested in @@ -975,6 +980,8 @@ def fetch( :param kill_after_timeout: To specify a timeout in seconds for the git command, after which the process should be killed. It is set to None by default. + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :param kwargs: Additional arguments to be passed to git-fetch :return: IterableList(FetchInfo, ...) list of FetchInfo instances providing detailed @@ -1021,11 +1028,13 @@ def pull( """Pull changes from the given branch, being the same as a fetch followed by a merge of branch with your local branch. - :param refspec: see 'fetch' method - :param progress: see 'push' method - :param kill_after_timeout: see 'fetch' method + :param refspec: see :meth:`fetch` method + :param progress: see :meth:`push` method + :param kill_after_timeout: see :meth:`fetch` method + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :param kwargs: Additional arguments to be passed to git-pull - :return: Please see 'fetch' method""" + :return: Please see :meth:`fetch` method""" if refspec is None: # No argument refspec, then ensure the repo's config has a fetch refspec. self._assert_refspec() @@ -1074,6 +1083,8 @@ def push( :param kill_after_timeout: To specify a timeout in seconds for the git command, after which the process should be killed. It is set to None by default. + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --receive-pack :param kwargs: Additional arguments to be passed to git-push :return: A ``PushInfoList`` object, where each list member diff --git a/git/repo/base.py b/git/repo/base.py index d4463f1e1..113fca459 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -9,6 +9,9 @@ import re import shlex import warnings + +from pathlib import Path + from gitdb.db.loose import LooseObjectDB from gitdb.exc import BadObject @@ -57,6 +60,7 @@ PathLike, Lit_config_levels, Commit_ish, + CallableProgress, Tree_ish, assert_never, ) @@ -112,7 +116,7 @@ class Repo(object): 'working_dir' is the working directory of the git command, which is the working tree directory if available or the .git directory in case of bare repositories - 'working_tree_dir' is the working tree directory, but will raise AssertionError + 'working_tree_dir' is the working tree directory, but will return None if we are a bare repository. 'git_dir' is the .git repository directory, which is always set.""" @@ -120,9 +124,9 @@ class Repo(object): DAEMON_EXPORT_FILE = "git-daemon-export-ok" git = cast("Git", None) # Must exist, or __del__ will fail in case we raise on `__init__()` - working_dir: Optional[PathLike] = None + working_dir: PathLike _working_tree_dir: Optional[PathLike] = None - git_dir: PathLike = "" + git_dir: PathLike _common_dir: PathLike = "" # precompiled regex @@ -212,13 +216,14 @@ def __init__( ## Walk up the path to find the `.git` dir. # curpath = epath + git_dir = None while curpath: # ABOUT osp.NORMPATH # It's important to normalize the paths, as submodules will otherwise initialize their # repo instances with paths that depend on path-portions that will not exist after being # removed. It's just cleaner. if is_git_dir(curpath): - self.git_dir = curpath + git_dir = curpath # from man git-config : core.worktree # Set the path to the root of the working tree. If GIT_COMMON_DIR environment # variable is set, core.worktree is ignored and not used for determining the @@ -227,9 +232,9 @@ def __init__( # directory, which is either specified by GIT_DIR, or automatically discovered. # If GIT_DIR is specified but none of GIT_WORK_TREE and core.worktree is specified, # the current working directory is regarded as the top level of your working tree. - self._working_tree_dir = os.path.dirname(self.git_dir) + self._working_tree_dir = os.path.dirname(git_dir) if os.environ.get("GIT_COMMON_DIR") is None: - gitconf = self.config_reader("repository") + gitconf = self._config_reader("repository", git_dir) if gitconf.has_option("core", "worktree"): self._working_tree_dir = gitconf.get("core", "worktree") if "GIT_WORK_TREE" in os.environ: @@ -239,14 +244,14 @@ def __init__( dotgit = osp.join(curpath, ".git") sm_gitpath = find_submodule_git_dir(dotgit) if sm_gitpath is not None: - self.git_dir = osp.normpath(sm_gitpath) + git_dir = osp.normpath(sm_gitpath) sm_gitpath = find_submodule_git_dir(dotgit) if sm_gitpath is None: sm_gitpath = find_worktree_git_dir(dotgit) if sm_gitpath is not None: - self.git_dir = expand_path(sm_gitpath, expand_vars) + git_dir = expand_path(sm_gitpath, expand_vars) self._working_tree_dir = curpath break @@ -257,8 +262,9 @@ def __init__( break # END while curpath - if self.git_dir is None: + if git_dir is None: raise InvalidGitRepositoryError(epath) + self.git_dir = git_dir self._bare = False try: @@ -268,7 +274,7 @@ def __init__( pass try: - common_dir = open(osp.join(self.git_dir, "commondir"), "rt").readlines()[0].strip() + common_dir = (Path(self.git_dir) / "commondir").read_text().splitlines()[0].strip() self._common_dir = osp.join(self.git_dir, common_dir) except OSError: self._common_dir = "" @@ -279,7 +285,7 @@ def __init__( self._working_tree_dir = None # END working dir handling - self.working_dir: Optional[PathLike] = self._working_tree_dir or self.common_dir + self.working_dir: PathLike = self._working_tree_dir or self.common_dir self.git = self.GitCommandWrapperType(self.working_dir) # special handling, in special times @@ -317,7 +323,7 @@ def close(self) -> None: gc.collect() def __eq__(self, rhs: object) -> bool: - if isinstance(rhs, Repo) and self.git_dir: + if isinstance(rhs, Repo): return self.git_dir == rhs.git_dir return False @@ -329,14 +335,12 @@ def __hash__(self) -> int: # Description property def _get_description(self) -> str: - if self.git_dir: - filename = osp.join(self.git_dir, "description") + filename = osp.join(self.git_dir, "description") with open(filename, "rb") as fp: return fp.read().rstrip().decode(defenc) def _set_description(self, descr: str) -> None: - if self.git_dir: - filename = osp.join(self.git_dir, "description") + filename = osp.join(self.git_dir, "description") with open(filename, "wb") as fp: fp.write((descr + "\n").encode(defenc)) @@ -354,13 +358,7 @@ def common_dir(self) -> PathLike: """ :return: The git dir that holds everything except possibly HEAD, FETCH_HEAD, ORIG_HEAD, COMMIT_EDITMSG, index, and logs/.""" - if self._common_dir: - return self._common_dir - elif self.git_dir: - return self.git_dir - else: - # or could return "" - raise InvalidGitRepositoryError() + return self._common_dir or self.git_dir @property def bare(self) -> bool: @@ -403,6 +401,7 @@ def head(self) -> "HEAD": @property def remotes(self) -> "IterableList[Remote]": """A list of Remote objects allowing to access and manipulate remotes + :return: ``git.IterableList(Remote, ...)``""" return Remote.list_items(self) @@ -443,6 +442,7 @@ def create_submodule(self, *args: Any, **kwargs: Any) -> Submodule: def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]: """An iterator yielding Submodule instances, see Traversable interface for a description of args and kwargs + :return: Iterator""" return RootModule(self).traverse(*args, **kwargs) @@ -457,6 +457,7 @@ def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]: @property def tags(self) -> "IterableList[TagReference]": """A list of ``Tag`` objects that are available in this repo + :return: ``git.IterableList(TagReference, ...)``""" return TagReference.list_items(self) @@ -498,7 +499,7 @@ def delete_head(self, *heads: "Union[str, Head]", **kwargs: Any) -> None: def create_tag( self, path: PathLike, - ref: Union[str, 'SymbolicReference'] = "HEAD", + ref: Union[str, "SymbolicReference"] = "HEAD", message: Optional[str] = None, force: bool = False, **kwargs: Any, @@ -526,7 +527,9 @@ def delete_remote(self, remote: "Remote") -> str: """Delete the given remote.""" return Remote.remove(self, remote) - def _get_config_path(self, config_level: Lit_config_levels) -> str: + def _get_config_path(self, config_level: Lit_config_levels, git_dir: Optional[PathLike] = None) -> str: + if git_dir is None: + git_dir = self.git_dir # we do not support an absolute path of the gitconfig on windows , # use the global config instead if is_win and config_level == "system": @@ -540,15 +543,14 @@ def _get_config_path(self, config_level: Lit_config_levels) -> str: elif config_level == "global": return osp.normpath(osp.expanduser("~/.gitconfig")) elif config_level == "repository": - repo_dir = self._common_dir or self.git_dir + repo_dir = self._common_dir or git_dir if not repo_dir: raise NotADirectoryError else: return osp.normpath(osp.join(repo_dir, "config")) else: - - assert_never( - config_level, # type:ignore[unreachable] + assert_never( # type:ignore[unreachable] + config_level, ValueError(f"Invalid configuration level: {config_level!r}"), ) @@ -569,15 +571,21 @@ def config_reader( you know which file you wish to read to prevent reading multiple files. :note: On windows, system configuration cannot currently be read as the path is unknown, instead the global path will be used.""" - files = None + return self._config_reader(config_level=config_level) + + def _config_reader( + self, + config_level: Optional[Lit_config_levels] = None, + git_dir: Optional[PathLike] = None, + ) -> GitConfigParser: if config_level is None: files = [ - self._get_config_path(cast(Lit_config_levels, f)) + self._get_config_path(cast(Lit_config_levels, f), git_dir) for f in self.config_level if cast(Lit_config_levels, f) ] else: - files = [self._get_config_path(config_level)] + files = [self._get_config_path(config_level, git_dir)] return GitConfigParser(files, read_only=True, repo=self) def config_writer(self, config_level: Lit_config_levels = "repository") -> GitConfigParser: @@ -593,7 +601,7 @@ def config_writer(self, config_level: Lit_config_levels = "repository") -> GitCo system = system wide configuration file global = user level configuration file repository = configuration file for this repository only""" - return GitConfigParser(self._get_config_path(config_level), read_only=False, repo=self) + return GitConfigParser(self._get_config_path(config_level), read_only=False, repo=self, merge_includes=False) def commit(self, rev: Union[str, Commit_ish, None] = None) -> Commit: """The Commit object for the specified revision @@ -867,8 +875,15 @@ def ignored(self, *paths: PathLike) -> List[str]: """ try: proc: str = self.git.check_ignore(*paths) - except GitCommandError: - return [] + except GitCommandError as err: + # If return code is 1, this means none of the items in *paths + # are ignored by Git, so return an empty list. Raise the + # exception on all other return codes. + if err.status == 1: + return [] + else: + raise + return proc.replace("\\\\", "\\").replace('"', "").split("\n") @property @@ -1188,6 +1203,8 @@ def _clone( if not allow_unsafe_protocols: Git.check_unsafe_protocols(str(url)) + if not allow_unsafe_options: + Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=cls.unsafe_git_clone_options) if not allow_unsafe_options and multi_options: Git.check_unsafe_options(options=multi_options, unsafe_options=cls.unsafe_git_clone_options) @@ -1242,7 +1259,7 @@ def _clone( def clone( self, path: PathLike, - progress: Optional[Callable] = None, + progress: Optional[CallableProgress] = None, multi_options: Optional[List[str]] = None, allow_unsafe_protocols: bool = False, allow_unsafe_options: bool = False, @@ -1256,7 +1273,8 @@ def clone( option per list item which is passed exactly as specified to clone. For example ['--config core.filemode=false', '--config core.ignorecase', '--recurse-submodule=repo1_path', '--recurse-submodule=repo2_path'] - :param unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :param kwargs: * odbt = ObjectDatabase Type, allowing to determine the object database implementation used by the returned Repo instance @@ -1280,7 +1298,7 @@ def clone_from( cls, url: PathLike, to_path: PathLike, - progress: Optional[Callable] = None, + progress: CallableProgress = None, env: Optional[Mapping[str, str]] = None, multi_options: Optional[List[str]] = None, allow_unsafe_protocols: bool = False, @@ -1299,7 +1317,8 @@ def clone_from( If you want to unset some variable, consider providing empty string as its value. :param multi_options: See ``clone`` method - :param unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :param kwargs: see the ``clone`` method :return: Repo instance pointing to the cloned directory""" git = cls.GitCommandWrapperType(os.getcwd()) @@ -1380,4 +1399,6 @@ def currently_rebasing_on(self) -> Commit | None: rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD") if not osp.isfile(rebase_head_file): return None - return self.commit(open(rebase_head_file, "rt").readline().strip()) + with open(rebase_head_file, "rt") as f: + content = f.readline().strip() + return self.commit(content) diff --git a/git/repo/fun.py b/git/repo/fun.py index 2ca2e3d6f..ae35aa81e 100644 --- a/git/repo/fun.py +++ b/git/repo/fun.py @@ -2,6 +2,7 @@ from __future__ import annotations import os import stat +from pathlib import Path from string import digits from git.exc import WorkTreeRepositoryUnsupported @@ -83,7 +84,7 @@ def find_worktree_git_dir(dotgit: "PathLike") -> Optional[str]: return None try: - lines = open(dotgit, "r").readlines() + lines = Path(dotgit).read_text().splitlines() for key, value in [line.strip().split(": ") for line in lines]: if key == "gitdir": return value diff --git a/git/types.py b/git/types.py index 9064ecbf9..9f8621721 100644 --- a/git/types.py +++ b/git/types.py @@ -8,42 +8,39 @@ from typing import ( Dict, NoReturn, - Sequence, + Sequence as Sequence, Tuple, Union, Any, + Optional, + Callable, TYPE_CHECKING, TypeVar, ) # noqa: F401 -if sys.version_info[:2] >= (3, 8): +if sys.version_info >= (3, 8): from typing import ( Literal, - SupportsIndex, TypedDict, Protocol, + SupportsIndex as SupportsIndex, runtime_checkable, ) # noqa: F401 else: from typing_extensions import ( Literal, - SupportsIndex, # noqa: F401 + SupportsIndex as SupportsIndex, TypedDict, Protocol, runtime_checkable, ) # noqa: F401 -# if sys.version_info[:2] >= (3, 10): +# if sys.version_info >= (3, 10): # from typing import TypeGuard # noqa: F401 # else: # from typing_extensions import TypeGuard # noqa: F401 - -if sys.version_info[:2] < (3, 9): - PathLike = Union[str, os.PathLike] -else: - # os.PathLike only becomes subscriptable from Python 3.9 onwards - PathLike = Union[str, os.PathLike[str]] +PathLike = Union[str, "os.PathLike[str]"] if TYPE_CHECKING: from git.repo import Repo @@ -62,6 +59,9 @@ Lit_config_levels = Literal["system", "global", "user", "repository"] +# Progress parameter type alias ----------------------------------------- + +CallableProgress = Optional[Callable[[int, Union[str, float], Union[str, float, None], str], None]] # def is_config_level(inp: str) -> TypeGuard[Lit_config_levels]: # # return inp in get_args(Lit_config_level) # only py >= 3.8 diff --git a/git/util.py b/git/util.py index 6a4a65579..dee467dd3 100644 --- a/git/util.py +++ b/git/util.py @@ -131,7 +131,7 @@ def unbare_repo(func: Callable[..., T]) -> Callable[..., T]: - """Methods with this decorator raise InvalidGitRepositoryError if they + """Methods with this decorator raise :class:`.exc.InvalidGitRepositoryError` if they encounter a bare repository""" from .exc import InvalidGitRepositoryError @@ -150,6 +150,7 @@ def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T: @contextlib.contextmanager def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]: + """Context manager to temporarily change directory. Not reentrant.""" old_dir = os.getcwd() os.chdir(new_dir) try: @@ -158,6 +159,20 @@ def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]: os.chdir(old_dir) +@contextlib.contextmanager +def patch_env(name: str, value: str) -> Generator[None, None, None]: + """Context manager to temporarily patch an environment variable.""" + old_value = os.getenv(name) + os.environ[name] = value + try: + yield + finally: + if old_value is None: + del os.environ[name] + else: + os.environ[name] = old_value + + def rmtree(path: PathLike) -> None: """Remove the given recursively. @@ -935,11 +950,8 @@ def _obtain_lock_or_raise(self) -> None: ) try: - flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL - if is_win: - flags |= os.O_SHORT_LIVED - fd = os.open(lock_file, flags, 0) - os.close(fd) + with open(lock_file, mode="w"): + pass except OSError as e: raise IOError(str(e)) from e @@ -1049,7 +1061,7 @@ class IterableList(List[T_IterableObj]): __slots__ = ("_id_attr", "_prefix") - def __new__(cls, id_attr: str, prefix: str = "") -> "IterableList[IterableObj]": + def __new__(cls, id_attr: str, prefix: str = "") -> "IterableList[T_IterableObj]": return super(IterableList, cls).__new__(cls) def __init__(self, id_attr: str, prefix: str = "") -> None: @@ -1083,7 +1095,6 @@ def __getattr__(self, attr: str) -> T_IterableObj: return list.__getattribute__(self, attr) def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: # type: ignore - assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str" if isinstance(index, int): @@ -1098,7 +1109,6 @@ def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_Iterabl # END handle getattr def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None: - assert isinstance(index, (int, str)), "Index of IterableList should be an int or str" delindex = cast(int, index) @@ -1152,7 +1162,7 @@ def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: :note: Favor the iter_items method as it will - :return:list(Item,...) list of item instances""" + :return: list(Item,...) list of item instances""" out_list: Any = IterableList(cls._id_attribute_) out_list.extend(cls.iter_items(repo, *args, **kwargs)) return out_list @@ -1184,7 +1194,7 @@ def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_I :note: Favor the iter_items method as it will - :return:list(Item,...) list of item instances""" + :return: list(Item,...) list of item instances""" out_list: IterableList = IterableList(cls._id_attribute_) out_list.extend(cls.iter_items(repo, *args, **kwargs)) return out_list diff --git a/pyproject.toml b/pyproject.toml index 0d5ebf012..32c9d4a26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ filterwarnings = 'ignore::DeprecationWarning' # filterwarnings ignore::WarningType # ignores those warnings [tool.mypy] +python_version = "3.7" disallow_untyped_defs = true no_implicit_optional = true warn_redundant_casts = true @@ -29,6 +30,7 @@ implicit_reexport = true # strict = true # TODO: remove when 'gitdb' is fully annotated +exclude = ["^git/ext/gitdb"] [[tool.mypy.overrides]] module = "gitdb.*" ignore_missing_imports = true @@ -43,3 +45,4 @@ omit = ["*/git/ext/*"] [tool.black] line-length = 120 target-version = ['py37'] +exclude = "git/ext/gitdb" diff --git a/requirements-dev.txt b/requirements-dev.txt index bacde3498..946b4c94f 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -10,4 +10,4 @@ pytest-icdiff # pytest-profiling -tox \ No newline at end of file +tox diff --git a/setup.py b/setup.py index daad454d8..ebece64eb 100755 --- a/setup.py +++ b/setup.py @@ -44,7 +44,7 @@ def make_release_tree(self, base_dir: str, files: Sequence) -> None: def _stamp_version(filename: str) -> None: found, out = False, [] try: - with open(filename, "r") as f: + with open(filename) as f: for line in f: if "__version__ =" in line: line = line.replace("\"git\"", "'%s'" % VERSION) @@ -82,7 +82,7 @@ def build_py_modules(basedir: str, excludes: Sequence = ()) -> Sequence: name="GitPython", cmdclass={"build_py": build_py, "sdist": sdist}, version=VERSION, - description="""GitPython is a python library used to interact with Git repositories""", + description="GitPython is a Python library used to interact with Git repositories", author="Sebastian Thiel, Michael Trier", author_email="byronimo@gmail.com, mtrier@gmail.com", license="BSD", @@ -95,7 +95,7 @@ def build_py_modules(basedir: str, excludes: Sequence = ()) -> Sequence: install_requires=requirements, tests_require=requirements + test_requirements, zip_safe=False, - long_description="""GitPython is a python library used to interact with Git repositories""", + long_description=long_description, long_description_content_type="text/markdown", classifiers=[ # Picked from @@ -121,5 +121,6 @@ def build_py_modules(basedir: str, excludes: Sequence = ()) -> Sequence: "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", ], ) diff --git a/test-requirements.txt b/test-requirements.txt index 6549f0fa0..6c6d57060 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,10 +3,7 @@ mypy black -flake8 -flake8-bugbear -flake8-comprehensions -flake8-typing-imports +pre-commit virtualenv diff --git a/test/fixtures/env_case.py b/test/fixtures/env_case.py new file mode 100644 index 000000000..120e59289 --- /dev/null +++ b/test/fixtures/env_case.py @@ -0,0 +1,13 @@ +import subprocess +import sys + +import git + + +_, working_dir, env_var_name = sys.argv + +# Importing git should be enough, but this really makes sure Git.execute is called. +repo = git.Repo(working_dir) # Hold the reference. +git.Git(repo.working_dir).execute(["git", "version"]) + +print(subprocess.check_output(["set", env_var_name], shell=True, text=True)) diff --git a/test/test_base.py b/test/test_base.py index ccfdc8ed3..30029367d 100644 --- a/test/test_base.py +++ b/test/test_base.py @@ -9,6 +9,7 @@ import tempfile from unittest import SkipTest, skipIf +from git import Repo from git.objects import Blob, Tree, Commit, TagObject from git.compat import is_win from git.objects.util import get_object_type_by_name @@ -95,14 +96,18 @@ def test_object_resolution(self): self.assertEqual(self.rorepo.head.reference.object, self.rorepo.active_branch.object) @with_rw_repo("HEAD", bare=True) - def test_with_bare_rw_repo(self, bare_rw_repo): + def test_with_bare_rw_repo(self, bare_rw_repo: Repo): assert bare_rw_repo.config_reader("repository").getboolean("core", "bare") assert osp.isfile(osp.join(bare_rw_repo.git_dir, "HEAD")) + assert osp.isdir(bare_rw_repo.working_dir) + assert bare_rw_repo.working_tree_dir is None @with_rw_repo("0.1.6") - def test_with_rw_repo(self, rw_repo): + def test_with_rw_repo(self, rw_repo: Repo): assert not rw_repo.config_reader("repository").getboolean("core", "bare") + assert osp.isdir(rw_repo.working_tree_dir) assert osp.isdir(osp.join(rw_repo.working_tree_dir, "lib")) + assert osp.isdir(rw_repo.working_dir) @skipIf(HIDE_WINDOWS_FREEZE_ERRORS, "FIXME: Freezes! sometimes...") @with_rw_and_rw_remote_repo("0.1.6") diff --git a/test/test_commit.py b/test/test_commit.py index c5a43c94a..4871902ec 100644 --- a/test/test_commit.py +++ b/test/test_commit.py @@ -159,6 +159,37 @@ def check_entries(d): self.assertEqual(commit.committer_tz_offset, 14400, commit.committer_tz_offset) self.assertEqual(commit.message, "initial project\n") + def test_renames(self): + commit = self.rorepo.commit("185d847ec7647fd2642a82d9205fb3d07ea71715") + files = commit.stats.files + + # when a file is renamed, the output of git diff is like "dir/{old => new}" + # unless we disable rename with --no-renames, which produces two lines + # one with the old path deletes and another with the new added + self.assertEqual(len(files), 2) + + def check_entries(path, changes): + expected = { + ".github/workflows/Future.yml" : { + 'insertions': 57, + 'deletions': 0, + 'lines': 57 + }, + ".github/workflows/test_pytest.yml" : { + 'insertions': 0, + 'deletions': 55, + 'lines': 55 + }, + } + assert path in expected + assert isinstance(changes, dict) + for key in ("insertions", "deletions", "lines"): + assert changes[key] == expected[path][key] + + for path, changes in files.items(): + check_entries(path, changes) + # END for each stated file + def test_unicode_actor(self): # assure we can parse unicode actors correctly name = "Üäöß ÄußÉ" @@ -463,52 +494,57 @@ def test_datetimes(self): def test_trailers(self): KEY_1 = "Hello" - VALUE_1 = "World" + VALUE_1_1 = "World" + VALUE_1_2 = "Another-World" KEY_2 = "Key" VALUE_2 = "Value with inner spaces" - # Check if KEY 1 & 2 with Value 1 & 2 is extracted from multiple msg variations - msgs = [] - msgs.append(f"Subject\n\n{KEY_1}: {VALUE_1}\n{KEY_2}: {VALUE_2}\n") - msgs.append(f"Subject\n \nSome body of a function\n \n{KEY_1}: {VALUE_1}\n{KEY_2}: {VALUE_2}\n") - msgs.append( - f"Subject\n \nSome body of a function\n\nnon-key: non-value\n\n{KEY_1}: {VALUE_1}\n{KEY_2}: {VALUE_2}\n" - ) - msgs.append( - f"Subject\n \nSome multiline\n body of a function\n\nnon-key: non-value\n\n{KEY_1}: {VALUE_1}\n{KEY_2} : {VALUE_2}\n" - ) - + # Check the following trailer example is extracted from multiple msg variations + TRAILER = f"{KEY_1}: {VALUE_1_1}\n{KEY_2}: {VALUE_2}\n{KEY_1}: {VALUE_1_2}" + msgs = [ + f"Subject\n\n{TRAILER}\n", + f"Subject\n \nSome body of a function\n \n{TRAILER}\n", + f"Subject\n \nSome body of a function\n\nnon-key: non-value\n\n{TRAILER}\n", + ( + # check when trailer has inconsistent whitespace + f"Subject\n \nSome multiline\n body of a function\n\nnon-key: non-value\n\n" + f"{KEY_1}:{VALUE_1_1}\n{KEY_2} : {VALUE_2}\n{KEY_1}: {VALUE_1_2}\n" + ), + ] for msg in msgs: - commit = self.rorepo.commit("master") - commit = copy.copy(commit) + commit = copy.copy(self.rorepo.commit("master")) commit.message = msg - assert KEY_1 in commit.trailers.keys() - assert KEY_2 in commit.trailers.keys() - assert commit.trailers[KEY_1] == VALUE_1 - assert commit.trailers[KEY_2] == VALUE_2 - - # Check that trailer stays empty for multiple msg combinations - msgs = [] - msgs.append(f"Subject\n") - msgs.append(f"Subject\n\nBody with some\nText\n") - msgs.append(f"Subject\n\nBody with\nText\n\nContinuation but\n doesn't contain colon\n") - msgs.append(f"Subject\n\nBody with\nText\n\nContinuation but\n only contains one :\n") - msgs.append(f"Subject\n\nBody with\nText\n\nKey: Value\nLine without colon\n") - msgs.append(f"Subject\n\nBody with\nText\n\nLine without colon\nKey: Value\n") + assert commit.trailers_list == [ + (KEY_1, VALUE_1_1), + (KEY_2, VALUE_2), + (KEY_1, VALUE_1_2), + ] + assert commit.trailers_dict == { + KEY_1: [VALUE_1_1, VALUE_1_2], + KEY_2: [VALUE_2], + } + + # check that trailer stays empty for multiple msg combinations + msgs = [ + f"Subject\n", + f"Subject\n\nBody with some\nText\n", + f"Subject\n\nBody with\nText\n\nContinuation but\n doesn't contain colon\n", + f"Subject\n\nBody with\nText\n\nContinuation but\n only contains one :\n", + f"Subject\n\nBody with\nText\n\nKey: Value\nLine without colon\n", + f"Subject\n\nBody with\nText\n\nLine without colon\nKey: Value\n", + ] for msg in msgs: - commit = self.rorepo.commit("master") - commit = copy.copy(commit) + commit = copy.copy(self.rorepo.commit("master")) commit.message = msg - assert len(commit.trailers.keys()) == 0 + assert commit.trailers_list == [] + assert commit.trailers_dict == {} # check that only the last key value paragraph is evaluated - commit = self.rorepo.commit("master") - commit = copy.copy(commit) - commit.message = f"Subject\n\nMultiline\nBody\n\n{KEY_1}: {VALUE_1}\n\n{KEY_2}: {VALUE_2}\n" - assert KEY_1 not in commit.trailers.keys() - assert KEY_2 in commit.trailers.keys() - assert commit.trailers[KEY_2] == VALUE_2 + commit = copy.copy(self.rorepo.commit("master")) + commit.message = f"Subject\n\nMultiline\nBody\n\n{KEY_1}: {VALUE_1_1}\n\n{KEY_2}: {VALUE_2}\n" + assert commit.trailers_list == [(KEY_2, VALUE_2)] + assert commit.trailers_dict == {KEY_2: [VALUE_2]} def test_commit_co_authors(self): commit = copy.copy(self.rorepo.commit("4251bd5")) diff --git a/test/test_config.py b/test/test_config.py index 8bb2aa306..b159ebe2d 100644 --- a/test/test_config.py +++ b/test/test_config.py @@ -398,6 +398,17 @@ def test_empty_config_value(self): with self.assertRaises(cp.NoOptionError): cr.get_value("color", "ui") + def test_get_values_works_without_requiring_any_other_calls_first(self): + file_obj = self._to_memcache(fixture_path("git_config_multiple")) + cr = GitConfigParser(file_obj, read_only=True) + self.assertEqual(cr.get_values("section0", "option0"), ["value0"]) + file_obj.seek(0) + cr = GitConfigParser(file_obj, read_only=True) + self.assertEqual(cr.get_values("section1", "option1"), ["value1a", "value1b"]) + file_obj.seek(0) + cr = GitConfigParser(file_obj, read_only=True) + self.assertEqual(cr.get_values("section1", "other_option1"), ["other_value1"]) + def test_multiple_values(self): file_obj = self._to_memcache(fixture_path("git_config_multiple")) with GitConfigParser(file_obj, read_only=False) as cw: diff --git a/test/test_diff.py b/test/test_diff.py index 7065f0635..504337744 100644 --- a/test/test_diff.py +++ b/test/test_diff.py @@ -411,3 +411,73 @@ def test_diff_interface(self): cp = c.parents[0] diff_index = c.diff(cp, ["does/not/exist"]) self.assertEqual(len(diff_index), 0) + + @with_rw_directory + def test_rename_override(self, rw_dir): + """Test disabling of diff rename detection""" + + # create and commit file_a.txt + repo = Repo.init(rw_dir) + file_a = osp.join(rw_dir, "file_a.txt") + with open(file_a, "w", encoding='utf-8') as outfile: + outfile.write("hello world\n") + repo.git.add(Git.polish_url(file_a)) + repo.git.commit(message="Added file_a.txt") + + # remove file_a.txt + repo.git.rm(Git.polish_url(file_a)) + + # create and commit file_b.txt with similarity index of 52 + file_b = osp.join(rw_dir, "file_b.txt") + with open(file_b, "w", encoding='utf-8') as outfile: + outfile.write("hello world\nhello world") + repo.git.add(Git.polish_url(file_b)) + repo.git.commit(message="Removed file_a.txt. Added file_b.txt") + + commit_a = repo.commit('HEAD') + commit_b = repo.commit('HEAD~1') + + # check default diff command with renamed files enabled + diffs = commit_b.diff(commit_a) + self.assertEqual(1, len(diffs)) + diff = diffs[0] + self.assertEqual(True, diff.renamed_file) + self.assertEqual('file_a.txt', diff.rename_from) + self.assertEqual('file_b.txt', diff.rename_to) + + # check diff with rename files disabled + diffs = commit_b.diff(commit_a, no_renames=True) + self.assertEqual(2, len(diffs)) + + # check fileA.txt deleted + diff = diffs[0] + self.assertEqual(True, diff.deleted_file) + self.assertEqual('file_a.txt', diff.a_path) + + # check fileB.txt added + diff = diffs[1] + self.assertEqual(True, diff.new_file) + self.assertEqual('file_b.txt', diff.a_path) + + # check diff with high similarity index + diffs = commit_b.diff(commit_a, split_single_char_options=False, M='75%') + self.assertEqual(2, len(diffs)) + + # check fileA.txt deleted + diff = diffs[0] + self.assertEqual(True, diff.deleted_file) + self.assertEqual('file_a.txt', diff.a_path) + + # check fileB.txt added + diff = diffs[1] + self.assertEqual(True, diff.new_file) + self.assertEqual('file_b.txt', diff.a_path) + + # check diff with low similarity index + diffs = commit_b.diff(commit_a, split_single_char_options=False, M='40%') + self.assertEqual(1, len(diffs)) + diff = diffs[0] + self.assertEqual(True, diff.renamed_file) + self.assertEqual('file_a.txt', diff.rename_from) + self.assertEqual('file_b.txt', diff.rename_to) + diff --git a/test/test_git.py b/test/test_git.py index e7d236deb..f1d35a355 100644 --- a/test/test_git.py +++ b/test/test_git.py @@ -5,15 +5,16 @@ # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php import os +import shutil import subprocess import sys -from tempfile import TemporaryFile -from unittest import mock +from tempfile import TemporaryDirectory, TemporaryFile +from unittest import mock, skipUnless from git import Git, refresh, GitCommandError, GitCommandNotFound, Repo, cmd from test.lib import TestBase, fixture_path from test.lib import with_rw_directory -from git.util import finalize_process +from git.util import cwd, finalize_process import os.path as osp @@ -75,6 +76,40 @@ def test_it_transforms_kwargs_into_git_command_arguments(self): def test_it_executes_git_to_shell_and_returns_result(self): self.assertRegex(self.git.execute(["git", "version"]), r"^git version [\d\.]{2}.*$") + def test_it_executes_git_not_from_cwd(self): + with TemporaryDirectory() as tmpdir: + if is_win: + # Copy an actual binary executable that is not git. + other_exe_path = os.path.join(os.getenv("WINDIR"), "system32", "hostname.exe") + impostor_path = os.path.join(tmpdir, "git.exe") + shutil.copy(other_exe_path, impostor_path) + else: + # Create a shell script that doesn't do anything. + impostor_path = os.path.join(tmpdir, "git") + with open(impostor_path, mode="w", encoding="utf-8") as file: + print("#!/bin/sh", file=file) + os.chmod(impostor_path, 0o755) + + with cwd(tmpdir): + self.assertRegex(self.git.execute(["git", "version"]), r"^git version\b") + + @skipUnless(is_win, "The regression only affected Windows, and this test logic is OS-specific.") + def test_it_avoids_upcasing_unrelated_environment_variable_names(self): + old_name = "28f425ca_d5d8_4257_b013_8d63166c8158" + if old_name == old_name.upper(): + raise RuntimeError("test bug or strange locale: old_name invariant under upcasing") + os.putenv(old_name, "1") # It has to be done this lower-level way to set it lower-case. + + cmdline = [ + sys.executable, + fixture_path("env_case.py"), + self.rorepo.working_dir, + old_name, + ] + pair_text = subprocess.check_output(cmdline, shell=False, text=True) + new_name = pair_text.split("=")[0] + self.assertEqual(new_name, old_name) + def test_it_accepts_stdin(self): filename = fixture_path("cat_file_blob") with open(filename, "r") as fh: @@ -169,7 +204,7 @@ def test_refresh(self): self.assertRaises(GitCommandNotFound, refresh, "yada") # test a good path refresh - which_cmd = "where" if is_win else "which" + which_cmd = "where" if is_win else "command -v" path = os.popen("{0} git".format(which_cmd)).read().strip().split("\n")[0] refresh(path) diff --git a/test/test_quick_doc.py b/test/test_quick_doc.py new file mode 100644 index 000000000..eaee4e581 --- /dev/null +++ b/test/test_quick_doc.py @@ -0,0 +1,224 @@ +import pytest + + +from test.lib import TestBase +from test.lib.helper import with_rw_directory + + +class QuickDoc(TestBase): + def tearDown(self): + import gc + + gc.collect() + + @with_rw_directory + def test_init_repo_object(self, path_to_dir): + + # [1-test_init_repo_object] + # $ git init + + from git import Repo + + repo = Repo.init(path_to_dir) + # ![1-test_init_repo_object] + + # [2-test_init_repo_object] + repo = Repo(path_to_dir) + # ![2-test_init_repo_object] + + @with_rw_directory + def test_cloned_repo_object(self, local_dir): + + from git import Repo + import git + # code to clone from url + # [1-test_cloned_repo_object] + # $ git clone + + repo_url = "https://github.com/gitpython-developers/QuickStartTutorialFiles.git" + + repo = Repo.clone_from(repo_url, local_dir) + # ![1-test_cloned_repo_object] + + # code to add files + # [2-test_cloned_repo_object] + # We must make a change to a file so that we can add the update to git + + update_file = 'dir1/file2.txt' # we'll use local_dir/dir1/file2.txt + with open(f"{local_dir}/{update_file}", 'a') as f: + f.write('\nUpdate version 2') + # ![2-test_cloned_repo_object] + + # [3-test_cloned_repo_object] + # $ git add + add_file = [update_file] # relative path from git root + repo.index.add(add_file) # notice the add function requires a list of paths + # ![3-test_cloned_repo_object] + + # code to commit - not sure how to test this + # [4-test_cloned_repo_object] + # $ git commit -m + repo.index.commit("Update to file2") + # ![4-test_cloned_repo_object] + + # [5-test_cloned_repo_object] + # $ git log + + # relative path from git root + repo.iter_commits(all=True, max_count=10, paths=update_file) # gets the last 10 commits from all branches + + # Outputs: + + # ![5-test_cloned_repo_object] + + # [6-test_cloned_repo_object] + commits_for_file_generator = repo.iter_commits(all=True, max_count=10, paths=update_file) + commits_for_file = [c for c in commits_for_file_generator] + commits_for_file + + # Outputs: [, + # ] + # ![6-test_cloned_repo_object] + + # Untracked files - create new file + # [7-test_cloned_repo_object] + f = open(f'{local_dir}/untracked.txt', 'w') # creates an empty file + f.close() + # ![7-test_cloned_repo_object] + + # [8-test_cloned_repo_object] + repo.untracked_files + # Output: ['untracked.txt'] + # ![8-test_cloned_repo_object] + + # Modified files + # [9-test_cloned_repo_object] + # Let's modify one of our tracked files + + with open(f'{local_dir}/Downloads/file3.txt', 'w') as f: + f.write('file3 version 2') # overwrite file 3 + # ![9-test_cloned_repo_object] + + # [10-test_cloned_repo_object] + repo.index.diff(None) # compares staging area to working directory + + # Output: [, + # ] + # ![10-test_cloned_repo_object] + + # [11-test_cloned_repo_object] + diffs = repo.index.diff(None) + for d in diffs: + print(d.a_path) + + # Output + # Downloads/file3.txt + # ![11-test_cloned_repo_object] + + # compares staging area to head commit + # [11.1-test_cloned_repo_object] + diffs = repo.index.diff(repo.head.commit) + for d in diffs: + print(d.a_path) + + # Output + + # ![11.1-test_cloned_repo_object] + # [11.2-test_cloned_repo_object] + # lets add untracked.txt + repo.index.add(['untracked.txt']) + diffs = repo.index.diff(repo.head.commit) + for d in diffs: + print(d.a_path) + + # Output + # untracked.txt + # ![11.2-test_cloned_repo_object] + + # Compare commit to commit + # [11.3-test_cloned_repo_object] + first_commit = [c for c in repo.iter_commits(all=True)][-1] + diffs = repo.head.commit.diff(first_commit) + for d in diffs: + print(d.a_path) + + # Output + # dir1/file2.txt + # ![11.3-test_cloned_repo_object] + + + + '''Trees and Blobs''' + + # Latest commit tree + # [12-test_cloned_repo_object] + tree = repo.head.commit.tree + # ![12-test_cloned_repo_object] + + # Previous commit tree + # [13-test_cloned_repo_object] + prev_commits = [c for c in repo.iter_commits(all=True, max_count=10)] # last 10 commits from all branches + tree = prev_commits[0].tree + # ![13-test_cloned_repo_object] + + # Iterating through tree + # [14-test_cloned_repo_object] + files_and_dirs = [(entry, entry.name, entry.type) for entry in tree] + files_and_dirs + + # Output + # [(< git.Tree "SHA1-HEX_HASH" >, 'Downloads', 'tree'), + # (< git.Tree "SHA1-HEX_HASH" >, 'dir1', 'tree'), + # (< git.Blob "SHA1-HEX_HASH" >, 'file4.txt', 'blob')] + # ![14-test_cloned_repo_object] + + # [15-test_cloned_repo_object] + def print_files_from_git(root, level=0): + for entry in root: + print(f'{"-" * 4 * level}| {entry.path}, {entry.type}') + if entry.type == "tree": + print_files_from_git(entry, level + 1) + + # ![15-test_cloned_repo_object] + + # [16-test_cloned_repo_object] + print_files_from_git(tree) + + # Output + # | Downloads, tree + # ----| Downloads / file3.txt, blob + # | dir1, tree + # ----| dir1 / file1.txt, blob + # ----| dir1 / file2.txt, blob + # | file4.txt, blob + # # ![16-test_cloned_repo_object] + + # Printing text files + # [17-test_cloned_repo_object] + print_file = 'dir1/file2.txt' + tree[print_file] # the head commit tree + + # Output + # ![17-test_cloned_repo_object] + + # print latest file + # [18-test_cloned_repo_object] + blob = tree[print_file] + print(blob.data_stream.read().decode()) + + # Output + # file 2 version 1 + # Update version 2 + # ![18-test_cloned_repo_object] + + # print previous tree + # [18.1-test_cloned_repo_object] + commits_for_file = [c for c in repo.iter_commits(all=True, paths=print_file)] + tree = commits_for_file[-1].tree # gets the first commit tree + blob = tree[print_file] + + print(blob.data_stream.read().decode()) + + # Output + # file 2 version 1 + # ![18.1-test_cloned_repo_object] \ No newline at end of file diff --git a/test/test_refs.py b/test/test_refs.py index 5bb83100e..e7526c3b2 100644 --- a/test/test_refs.py +++ b/test/test_refs.py @@ -5,6 +5,7 @@ # the BSD License: http://www.opensource.org/licenses/bsd-license.php from itertools import chain +from pathlib import Path from git import ( Reference, @@ -15,13 +16,16 @@ SymbolicReference, GitCommandError, RefLog, + GitConfigParser, ) from git.objects.tag import TagObject from test.lib import TestBase, with_rw_repo from git.util import Actor +from gitdb.exc import BadName import git.refs as refs import os.path as osp +import tempfile class TestRefs(TestBase): @@ -172,6 +176,26 @@ def test_heads(self, rwrepo): assert log[0].oldhexsha == pcommit.NULL_HEX_SHA assert log[0].newhexsha == pcommit.hexsha + @with_rw_repo("HEAD", bare=False) + def test_set_tracking_branch_with_import(self, rwrepo): + # prepare included config file + included_config = osp.join(rwrepo.git_dir, "config.include") + with GitConfigParser(included_config, read_only=False) as writer: + writer.set_value("test", "value", "test") + assert osp.exists(included_config) + + with rwrepo.config_writer() as writer: + writer.set_value("include", "path", included_config) + + for head in rwrepo.heads: + head.set_tracking_branch(None) + assert head.tracking_branch() is None + remote_ref = rwrepo.remotes[0].refs[0] + assert head.set_tracking_branch(remote_ref) is head + assert head.tracking_branch() == remote_ref + head.set_tracking_branch(None) + assert head.tracking_branch() is None + def test_refs(self): types_found = set() for ref in self.rorepo.refs: @@ -595,3 +619,15 @@ def test_dereference_recursive(self): def test_reflog(self): assert isinstance(self.rorepo.heads.master.log(), RefLog) + + def test_refs_outside_repo(self): + # Create a file containing a valid reference outside the repository. Attempting + # to access it should raise an exception, due to it containing a parent directory + # reference ('..'). This tests for CVE-2023-41040. + git_dir = Path(self.rorepo.git_dir) + repo_parent_dir = git_dir.parent.parent + with tempfile.NamedTemporaryFile(dir=repo_parent_dir) as ref_file: + ref_file.write(b"91b464cd624fe22fbf54ea22b85a7e5cca507cfe") + ref_file.flush() + ref_file_name = Path(ref_file.name).name + self.assertRaises(BadName, self.rorepo.commit, f"../../{ref_file_name}") diff --git a/test/test_remote.py b/test/test_remote.py index 3a47afab5..9636ca486 100644 --- a/test/test_remote.py +++ b/test/test_remote.py @@ -694,259 +694,279 @@ def test_push_error(self, repo): @with_rw_repo("HEAD") def test_set_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.set_url(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.set_url(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_set_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - remote.set_url(url, allow_unsafe_protocols=True) - assert list(remote.urls)[-1] == url - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + remote.set_url(url, allow_unsafe_protocols=True) + assert list(remote.urls)[-1] == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_add_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.add_url(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.add_url(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_add_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - remote.add_url(url, allow_unsafe_protocols=True) - assert list(remote.urls)[-1] == url - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + remote.add_url(url, allow_unsafe_protocols=True) + assert list(remote.urls)[-1] == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_create_remote_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - Remote.create(rw_repo, "origin", url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Remote.create(rw_repo, "origin", url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_create_remote_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for i, url in enumerate(urls): - remote = Remote.create(rw_repo, f"origin{i}", url, allow_unsafe_protocols=True) - assert remote.url == url - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for i, url in enumerate(urls): + remote = Remote.create(rw_repo, f"origin{i}", url, allow_unsafe_protocols=True) + assert remote.url == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.fetch(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.fetch(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - remote.fetch(url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.fetch(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_options(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - remote.fetch(**unsafe_option) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + remote.fetch(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_options_allowed(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - assert not tmp_file.exists() - with self.assertRaises(GitCommandError): - remote.fetch(**unsafe_option, allow_unsafe_options=True) - assert tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + remote.fetch(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() @with_rw_repo("HEAD") def test_pull_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.pull(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.pull(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - remote.pull(url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.pull(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_options(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - remote.pull(**unsafe_option) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + remote.pull(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_options_allowed(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - assert not tmp_file.exists() - with self.assertRaises(GitCommandError): - remote.pull(**unsafe_option, allow_unsafe_options=True) - assert tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + remote.pull(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() @with_rw_repo("HEAD") def test_push_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.push(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.push(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - remote.push(url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.push(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_options(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - { - "receive-pack": f"touch {tmp_file}", - "exec": f"touch {tmp_file}", - } - ] - for unsafe_option in unsafe_options: - assert not tmp_file.exists() - with self.assertRaises(UnsafeOptionError): - remote.push(**unsafe_option) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + { + "receive-pack": f"touch {tmp_file}", + "exec": f"touch {tmp_file}", + } + ] + for unsafe_option in unsafe_options: + assert not tmp_file.exists() + with self.assertRaises(UnsafeOptionError): + remote.push(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_options_allowed(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - { - "receive-pack": f"touch {tmp_file}", - "exec": f"touch {tmp_file}", - } - ] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - assert not tmp_file.exists() - with self.assertRaises(GitCommandError): - remote.push(**unsafe_option, allow_unsafe_options=True) - assert tmp_file.exists() - tmp_file.unlink() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + { + "receive-pack": f"touch {tmp_file}", + "exec": f"touch {tmp_file}", + } + ] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + remote.push(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() class TestTimeouts(TestBase): diff --git a/test/test_repo.py b/test/test_repo.py index 5874dbe6a..08ed13a00 100644 --- a/test/test_repo.py +++ b/test/test_repo.py @@ -13,7 +13,7 @@ import pickle import sys import tempfile -from unittest import mock, skipIf, SkipTest +from unittest import mock, skipIf, SkipTest, skip import pytest @@ -251,6 +251,7 @@ def test_clone_from_with_path_contains_unicode(self): self.fail("Raised UnicodeEncodeError") @with_rw_directory + @skip("the referenced repository was removed, and one needs to setup a new password controlled repo under the orgs control") def test_leaking_password_in_clone_logs(self, rw_dir): password = "fakepassword1234" try: @@ -268,143 +269,198 @@ def test_leaking_password_in_clone_logs(self, rw_dir): @with_rw_repo("HEAD") def test_clone_unsafe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - rw_repo.clone(tmp_dir, multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + rw_repo.clone(tmp_dir, multi_options=[unsafe_option]) + assert not tmp_file.exists() + + unsafe_options = [ + {"upload-pack": f"touch {tmp_file}"}, + {"u": f"touch {tmp_file}"}, + {"config": "protocol.ext.allow=always"}, + {"c": "protocol.ext.allow=always"}, + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + rw_repo.clone(tmp_dir, **unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_clone_unsafe_options_allowed(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not tmp_file.exists() - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not tmp_file.exists() + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not destination.exists() rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) - assert tmp_file.exists() - tmp_file.unlink() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not destination.exists() - rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) - assert destination.exists() + assert destination.exists() @with_rw_repo("HEAD") def test_clone_safe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - options = [ - "--depth=1", - "--single-branch", - "-q", - ] - for option in options: - destination = tmp_dir / option - assert not destination.exists() - rw_repo.clone(destination, multi_options=[option]) - assert destination.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + options = [ + "--depth=1", + "--single-branch", + "-q", + ] + for option in options: + destination = tmp_dir / option + assert not destination.exists() + rw_repo.clone(destination, multi_options=[option]) + assert destination.exists() @with_rw_repo("HEAD") def test_clone_from_unsafe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - Repo.clone_from(rw_repo.working_dir, tmp_dir, multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + Repo.clone_from(rw_repo.working_dir, tmp_dir, multi_options=[unsafe_option]) + assert not tmp_file.exists() + + unsafe_options = [ + {"upload-pack": f"touch {tmp_file}"}, + {"u": f"touch {tmp_file}"}, + {"config": "protocol.ext.allow=always"}, + {"c": "protocol.ext.allow=always"}, + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + Repo.clone_from(rw_repo.working_dir, tmp_dir, **unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_clone_from_unsafe_options_allowed(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not tmp_file.exists() - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): - Repo.clone_from( - rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True - ) - assert tmp_file.exists() - tmp_file.unlink() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not destination.exists() - Repo.clone_from(rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True) - assert destination.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not tmp_file.exists() + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + Repo.clone_from( + rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True + ) + assert tmp_file.exists() + tmp_file.unlink() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not destination.exists() + Repo.clone_from(rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True) + assert destination.exists() @with_rw_repo("HEAD") def test_clone_from_safe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - options = [ - "--depth=1", - "--single-branch", - "-q", - ] - for option in options: - destination = tmp_dir / option - assert not destination.exists() - Repo.clone_from(rw_repo.common_dir, destination, multi_options=[option]) - assert destination.exists() - - def test_clone_from_unsafe_procol(self): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - Repo.clone_from(url, tmp_dir) - assert not tmp_file.exists() - - def test_clone_from_unsafe_procol_allowed(self): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - "ext::sh -c touch% /tmp/pwn", - "fd::/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - Repo.clone_from(url, tmp_dir, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + options = [ + "--depth=1", + "--single-branch", + "-q", + ] + for option in options: + destination = tmp_dir / option + assert not destination.exists() + Repo.clone_from(rw_repo.common_dir, destination, multi_options=[option]) + assert destination.exists() + + def test_clone_from_unsafe_protocol(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Repo.clone_from(url, tmp_dir / "repo") + assert not tmp_file.exists() + + def test_clone_from_unsafe_protocol_allowed(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + Repo.clone_from(url, tmp_dir / "repo", allow_unsafe_protocols=True) + assert not tmp_file.exists() + + def test_clone_from_unsafe_protocol_allowed_and_enabled(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + ] + allow_ext = [ + "--config=protocol.ext.allow=always", + ] + for url in urls: + # The URL will be allowed into the command, and the protocol is enabled, + # but the command will fail since it can't read from the remote repo. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + Repo.clone_from( + url, + tmp_dir / "repo", + multi_options=allow_ext, + allow_unsafe_protocols=True, + allow_unsafe_options=True, + ) + assert tmp_file.exists() + tmp_file.unlink() @with_rw_repo("HEAD") def test_max_chunk_size(self, repo): @@ -1326,26 +1382,55 @@ def test_do_not_strip_newline_in_stdout(self, rw_dir): @with_rw_repo("HEAD") def test_clone_command_injection(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - unexpected_file = tmp_dir / "pwn" - assert not unexpected_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + unexpected_file = tmp_dir / "pwn" + assert not unexpected_file.exists() - payload = f"--upload-pack=touch {unexpected_file}" - rw_repo.clone(payload) + payload = f"--upload-pack=touch {unexpected_file}" + rw_repo.clone(payload) - assert not unexpected_file.exists() - # A repo was cloned with the payload as name - assert pathlib.Path(payload).exists() + assert not unexpected_file.exists() + # A repo was cloned with the payload as name + assert pathlib.Path(payload).exists() @with_rw_repo("HEAD") def test_clone_from_command_injection(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - temp_repo = Repo.init(tmp_dir / "repo") - unexpected_file = tmp_dir / "pwn" + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + temp_repo = Repo.init(tmp_dir / "repo") + unexpected_file = tmp_dir / "pwn" + + assert not unexpected_file.exists() + payload = f"--upload-pack=touch {unexpected_file}" + with self.assertRaises(GitCommandError): + rw_repo.clone_from(payload, temp_repo.common_dir) + + assert not unexpected_file.exists() + + def test_ignored_items_reported(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + temp_repo = Repo.init(tmp_dir / "repo") + + gi = tmp_dir / "repo" / ".gitignore" + + with open(gi, 'w') as file: + file.write('ignored_file.txt\n') + file.write('ignored_dir/\n') + + assert temp_repo.ignored(['included_file.txt', 'included_dir/file.txt']) == [] + assert temp_repo.ignored(['ignored_file.txt']) == ['ignored_file.txt'] + assert temp_repo.ignored(['included_file.txt', 'ignored_file.txt']) == ['ignored_file.txt'] + assert temp_repo.ignored(['included_file.txt', 'ignored_file.txt', 'included_dir/file.txt', 'ignored_dir/file.txt']) == ['ignored_file.txt', 'ignored_dir/file.txt'] + + def test_ignored_raises_error_w_symlink(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + temp_repo = Repo.init(tmp_dir / "repo") - assert not unexpected_file.exists() - payload = f"--upload-pack=touch {unexpected_file}" - with self.assertRaises(GitCommandError): - rw_repo.clone_from(payload, temp_repo.common_dir) + os.mkdir(tmp_dir / "target") + os.symlink(tmp_dir / "target", tmp_dir / "symlink") - assert not unexpected_file.exists() + with pytest.raises(GitCommandError): + temp_repo.ignored(tmp_dir / "symlink/file.txt") diff --git a/test/test_submodule.py b/test/test_submodule.py index 13878df28..8c98a671e 100644 --- a/test/test_submodule.py +++ b/test/test_submodule.py @@ -1,12 +1,13 @@ # -*- coding: utf-8 -*- # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php +import contextlib import os import shutil import tempfile from pathlib import Path import sys -from unittest import skipIf +from unittest import mock, skipIf import pytest @@ -31,6 +32,23 @@ import os.path as osp +@contextlib.contextmanager +def _patch_git_config(name, value): + """Temporarily add a git config name-value pair, using environment variables.""" + pair_index = int(os.getenv("GIT_CONFIG_COUNT", "0")) + + # This is recomputed each time the context is entered, for compatibility with + # existing GIT_CONFIG_* environment variables, even if changed in this process. + patcher = mock.patch.dict(os.environ, { + "GIT_CONFIG_COUNT": str(pair_index + 1), + f"GIT_CONFIG_KEY_{pair_index}": name, + f"GIT_CONFIG_VALUE_{pair_index}": value, + }) + + with patcher: + yield + + class TestRootProgress(RootUpdateProgress): """Just prints messages, for now without checking the correctness of the states""" @@ -709,6 +727,7 @@ def test_add_empty_repo(self, rwdir): # end for each checkout mode @with_rw_directory + @_patch_git_config("protocol.file.allow", "always") def test_list_only_valid_submodules(self, rwdir): repo_path = osp.join(rwdir, "parent") repo = git.Repo.init(repo_path) @@ -737,6 +756,7 @@ def test_list_only_valid_submodules(self, rwdir): """, ) @with_rw_directory + @_patch_git_config("protocol.file.allow", "always") def test_git_submodules_and_add_sm_with_new_commit(self, rwdir): parent = git.Repo.init(osp.join(rwdir, "parent")) parent.git.submodule("add", self._small_repo_url(), "module") @@ -886,6 +906,28 @@ def assert_exists(sm, value=True): assert osp.isdir(sm_module_path) == dry_run # end for each dry-run mode + @with_rw_directory + def test_ignore_non_submodule_file(self, rwdir): + parent = git.Repo.init(rwdir) + + smp = osp.join(rwdir, "module") + os.mkdir(smp) + + with open(osp.join(smp, "a"), "w", encoding="utf-8") as f: + f.write('test\n') + + with open(osp.join(rwdir, ".gitmodules"), "w", encoding="utf-8") as f: + f.write("[submodule \"a\"]\n") + f.write(" path = module\n") + f.write(" url = https://github.com/chaconinc/DbConnector\n") + + parent.git.add(Git.polish_url(osp.join(smp, "a"))) + parent.git.add(Git.polish_url(osp.join(rwdir, ".gitmodules"))) + + parent.git.commit(message='test') + + assert len(parent.submodules) == 0 + @with_rw_directory def test_remove_norefs(self, rwdir): parent = git.Repo.init(osp.join(rwdir, "parent")) @@ -1101,139 +1143,147 @@ def test_add_no_clone_multi_options_argument(self, rwdir): @with_rw_repo("HEAD") def test_submodule_add_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - Submodule.add(rw_repo, "new", "new", url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Submodule.add(rw_repo, "new", "new", url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - Submodule.add(rw_repo, "new", "new", url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + Submodule.add(rw_repo, "new", "new", url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_options(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - Submodule.add(rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + Submodule.add(rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_options_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): - Submodule.add( - rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True - ) - assert not tmp_file.exists() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(GitCommandError): - Submodule.add( - rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True - ) + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + Submodule.add( + rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True + ) + assert not tmp_file.exists() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(GitCommandError): + Submodule.add( + rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True + ) @with_rw_repo("HEAD") def test_submodule_update_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) - with self.assertRaises(UnsafeProtocolError): - submodule.update() - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) + with self.assertRaises(UnsafeProtocolError): + submodule.update() + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - submodule.update(allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + submodule.update(allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_options(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - submodule.update(clone_multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + submodule.update(clone_multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_options_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): - submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) - assert not tmp_file.exists() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) - for unsafe_option in unsafe_options: - with self.assertRaises(GitCommandError): - submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) + assert not tmp_file.exists() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + with self.assertRaises(GitCommandError): + submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) diff --git a/test/test_util.py b/test/test_util.py index 90dd89a91..c17efce35 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -333,6 +333,27 @@ def test_iterable_list(self, case): self.assertRaises(IndexError, ilist.__delitem__, 0) self.assertRaises(IndexError, ilist.__delitem__, "something") + def test_utctz_to_altz(self): + self.assertEqual(utctz_to_altz("+0000"), 0) + self.assertEqual(utctz_to_altz("+1400"), -(14 * 3600)) + self.assertEqual(utctz_to_altz("-1200"), 12 * 3600) + self.assertEqual(utctz_to_altz("+0001"), -60) + self.assertEqual(utctz_to_altz("+0530"), -(5 * 3600 + 1800)) + self.assertEqual(utctz_to_altz("-0930"), 9 * 3600 + 1800) + + def test_altz_to_utctz_str(self): + self.assertEqual(altz_to_utctz_str(0), "+0000") + self.assertEqual(altz_to_utctz_str(-(14 * 3600)), "+1400") + self.assertEqual(altz_to_utctz_str(12 * 3600), "-1200") + self.assertEqual(altz_to_utctz_str(-60), "+0001") + self.assertEqual(altz_to_utctz_str(-(5 * 3600 + 1800)), "+0530") + self.assertEqual(altz_to_utctz_str(9 * 3600 + 1800), "-0930") + + self.assertEqual(altz_to_utctz_str(1), "+0000") + self.assertEqual(altz_to_utctz_str(59), "+0000") + self.assertEqual(altz_to_utctz_str(-1), "+0000") + self.assertEqual(altz_to_utctz_str(-59), "+0000") + def test_from_timestamp(self): # Correct offset: UTC+2, should return datetime + tzoffset(+2) altz = utctz_to_altz("+0200")