From ed8b634605bda5d244916aecae066fe0e7802661 Mon Sep 17 00:00:00 2001 From: phlax Date: Sat, 17 Aug 2024 10:20:53 +0100 Subject: [PATCH] `envoy.base.utils`(0.5.2): Add fetch util (#2208) Signed-off-by: Ryan Northey --- README.md | 2 +- envoy.base.utils/VERSION | 2 +- envoy.base.utils/envoy/base/utils/BUILD | 2 + envoy.base.utils/envoy/base/utils/__init__.py | 4 + .../envoy/base/utils/exceptions.py | 4 + .../envoy/base/utils/fetch_cmd.py | 16 + .../envoy/base/utils/fetch_runner.py | 220 ++++++ envoy.base.utils/setup.cfg | 1 + envoy.base.utils/tests/test_fetch_runner.py | 738 ++++++++++++++++++ 9 files changed, 987 insertions(+), 2 deletions(-) create mode 100644 envoy.base.utils/envoy/base/utils/fetch_cmd.py create mode 100644 envoy.base.utils/envoy/base/utils/fetch_runner.py create mode 100644 envoy.base.utils/tests/test_fetch_runner.py diff --git a/README.md b/README.md index 4576b75b6..2e4369668 100644 --- a/README.md +++ b/README.md @@ -139,7 +139,7 @@ pypi: https://pypi.org/project/dependatool #### [envoy.base.utils](envoy.base.utils) -version: 0.5.2.dev0 +version: 0.5.2 pypi: https://pypi.org/project/envoy.base.utils diff --git a/envoy.base.utils/VERSION b/envoy.base.utils/VERSION index 12805ec82..cb0c939a9 100644 --- a/envoy.base.utils/VERSION +++ b/envoy.base.utils/VERSION @@ -1 +1 @@ -0.5.2-dev +0.5.2 diff --git a/envoy.base.utils/envoy/base/utils/BUILD b/envoy.base.utils/envoy/base/utils/BUILD index 762401ce5..93a556b82 100644 --- a/envoy.base.utils/envoy/base/utils/BUILD +++ b/envoy.base.utils/envoy/base/utils/BUILD @@ -28,6 +28,8 @@ toolshed_library( "data_env.py", "data_env_cmd.py", "exceptions.py", + "fetch_cmd.py", + "fetch_runner.py", "interface.py", "jinja_env.py", "jinja_env_cmd.py", diff --git a/envoy.base.utils/envoy/base/utils/__init__.py b/envoy.base.utils/envoy/base/utils/__init__.py index 821a25f88..091661500 100644 --- a/envoy.base.utils/envoy/base/utils/__init__.py +++ b/envoy.base.utils/envoy/base/utils/__init__.py @@ -27,6 +27,8 @@ tuple_pair, TuplePairError) from . import interface, typing +from .fetch_cmd import fetch_cmd +from .fetch_runner import FetchRunner from .parallel_cmd import parallel_cmd from .parallel_runner import ParallelRunner from .project import Changelog, ChangelogEntry, Changelogs, Project @@ -53,6 +55,8 @@ "ellipsize", "extract", "ExtractError", + "fetch_cmd", + "FetchRunner", "from_json", "from_yaml", "increment_version", diff --git a/envoy.base.utils/envoy/base/utils/exceptions.py b/envoy.base.utils/envoy/base/utils/exceptions.py index 7ed98a7bd..eaed6d569 100644 --- a/envoy.base.utils/envoy/base/utils/exceptions.py +++ b/envoy.base.utils/envoy/base/utils/exceptions.py @@ -31,3 +31,7 @@ class CommitError(Exception): class PublishError(Exception): pass + + +class ChecksumError(Exception): + pass diff --git a/envoy.base.utils/envoy/base/utils/fetch_cmd.py b/envoy.base.utils/envoy/base/utils/fetch_cmd.py new file mode 100644 index 000000000..4174415be --- /dev/null +++ b/envoy.base.utils/envoy/base/utils/fetch_cmd.py @@ -0,0 +1,16 @@ + +import sys + +from .fetch_runner import FetchRunner + + +def main(*args: str) -> int: + return FetchRunner(*args)() + + +def fetch_cmd(): + sys.exit(main(*sys.argv[1:])) + + +if __name__ == "__main__": + fetch_cmd() diff --git a/envoy.base.utils/envoy/base/utils/fetch_runner.py b/envoy.base.utils/envoy/base/utils/fetch_runner.py new file mode 100644 index 000000000..632e309e2 --- /dev/null +++ b/envoy.base.utils/envoy/base/utils/fetch_runner.py @@ -0,0 +1,220 @@ + +import asyncio +import json +import hashlib +import os +import pathlib +import time +from functools import cached_property +from typing import Optional +from urllib.parse import urlsplit + +import aiohttp + +from aio.core.tasks import concurrent +from aio.run import runner + +from envoy.base import utils + + +DEFAULT_CHUNK_SIZE = 32768 +DEFAULT_MAX_CONCURRENCY = 3 + + +class FetchRunner(runner.Runner): + + @cached_property + def downloads(self) -> dict[str, dict]: + return json.load(pathlib.Path(self.args.downloads).open()) + + @cached_property + def downloads_path(self) -> pathlib.Path: + return pathlib.Path(self.tempdir.name).joinpath("downloads") + + @cached_property + def excludes(self) -> list[str]: + return ( + pathlib.Path(self.args.excludes).read_text().splitlines() + if self.args.excludes + else []) + + @property + def headers(self) -> dict: + return ( + {} + if not self.token + else dict( + Authorization=f"token {self.token}", + Accept="application/octet-stream")) + + @property + def time_elapsed(self) -> float: + start = self.time_start + return round(time.time() - start, 3) + + @cached_property + def time_start(self) -> float: + return time.time() + + @property + def token(self) -> Optional[str]: + """Github access token.""" + if self.args.token_path: + return pathlib.Path(self.args.token_path).read_text().strip() + elif self.args.token: + return os.getenv(self.args.token) + + @cached_property + def session(self) -> aiohttp.ClientSession: + """HTTP client session.""" + return aiohttp.ClientSession() + + def add_arguments(self, parser) -> None: + super().add_arguments(parser) + parser.add_argument("downloads", help="JSON k/v of downloads/info") + parser.add_argument( + "--chunk-size", + default=DEFAULT_CHUNK_SIZE, + type=int, + help="Download chunk size") + parser.add_argument( + "--concurrency", + default=DEFAULT_MAX_CONCURRENCY, + type=int, + help="Maximum concurrent downloads") + parser.add_argument( + "--excludes", + help="Path to file containing newline separated paths to exclude") + parser.add_argument( + "--extract-downloads", + action="store_true", + default=False, + help="Extract downloaded files") + parser.add_argument( + "--output", + help="Output format") + parser.add_argument( + "--output-path", + help="Output path") + parser.add_argument( + "--token", + help="Env name for auth token") + parser.add_argument( + "--token-path", + help="Path to auth token") + + def download_path(self, url: str) -> Optional[pathlib.Path]: + if "path" not in self.downloads[url]: + return None + return self.downloads_path.joinpath( + self.downloads[url]["path"], + self.filename(url)) + + def excluded(self, url: str) -> bool: + path = self.downloads[url].get("path") + return bool(path and path in self.excludes) + + async def fetch(self, url: str) -> tuple[str, Optional[bytes]]: + self.log.debug( + f"{self.time_elapsed} Fetching:\n" + f" {url}\n") + download_path = self.download_path(url) + if download_path: + download_path.parent.mkdir(parents=True, exist_ok=True) + return await self.fetch_bytes(url, path=download_path) + + async def fetch_bytes( + self, + url: str, + path: Optional[pathlib.Path] = None) -> ( + tuple[str, Optional[bytes]]): + async with self.session.get(url, headers=self.headers) as response: + response.raise_for_status() + if not path: + return url, await response.read() + + self.log.debug( + f"{self.time_elapsed} " + f"Writing chunks({self.args.chunk_size}):\n" + f" {url}\n" + f" -> {path}") + with path.open("wb") as f: + chunks = response.content.iter_chunked(self.args.chunk_size) + async for chunk in chunks: + f.write(chunk) + + if "checksum" in self.downloads[url]: + await self.validate_checksum(url) + + if self.args.extract_downloads: + await asyncio.to_thread(utils.extract, path.parent, path) + path.unlink() + + return url, None + + def filename(self, url: str) -> str: + parsed_url = urlsplit(url) + path_parts = parsed_url.path.split("/") + return path_parts[-1] + + def hashed(self, content: bytes) -> str: + hash_object = hashlib.sha256() + hash_object.update(content) + return hash_object.hexdigest() + + @runner.cleansup + async def run(self) -> Optional[int]: + result = {} + downloads = concurrent( + (self.fetch(url) + for url + in self.downloads + if not self.excluded(url)), + limit=self.args.concurrency) + + async for (url, response) in downloads: + self.log.debug( + f"{self.time_elapsed} " + f"Received:\n" + f" {url}\n") + if self.args.output == "json": + result[url] = response.decode() + + if self.args.output == "json": + print(json.dumps(result)) + return 0 + + if not self.args.output_path: + return 0 + self.log.debug( + f"{self.time_elapsed} " + f"Packing:\n" + f" {self.downloads_path}\n" + f" {self.args.output_path}\n") + await asyncio.to_thread( + utils.pack, + self.downloads_path, + self.args.output_path) + + async def cleanup(self): + await super().cleanup() + await self.session.close() + + async def validate_checksum(self, url: str) -> None: + path = self.download_path(url) + if not path: + return + hashed = await asyncio.to_thread( + self.hashed, + path.read_bytes()) + checksum = self.downloads[url]["checksum"] + self.log.debug( + f"{self.time_elapsed} " + f"Validating:\n" + f" {url}\n" + f" {checksum}\n") + if hashed != checksum: + raise utils.exceptions.ChecksumError( + f"Checksums do not match({url}):\n" + f" expected: {checksum}\n" + f" received: {hashed}") diff --git a/envoy.base.utils/setup.cfg b/envoy.base.utils/setup.cfg index 5420c9e05..baf0dde90 100644 --- a/envoy.base.utils/setup.cfg +++ b/envoy.base.utils/setup.cfg @@ -69,6 +69,7 @@ publish = wheel [options.entry_points] console_scripts = envoy.data_env = envoy.base.utils:data_env_cmd + envoy.fetch = envoy.base.utils:fetch_cmd envoy.jinja_env = envoy.base.utils:jinja_env_cmd envoy.parallel = envoy.base.utils:parallel_cmd envoy.project = envoy.base.utils:project_cmd diff --git a/envoy.base.utils/tests/test_fetch_runner.py b/envoy.base.utils/tests/test_fetch_runner.py new file mode 100644 index 000000000..0cdf3ec33 --- /dev/null +++ b/envoy.base.utils/tests/test_fetch_runner.py @@ -0,0 +1,738 @@ + +import types +from unittest.mock import AsyncMock, MagicMock, PropertyMock + +import pytest + +from aio.run.runner import Runner + +from envoy.base import utils + + +def test_fetchrunner_constructor(iters, patches): + args = iters(tuple, count=3) + kwargs = iters(dict, count=3) + patched = patches( + "runner.Runner.__init__", + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_super, ): + m_super.return_value = None + runner = utils.fetch_runner.FetchRunner(*args, **kwargs) + + assert isinstance(runner, Runner) + assert ( + m_super.call_args + == [args, kwargs]) + + +def test_fetchrunner_downloads(patches): + runner = utils.FetchRunner() + patched = patches( + "json", + "pathlib", + ("FetchRunner.args", + dict(new_callable=PropertyMock)), + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_json, m_plib, m_args): + assert ( + runner.downloads + == m_json.load.return_value) + + assert ( + m_json.load.call_args + == [(m_plib.Path.return_value.open.return_value, ), {}]) + assert ( + m_plib.Path.call_args + == [(m_args.return_value.downloads, ), {}]) + assert ( + m_plib.Path.return_value.open.call_args + == [(), {}]) + assert "downloads" in runner.__dict__ + + +def test_fetchrunner_downloads_path(patches): + runner = utils.FetchRunner() + patched = patches( + "pathlib", + ("FetchRunner.tempdir", + dict(new_callable=PropertyMock)), + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_plib, m_temp): + assert ( + runner.downloads_path + == m_plib.Path.return_value.joinpath.return_value) + + assert ( + m_plib.Path.call_args + == [(m_temp.return_value.name, ), {}]) + assert ( + m_plib.Path.return_value.joinpath.call_args + == [("downloads", ), {}]) + + assert "downloads_path" in runner.__dict__ + + +@pytest.mark.parametrize("excludes", ["", "EXCLUDE_PATH"]) +def test_fetchrunner_excludes(patches, excludes): + runner = utils.FetchRunner() + patched = patches( + "pathlib", + ("FetchRunner.args", + dict(new_callable=PropertyMock)), + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_plib, m_args): + m_args.return_value.excludes = excludes + assert ( + runner.excludes + == ((m_plib.Path.return_value + .read_text.return_value + .splitlines.return_value) + if excludes + else [])) + + if not excludes: + assert not m_plib.Path.called + return + + assert ( + m_plib.Path.call_args + == [("EXCLUDE_PATH", ), {}]) + assert ( + m_plib.Path.return_value.read_text.call_args + == [(), {}]) + assert ( + m_plib.Path.return_value.read_text.return_value.splitlines.call_args + == [(), {}]) + assert "excludes" in runner.__dict__ + + +@pytest.mark.parametrize("token", ["", "TOKEN"]) +def test_fetchrunner_headers(patches, token): + runner = utils.FetchRunner() + patched = patches( + "dict", + ("FetchRunner.token", + dict(new_callable=PropertyMock)), + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_dict, m_token): + m_token.return_value = token + assert ( + runner.headers + == (m_dict.return_value + if token + else {})) + + if not token: + assert not m_dict.called + return + + assert ( + m_dict.call_args + == [(), + dict(Authorization="token TOKEN", + Accept="application/octet-stream")]) + assert "headers" not in runner.__dict__ + + +def test_fetchrunner_time_elapsed(patches): + runner = utils.FetchRunner() + patched = patches( + "round", + "time", + ("FetchRunner.time_start", + dict(new_callable=PropertyMock)), + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_round, m_time, m_start): + assert ( + runner.time_elapsed + == m_round.return_value) + + assert ( + m_round.call_args + == [(m_time.time.return_value.__sub__.return_value, 3), {}]) + assert ( + m_time.time.call_args + == [(), {}]) + assert ( + m_time.time.return_value.__sub__.call_args + == [(m_start.return_value, ), {}]) + + assert "time_elapsed" not in runner.__dict__ + + +def test_fetchrunner_time_start(patches): + runner = utils.FetchRunner() + patched = patches( + "time", + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_time, ): + assert ( + runner.time_start + == m_time.time.return_value) + + assert ( + m_time.time.call_args + == [(), {}]) + + assert "time_start" in runner.__dict__ + + +@pytest.mark.parametrize("token", ["", "TOKEN"]) +@pytest.mark.parametrize("token_path", ["", "TOKEN_PATH"]) +def test_fetchrunner_token(patches, token, token_path): + runner = utils.FetchRunner() + patched = patches( + "os", + "pathlib", + ("FetchRunner.args", + dict(new_callable=PropertyMock)), + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_os, m_plib, m_args): + m_args.return_value.token = token + m_args.return_value.token_path = token_path + assert ( + runner.token + == ((m_plib.Path.return_value + .read_text.return_value + .strip.return_value) + if token_path + else (m_os.getenv.return_value + if token + else None))) + + if token_path: + assert ( + m_plib.Path.call_args + == [(token_path, ), {}]) + assert ( + m_plib.Path.return_value.read_text.call_args + == [(), {}]) + assert ( + m_plib.Path.return_value.read_text.return_value.strip.call_args + == [(), {}]) + assert not m_os.getenv.called + elif token: + assert not m_plib.Path.called + assert ( + m_os.getenv.call_args + == [(token, ), {}]) + else: + assert not m_plib.Path.called + assert not m_os.getenv.called + assert "token" not in runner.__dict__ + + +def test_fetchrunner_session(patches): + runner = utils.FetchRunner() + patched = patches( + "aiohttp", + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_aiohttp, ): + assert ( + runner.session + == m_aiohttp.ClientSession.return_value) + + assert ( + m_aiohttp.ClientSession.call_args + == [(), {}]) + assert "session" in runner.__dict__ + + +def test_fetchrunner_add_arguments(patches): + runner = utils.FetchRunner() + parser = MagicMock() + patched = patches( + "runner.Runner.add_arguments", + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_super, ): + runner.add_arguments(parser) + + assert ( + m_super.call_args + == [(parser, ), {}]) + + assert ( + parser.add_argument.call_args_list + == [[("downloads",), + {"help": "JSON k/v of downloads/info"}], + [("--chunk-size",), + {"default": utils.fetch_runner.DEFAULT_CHUNK_SIZE, + "help": "Download chunk size", + "type": int}], + [("--concurrency",), + {"default": utils.fetch_runner.DEFAULT_MAX_CONCURRENCY, + "help": "Maximum concurrent downloads", + "type": int}], + [("--excludes",), + {"help": ( + "Path to file containing newline separated " + "paths to exclude")}], + [("--extract-downloads",), + {"action": "store_true", + "default": False, + "help": "Extract downloaded files"}], + [("--output",), + {"help": "Output format"}], + [("--output-path",), + {"help": "Output path"}], + [("--token",), + {"help": "Env name for auth token"}], + [("--token-path",), + {"help": "Path to auth token"}]]) + + +@pytest.mark.parametrize("path", [True, False]) +def test_fetchrunner_download_path(patches, path): + runner = utils.FetchRunner() + url = MagicMock() + patched = patches( + ("FetchRunner.downloads", + dict(new_callable=PropertyMock)), + ("FetchRunner.downloads_path", + dict(new_callable=PropertyMock)), + "FetchRunner.filename", + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_downloads, m_path, m_filename): + (m_downloads.return_value.__getitem__.return_value + .__contains__.return_value) = path + assert ( + runner.download_path(url) + == (None + if not path + else m_path.return_value.joinpath.return_value)) + + if not path: + assert not m_path.called + assert not m_filename.called + assert not ( + m_downloads.return_value.__getitem__.return_value + .__getitem__.called) + return + + assert ( + m_path.return_value.joinpath.call_args + == [(m_downloads.return_value.__getitem__.return_value + .__getitem__.return_value, + m_filename.return_value), {}]) + assert ( + m_filename.call_args + == [(url, ), {}]) + assert ( + m_downloads.return_value.__getitem__.call_args + == [(url, ), {}]) + assert ( + m_downloads.return_value.__getitem__.return_value.__getitem__.call_args + == [("path", ), {}]) + + +@pytest.mark.parametrize("path", [True, False]) +@pytest.mark.parametrize("contains", [True, False]) +def test_fetchrunner_excluded(patches, path, contains): + runner = utils.FetchRunner() + url = MagicMock() + _path = ( + MagicMock() + if path + else None) + patched = patches( + "bool", + ("FetchRunner.downloads", + dict(new_callable=PropertyMock)), + ("FetchRunner.excludes", + dict(new_callable=PropertyMock)), + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_bool, m_downloads, m_excludes): + (m_downloads.return_value.__getitem__.return_value + .get.return_value) = _path + m_excludes.return_value.__contains__.return_value = contains + assert ( + runner.excluded(url) + == m_bool.return_value) + + assert ( + m_downloads.return_value.__getitem__.call_args + == [(url, ), {}]) + assert ( + m_downloads.return_value.__getitem__.return_value.get.call_args + == [("path", ), {}]) + assert ( + m_bool.call_args + == [((None if not path else contains), ), {}]) + if path: + assert ( + m_excludes.return_value.__contains__.call_args + == [(_path, ), {}]) + else: + assert not m_excludes.called + + +@pytest.mark.parametrize("path", [True, False]) +async def test_fetchrunner_fetch(patches, path): + runner = utils.FetchRunner() + url = MagicMock() + _path = ( + MagicMock() + if path + else None) + patched = patches( + "FetchRunner.download_path", + "FetchRunner.fetch_bytes", + ("FetchRunner.log", + dict(new_callable=PropertyMock)), + ("FetchRunner.time_elapsed", + dict(new_callable=PropertyMock)), + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_path, m_fetch, m_log, m_elapsed): + m_path.return_value = _path + assert ( + await runner.fetch(url) + == m_fetch.return_value) + + assert ( + m_log.return_value.debug.call_args + == [((f"{m_elapsed.return_value} Fetching:\n" + f" {url}\n"), ), {}]) + assert ( + m_path.call_args + == [(url, ), {}]) + if path: + assert ( + _path.parent.mkdir.call_args + == [(), dict(parents=True, exist_ok=True)]) + assert ( + m_fetch.call_args + == [(url, ), dict(path=_path)]) + + +@pytest.mark.parametrize("path", [True, False]) +@pytest.mark.parametrize("checksum", [True, False]) +@pytest.mark.parametrize("extract", [True, False]) +async def test_fetchrunner_fetch_bytes(patches, path, checksum, extract): + runner = utils.FetchRunner() + url = MagicMock() + _path = ( + MagicMock() + if path + else None) + patched = patches( + "asyncio", + "utils", + "FetchRunner.download_path", + "FetchRunner.validate_checksum", + ("FetchRunner.args", + dict(new_callable=PropertyMock)), + ("FetchRunner.downloads", + dict(new_callable=PropertyMock)), + ("FetchRunner.log", + dict(new_callable=PropertyMock)), + ("FetchRunner.headers", + dict(new_callable=PropertyMock)), + ("FetchRunner.session", + dict(new_callable=PropertyMock)), + ("FetchRunner.time_elapsed", + dict(new_callable=PropertyMock)), + prefix="envoy.base.utils.fetch_runner") + + with patched as patchy: + (m_asyncio, m_utils, m_download_path, m_validate, + m_args, m_downloads, m_log, m_headers, + m_session, m_elapsed) = patchy + + async def _chunked(size): + assert size == m_args.return_value.chunk_size + for x in range(0, 3): + yield f"CHUNK{x}" + + _to_thread = AsyncMock() + m_asyncio.to_thread = _to_thread + _get = AsyncMock() + m_session.return_value.get.return_value = _get + response = _get.__aenter__.return_value + response.content.iter_chunked = _chunked + response.raise_for_status = MagicMock() + (m_downloads.return_value.__getitem__.return_value + .__contains__.return_value) = checksum + m_args.return_value.extract_downloads = extract + assert ( + await runner.fetch_bytes(url, _path) + == (url, + (response.read.return_value + if not path + else None))) + + assert ( + m_session.return_value.get.call_args + == [(url, ), dict(headers=m_headers.return_value)]) + assert ( + response.raise_for_status.call_args + == [(), {}]) + if not path: + assert ( + response.read.call_args + == [(), {}]) + assert not m_log.return_value.debug.called + assert not response.content.iter_chunk.called + assert not m_downloads.return_value.__getitem__.called + assert not m_asyncio.to_thread.called + assert not m_validate.called + return + + assert not response.read.called + assert ( + m_log.return_value.debug.call_args + == [((f"{m_elapsed.return_value} " + f"Writing chunks({m_args.return_value.chunk_size}):\n" + f" {url}\n" + f" -> {_path}"), ), {}]) + assert ( + _path.open.call_args + == [("wb", ), {}]) + assert ( + _path.open.return_value.__enter__.return_value.write.call_args_list + == [[(f"CHUNK{x}", ), {}] + for x in range(0, 3)]) + assert ( + m_downloads.return_value.__getitem__.call_args + == [(url, ), {}]) + if checksum: + assert ( + m_validate.call_args + == [(url, ), {}]) + else: + assert not m_validate.called + if not extract: + assert not m_asyncio.to_thread.called + assert not _path.unlink.called + return + assert ( + m_asyncio.to_thread.call_args + == [(m_utils.extract, _path.parent, _path), {}]) + assert ( + _path.unlink.call_args + == [(), {}]) + + +def test_fetchrunner_filename(patches): + runner = utils.FetchRunner() + url = MagicMock() + patched = patches( + "urlsplit", + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_url, ): + assert ( + runner.filename(url) + == (m_url.return_value.path.split.return_value + .__getitem__.return_value)) + + assert ( + m_url.call_args + == [(url, ), {}]) + assert ( + m_url.return_value.path.split.call_args + == [("/", ), {}]) + assert ( + m_url.return_value.path.split.return_value.__getitem__.call_args + == [(-1, ), {}]) + + +def test_fetchrunner_hashed(patches): + runner = utils.FetchRunner() + content = MagicMock() + patched = patches( + "hashlib", + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_hash, ): + assert ( + runner.hashed(content) + == m_hash.sha256.return_value.hexdigest.return_value) + + assert ( + m_hash.sha256.call_args + == [(), {}]) + assert ( + m_hash.sha256.return_value.update.call_args + == [(content, ), {}]) + assert ( + m_hash.sha256.return_value.hexdigest.call_args + == [(), {}]) + + +@pytest.mark.parametrize("output", ["json", "NOTJSON"]) +@pytest.mark.parametrize("path", ["", "PATH"]) +async def test_fetchrunner_run(patches, iters, output, path): + runner = utils.FetchRunner() + patched = patches( + "asyncio", + "concurrent", + "json", + "print", + "utils", + "FetchRunner.excluded", + "FetchRunner.fetch", + ("FetchRunner.args", + dict(new_callable=PropertyMock)), + ("FetchRunner.downloads", + dict(new_callable=PropertyMock)), + ("FetchRunner.downloads_path", + dict(new_callable=PropertyMock)), + ("FetchRunner.log", + dict(new_callable=PropertyMock)), + ("FetchRunner.time_elapsed", + dict(new_callable=PropertyMock)), + prefix="envoy.base.utils.fetch_runner") + items = {} + + async def _concurrent(): + for item in iters(cb=lambda x: f"X{x}"): + items[item] = MagicMock() + yield item, items[item] + + with patched as patchy: + (m_asyncio, m_concurrent, m_json, m_print, + m_utils, m_excluded, m_fetch, m_args, + m_downloads, m_path, m_log, m_elapsed) = patchy + m_args.return_value.output = output + m_args.return_value.output_path = path + _to_thread = AsyncMock() + m_asyncio.to_thread = _to_thread + m_concurrent.return_value = _concurrent() + m_excluded.side_effect = lambda x: x == "I3" + m_downloads.return_value = iters() + result = await runner.run() + assert not result + download_iter = m_concurrent.call_args[0][0] + downloads = list(download_iter) + + assert isinstance(download_iter, types.GeneratorType) + assert ( + m_concurrent.call_args + == [(download_iter, ), + dict(limit=m_args.return_value.concurrency)]) + for download in downloads: + assert await download == m_fetch.return_value + assert ( + m_fetch.call_args_list + == [[(x, ), {}] + for x in m_downloads.return_value + if x != "I3"]) + assert ( + m_log.return_value.debug.call_args_list[:5] + == [[(f"{m_elapsed.return_value} Received:\n {x}\n", ), {}] + for x in items]) + + if output == "json": + assert result == 0 + assert ( + m_print.call_args + == [(m_json.dumps.return_value, ), {}]) + assert len(m_log.return_value.debug.call_args_list) == 5 + assert ( + m_json.dumps.call_args + == [({k: v.decode() for k, v in items.items()},), {}]) + assert not m_asyncio.to_thread.called + return + if not path: + assert result == 0 + assert len(m_log.return_value.debug.call_args_list) == 5 + assert not m_asyncio.to_thread.called + return + assert ( + m_log.return_value.debug.call_args_list[5] + == [(f"{m_elapsed.return_value} " + f"Packing:\n" + f" {m_path.return_value}\n" + f" {m_args.return_value.output_path}\n", ), {}]) + assert not m_print.called + assert not m_json.dumps.called + assert ( + m_asyncio.to_thread.call_args + == [(m_utils.pack, + m_path.return_value, + m_args.return_value.output_path), {}]) + + +@pytest.mark.parametrize("path", [True, False]) +@pytest.mark.parametrize("matches", [True, False]) +async def test_fetchrunner_validate_checksum(patches, path, matches): + runner = utils.FetchRunner() + url = MagicMock() + _path = ( + MagicMock() + if path + else None) + patched = patches( + "asyncio", + "FetchRunner.download_path", + ("FetchRunner.downloads", + dict(new_callable=PropertyMock)), + ("FetchRunner.log", + dict(new_callable=PropertyMock)), + ("FetchRunner.time_elapsed", + dict(new_callable=PropertyMock)), + prefix="envoy.base.utils.fetch_runner") + + with patched as (m_asyncio, m_path, m_downloads, m_log, m_elapsed): + m_path.return_value = _path + _to_thread = AsyncMock() + m_asyncio.to_thread = _to_thread + _to_thread.return_value.__ne__.return_value = not matches + if path and not matches: + with pytest.raises(utils.exceptions.ChecksumError) as e: + await runner.validate_checksum(url) + else: + assert not await runner.validate_checksum(url) + + assert ( + m_path.call_args + == [(url, ), {}]) + if not path: + assert not _to_thread.called + assert not m_log.return_value.debug.called + assert not m_downloads.called + return + assert ( + _to_thread.call_args + == [(runner.hashed, _path.read_bytes.return_value), {}]) + assert ( + _path.read_bytes.call_args + == [(), {}]) + configured_checksum = ( + m_downloads.return_value.__getitem__.return_value + .__getitem__.return_value) + assert ( + m_log.return_value.debug.call_args + == [((f"{m_elapsed.return_value} " + f"Validating:\n" + f" {url}\n" + f" {configured_checksum}\n"), ), {}]) + assert ( + _to_thread.return_value.__ne__.call_args + == [(m_downloads.return_value.__getitem__.return_value + .__getitem__.return_value, ), {}]) + assert ( + m_downloads.return_value.__getitem__.call_args + == [(url,), {}]) + assert ( + m_downloads.return_value.__getitem__.return_value.__getitem__.call_args + == [("checksum",), {}]) + if not matches: + assert ( + e.value.args[0] + == (f"Checksums do not match({url}):\n" + f" expected: {configured_checksum}\n" + f" received: {_to_thread.return_value}"))