From 6f76dc1ca093adceaa23f8655a6fde6e400da53a Mon Sep 17 00:00:00 2001 From: Andrey Nikiforov Date: Thu, 6 Jun 2024 18:03:27 -0700 Subject: [PATCH] customize the choice and the order of password providers (#864) --- CHANGELOG.md | 2 + src/icloudpd/authentication.py | 40 +++++++++++++------ src/icloudpd/base.py | 73 +++++++++++++++++++++++++++++++--- src/pyicloud_ipd/base.py | 5 +-- src/pyicloud_ipd/cmdline.py | 8 ++-- src/pyicloud_ipd/utils.py | 50 +++++++++++++---------- tests/test_authentication.py | 13 +++--- 7 files changed, 135 insertions(+), 56 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9c840b87..4e69f33db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- feature: customize choice and the order of checking for password with `--password-provider` parameter + ## 1.19.1 (2024-06-02) - fix: KeyError alternative [#859](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/859) diff --git a/src/icloudpd/authentication.py b/src/icloudpd/authentication.py index 71b3eee31..eebe0e394 100644 --- a/src/icloudpd/authentication.py +++ b/src/icloudpd/authentication.py @@ -2,7 +2,7 @@ import logging import sys -from typing import Callable, Optional +from typing import Callable, Dict, Optional, Sequence, Tuple import click import pyicloud_ipd from pyicloud_ipd.base import PyiCloudService @@ -16,34 +16,48 @@ class TwoStepAuthRequiredError(Exception): """ -def authenticator(logger: logging.Logger, domain: str, filename_cleaner: Callable[[str], str], lp_filename_generator: Callable[[str], str], raw_policy:RawTreatmentPolicy) -> Callable[[str, Optional[str], Optional[str], bool, Optional[str]], PyiCloudService]: +def authenticator( + logger: logging.Logger, + domain: str, + filename_cleaner: Callable[[str], str], + lp_filename_generator: Callable[[str], str], + raw_policy:RawTreatmentPolicy, + password_providers: Dict[str, Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]]) -> Callable[[str, Optional[str], bool, Optional[str]], PyiCloudService]: """Wraping authentication with domain context""" def authenticate_( username:str, - password:Optional[str], cookie_directory:Optional[str]=None, raise_error_on_2sa:bool=False, client_id:Optional[str]=None, ) -> PyiCloudService: """Authenticate with iCloud username and password""" logger.debug("Authenticating...") - while True: - try: - # If password not provided on command line variable will be set to None - # and PyiCloud will attempt to retrieve from its keyring + icloud: Optional[PyiCloudService] = None + _valid_password: Optional[str] = None + for _, _pair in password_providers.items(): + _reader, _ = _pair + _password = _reader(username) + if _password: icloud = PyiCloudService( filename_cleaner, lp_filename_generator, domain, raw_policy, - username, password, + username, _password, cookie_directory=cookie_directory, client_id=client_id, ) + _valid_password = _password break - except pyicloud_ipd.exceptions.PyiCloudNoStoredPasswordAvailableException: - # Prompt for password if not stored in PyiCloud's keyring - password = click.prompt("iCloud Password", hide_input=True) + + if not icloud: + raise NotImplementedError("None of providers gave password") + + if _valid_password: + # save valid password to all providers + for _, _pair in password_providers.items(): + _, _writer = _pair + _writer(username, _valid_password) if icloud.requires_2fa: if raise_error_on_2sa: @@ -69,7 +83,7 @@ def request_2sa(icloud: PyiCloudService, logger: logging.Logger) -> None: """Request two-step authentication. Prompts for SMS or device""" devices = icloud.trusted_devices devices_count = len(devices) - device_index = 0 + device_index: int = 0 if devices_count > 0: for i, device in enumerate(devices): # pylint: disable-msg=consider-using-f-string @@ -82,7 +96,7 @@ def request_2sa(icloud: PyiCloudService, logger: logging.Logger) -> None: device_index = click.prompt( "Please choose an option:", - default=0, + default="0", type=click.IntRange( 0, devices_count - 1)) diff --git a/src/icloudpd/base.py b/src/icloudpd/base.py index d0c3d5dbb..60f83c191 100644 --- a/src/icloudpd/base.py +++ b/src/icloudpd/base.py @@ -1,6 +1,10 @@ #!/usr/bin/env python """Main script that uses Click to parse command-line arguments""" from __future__ import print_function +import getpass +import typing + +from click import Option, Parameter from icloudpd.counter import Counter from icloudpd import constants from icloudpd import exif_datetime @@ -19,7 +23,7 @@ from tqdm import tqdm import click import urllib -from typing import Callable, Iterable, NoReturn, Optional, Sequence, TypeVar, cast +from typing import Callable, Dict, Iterable, List, NoReturn, Optional, Sequence, Tuple, TypeVar, cast import json import subprocess import itertools @@ -31,7 +35,7 @@ import os from multiprocessing import freeze_support -from pyicloud_ipd.utils import compose, disambiguate_filenames, identity +from pyicloud_ipd.utils import compose, constant, disambiguate_filenames, get_password_from_keyring, identity, store_password_in_keyring from pyicloud_ipd.version_size import AssetVersionSize, LivePhotoVersionSize freeze_support() # fixing tqdm on macos @@ -88,6 +92,38 @@ def _map(size: str) -> AssetVersionSize: raise ValueError(f"size was provided with unsupported value of '{size}'") return [_map(_s) for _s in sizes] +def ask_password_in_console(_user:str) -> Optional[str]: + return typing.cast(Optional[str], click.prompt("iCloud Password", hide_input=True)) + # return getpass.getpass( + # f'iCloud Password for {_user}:' + # ) + +# def get_click_param_by_name(_name: str, _params: List[Parameter]) -> Optional[Parameter]: +# _with_password = [_p for _p in _params if _name in _p.name] +# if len(_with_password) == 0: +# return None +# return _with_password[0] + +def dummy_password_writter(_u:str, _p:str) -> None: + pass + +def password_provider_generator(_ctx: click.Context, _param: click.Parameter, providers: Sequence[str]) -> Dict[str, Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]]: + def _map(provider: str) -> Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]: + if provider == "console": + return (ask_password_in_console, dummy_password_writter) + elif provider == "keyring": + return (get_password_from_keyring, store_password_in_keyring) + elif provider == "parameter": + # TODO get from parameter + # _param: Optional[Parameter] = get_click_param_by_name("password", _ctx.command.params) + # if _param: + # _password: str = _param.consume_value(_ctx, {}) + # return constant(_password) + return (constant(None), dummy_password_writter) + else: + raise ValueError(f"password provider was given an unsupported value of '{provider}'") + return dict([(_s, _map(_s)) for _s in providers]) + def lp_size_generator(_ctx: click.Context, _param: click.Parameter, size: str) -> LivePhotoVersionSize: if size == "original": return LivePhotoVersionSize.ORIGINAL @@ -121,6 +157,7 @@ def lp_size_generator(_ctx: click.Context, _param: click.Parameter, size: str) - help="Your iCloud password " "(default: use PyiCloud keyring or prompt for password)", metavar="", + # is_eager=True, ) @click.option( "--auth-only", @@ -333,6 +370,15 @@ def lp_size_generator(_ctx: click.Context, _param: click.Parameter, size: str) - show_default=True, callback=raw_policy_generator, ) +@click.option("--password-provider", + "password_providers", + help="Specifies passwords provider to check in the specified order", + type=click.Choice(['console', 'keyring', 'parameter'], case_sensitive=False), + default=["parameter", "keyring", "console"], + show_default=True, + multiple=True, + callback=password_provider_generator, + ) # a hacky way to get proper version because automatic detection does not # work for some reason @click.version_option(version="1.19.1") @@ -377,6 +423,7 @@ def main( filename_cleaner: Callable[[str], str], lp_filename_generator: Callable[[str], str], raw_policy:RawTreatmentPolicy, + password_providers: Dict[str, Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]], ) -> NoReturn: """Download all iCloud photos to a local directory""" @@ -418,6 +465,18 @@ def main( ) sys.exit(2) + + # hacky way to use one param in another + if password and "parameter" in password_providers: + # replace + password_providers["parameter"] = (constant(password), lambda _r, _w: None) + + if len(password_providers) == 0: # pragma: no cover + print( + 'You need to specify at least one --password-provider' + ) + sys.exit(2) + sys.exit( core( download_builder( @@ -467,7 +526,9 @@ def main( dry_run, filename_cleaner, lp_filename_generator, - raw_policy)) + raw_policy, + password_providers, + )) @@ -855,7 +916,8 @@ def core( dry_run: bool, filename_cleaner: Callable[[str], str], lp_filename_generator: Callable[[str], str], - raw_policy: RawTreatmentPolicy + raw_policy: RawTreatmentPolicy, + password_providers: Dict[str,Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]], ) -> int: """Download all iCloud photos to a local directory""" @@ -865,9 +927,8 @@ def core( or notification_script is not None ) try: - icloud = authenticator(logger, domain, filename_cleaner, lp_filename_generator, raw_policy)( + icloud = authenticator(logger, domain, filename_cleaner, lp_filename_generator, raw_policy, password_providers)( username, - password, cookie_directory, raise_error_on_2sa, os.environ.get("CLIENT_ID"), diff --git a/src/pyicloud_ipd/base.py b/src/pyicloud_ipd/base.py index 662bac07b..110de4e4b 100644 --- a/src/pyicloud_ipd/base.py +++ b/src/pyicloud_ipd/base.py @@ -58,12 +58,9 @@ def __init__( lp_filename_generator: Callable[[str], str], domain:str, raw_policy: RawTreatmentPolicy, - apple_id: str, password:Optional[str]=None, cookie_directory:Optional[str]=None, verify:bool=True, + apple_id: str, password:str, cookie_directory:Optional[str]=None, verify:bool=True, client_id:Optional[str]=None, with_family:bool=True, ): - if password is None: - password = get_password_from_keyring(apple_id) - self.filename_cleaner = filename_cleaner self.lp_filename_generator = lp_filename_generator self.raw_policy = raw_policy diff --git a/src/pyicloud_ipd/cmdline.py b/src/pyicloud_ipd/cmdline.py index d8770bae6..d58a639fa 100644 --- a/src/pyicloud_ipd/cmdline.py +++ b/src/pyicloud_ipd/cmdline.py @@ -191,10 +191,10 @@ def main(args:Optional[Sequence[str]]=None) -> NoReturn: if not username: parser.error("No username supplied") - if not password: - password = utils.get_password( - username, interactive=command_line.interactive - ) + # if not password: + # password = utils.get_password( + # username, interactive=command_line.interactive + # ) if not password: parser.error("No password supplied") diff --git a/src/pyicloud_ipd/utils.py b/src/pyicloud_ipd/utils.py index 66b2f38af..196cb4c0e 100644 --- a/src/pyicloud_ipd/utils.py +++ b/src/pyicloud_ipd/utils.py @@ -13,18 +13,18 @@ KEYRING_SYSTEM = 'pyicloud://icloud-password' -def get_password(username:str, interactive:bool=sys.stdout.isatty()) -> str: - try: - return get_password_from_keyring(username) - except PyiCloudNoStoredPasswordAvailableException: - if not interactive: - raise +# def get_password(username:str, interactive:bool=sys.stdout.isatty()) -> str: +# try: +# return get_password_from_keyring(username) +# except PyiCloudNoStoredPasswordAvailableException: +# if not interactive: +# raise - return getpass.getpass( - 'Enter iCloud password for {username}: '.format( - username=username, - ) - ) +# return getpass.getpass( +# 'Enter iCloud password for {username}: '.format( +# username=username, +# ) +# ) def password_exists_in_keyring(username:str) -> bool: @@ -36,20 +36,20 @@ def password_exists_in_keyring(username:str) -> bool: return True -def get_password_from_keyring(username:str) -> str: +def get_password_from_keyring(username:str) -> Optional[str]: result = keyring.get_password( KEYRING_SYSTEM, username ) - if result is None: - raise PyiCloudNoStoredPasswordAvailableException( - "No pyicloud password for {username} could be found " - "in the system keychain. Use the `--store-in-keyring` " - "command-line option for storing a password for this " - "username.".format( - username=username, - ) - ) + # if result is None: + # raise PyiCloudNoStoredPasswordAvailableException( + # "No pyicloud password for {username} could be found " + # "in the system keychain. Use the `--store-in-keyring` " + # "command-line option for storing a password for this " + # "username.".format( + # username=username, + # ) + # ) return result @@ -89,6 +89,14 @@ def identity(value: _Tin) -> _Tin: """identity function""" return value +def constant(value: _Tout) -> Callable[[_Tin], _Tout]: + """constant function""" + def _intern(_:_Tin) -> _Tout: + return value + return _intern + + + # def filename_with_size(filename: str, size: str, original: Optional[Dict[str, Any]]) -> str: # """Returns the filename with size, e.g. IMG1234.jpg, IMG1234-small.jpg""" # if size == 'original' or size == 'alternative': diff --git a/tests/test_authentication.py b/tests/test_authentication.py index 24853ab50..9249d1718 100644 --- a/tests/test_authentication.py +++ b/tests/test_authentication.py @@ -5,12 +5,12 @@ from click.testing import CliRunner from icloudpd.logger import setup_logger import pyicloud_ipd -from icloudpd.base import lp_filename_concatinator, main +from icloudpd.base import dummy_password_writter, lp_filename_concatinator, main from icloudpd.authentication import authenticator, TwoStepAuthRequiredError import inspect from pyicloud_ipd.raw_policy import RawTreatmentPolicy -from pyicloud_ipd.utils import identity +from pyicloud_ipd.utils import constant, identity from tests.helpers import path_from_project_root, recreate_path vcr = VCR(decode_compressed_response=True) @@ -35,9 +35,8 @@ def test_failed_auth(self) -> None: with self.assertRaises( pyicloud_ipd.exceptions.PyiCloudFailedLoginException ) as context: - authenticator(setup_logger(), "com", identity, lp_filename_concatinator, RawTreatmentPolicy.AS_IS)( + authenticator(setup_logger(), "com", identity, lp_filename_concatinator, RawTreatmentPolicy.AS_IS, {"test": (constant("dummy"), dummy_password_writter)})( "bad_username", - "bad_password", cookie_dir, False, "EC5646DE-9423-11E8-BF21-14109FE0B321", @@ -59,9 +58,8 @@ def test_2sa_required(self) -> None: # delete ./tests/vcr_cassettes/auth_requires_2sa.yml, # put your actual credentials in here, run the test, # and then replace with dummy credentials. - authenticator(setup_logger(), "com", identity, lp_filename_concatinator, RawTreatmentPolicy.AS_IS)( + authenticator(setup_logger(), "com", identity, lp_filename_concatinator, RawTreatmentPolicy.AS_IS, {"test": (constant("dummy"), dummy_password_writter)})( "jdoe@gmail.com", - "password1", cookie_dir, True, "DE309E26-942E-11E8-92F5-14109FE0B321", @@ -85,9 +83,8 @@ def test_2fa_required(self) -> None: # delete ./tests/vcr_cassettes/auth_requires_2fa.yml, # put your actual credentials in here, run the test, # and then replace with dummy credentials. - authenticator(setup_logger(), "com", identity, lp_filename_concatinator, RawTreatmentPolicy.AS_IS)( + authenticator(setup_logger(), "com", identity, lp_filename_concatinator, RawTreatmentPolicy.AS_IS, {"test": (constant("dummy"), dummy_password_writter)})( "jdoe@gmail.com", - "password1", cookie_dir, True, "EC5646DE-9423-11E8-BF21-14109FE0B321",