Skip to content

Commit

Permalink
customize the choice and the order of password providers (#864)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyNikiforov committed Jun 7, 2024
1 parent 463589b commit 6f76dc1
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 56 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- feature: customize choice and the order of checking for password with `--password-provider` parameter

## 1.19.1 (2024-06-02)

- fix: KeyError alternative [#859](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/859)
Expand Down
40 changes: 27 additions & 13 deletions src/icloudpd/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
import sys
from typing import Callable, Optional
from typing import Callable, Dict, Optional, Sequence, Tuple
import click
import pyicloud_ipd
from pyicloud_ipd.base import PyiCloudService
Expand All @@ -16,34 +16,48 @@ class TwoStepAuthRequiredError(Exception):
"""


def authenticator(logger: logging.Logger, domain: str, filename_cleaner: Callable[[str], str], lp_filename_generator: Callable[[str], str], raw_policy:RawTreatmentPolicy) -> Callable[[str, Optional[str], Optional[str], bool, Optional[str]], PyiCloudService]:
def authenticator(
logger: logging.Logger,
domain: str,
filename_cleaner: Callable[[str], str],
lp_filename_generator: Callable[[str], str],
raw_policy:RawTreatmentPolicy,
password_providers: Dict[str, Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]]) -> Callable[[str, Optional[str], bool, Optional[str]], PyiCloudService]:
"""Wraping authentication with domain context"""
def authenticate_(
username:str,
password:Optional[str],
cookie_directory:Optional[str]=None,
raise_error_on_2sa:bool=False,
client_id:Optional[str]=None,
) -> PyiCloudService:
"""Authenticate with iCloud username and password"""
logger.debug("Authenticating...")
while True:
try:
# If password not provided on command line variable will be set to None
# and PyiCloud will attempt to retrieve from its keyring
icloud: Optional[PyiCloudService] = None
_valid_password: Optional[str] = None
for _, _pair in password_providers.items():
_reader, _ = _pair
_password = _reader(username)
if _password:
icloud = PyiCloudService(
filename_cleaner,
lp_filename_generator,
domain,
raw_policy,
username, password,
username, _password,
cookie_directory=cookie_directory,
client_id=client_id,
)
_valid_password = _password
break
except pyicloud_ipd.exceptions.PyiCloudNoStoredPasswordAvailableException:
# Prompt for password if not stored in PyiCloud's keyring
password = click.prompt("iCloud Password", hide_input=True)

if not icloud:
raise NotImplementedError("None of providers gave password")

if _valid_password:
# save valid password to all providers
for _, _pair in password_providers.items():
_, _writer = _pair
_writer(username, _valid_password)

if icloud.requires_2fa:
if raise_error_on_2sa:
Expand All @@ -69,7 +83,7 @@ def request_2sa(icloud: PyiCloudService, logger: logging.Logger) -> None:
"""Request two-step authentication. Prompts for SMS or device"""
devices = icloud.trusted_devices
devices_count = len(devices)
device_index = 0
device_index: int = 0
if devices_count > 0:
for i, device in enumerate(devices):
# pylint: disable-msg=consider-using-f-string
Expand All @@ -82,7 +96,7 @@ def request_2sa(icloud: PyiCloudService, logger: logging.Logger) -> None:

device_index = click.prompt(
"Please choose an option:",
default=0,
default="0",
type=click.IntRange(
0,
devices_count - 1))
Expand Down
73 changes: 67 additions & 6 deletions src/icloudpd/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#!/usr/bin/env python
"""Main script that uses Click to parse command-line arguments"""
from __future__ import print_function
import getpass
import typing

from click import Option, Parameter
from icloudpd.counter import Counter
from icloudpd import constants
from icloudpd import exif_datetime
Expand All @@ -19,7 +23,7 @@
from tqdm import tqdm
import click
import urllib
from typing import Callable, Iterable, NoReturn, Optional, Sequence, TypeVar, cast
from typing import Callable, Dict, Iterable, List, NoReturn, Optional, Sequence, Tuple, TypeVar, cast
import json
import subprocess
import itertools
Expand All @@ -31,7 +35,7 @@
import os
from multiprocessing import freeze_support

from pyicloud_ipd.utils import compose, disambiguate_filenames, identity
from pyicloud_ipd.utils import compose, constant, disambiguate_filenames, get_password_from_keyring, identity, store_password_in_keyring
from pyicloud_ipd.version_size import AssetVersionSize, LivePhotoVersionSize
freeze_support() # fixing tqdm on macos

Expand Down Expand Up @@ -88,6 +92,38 @@ def _map(size: str) -> AssetVersionSize:
raise ValueError(f"size was provided with unsupported value of '{size}'")
return [_map(_s) for _s in sizes]

def ask_password_in_console(_user:str) -> Optional[str]:
return typing.cast(Optional[str], click.prompt("iCloud Password", hide_input=True))
# return getpass.getpass(
# f'iCloud Password for {_user}:'
# )

# def get_click_param_by_name(_name: str, _params: List[Parameter]) -> Optional[Parameter]:
# _with_password = [_p for _p in _params if _name in _p.name]
# if len(_with_password) == 0:
# return None
# return _with_password[0]

def dummy_password_writter(_u:str, _p:str) -> None:
pass

def password_provider_generator(_ctx: click.Context, _param: click.Parameter, providers: Sequence[str]) -> Dict[str, Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]]:
def _map(provider: str) -> Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]:
if provider == "console":
return (ask_password_in_console, dummy_password_writter)
elif provider == "keyring":
return (get_password_from_keyring, store_password_in_keyring)
elif provider == "parameter":
# TODO get from parameter
# _param: Optional[Parameter] = get_click_param_by_name("password", _ctx.command.params)
# if _param:
# _password: str = _param.consume_value(_ctx, {})
# return constant(_password)
return (constant(None), dummy_password_writter)
else:
raise ValueError(f"password provider was given an unsupported value of '{provider}'")
return dict([(_s, _map(_s)) for _s in providers])

def lp_size_generator(_ctx: click.Context, _param: click.Parameter, size: str) -> LivePhotoVersionSize:
if size == "original":
return LivePhotoVersionSize.ORIGINAL
Expand Down Expand Up @@ -121,6 +157,7 @@ def lp_size_generator(_ctx: click.Context, _param: click.Parameter, size: str) -
help="Your iCloud password "
"(default: use PyiCloud keyring or prompt for password)",
metavar="<password>",
# is_eager=True,
)
@click.option(
"--auth-only",
Expand Down Expand Up @@ -333,6 +370,15 @@ def lp_size_generator(_ctx: click.Context, _param: click.Parameter, size: str) -
show_default=True,
callback=raw_policy_generator,
)
@click.option("--password-provider",
"password_providers",
help="Specifies passwords provider to check in the specified order",
type=click.Choice(['console', 'keyring', 'parameter'], case_sensitive=False),
default=["parameter", "keyring", "console"],
show_default=True,
multiple=True,
callback=password_provider_generator,
)
# a hacky way to get proper version because automatic detection does not
# work for some reason
@click.version_option(version="1.19.1")
Expand Down Expand Up @@ -377,6 +423,7 @@ def main(
filename_cleaner: Callable[[str], str],
lp_filename_generator: Callable[[str], str],
raw_policy:RawTreatmentPolicy,
password_providers: Dict[str, Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]],
) -> NoReturn:
"""Download all iCloud photos to a local directory"""

Expand Down Expand Up @@ -418,6 +465,18 @@ def main(
)
sys.exit(2)


# hacky way to use one param in another
if password and "parameter" in password_providers:
# replace
password_providers["parameter"] = (constant(password), lambda _r, _w: None)

if len(password_providers) == 0: # pragma: no cover
print(
'You need to specify at least one --password-provider'
)
sys.exit(2)

sys.exit(
core(
download_builder(
Expand Down Expand Up @@ -467,7 +526,9 @@ def main(
dry_run,
filename_cleaner,
lp_filename_generator,
raw_policy))
raw_policy,
password_providers,
))



Expand Down Expand Up @@ -855,7 +916,8 @@ def core(
dry_run: bool,
filename_cleaner: Callable[[str], str],
lp_filename_generator: Callable[[str], str],
raw_policy: RawTreatmentPolicy
raw_policy: RawTreatmentPolicy,
password_providers: Dict[str,Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]],
) -> int:
"""Download all iCloud photos to a local directory"""

Expand All @@ -865,9 +927,8 @@ def core(
or notification_script is not None
)
try:
icloud = authenticator(logger, domain, filename_cleaner, lp_filename_generator, raw_policy)(
icloud = authenticator(logger, domain, filename_cleaner, lp_filename_generator, raw_policy, password_providers)(
username,
password,
cookie_directory,
raise_error_on_2sa,
os.environ.get("CLIENT_ID"),
Expand Down
5 changes: 1 addition & 4 deletions src/pyicloud_ipd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,9 @@ def __init__(
lp_filename_generator: Callable[[str], str],
domain:str,
raw_policy: RawTreatmentPolicy,
apple_id: str, password:Optional[str]=None, cookie_directory:Optional[str]=None, verify:bool=True,
apple_id: str, password:str, cookie_directory:Optional[str]=None, verify:bool=True,
client_id:Optional[str]=None, with_family:bool=True,
):
if password is None:
password = get_password_from_keyring(apple_id)

self.filename_cleaner = filename_cleaner
self.lp_filename_generator = lp_filename_generator
self.raw_policy = raw_policy
Expand Down
8 changes: 4 additions & 4 deletions src/pyicloud_ipd/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,10 @@ def main(args:Optional[Sequence[str]]=None) -> NoReturn:
if not username:
parser.error("No username supplied")

if not password:
password = utils.get_password(
username, interactive=command_line.interactive
)
# if not password:
# password = utils.get_password(
# username, interactive=command_line.interactive
# )

if not password:
parser.error("No password supplied")
Expand Down
50 changes: 29 additions & 21 deletions src/pyicloud_ipd/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@
KEYRING_SYSTEM = 'pyicloud://icloud-password'


def get_password(username:str, interactive:bool=sys.stdout.isatty()) -> str:
try:
return get_password_from_keyring(username)
except PyiCloudNoStoredPasswordAvailableException:
if not interactive:
raise
# def get_password(username:str, interactive:bool=sys.stdout.isatty()) -> str:
# try:
# return get_password_from_keyring(username)
# except PyiCloudNoStoredPasswordAvailableException:
# if not interactive:
# raise

return getpass.getpass(
'Enter iCloud password for {username}: '.format(
username=username,
)
)
# return getpass.getpass(
# 'Enter iCloud password for {username}: '.format(
# username=username,
# )
# )


def password_exists_in_keyring(username:str) -> bool:
Expand All @@ -36,20 +36,20 @@ def password_exists_in_keyring(username:str) -> bool:
return True


def get_password_from_keyring(username:str) -> str:
def get_password_from_keyring(username:str) -> Optional[str]:
result = keyring.get_password(
KEYRING_SYSTEM,
username
)
if result is None:
raise PyiCloudNoStoredPasswordAvailableException(
"No pyicloud password for {username} could be found "
"in the system keychain. Use the `--store-in-keyring` "
"command-line option for storing a password for this "
"username.".format(
username=username,
)
)
# if result is None:
# raise PyiCloudNoStoredPasswordAvailableException(
# "No pyicloud password for {username} could be found "
# "in the system keychain. Use the `--store-in-keyring` "
# "command-line option for storing a password for this "
# "username.".format(
# username=username,
# )
# )

return result

Expand Down Expand Up @@ -89,6 +89,14 @@ def identity(value: _Tin) -> _Tin:
"""identity function"""
return value

def constant(value: _Tout) -> Callable[[_Tin], _Tout]:
"""constant function"""
def _intern(_:_Tin) -> _Tout:
return value
return _intern



# def filename_with_size(filename: str, size: str, original: Optional[Dict[str, Any]]) -> str:
# """Returns the filename with size, e.g. IMG1234.jpg, IMG1234-small.jpg"""
# if size == 'original' or size == 'alternative':
Expand Down
13 changes: 5 additions & 8 deletions tests/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from click.testing import CliRunner
from icloudpd.logger import setup_logger
import pyicloud_ipd
from icloudpd.base import lp_filename_concatinator, main
from icloudpd.base import dummy_password_writter, lp_filename_concatinator, main
from icloudpd.authentication import authenticator, TwoStepAuthRequiredError
import inspect

from pyicloud_ipd.raw_policy import RawTreatmentPolicy
from pyicloud_ipd.utils import identity
from pyicloud_ipd.utils import constant, identity
from tests.helpers import path_from_project_root, recreate_path

vcr = VCR(decode_compressed_response=True)
Expand All @@ -35,9 +35,8 @@ def test_failed_auth(self) -> None:
with self.assertRaises(
pyicloud_ipd.exceptions.PyiCloudFailedLoginException
) as context:
authenticator(setup_logger(), "com", identity, lp_filename_concatinator, RawTreatmentPolicy.AS_IS)(
authenticator(setup_logger(), "com", identity, lp_filename_concatinator, RawTreatmentPolicy.AS_IS, {"test": (constant("dummy"), dummy_password_writter)})(
"bad_username",
"bad_password",
cookie_dir,
False,
"EC5646DE-9423-11E8-BF21-14109FE0B321",
Expand All @@ -59,9 +58,8 @@ def test_2sa_required(self) -> None:
# delete ./tests/vcr_cassettes/auth_requires_2sa.yml,
# put your actual credentials in here, run the test,
# and then replace with dummy credentials.
authenticator(setup_logger(), "com", identity, lp_filename_concatinator, RawTreatmentPolicy.AS_IS)(
authenticator(setup_logger(), "com", identity, lp_filename_concatinator, RawTreatmentPolicy.AS_IS, {"test": (constant("dummy"), dummy_password_writter)})(
"jdoe@gmail.com",
"password1",
cookie_dir,
True,
"DE309E26-942E-11E8-92F5-14109FE0B321",
Expand All @@ -85,9 +83,8 @@ def test_2fa_required(self) -> None:
# delete ./tests/vcr_cassettes/auth_requires_2fa.yml,
# put your actual credentials in here, run the test,
# and then replace with dummy credentials.
authenticator(setup_logger(), "com", identity, lp_filename_concatinator, RawTreatmentPolicy.AS_IS)(
authenticator(setup_logger(), "com", identity, lp_filename_concatinator, RawTreatmentPolicy.AS_IS, {"test": (constant("dummy"), dummy_password_writter)})(
"jdoe@gmail.com",
"password1",
cookie_dir,
True,
"EC5646DE-9423-11E8-BF21-14109FE0B321",
Expand Down

0 comments on commit 6f76dc1

Please sign in to comment.