diff --git a/.coveragerc b/.coveragerc index 8f872a93d8dae6..0b73599dffa7e1 100644 --- a/.coveragerc +++ b/.coveragerc @@ -319,7 +319,8 @@ omit = homeassistant/components/iaqualink/light.py homeassistant/components/iaqualink/sensor.py homeassistant/components/iaqualink/switch.py - homeassistant/components/icloud/* + homeassistant/components/icloud/__init__.py + homeassistant/components/icloud/device_tracker.py homeassistant/components/izone/climate.py homeassistant/components/izone/discovery.py homeassistant/components/izone/__init__.py diff --git a/CODEOWNERS b/CODEOWNERS index afea92c8847b40..392c363c648c28 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -152,6 +152,7 @@ homeassistant/components/huawei_lte/* @scop homeassistant/components/huawei_router/* @abmantis homeassistant/components/hue/* @balloob homeassistant/components/iaqualink/* @flz +homeassistant/components/icloud/* @Quentame homeassistant/components/ign_sismologia/* @exxamalte homeassistant/components/incomfort/* @zxdavb homeassistant/components/influxdb/* @fabaff diff --git a/homeassistant/components/icloud/.translations/en.json b/homeassistant/components/icloud/.translations/en.json new file mode 100644 index 00000000000000..581017593566ca --- /dev/null +++ b/homeassistant/components/icloud/.translations/en.json @@ -0,0 +1,38 @@ +{ + "config": { + "abort": { + "username_exists": "Account already configured" + }, + "error": { + "login": "Login error: please check your email & password", + "send_verification_code": "Failed to send verification code", + "username_exists": "Account already configured", + "validate_verification_code": "Failed to verify your verification code, choose a trust device and start the verification again" + }, + "step": { + "trusted_device": { + "data": { + "trusted_device": "Trusted device" + }, + "description": "Select your trusted device", + "title": "iCloud trusted device" + }, + "user": { + "data": { + "password": "Password", + "username": "Email" + }, + "description": "Enter your credentials", + "title": "iCloud credentials" + }, + "verification_code": { + "data": { + "verification_code": "Verification code" + }, + "description": "Please enter the verification code you just received from iCloud", + "title": "iCloud verification code" + } + }, + "title": "Apple iCloud" + } +} \ No newline at end of file diff --git a/homeassistant/components/icloud/__init__.py b/homeassistant/components/icloud/__init__.py index 1169104c99d9a3..2012f69193803b 100644 --- a/homeassistant/components/icloud/__init__.py +++ b/homeassistant/components/icloud/__init__.py @@ -1 +1,606 @@ -"""The icloud component.""" +"""The iCloud component.""" +from datetime import timedelta +import logging +import operator +from typing import Dict + +from pyicloud import PyiCloudService +from pyicloud.exceptions import PyiCloudFailedLoginException, PyiCloudNoDevicesException +from pyicloud.services.findmyiphone import AppleDevice +import voluptuous as vol + +from homeassistant.components.zone import async_active_zone +from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry +from homeassistant.const import ATTR_ATTRIBUTION, CONF_PASSWORD, CONF_USERNAME +import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.dispatcher import dispatcher_send +from homeassistant.helpers.event import track_point_in_utc_time +from homeassistant.helpers.storage import Store +from homeassistant.helpers.typing import ConfigType, HomeAssistantType, ServiceDataType +from homeassistant.util import slugify +from homeassistant.util.async_ import run_callback_threadsafe +from homeassistant.util.dt import utcnow +from homeassistant.util.location import distance + +from .const import ( + CONF_ACCOUNT_NAME, + CONF_GPS_ACCURACY_THRESHOLD, + CONF_MAX_INTERVAL, + DEFAULT_GPS_ACCURACY_THRESHOLD, + DEFAULT_MAX_INTERVAL, + DEVICE_BATTERY_LEVEL, + DEVICE_BATTERY_STATUS, + DEVICE_CLASS, + DEVICE_DISPLAY_NAME, + DEVICE_ID, + DEVICE_LOCATION, + DEVICE_LOCATION_LATITUDE, + DEVICE_LOCATION_LONGITUDE, + DEVICE_LOST_MODE_CAPABLE, + DEVICE_LOW_POWER_MODE, + DEVICE_NAME, + DEVICE_PERSON_ID, + DEVICE_RAW_DEVICE_MODEL, + DEVICE_STATUS, + DEVICE_STATUS_CODES, + DEVICE_STATUS_SET, + DOMAIN, + ICLOUD_COMPONENTS, + STORAGE_KEY, + STORAGE_VERSION, + TRACKER_UPDATE, +) + +ATTRIBUTION = "Data provided by Apple iCloud" + +# entity attributes +ATTR_ACCOUNT_FETCH_INTERVAL = "account_fetch_interval" +ATTR_BATTERY = "battery" +ATTR_BATTERY_STATUS = "battery_status" +ATTR_DEVICE_NAME = "device_name" +ATTR_DEVICE_STATUS = "device_status" +ATTR_LOW_POWER_MODE = "low_power_mode" +ATTR_OWNER_NAME = "owner_fullname" + +# services +SERVICE_ICLOUD_PLAY_SOUND = "play_sound" +SERVICE_ICLOUD_DISPLAY_MESSAGE = "display_message" +SERVICE_ICLOUD_LOST_DEVICE = "lost_device" +SERVICE_ICLOUD_UPDATE = "update" +ATTR_ACCOUNT = "account" +ATTR_LOST_DEVICE_MESSAGE = "message" +ATTR_LOST_DEVICE_NUMBER = "number" +ATTR_LOST_DEVICE_SOUND = "sound" + +SERVICE_SCHEMA = vol.Schema({vol.Optional(ATTR_ACCOUNT): cv.string}) + +SERVICE_SCHEMA_PLAY_SOUND = vol.Schema( + {vol.Required(ATTR_ACCOUNT): cv.string, vol.Required(ATTR_DEVICE_NAME): cv.string} +) + +SERVICE_SCHEMA_DISPLAY_MESSAGE = vol.Schema( + { + vol.Required(ATTR_ACCOUNT): cv.string, + vol.Required(ATTR_DEVICE_NAME): cv.string, + vol.Required(ATTR_LOST_DEVICE_MESSAGE): cv.string, + vol.Optional(ATTR_LOST_DEVICE_SOUND): cv.boolean, + } +) + +SERVICE_SCHEMA_LOST_DEVICE = vol.Schema( + { + vol.Required(ATTR_ACCOUNT): cv.string, + vol.Required(ATTR_DEVICE_NAME): cv.string, + vol.Required(ATTR_LOST_DEVICE_NUMBER): cv.string, + vol.Required(ATTR_LOST_DEVICE_MESSAGE): cv.string, + } +) + +ACCOUNT_SCHEMA = vol.Schema( + { + vol.Required(CONF_USERNAME): cv.string, + vol.Required(CONF_PASSWORD): cv.string, + vol.Optional(CONF_ACCOUNT_NAME): cv.string, + vol.Optional(CONF_MAX_INTERVAL, default=DEFAULT_MAX_INTERVAL): cv.positive_int, + vol.Optional( + CONF_GPS_ACCURACY_THRESHOLD, default=DEFAULT_GPS_ACCURACY_THRESHOLD + ): cv.positive_int, + } +) + +CONFIG_SCHEMA = vol.Schema( + {DOMAIN: vol.Schema(vol.All(cv.ensure_list, [ACCOUNT_SCHEMA]))}, + extra=vol.ALLOW_EXTRA, +) + +_LOGGER = logging.getLogger(__name__) + + +async def async_setup(hass: HomeAssistantType, config: ConfigType) -> bool: + """Set up iCloud from legacy config file.""" + + conf = config.get(DOMAIN) + if conf is None: + return True + + for account_conf in conf: + hass.async_create_task( + hass.config_entries.flow.async_init( + DOMAIN, context={"source": SOURCE_IMPORT}, data=account_conf + ) + ) + + return True + + +async def async_setup_entry(hass: HomeAssistantType, entry: ConfigEntry) -> bool: + """Set up an iCloud account from a config entry.""" + + hass.data.setdefault(DOMAIN, {}) + + username = entry.data[CONF_USERNAME] + password = entry.data[CONF_PASSWORD] + account_name = entry.data.get(CONF_ACCOUNT_NAME) + max_interval = entry.data[CONF_MAX_INTERVAL] + gps_accuracy_threshold = entry.data[CONF_GPS_ACCURACY_THRESHOLD] + + icloud_dir = hass.helpers.storage.Store(STORAGE_VERSION, STORAGE_KEY) + + account = IcloudAccount( + hass, + username, + password, + icloud_dir, + account_name, + max_interval, + gps_accuracy_threshold, + ) + await hass.async_add_executor_job(account.setup) + hass.data[DOMAIN][username] = account + + for component in ICLOUD_COMPONENTS: + hass.async_create_task( + hass.config_entries.async_forward_entry_setup(entry, component) + ) + + def play_sound(service: ServiceDataType) -> None: + """Play sound on the device.""" + account = service.data[ATTR_ACCOUNT] + device_name = service.data.get(ATTR_DEVICE_NAME) + device_name = slugify(device_name.replace(" ", "", 99)) + + for device in _get_account(account).get_devices_with_name(device_name): + device.play_sound() + + def display_message(service: ServiceDataType) -> None: + """Display a message on the device.""" + account = service.data[ATTR_ACCOUNT] + device_name = service.data.get(ATTR_DEVICE_NAME) + device_name = slugify(device_name.replace(" ", "", 99)) + message = service.data.get(ATTR_LOST_DEVICE_MESSAGE) + sound = service.data.get(ATTR_LOST_DEVICE_SOUND, False) + + for device in _get_account(account).get_devices_with_name(device_name): + device.display_message(message, sound) + + def lost_device(service: ServiceDataType) -> None: + """Make the device in lost state.""" + account = service.data[ATTR_ACCOUNT] + device_name = service.data.get(ATTR_DEVICE_NAME) + device_name = slugify(device_name.replace(" ", "", 99)) + number = service.data.get(ATTR_LOST_DEVICE_NUMBER) + message = service.data.get(ATTR_LOST_DEVICE_MESSAGE) + + for device in _get_account(account).get_devices_with_name(device_name): + device.lost_device(number, message) + + def update_account(service: ServiceDataType) -> None: + """Call the update function of an iCloud account.""" + account = service.data.get(ATTR_ACCOUNT) + + if account is None: + for account in hass.data[DOMAIN].values(): + account.keep_alive() + else: + _get_account(account).keep_alive() + + def _get_account(account_identifier: str) -> any: + if account_identifier is None: + return None + + icloud_account = hass.data[DOMAIN].get(account_identifier, None) + if icloud_account is None: + for account in hass.data[DOMAIN].values(): + if account.name == account_identifier: + icloud_account = account + + if icloud_account is None: + raise Exception( + "No iCloud account with username or name " + account_identifier + ) + return icloud_account + + hass.services.async_register( + DOMAIN, SERVICE_ICLOUD_PLAY_SOUND, play_sound, schema=SERVICE_SCHEMA_PLAY_SOUND + ) + + hass.services.async_register( + DOMAIN, + SERVICE_ICLOUD_DISPLAY_MESSAGE, + display_message, + schema=SERVICE_SCHEMA_DISPLAY_MESSAGE, + ) + + hass.services.async_register( + DOMAIN, + SERVICE_ICLOUD_LOST_DEVICE, + lost_device, + schema=SERVICE_SCHEMA_LOST_DEVICE, + ) + + hass.services.async_register( + DOMAIN, SERVICE_ICLOUD_UPDATE, update_account, schema=SERVICE_SCHEMA + ) + + return True + + +class IcloudAccount: + """Representation of an iCloud account.""" + + def __init__( + self, + hass: HomeAssistantType, + username: str, + password: str, + icloud_dir: Store, + account_name: str, + max_interval: int, + gps_accuracy_threshold: int, + ): + """Initialize an iCloud account.""" + self.hass = hass + self._username = username + self._password = password + self._name = account_name or slugify(username.partition("@")[0]) + self._fetch_interval = max_interval + self._max_interval = max_interval + self._gps_accuracy_threshold = gps_accuracy_threshold + + self._icloud_dir = icloud_dir + + self.api = None + self._owner_fullname = None + self._family_members_fullname = {} + self._devices = {} + + self.unsub_device_tracker = None + + def setup(self): + """Set up an iCloud account.""" + try: + self.api = PyiCloudService( + self._username, self._password, self._icloud_dir.path + ) + except PyiCloudFailedLoginException as error: + self.api = None + _LOGGER.error("Error logging into iCloud Service: %s", error) + return + + user_info = None + try: + # Gets device owners infos + user_info = self.api.devices.response["userInfo"] + except PyiCloudNoDevicesException: + _LOGGER.error("No iCloud Devices found") + + self._owner_fullname = f"{user_info['firstName']} {user_info['lastName']}" + + self._family_members_fullname = {} + for prs_id, member in user_info["membersInfo"].items(): + self._family_members_fullname[ + prs_id + ] = f"{member['firstName']} {member['lastName']}" + + self._devices = {} + self.update_devices() + + def update_devices(self) -> None: + """Update iCloud devices.""" + if self.api is None: + return + + api_devices = {} + try: + api_devices = self.api.devices + except PyiCloudNoDevicesException: + _LOGGER.error("No iCloud Devices found") + + # Gets devices infos + for device in api_devices: + status = device.status(DEVICE_STATUS_SET) + device_id = status[DEVICE_ID] + device_name = status[DEVICE_NAME] + + if self._devices.get(device_id, None) is not None: + # Seen device -> updating + _LOGGER.debug("Updating iCloud device: %s", device_name) + self._devices[device_id].update(status) + else: + # New device, should be unique + _LOGGER.debug( + "Adding iCloud device: %s [model: %s]", + device_name, + status[DEVICE_RAW_DEVICE_MODEL], + ) + self._devices[device_id] = IcloudDevice(self, device, status) + self._devices[device_id].update(status) + + dispatcher_send(self.hass, TRACKER_UPDATE) + self._fetch_interval = self._determine_interval() + track_point_in_utc_time( + self.hass, + self.keep_alive, + utcnow() + timedelta(minutes=self._fetch_interval), + ) + + def _determine_interval(self) -> int: + """Calculate new interval between two API fetch (in minutes).""" + intervals = {} + for device in self._devices.values(): + if device.location is None: + continue + + current_zone = run_callback_threadsafe( + self.hass.loop, + async_active_zone, + self.hass, + device.location[DEVICE_LOCATION_LATITUDE], + device.location[DEVICE_LOCATION_LONGITUDE], + ).result() + + if current_zone is not None: + intervals[device.name] = self._max_interval + continue + + zones = ( + self.hass.states.get(entity_id) + for entity_id in sorted(self.hass.states.entity_ids("zone")) + ) + + distances = [] + for zone_state in zones: + zone_state_lat = zone_state.attributes[DEVICE_LOCATION_LATITUDE] + zone_state_long = zone_state.attributes[DEVICE_LOCATION_LONGITUDE] + zone_distance = distance( + device.location[DEVICE_LOCATION_LATITUDE], + device.location[DEVICE_LOCATION_LONGITUDE], + zone_state_lat, + zone_state_long, + ) + distances.append(round(zone_distance / 1000, 1)) + + if not distances: + continue + mindistance = min(distances) + + # Calculate out how long it would take for the device to drive + # to the nearest zone at 120 km/h: + interval = round(mindistance / 2, 0) + + # Never poll more than once per minute + interval = max(interval, 1) + + if interval > 180: + # Three hour drive? + # This is far enough that they might be flying + interval = self._max_interval + + if ( + device.battery_level is not None + and device.battery_level <= 33 + and mindistance > 3 + ): + # Low battery - let's check half as often + interval = interval * 2 + + intervals[device.name] = interval + + return max( + int(min(intervals.items(), key=operator.itemgetter(1))[1]), + self._max_interval, + ) + + def keep_alive(self, now=None) -> None: + """Keep the API alive.""" + if self.api is None: + self.setup() + + if self.api is None: + return + + self.api.authenticate() + self.update_devices() + + def get_devices_with_name(self, name: str) -> [any]: + """Get devices by name.""" + result = [] + name_slug = slugify(name.replace(" ", "", 99)) + for device in self.devices.values(): + if slugify(device.name.replace(" ", "", 99)) == name_slug: + result.append(device) + if not result: + raise Exception("No device with name " + name) + return result + + @property + def name(self) -> str: + """Return the account name.""" + return self._name + + @property + def username(self) -> str: + """Return the account username.""" + return self._username + + @property + def owner_fullname(self) -> str: + """Return the account owner fullname.""" + return self._owner_fullname + + @property + def family_members_fullname(self) -> Dict[str, str]: + """Return the account family members fullname.""" + return self._family_members_fullname + + @property + def fetch_interval(self) -> int: + """Return the account fetch interval.""" + return self._fetch_interval + + @property + def devices(self) -> Dict[str, any]: + """Return the account devices.""" + return self._devices + + +class IcloudDevice: + """Representation of a iCloud device.""" + + def __init__(self, account: IcloudAccount, device: AppleDevice, status): + """Initialize the iCloud device.""" + self._account = account + account_name = account.name + + self._device = device + self._status = status + + self._name = self._status[DEVICE_NAME] + self._device_id = self._status[DEVICE_ID] + self._device_class = self._status[DEVICE_CLASS] + self._device_model = self._status[DEVICE_DISPLAY_NAME] + + if self._status[DEVICE_PERSON_ID]: + owner_fullname = account.family_members_fullname[ + self._status[DEVICE_PERSON_ID] + ] + else: + owner_fullname = account.owner_fullname + + self._battery_level = None + self._battery_status = None + self._location = None + + self._attrs = { + ATTR_ATTRIBUTION: ATTRIBUTION, + CONF_ACCOUNT_NAME: account_name, + ATTR_ACCOUNT_FETCH_INTERVAL: self._account.fetch_interval, + ATTR_DEVICE_NAME: self._device_model, + ATTR_DEVICE_STATUS: None, + ATTR_OWNER_NAME: owner_fullname, + } + + def update(self, status) -> None: + """Update the iCloud device.""" + self._status = status + + self._status[ATTR_ACCOUNT_FETCH_INTERVAL] = self._account.fetch_interval + + device_status = DEVICE_STATUS_CODES.get(self._status[DEVICE_STATUS], "error") + self._attrs[ATTR_DEVICE_STATUS] = device_status + + if self._status[DEVICE_BATTERY_STATUS] != "Unknown": + self._battery_level = int(self._status.get(DEVICE_BATTERY_LEVEL, 0) * 100) + self._battery_status = self._status[DEVICE_BATTERY_STATUS] + low_power_mode = self._status[DEVICE_LOW_POWER_MODE] + + self._attrs[ATTR_BATTERY] = self._battery_level + self._attrs[ATTR_BATTERY_STATUS] = self._battery_status + self._attrs[ATTR_LOW_POWER_MODE] = low_power_mode + + if ( + self._status[DEVICE_LOCATION] + and self._status[DEVICE_LOCATION][DEVICE_LOCATION_LATITUDE] + ): + location = self._status[DEVICE_LOCATION] + self._location = location + + def play_sound(self) -> None: + """Play sound on the device.""" + if self._account.api is None: + return + + self._account.api.authenticate() + _LOGGER.debug("Playing sound for %s", self.name) + self.device.play_sound() + + def display_message(self, message: str, sound: bool = False) -> None: + """Display a message on the device.""" + if self._account.api is None: + return + + self._account.api.authenticate() + _LOGGER.debug("Displaying message for %s", self.name) + self.device.display_message("Subject not working", message, sound) + + def lost_device(self, number: str, message: str) -> None: + """Make the device in lost state.""" + if self._account.api is None: + return + + self._account.api.authenticate() + if self._status[DEVICE_LOST_MODE_CAPABLE]: + _LOGGER.debug("Make device lost for %s", self.name) + self.device.lost_device(number, message, None) + else: + _LOGGER.error("Cannot make device lost for %s", self.name) + + @property + def unique_id(self) -> str: + """Return a unique ID.""" + return self._device_id + + @property + def dev_id(self) -> str: + """Return the device ID.""" + return self._device_id + + @property + def name(self) -> str: + """Return the Apple device name.""" + return self._name + + @property + def device(self) -> AppleDevice: + """Return the Apple device.""" + return self._device + + @property + def device_class(self) -> str: + """Return the Apple device class.""" + return self._device_class + + @property + def device_model(self) -> str: + """Return the Apple device model.""" + return self._device_model + + @property + def battery_level(self) -> int: + """Return the Apple device battery level.""" + return self._battery_level + + @property + def battery_status(self) -> str: + """Return the Apple device battery status.""" + return self._battery_status + + @property + def location(self) -> Dict[str, any]: + """Return the Apple device location.""" + return self._location + + @property + def state_attributes(self) -> Dict[str, any]: + """Return the attributes.""" + return self._attrs diff --git a/homeassistant/components/icloud/config_flow.py b/homeassistant/components/icloud/config_flow.py new file mode 100644 index 00000000000000..cf05c07e26f89d --- /dev/null +++ b/homeassistant/components/icloud/config_flow.py @@ -0,0 +1,230 @@ +"""Config flow to configure the iCloud integration.""" +import logging +import os + +from pyicloud import PyiCloudService +from pyicloud.exceptions import PyiCloudException, PyiCloudFailedLoginException +import voluptuous as vol + +from homeassistant import config_entries +from homeassistant.const import CONF_PASSWORD, CONF_USERNAME +from homeassistant.util import slugify + +from .const import ( + CONF_ACCOUNT_NAME, + CONF_GPS_ACCURACY_THRESHOLD, + CONF_MAX_INTERVAL, + DEFAULT_GPS_ACCURACY_THRESHOLD, + DEFAULT_MAX_INTERVAL, + STORAGE_KEY, + STORAGE_VERSION, +) +from .const import DOMAIN # pylint: disable=unused-import + +CONF_TRUSTED_DEVICE = "trusted_device" +CONF_VERIFICATION_CODE = "verification_code" + +_LOGGER = logging.getLogger(__name__) + + +class IcloudFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): + """Handle a iCloud config flow.""" + + VERSION = 1 + CONNECTION_CLASS = config_entries.CONN_CLASS_CLOUD_POLL + + def __init__(self): + """Initialize iCloud config flow.""" + self.api = None + self._username = None + self._password = None + self._account_name = None + self._max_interval = None + self._gps_accuracy_threshold = None + + self._trusted_device = None + self._verification_code = None + + def _configuration_exists(self, username: str, account_name: str) -> bool: + """Return True if username or account_name exists in configuration.""" + for entry in self._async_current_entries(): + if ( + entry.data[CONF_USERNAME] == username + or entry.data.get(CONF_ACCOUNT_NAME) == account_name + or slugify(entry.data[CONF_USERNAME].partition("@")[0]) == account_name + ): + return True + return False + + async def _show_setup_form(self, user_input=None, errors=None): + """Show the setup form to the user.""" + + if user_input is None: + user_input = {} + + return self.async_show_form( + step_id="user", + data_schema=vol.Schema( + { + vol.Required( + CONF_USERNAME, default=user_input.get(CONF_USERNAME, "") + ): str, + vol.Required( + CONF_PASSWORD, default=user_input.get(CONF_PASSWORD, "") + ): str, + } + ), + errors=errors or {}, + ) + + async def async_step_user(self, user_input=None): + """Handle a flow initiated by the user.""" + errors = {} + + icloud_dir = self.hass.helpers.storage.Store(STORAGE_VERSION, STORAGE_KEY) + + if not os.path.exists(icloud_dir.path): + await self.hass.async_add_executor_job(os.makedirs, icloud_dir.path) + + if user_input is None: + return await self._show_setup_form(user_input, errors) + + self._username = user_input[CONF_USERNAME] + self._password = user_input[CONF_PASSWORD] + self._account_name = user_input.get(CONF_ACCOUNT_NAME) + self._max_interval = user_input.get(CONF_MAX_INTERVAL, DEFAULT_MAX_INTERVAL) + self._gps_accuracy_threshold = user_input.get( + CONF_GPS_ACCURACY_THRESHOLD, DEFAULT_GPS_ACCURACY_THRESHOLD + ) + + if self._configuration_exists(self._username, self._account_name): + errors[CONF_USERNAME] = "username_exists" + return await self._show_setup_form(user_input, errors) + + try: + self.api = await self.hass.async_add_executor_job( + PyiCloudService, self._username, self._password, icloud_dir.path + ) + except PyiCloudFailedLoginException as error: + _LOGGER.error("Error logging into iCloud service: %s", error) + self.api = None + errors[CONF_USERNAME] = "login" + return await self._show_setup_form(user_input, errors) + + if self.api.requires_2fa: + return await self.async_step_trusted_device() + + return self.async_create_entry( + title=self._username, + data={ + CONF_USERNAME: self._username, + CONF_PASSWORD: self._password, + CONF_ACCOUNT_NAME: self._account_name, + CONF_MAX_INTERVAL: self._max_interval, + CONF_GPS_ACCURACY_THRESHOLD: self._gps_accuracy_threshold, + }, + ) + + async def async_step_import(self, user_input): + """Import a config entry.""" + if self._configuration_exists( + user_input[CONF_USERNAME], user_input.get(CONF_ACCOUNT_NAME) + ): + return self.async_abort(reason="username_exists") + + return await self.async_step_user(user_input) + + async def async_step_trusted_device(self, user_input=None, errors=None): + """We need a trusted device.""" + if errors is None: + errors = {} + + trusted_devices = await self.hass.async_add_executor_job( + getattr, self.api, "trusted_devices" + ) + trusted_devices_for_form = {} + for i, device in enumerate(trusted_devices): + trusted_devices_for_form[i] = device.get( + "deviceName", f"SMS to {device.get('phoneNumber')}" + ) + + if user_input is None: + return await self._show_trusted_device_form( + trusted_devices_for_form, user_input, errors + ) + + self._trusted_device = trusted_devices[int(user_input[CONF_TRUSTED_DEVICE])] + + if not await self.hass.async_add_executor_job( + self.api.send_verification_code, self._trusted_device + ): + _LOGGER.error("Failed to send verification code") + self._trusted_device = None + errors[CONF_TRUSTED_DEVICE] = "send_verification_code" + + return await self._show_trusted_device_form( + trusted_devices_for_form, user_input, errors + ) + + return await self.async_step_verification_code() + + async def _show_trusted_device_form( + self, trusted_devices, user_input=None, errors=None + ): + """Show the trusted_device form to the user.""" + + return self.async_show_form( + step_id=CONF_TRUSTED_DEVICE, + data_schema=vol.Schema( + { + vol.Required(CONF_TRUSTED_DEVICE): vol.All( + vol.Coerce(int), vol.In(trusted_devices) + ) + } + ), + errors=errors or {}, + ) + + async def async_step_verification_code(self, user_input=None): + """Ask the verification code to the user.""" + errors = {} + + if user_input is None: + return await self._show_verification_code_form(user_input) + + self._verification_code = user_input[CONF_VERIFICATION_CODE] + + try: + if not await self.hass.async_add_executor_job( + self.api.validate_verification_code, + self._trusted_device, + self._verification_code, + ): + raise PyiCloudException("The code you entered is not valid.") + except PyiCloudException as error: + # Reset to the initial 2FA state to allow the user to retry + _LOGGER.error("Failed to verify verification code: %s", error) + self._trusted_device = None + self._verification_code = None + errors["base"] = "validate_verification_code" + + return await self.async_step_trusted_device(None, errors) + + return await self.async_step_user( + { + CONF_USERNAME: self._username, + CONF_PASSWORD: self._password, + CONF_ACCOUNT_NAME: self._account_name, + CONF_MAX_INTERVAL: self._max_interval, + CONF_GPS_ACCURACY_THRESHOLD: self._gps_accuracy_threshold, + } + ) + + async def _show_verification_code_form(self, user_input=None): + """Show the verification_code form to the user.""" + + return self.async_show_form( + step_id=CONF_VERIFICATION_CODE, + data_schema=vol.Schema({vol.Required(CONF_VERIFICATION_CODE): str}), + errors=None, + ) diff --git a/homeassistant/components/icloud/const.py b/homeassistant/components/icloud/const.py index fe8010df703b43..4e99a378077c9a 100644 --- a/homeassistant/components/icloud/const.py +++ b/homeassistant/components/icloud/const.py @@ -1,6 +1,85 @@ -"""Constants for the iCloud component.""" +"""iCloud component constants.""" + DOMAIN = "icloud" -SERVICE_LOST_IPHONE = "lost_iphone" -SERVICE_UPDATE = "update" -SERVICE_RESET_ACCOUNT = "reset_account" -SERVICE_SET_INTERVAL = "set_interval" +TRACKER_UPDATE = f"{DOMAIN}_tracker_update" + +CONF_ACCOUNT_NAME = "account_name" +CONF_MAX_INTERVAL = "max_interval" +CONF_GPS_ACCURACY_THRESHOLD = "gps_accuracy_threshold" + +DEFAULT_MAX_INTERVAL = 30 # min +DEFAULT_GPS_ACCURACY_THRESHOLD = 500 # meters + +# to store the cookie +STORAGE_KEY = DOMAIN +STORAGE_VERSION = 1 + +# Next PR will add sensor +ICLOUD_COMPONENTS = ["device_tracker"] + +# pyicloud.AppleDevice status +DEVICE_BATTERY_LEVEL = "batteryLevel" +DEVICE_BATTERY_STATUS = "batteryStatus" +DEVICE_CLASS = "deviceClass" +DEVICE_DISPLAY_NAME = "deviceDisplayName" +DEVICE_ID = "id" +DEVICE_LOCATION = "location" +DEVICE_LOCATION_HORIZONTAL_ACCURACY = "horizontalAccuracy" +DEVICE_LOCATION_LATITUDE = "latitude" +DEVICE_LOCATION_LONGITUDE = "longitude" +DEVICE_LOST_MODE_CAPABLE = "lostModeCapable" +DEVICE_LOW_POWER_MODE = "lowPowerMode" +DEVICE_NAME = "name" +DEVICE_PERSON_ID = "prsId" +DEVICE_RAW_DEVICE_MODEL = "rawDeviceModel" +DEVICE_STATUS = "deviceStatus" + +DEVICE_STATUS_SET = [ + "features", + "maxMsgChar", + "darkWake", + "fmlyShare", + DEVICE_STATUS, + "remoteLock", + "activationLocked", + DEVICE_CLASS, + DEVICE_ID, + "deviceModel", + DEVICE_RAW_DEVICE_MODEL, + "passcodeLength", + "canWipeAfterLock", + "trackingInfo", + DEVICE_LOCATION, + "msg", + DEVICE_BATTERY_LEVEL, + "remoteWipe", + "thisDevice", + "snd", + DEVICE_PERSON_ID, + "wipeInProgress", + DEVICE_LOW_POWER_MODE, + "lostModeEnabled", + "isLocating", + DEVICE_LOST_MODE_CAPABLE, + "mesg", + DEVICE_NAME, + DEVICE_BATTERY_STATUS, + "lockedTimestamp", + "lostTimestamp", + "locationCapable", + DEVICE_DISPLAY_NAME, + "lostDevice", + "deviceColor", + "wipedTimestamp", + "modelDisplayName", + "locationEnabled", + "isMac", + "locFoundEnabled", +] + +DEVICE_STATUS_CODES = { + "200": "online", + "201": "offline", + "203": "pending", + "204": "unregistered", +} diff --git a/homeassistant/components/icloud/device_tracker.py b/homeassistant/components/icloud/device_tracker.py index 3d9fb4715da0b6..4be34728c6dfde 100644 --- a/homeassistant/components/icloud/device_tracker.py +++ b/homeassistant/components/icloud/device_tracker.py @@ -1,544 +1,132 @@ -"""Platform that supports scanning iCloud.""" +"""Support for tracking for iCloud devices.""" import logging -import os -import random -from pyicloud import PyiCloudService -from pyicloud.exceptions import ( - PyiCloudException, - PyiCloudFailedLoginException, - PyiCloudNoDevicesException, -) -import voluptuous as vol - -from homeassistant.components.device_tracker import PLATFORM_SCHEMA -from homeassistant.components.device_tracker.const import ( - ATTR_ATTRIBUTES, - ENTITY_ID_FORMAT, -) -from homeassistant.components.device_tracker.legacy import DeviceScanner -from homeassistant.components.zone import async_active_zone -from homeassistant.const import CONF_PASSWORD, CONF_USERNAME -import homeassistant.helpers.config_validation as cv -from homeassistant.helpers.event import track_utc_time_change -from homeassistant.util import slugify -from homeassistant.util.async_ import run_callback_threadsafe -import homeassistant.util.dt as dt_util -from homeassistant.util.location import distance +from homeassistant.components.device_tracker import SOURCE_TYPE_GPS +from homeassistant.components.device_tracker.config_entry import TrackerEntity +from homeassistant.const import CONF_USERNAME +from homeassistant.helpers.dispatcher import async_dispatcher_connect +from homeassistant.helpers.typing import HomeAssistantType +from . import IcloudDevice from .const import ( + DEVICE_LOCATION_HORIZONTAL_ACCURACY, + DEVICE_LOCATION_LATITUDE, + DEVICE_LOCATION_LONGITUDE, DOMAIN, - SERVICE_LOST_IPHONE, - SERVICE_RESET_ACCOUNT, - SERVICE_SET_INTERVAL, - SERVICE_UPDATE, + TRACKER_UPDATE, ) _LOGGER = logging.getLogger(__name__) -CONF_ACCOUNTNAME = "account_name" -CONF_MAX_INTERVAL = "max_interval" -CONF_GPS_ACCURACY_THRESHOLD = "gps_accuracy_threshold" - -# entity attributes -ATTR_ACCOUNTNAME = "account_name" -ATTR_INTERVAL = "interval" -ATTR_DEVICENAME = "device_name" -ATTR_BATTERY = "battery" -ATTR_DISTANCE = "distance" -ATTR_DEVICESTATUS = "device_status" -ATTR_LOWPOWERMODE = "low_power_mode" -ATTR_BATTERYSTATUS = "battery_status" - -ICLOUDTRACKERS = {} -_CONFIGURING = {} +async def async_setup_scanner( + hass: HomeAssistantType, config, see, discovery_info=None +): + """Old way of setting up the iCloud tracker.""" + pass -DEVICESTATUSSET = [ - "features", - "maxMsgChar", - "darkWake", - "fmlyShare", - "deviceStatus", - "remoteLock", - "activationLocked", - "deviceClass", - "id", - "deviceModel", - "rawDeviceModel", - "passcodeLength", - "canWipeAfterLock", - "trackingInfo", - "location", - "msg", - "batteryLevel", - "remoteWipe", - "thisDevice", - "snd", - "prsId", - "wipeInProgress", - "lowPowerMode", - "lostModeEnabled", - "isLocating", - "lostModeCapable", - "mesg", - "name", - "batteryStatus", - "lockedTimestamp", - "lostTimestamp", - "locationCapable", - "deviceDisplayName", - "lostDevice", - "deviceColor", - "wipedTimestamp", - "modelDisplayName", - "locationEnabled", - "isMac", - "locFoundEnabled", -] - -DEVICESTATUSCODES = { - "200": "online", - "201": "offline", - "203": "pending", - "204": "unregistered", -} - -SERVICE_SCHEMA = vol.Schema( - { - vol.Optional(ATTR_ACCOUNTNAME): vol.All(cv.ensure_list, [cv.slugify]), - vol.Optional(ATTR_DEVICENAME): cv.slugify, - vol.Optional(ATTR_INTERVAL): cv.positive_int, - } -) -PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( - { - vol.Required(CONF_USERNAME): cv.string, - vol.Required(CONF_PASSWORD): cv.string, - vol.Optional(ATTR_ACCOUNTNAME): cv.slugify, - vol.Optional(CONF_MAX_INTERVAL, default=30): cv.positive_int, - vol.Optional(CONF_GPS_ACCURACY_THRESHOLD, default=1000): cv.positive_int, - } -) - - -def setup_scanner(hass, config: dict, see, discovery_info=None): - """Set up the iCloud Scanner.""" - username = config.get(CONF_USERNAME) - password = config.get(CONF_PASSWORD) - account = config.get(CONF_ACCOUNTNAME, slugify(username.partition("@")[0])) - max_interval = config.get(CONF_MAX_INTERVAL) - gps_accuracy_threshold = config.get(CONF_GPS_ACCURACY_THRESHOLD) - - icloudaccount = Icloud( - hass, username, password, account, max_interval, gps_accuracy_threshold, see - ) - - if icloudaccount.api is not None: - ICLOUDTRACKERS[account] = icloudaccount - - else: - _LOGGER.error("No ICLOUDTRACKERS added") - return False +async def async_setup_entry(hass: HomeAssistantType, entry, async_add_entities): + """Configure a dispatcher connection based on a config entry.""" + username = entry.data[CONF_USERNAME] - def lost_iphone(call): - """Call the lost iPhone function if the device is found.""" - accounts = call.data.get(ATTR_ACCOUNTNAME, ICLOUDTRACKERS) - devicename = call.data.get(ATTR_DEVICENAME) - for account in accounts: - if account in ICLOUDTRACKERS: - ICLOUDTRACKERS[account].lost_iphone(devicename) + for device in hass.data[DOMAIN][username].devices.values(): + if device.location is None: + _LOGGER.debug("No position found for device %s", device.name) + continue - hass.services.register( - DOMAIN, SERVICE_LOST_IPHONE, lost_iphone, schema=SERVICE_SCHEMA - ) + _LOGGER.debug("Adding device_tracker for %s", device.name) - def update_icloud(call): - """Call the update function of an iCloud account.""" - accounts = call.data.get(ATTR_ACCOUNTNAME, ICLOUDTRACKERS) - devicename = call.data.get(ATTR_DEVICENAME) - for account in accounts: - if account in ICLOUDTRACKERS: - ICLOUDTRACKERS[account].update_icloud(devicename) + async_add_entities([IcloudTrackerEntity(device)]) - hass.services.register(DOMAIN, SERVICE_UPDATE, update_icloud, schema=SERVICE_SCHEMA) - def reset_account_icloud(call): - """Reset an iCloud account.""" - accounts = call.data.get(ATTR_ACCOUNTNAME, ICLOUDTRACKERS) - for account in accounts: - if account in ICLOUDTRACKERS: - ICLOUDTRACKERS[account].reset_account_icloud() +class IcloudTrackerEntity(TrackerEntity): + """Represent a tracked device.""" - hass.services.register( - DOMAIN, SERVICE_RESET_ACCOUNT, reset_account_icloud, schema=SERVICE_SCHEMA - ) + def __init__(self, device: IcloudDevice): + """Set up the iCloud tracker entity.""" + self._device = device + self._unsub_dispatcher = None - def setinterval(call): - """Call the update function of an iCloud account.""" - accounts = call.data.get(ATTR_ACCOUNTNAME, ICLOUDTRACKERS) - interval = call.data.get(ATTR_INTERVAL) - devicename = call.data.get(ATTR_DEVICENAME) - for account in accounts: - if account in ICLOUDTRACKERS: - ICLOUDTRACKERS[account].setinterval(interval, devicename) + @property + def unique_id(self): + """Return a unique ID.""" + return f"{self._device.unique_id}_tracker" - hass.services.register( - DOMAIN, SERVICE_SET_INTERVAL, setinterval, schema=SERVICE_SCHEMA - ) + @property + def name(self): + """Return the name of the device.""" + return self._device.name - # Tells the bootstrapper that the component was successfully initialized - return True + @property + def location_accuracy(self): + """Return the location accuracy of the device.""" + return self._device.location[DEVICE_LOCATION_HORIZONTAL_ACCURACY] + @property + def latitude(self): + """Return latitude value of the device.""" + return self._device.location[DEVICE_LOCATION_LATITUDE] -class Icloud(DeviceScanner): - """Representation of an iCloud account.""" - - def __init__( - self, hass, username, password, name, max_interval, gps_accuracy_threshold, see - ): - """Initialize an iCloud account.""" - self.hass = hass - self.username = username - self.password = password - self.api = None - self.accountname = name - self.devices = {} - self.seen_devices = {} - self._overridestates = {} - self._intervals = {} - self._max_interval = max_interval - self._gps_accuracy_threshold = gps_accuracy_threshold - self.see = see - - self._trusted_device = None - self._verification_code = None - - self._attrs = {} - self._attrs[ATTR_ACCOUNTNAME] = name - - self.reset_account_icloud() - - randomseconds = random.randint(10, 59) - track_utc_time_change(self.hass, self.keep_alive, second=randomseconds) - - def reset_account_icloud(self): - """Reset an iCloud account.""" - icloud_dir = self.hass.config.path("icloud") - if not os.path.exists(icloud_dir): - os.makedirs(icloud_dir) - - try: - self.api = PyiCloudService( - self.username, self.password, cookie_directory=icloud_dir, verify=True - ) - except PyiCloudFailedLoginException as error: - self.api = None - _LOGGER.error("Error logging into iCloud Service: %s", error) - return - - try: - self.devices = {} - self._overridestates = {} - self._intervals = {} - for device in self.api.devices: - status = device.status(DEVICESTATUSSET) - _LOGGER.debug("Device Status is %s", status) - devicename = slugify(status["name"].replace(" ", "", 99)) - _LOGGER.info("Adding icloud device: %s", devicename) - if devicename in self.devices: - _LOGGER.error("Multiple devices with name: %s", devicename) - continue - self.devices[devicename] = device - self._intervals[devicename] = 1 - self._overridestates[devicename] = None - except PyiCloudNoDevicesException: - _LOGGER.error("No iCloud Devices found!") - - def icloud_trusted_device_callback(self, callback_data): - """Handle chosen trusted devices.""" - self._trusted_device = int(callback_data.get("trusted_device")) - self._trusted_device = self.api.trusted_devices[self._trusted_device] - - if not self.api.send_verification_code(self._trusted_device): - _LOGGER.error("Failed to send verification code") - self._trusted_device = None - return - - if self.accountname in _CONFIGURING: - request_id = _CONFIGURING.pop(self.accountname) - configurator = self.hass.components.configurator - configurator.request_done(request_id) - - # Trigger the next step immediately - self.icloud_need_verification_code() - - def icloud_need_trusted_device(self): - """We need a trusted device.""" - configurator = self.hass.components.configurator - if self.accountname in _CONFIGURING: - return - - devicesstring = "" - devices = self.api.trusted_devices - for i, device in enumerate(devices): - devicename = device.get( - "deviceName", "SMS to %s" % device.get("phoneNumber") - ) - devicesstring += f"{i}: {devicename};" - - _CONFIGURING[self.accountname] = configurator.request_config( - f"iCloud {self.accountname}", - self.icloud_trusted_device_callback, - description=( - "Please choose your trusted device by entering" - " the index from this list: " + devicesstring - ), - entity_picture="/static/images/config_icloud.png", - submit_caption="Confirm", - fields=[{"id": "trusted_device", "name": "Trusted Device"}], - ) + @property + def longitude(self): + """Return longitude value of the device.""" + return self._device.location[DEVICE_LOCATION_LONGITUDE] - def icloud_verification_callback(self, callback_data): - """Handle the chosen trusted device.""" - self._verification_code = callback_data.get("code") - - try: - if not self.api.validate_verification_code( - self._trusted_device, self._verification_code - ): - raise PyiCloudException("Unknown failure") - except PyiCloudException as error: - # Reset to the initial 2FA state to allow the user to retry - _LOGGER.error("Failed to verify verification code: %s", error) - self._trusted_device = None - self._verification_code = None - - # Trigger the next step immediately - self.icloud_need_trusted_device() - - if self.accountname in _CONFIGURING: - request_id = _CONFIGURING.pop(self.accountname) - configurator = self.hass.components.configurator - configurator.request_done(request_id) - - def icloud_need_verification_code(self): - """Return the verification code.""" - configurator = self.hass.components.configurator - if self.accountname in _CONFIGURING: - return - - _CONFIGURING[self.accountname] = configurator.request_config( - f"iCloud {self.accountname}", - self.icloud_verification_callback, - description=("Please enter the validation code:"), - entity_picture="/static/images/config_icloud.png", - submit_caption="Confirm", - fields=[{"id": "code", "name": "code"}], - ) - - def keep_alive(self, now): - """Keep the API alive.""" - if self.api is None: - self.reset_account_icloud() - - if self.api is None: - return - - if self.api.requires_2fa: - try: - if self._trusted_device is None: - self.icloud_need_trusted_device() - return - - if self._verification_code is None: - self.icloud_need_verification_code() - return - - self.api.authenticate() - if self.api.requires_2fa: - raise Exception("Unknown failure") - - self._trusted_device = None - self._verification_code = None - except PyiCloudException as error: - _LOGGER.error("Error setting up 2FA: %s", error) - else: - self.api.authenticate() - - currentminutes = dt_util.now().hour * 60 + dt_util.now().minute - try: - for devicename in self.devices: - interval = self._intervals.get(devicename, 1) - if (currentminutes % interval == 0) or ( - interval > 10 and currentminutes % interval in [2, 4] - ): - self.update_device(devicename) - except ValueError: - _LOGGER.debug("iCloud API returned an error") - - def determine_interval(self, devicename, latitude, longitude, battery): - """Calculate new interval.""" - currentzone = run_callback_threadsafe( - self.hass.loop, async_active_zone, self.hass, latitude, longitude - ).result() - - if ( - currentzone is not None - and currentzone == self._overridestates.get(devicename) - ) or (currentzone is None and self._overridestates.get(devicename) == "away"): - return + @property + def should_poll(self): + """No polling needed.""" + return False - zones = ( - self.hass.states.get(entity_id) - for entity_id in sorted(self.hass.states.entity_ids("zone")) + @property + def battery_level(self): + """Return the battery level of the device.""" + return self._device.battery_level + + @property + def source_type(self): + """Return the source type, eg gps or router, of the device.""" + return SOURCE_TYPE_GPS + + @property + def icon(self): + """Return the icon.""" + return icon_for_icloud_device(self._device) + + @property + def device_state_attributes(self): + """Return the device state attributes.""" + return self._device.state_attributes + + @property + def device_info(self): + """Return the device information.""" + return { + "identifiers": {(DOMAIN, self.unique_id)}, + "name": self.name, + "manufacturer": "Apple", + "model": self._device.device_model, + } + + async def async_added_to_hass(self): + """Register state update callback.""" + self._unsub_dispatcher = async_dispatcher_connect( + self.hass, TRACKER_UPDATE, self.async_write_ha_state ) - distances = [] - for zone_state in zones: - zone_state_lat = zone_state.attributes["latitude"] - zone_state_long = zone_state.attributes["longitude"] - zone_distance = distance( - latitude, longitude, zone_state_lat, zone_state_long - ) - distances.append(round(zone_distance / 1000, 1)) + async def async_will_remove_from_hass(self): + """Clean up after entity before removal.""" + self._unsub_dispatcher() - if distances: - mindistance = min(distances) - else: - mindistance = None - self._overridestates[devicename] = None - - if currentzone is not None: - self._intervals[devicename] = self._max_interval - return - - if mindistance is None: - return - - # Calculate out how long it would take for the device to drive to the - # nearest zone at 120 km/h: - interval = round(mindistance / 2, 0) - - # Never poll more than once per minute - interval = max(interval, 1) - - if interval > 180: - # Three hour drive? This is far enough that they might be flying - interval = 30 - - if battery is not None and battery <= 33 and mindistance > 3: - # Low battery - let's check half as often - interval = interval * 2 - - self._intervals[devicename] = interval - - def update_device(self, devicename): - """Update the device_tracker entity.""" - # An entity will not be created by see() when track=false in - # 'known_devices.yaml', but we need to see() it at least once - entity = self.hass.states.get(ENTITY_ID_FORMAT.format(devicename)) - if entity is None and devicename in self.seen_devices: - return - attrs = {} - kwargs = {} - - if self.api is None: - return - - try: - for device in self.api.devices: - if str(device) != str(self.devices[devicename]): - continue - - status = device.status(DEVICESTATUSSET) - _LOGGER.debug("Device Status is %s", status) - dev_id = status["name"].replace(" ", "", 99) - dev_id = slugify(dev_id) - attrs[ATTR_DEVICESTATUS] = DEVICESTATUSCODES.get( - status["deviceStatus"], "error" - ) - attrs[ATTR_LOWPOWERMODE] = status["lowPowerMode"] - attrs[ATTR_BATTERYSTATUS] = status["batteryStatus"] - attrs[ATTR_ACCOUNTNAME] = self.accountname - status = device.status(DEVICESTATUSSET) - battery = status.get("batteryLevel", 0) * 100 - location = status["location"] - if location and location["horizontalAccuracy"]: - horizontal_accuracy = int(location["horizontalAccuracy"]) - if horizontal_accuracy < self._gps_accuracy_threshold: - self.determine_interval( - devicename, - location["latitude"], - location["longitude"], - battery, - ) - interval = self._intervals.get(devicename, 1) - attrs[ATTR_INTERVAL] = interval - accuracy = location["horizontalAccuracy"] - kwargs["dev_id"] = dev_id - kwargs["host_name"] = status["name"] - kwargs["gps"] = (location["latitude"], location["longitude"]) - kwargs["battery"] = battery - kwargs["gps_accuracy"] = accuracy - kwargs[ATTR_ATTRIBUTES] = attrs - self.see(**kwargs) - self.seen_devices[devicename] = True - except PyiCloudNoDevicesException: - _LOGGER.error("No iCloud Devices found") - - def lost_iphone(self, devicename): - """Call the lost iPhone function if the device is found.""" - if self.api is None: - return - - self.api.authenticate() - for device in self.api.devices: - if str(device) == str(self.devices[devicename]): - _LOGGER.info("Playing Lost iPhone sound for %s", devicename) - device.play_sound() - - def update_icloud(self, devicename=None): - """Request device information from iCloud and update device_tracker.""" - if self.api is None: - return - - try: - if devicename is not None: - if devicename in self.devices: - self.update_device(devicename) - else: - _LOGGER.error( - "devicename %s unknown for account %s", - devicename, - self._attrs[ATTR_ACCOUNTNAME], - ) - else: - for device in self.devices: - self.update_device(device) - except PyiCloudNoDevicesException: - _LOGGER.error("No iCloud Devices found") +def icon_for_icloud_device(icloud_device: IcloudDevice) -> str: + """Return a battery icon valid identifier.""" + switcher = { + "iPad": "mdi:tablet-ipad", + "iPhone": "mdi:cellphone-iphone", + "iPod": "mdi:ipod", + "iMac": "mdi:desktop-mac", + "MacBookPro": "mdi:laptop-mac", + } - def setinterval(self, interval=None, devicename=None): - """Set the interval of the given devices.""" - devs = [devicename] if devicename else self.devices - for device in devs: - devid = f"{DOMAIN}.{device}" - devicestate = self.hass.states.get(devid) - if interval is not None: - if devicestate is not None: - self._overridestates[device] = run_callback_threadsafe( - self.hass.loop, - async_active_zone, - self.hass, - float(devicestate.attributes.get("latitude", 0)), - float(devicestate.attributes.get("longitude", 0)), - ).result() - if self._overridestates[device] is None: - self._overridestates[device] = "away" - self._intervals[device] = interval - else: - self._overridestates[device] = None - self.update_device(device) + return switcher.get(icloud_device.device_class, "mdi:cellphone-link") diff --git a/homeassistant/components/icloud/manifest.json b/homeassistant/components/icloud/manifest.json index d3924ee61a8b21..f7295ceae4d0cc 100644 --- a/homeassistant/components/icloud/manifest.json +++ b/homeassistant/components/icloud/manifest.json @@ -1,10 +1,13 @@ { "domain": "icloud", - "name": "Icloud", - "documentation": "https://www.home-assistant.io/integrations/icloud", + "name": "iCloud", + "config_flow": true, + "documentation": "https://www.home-assistant.io/components/icloud", "requirements": [ "pyicloud==0.9.1" ], - "dependencies": ["configurator"], - "codeowners": [] -} + "dependencies": [], + "codeowners": [ + "@Quentame" + ] +} \ No newline at end of file diff --git a/homeassistant/components/icloud/services.yaml b/homeassistant/components/icloud/services.yaml index 7b2d3b80e8435b..ce239df7564c91 100644 --- a/homeassistant/components/icloud/services.yaml +++ b/homeassistant/components/icloud/services.yaml @@ -1,39 +1,49 @@ -lost_iphone: - description: Service to play the lost iphone sound on an iDevice. +update: + description: Update iCloud devices. fields: - account_name: - description: Name of the account in the config that will be used to look for the device. This is optional, if it isn't given it will use all accounts. - example: 'bart' - device_name: - description: Name of the device that will play the sound. This is optional, if it isn't given it will play on all devices for the given account. - example: 'iphonebart' + account: + description: Your iCloud account username (email) or account name. + example: 'steve@apple.com' -set_interval: - description: Service to set the interval of an iDevice. +play_sound: + description: Play sound on an Apple device. fields: - account_name: - description: Name of the account in the config that will be used to look for the device. This is optional, if it isn't given it will use all accounts. - example: 'bart' + account: + description: (required) Your iCloud account username (email) or account name. + example: 'steve@apple.com' device_name: - description: Name of the device that will get a new interval. This is optional, if it isn't given it will change the interval for all devices for the given account. - example: 'iphonebart' - interval: - description: The interval (in minutes) that the iDevice will have until the according device_tracker entity changes from zone or until this service is used again. This is optional, if it isn't given the interval of the device will revert back to the original interval based on the current state. - example: 1 + description: (required) The name of the Apple device to play a sound. + example: 'stevesiphone' -update: - description: Service to ask for an update of an iDevice. +display_message: + description: Display a message on an Apple device. fields: - account_name: - description: Name of the account in the config that will be used to look for the device. This is optional, if it isn't given it will use all accounts. - example: 'bart' + account: + description: (required) Your iCloud account username (email) or account name. + example: 'steve@apple.com' device_name: - description: Name of the device that will be updated. This is optional, if it isn't given it will update all devices for the given account. - example: 'iphonebart' + description: (required) The name of the Apple device to display the message. + example: 'stevesiphone' + message: + description: (required) The content of your message. + example: 'Hey Steve !' + sound: + description: To make a sound when displaying the message (boolean). + example: 'true' -reset_account: - description: Service to restart an iCloud account. Helpful when not all devices are found after initializing or when you add a new device. +lost_device: + description: Make an Apple device in lost state. fields: - account_name: - description: Name of the account in the config that will be restarted. This is optional, if it isn't given it will restart all accounts. - example: 'bart' + account: + description: (required) Your iCloud account username (email) or account name. + example: 'steve@apple.com' + device_name: + description: (required) The name of the Apple device to set lost. + example: 'stevesiphone' + number: + description: (required) The phone number to call in lost mode (must contain country code). + example: '+33450020100' + message: + description: (required) The message to display in lost mode. + example: 'Call me' + diff --git a/homeassistant/components/icloud/strings.json b/homeassistant/components/icloud/strings.json new file mode 100644 index 00000000000000..343a087738f4da --- /dev/null +++ b/homeassistant/components/icloud/strings.json @@ -0,0 +1,38 @@ +{ + "config": { + "title": "Apple iCloud", + "step": { + "user": { + "title": "iCloud credentials", + "description": "Enter your credentials", + "data": { + "username": "Email", + "password": "Password" + } + }, + "trusted_device": { + "title": "iCloud trusted device", + "description": "Select your trusted device", + "data": { + "trusted_device": "Trusted device" + } + }, + "verification_code": { + "title": "iCloud verification code", + "description": "Please enter the verification code you just received from iCloud", + "data": { + "verification_code": "Verification code" + } + } + }, + "error":{ + "username_exists": "Account already configured", + "login": "Login error: please check your email & password", + "send_verification_code": "Failed to send verification code", + "validate_verification_code": "Failed to verify your verification code, choose a trust device and start the verification again", + }, + "abort":{ + "username_exists": "Account already configured" + } + } +} diff --git a/homeassistant/generated/config_flows.py b/homeassistant/generated/config_flows.py index cf1c4b55e1924a..2b3940000e7963 100644 --- a/homeassistant/generated/config_flows.py +++ b/homeassistant/generated/config_flows.py @@ -36,6 +36,7 @@ "huawei_lte", "hue", "iaqualink", + "icloud", "ifttt", "ios", "ipma", diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 4cb7ca26a87035..3812b8f0b339b8 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -432,6 +432,9 @@ pyheos==0.6.0 # homeassistant.components.homematic pyhomematic==0.1.62 +# homeassistant.components.icloud +pyicloud==0.9.1 + # homeassistant.components.ipma pyipma==1.2.1 diff --git a/tests/components/icloud/__init__.py b/tests/components/icloud/__init__.py new file mode 100644 index 00000000000000..b85f1017e45d6b --- /dev/null +++ b/tests/components/icloud/__init__.py @@ -0,0 +1 @@ +"""Tests for the iCloud component.""" diff --git a/tests/components/icloud/test_config_flow.py b/tests/components/icloud/test_config_flow.py new file mode 100644 index 00000000000000..b292a9e258cf37 --- /dev/null +++ b/tests/components/icloud/test_config_flow.py @@ -0,0 +1,309 @@ +"""Tests for the iCloud config flow.""" +from unittest.mock import patch, Mock, MagicMock +import pytest + +from pyicloud.exceptions import PyiCloudFailedLoginException + +from homeassistant import data_entry_flow +from homeassistant.components.icloud import config_flow + +from homeassistant.components.icloud.config_flow import ( + CONF_TRUSTED_DEVICE, + CONF_VERIFICATION_CODE, +) +from homeassistant.components.icloud.const import ( + DOMAIN, + CONF_ACCOUNT_NAME, + CONF_GPS_ACCURACY_THRESHOLD, + CONF_MAX_INTERVAL, + DEFAULT_GPS_ACCURACY_THRESHOLD, + DEFAULT_MAX_INTERVAL, +) +from homeassistant.const import CONF_PASSWORD, CONF_USERNAME +from homeassistant.helpers.typing import HomeAssistantType + +from tests.common import MockConfigEntry + +USERNAME = "username@me.com" +PASSWORD = "password" +ACCOUNT_NAME = "Account name 1 2 3" +ACCOUNT_NAME_FROM_USERNAME = None +MAX_INTERVAL = 15 +GPS_ACCURACY_THRESHOLD = 250 + +TRUSTED_DEVICES = [ + {"deviceType": "SMS", "areaCode": "", "phoneNumber": "*******58", "deviceId": "1"} +] + + +@pytest.fixture(name="service") +def mock_controller_service(): + """Mock a successful service.""" + with patch( + "homeassistant.components.icloud.config_flow.PyiCloudService" + ) as service_mock: + service_mock.return_value.requires_2fa = True + yield service_mock + + +@pytest.fixture(name="service_with_cookie") +def mock_controller_service_with_cookie(): + """Mock a successful service while already authenticate.""" + with patch( + "homeassistant.components.icloud.config_flow.PyiCloudService" + ) as service_mock: + service_mock.return_value.requires_2fa = False + service_mock.return_value.trusted_devices = TRUSTED_DEVICES + service_mock.return_value.send_verification_code = Mock(return_value=True) + service_mock.return_value.validate_verification_code = Mock(return_value=True) + yield service_mock + + +@pytest.fixture(name="service_send_verification_code_failed") +def mock_controller_service_send_verification_code_failed(): + """Mock a failed service during sending verification code step.""" + with patch( + "homeassistant.components.icloud.config_flow.PyiCloudService" + ) as service_mock: + service_mock.return_value.requires_2fa = False + service_mock.return_value.trusted_devices = TRUSTED_DEVICES + service_mock.return_value.send_verification_code = Mock(return_value=False) + yield service_mock + + +@pytest.fixture(name="service_validate_verification_code_failed") +def mock_controller_service_validate_verification_code_failed(): + """Mock a failed service during validation of verification code step.""" + with patch( + "homeassistant.components.icloud.config_flow.PyiCloudService" + ) as service_mock: + service_mock.return_value.requires_2fa = False + service_mock.return_value.trusted_devices = TRUSTED_DEVICES + service_mock.return_value.send_verification_code = Mock(return_value=True) + service_mock.return_value.validate_verification_code = Mock(return_value=False) + yield service_mock + + +def init_config_flow(hass: HomeAssistantType): + """Init a configuration flow.""" + flow = config_flow.IcloudFlowHandler() + flow.hass = hass + return flow + + +async def test_user(hass: HomeAssistantType, service: MagicMock): + """Test user config.""" + flow = init_config_flow(hass) + + result = await flow.async_step_user() + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "user" + + # test with all provided + result = await flow.async_step_user( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == CONF_TRUSTED_DEVICE + + +async def test_user_with_cookie( + hass: HomeAssistantType, service_with_cookie: MagicMock +): + """Test user config with presence of a cookie.""" + flow = init_config_flow(hass) + + # test with all provided + result = await flow.async_step_user( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == USERNAME + assert result["data"][CONF_USERNAME] == USERNAME + assert result["data"][CONF_PASSWORD] == PASSWORD + assert result["data"][CONF_ACCOUNT_NAME] == ACCOUNT_NAME_FROM_USERNAME + assert result["data"][CONF_MAX_INTERVAL] == DEFAULT_MAX_INTERVAL + assert result["data"][CONF_GPS_ACCURACY_THRESHOLD] == DEFAULT_GPS_ACCURACY_THRESHOLD + + +async def test_import(hass: HomeAssistantType, service: MagicMock): + """Test import step.""" + flow = init_config_flow(hass) + + # import with username and password + result = await flow.async_step_import( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "trusted_device" + + # import with all + result = await flow.async_step_import( + { + CONF_USERNAME: USERNAME, + CONF_PASSWORD: PASSWORD, + CONF_ACCOUNT_NAME: ACCOUNT_NAME, + CONF_MAX_INTERVAL: MAX_INTERVAL, + CONF_GPS_ACCURACY_THRESHOLD: GPS_ACCURACY_THRESHOLD, + } + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "trusted_device" + + +async def test_import_with_cookie( + hass: HomeAssistantType, service_with_cookie: MagicMock +): + """Test import step with presence of a cookie.""" + flow = init_config_flow(hass) + + # import with username and password + result = await flow.async_step_import( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == USERNAME + assert result["data"][CONF_USERNAME] == USERNAME + assert result["data"][CONF_PASSWORD] == PASSWORD + assert result["data"][CONF_ACCOUNT_NAME] == ACCOUNT_NAME_FROM_USERNAME + assert result["data"][CONF_MAX_INTERVAL] == DEFAULT_MAX_INTERVAL + assert result["data"][CONF_GPS_ACCURACY_THRESHOLD] == DEFAULT_GPS_ACCURACY_THRESHOLD + + # import with all + result = await flow.async_step_import( + { + CONF_USERNAME: USERNAME, + CONF_PASSWORD: PASSWORD, + CONF_ACCOUNT_NAME: ACCOUNT_NAME, + CONF_MAX_INTERVAL: MAX_INTERVAL, + CONF_GPS_ACCURACY_THRESHOLD: GPS_ACCURACY_THRESHOLD, + } + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == USERNAME + assert result["data"][CONF_USERNAME] == USERNAME + assert result["data"][CONF_PASSWORD] == PASSWORD + assert result["data"][CONF_ACCOUNT_NAME] == ACCOUNT_NAME + assert result["data"][CONF_MAX_INTERVAL] == MAX_INTERVAL + assert result["data"][CONF_GPS_ACCURACY_THRESHOLD] == GPS_ACCURACY_THRESHOLD + + +async def test_abort_if_already_setup(hass: HomeAssistantType): + """Test we abort if the account is already setup.""" + flow = init_config_flow(hass) + MockConfigEntry( + domain=DOMAIN, data={CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ).add_to_hass(hass) + + # Should fail, same USERNAME (import) + result = await flow.async_step_import( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "username_exists" + + # Should fail, same ACCOUNT_NAME (import) + result = await flow.async_step_import( + { + CONF_USERNAME: "other_username@icloud.com", + CONF_PASSWORD: PASSWORD, + CONF_ACCOUNT_NAME: ACCOUNT_NAME_FROM_USERNAME, + } + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "username_exists" + + # Should fail, same USERNAME (flow) + result = await flow.async_step_user( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["errors"] == {CONF_USERNAME: "username_exists"} + + +async def test_login_failed(hass: HomeAssistantType): + """Test when we have errors during login.""" + flow = init_config_flow(hass) + + with patch( + "pyicloud.base.PyiCloudService.authenticate", + side_effect=PyiCloudFailedLoginException(), + ): + result = await flow.async_step_user( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["errors"] == {CONF_USERNAME: "login"} + + +async def test_trusted_device(hass: HomeAssistantType, service: MagicMock): + """Test trusted_device step.""" + flow = init_config_flow(hass) + await flow.async_step_user({CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}) + + result = await flow.async_step_trusted_device() + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == CONF_TRUSTED_DEVICE + + +async def test_trusted_device_success(hass: HomeAssistantType, service: MagicMock): + """Test trusted_device step success.""" + flow = init_config_flow(hass) + await flow.async_step_user({CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}) + + result = await flow.async_step_trusted_device({CONF_TRUSTED_DEVICE: 0}) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == CONF_VERIFICATION_CODE + + +async def test_send_verification_code_failed( + hass: HomeAssistantType, service_send_verification_code_failed: MagicMock +): + """Test when we have errors during send_verification_code.""" + flow = init_config_flow(hass) + await flow.async_step_user({CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}) + + result = await flow.async_step_trusted_device({CONF_TRUSTED_DEVICE: 0}) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == CONF_TRUSTED_DEVICE + assert result["errors"] == {CONF_TRUSTED_DEVICE: "send_verification_code"} + + +async def test_verification_code(hass: HomeAssistantType): + """Test verification_code step.""" + flow = init_config_flow(hass) + await flow.async_step_user({CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}) + + result = await flow.async_step_verification_code() + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == CONF_VERIFICATION_CODE + + +async def test_verification_code_success( + hass: HomeAssistantType, service_with_cookie: MagicMock +): + """Test verification_code step success.""" + flow = init_config_flow(hass) + await flow.async_step_user({CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}) + + result = await flow.async_step_verification_code({CONF_VERIFICATION_CODE: 0}) + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == USERNAME + assert result["data"][CONF_USERNAME] == USERNAME + assert result["data"][CONF_PASSWORD] == PASSWORD + assert result["data"][CONF_ACCOUNT_NAME] is None + assert result["data"][CONF_MAX_INTERVAL] == DEFAULT_MAX_INTERVAL + assert result["data"][CONF_GPS_ACCURACY_THRESHOLD] == DEFAULT_GPS_ACCURACY_THRESHOLD + + +async def test_validate_verification_code_failed( + hass: HomeAssistantType, service_validate_verification_code_failed: MagicMock +): + """Test when we have errors during validate_verification_code.""" + flow = init_config_flow(hass) + await flow.async_step_user({CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}) + + result = await flow.async_step_verification_code({CONF_VERIFICATION_CODE: 0}) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == CONF_TRUSTED_DEVICE + assert result["errors"] == {"base": "validate_verification_code"}