This repository has been archived by the owner on Aug 27, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
743 additions
and
315 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,167 @@ | ||
# __init__ | ||
import logging | ||
from O365 import Account | ||
from homeassistant.components.http import HomeAssistantView | ||
from homeassistant.helpers import discovery | ||
from homeassistant.core import callback | ||
from .const import ( | ||
DOMAIN, | ||
CONF_CLIENT_ID, | ||
CONF_CLIENT_SECRET, | ||
CONF_ALT_CONFIG, | ||
AUTH_CALLBACK_PATH, | ||
AUTH_CALLBACK_PATH_ALT, | ||
TOKEN_BACKEND, | ||
SCOPE, | ||
CONF_CALENDARS, | ||
DEFAULT_NAME, | ||
CONFIGURATOR_LINK_NAME, | ||
CONFIGURATOR_DESCRIPTION, | ||
CONFIGURATOR_SUBMIT_CAPTION, | ||
AUTH_CALLBACK_NAME, | ||
CONF_QUERY_SENSORS, | ||
CONF_EMAIL_SENSORS, | ||
) | ||
|
||
from .utils import validate_permissions | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
def setup(hass, config): | ||
"""Set up the O365 platform.""" | ||
validate_permissions() | ||
conf = config.get(DOMAIN, {}) | ||
|
||
credentials = (conf.get(CONF_CLIENT_ID), conf.get(CONF_CLIENT_SECRET)) | ||
alt_config = conf.get(CONF_ALT_CONFIG) | ||
if not alt_config: | ||
callback_url = f"{hass.config.api.base_url}{AUTH_CALLBACK_PATH}" | ||
else: | ||
callback_url = AUTH_CALLBACK_PATH_ALT | ||
|
||
account = Account(credentials, token_backend=TOKEN_BACKEND) | ||
is_authenticated = account.is_authenticated | ||
permissions = validate_permissions() | ||
if not is_authenticated or not permissions: | ||
url, state = account.con.get_authorization_url( | ||
requested_scopes=SCOPE, redirect_uri=callback_url | ||
) | ||
_LOGGER.info("no token; requesting authorization") | ||
callback_view = O365AuthCallbackView( | ||
conf, None, account, state, callback_url, hass | ||
) | ||
hass.http.register_view(callback_view) | ||
if alt_config: | ||
request_configuration_alt(hass, conf, url, callback_view) | ||
else: | ||
request_configuration(hass, conf, url, callback_view) | ||
return True | ||
else: | ||
do_setup(hass, conf, account) | ||
|
||
return True | ||
|
||
|
||
def do_setup(hass, config, account): | ||
"""Run the setup after we have everything configured.""" | ||
hass.data[DOMAIN] = { | ||
"account": account, | ||
CONF_CALENDARS: config.get(CONF_CALENDARS, []), | ||
CONF_EMAIL_SENSORS: config.get(CONF_EMAIL_SENSORS, []), | ||
CONF_QUERY_SENSORS: config.get(CONF_QUERY_SENSORS, []), | ||
} | ||
hass.async_create_task( | ||
discovery.async_load_platform(hass, "calendar", DOMAIN, {}, config) | ||
) | ||
hass.async_create_task( | ||
discovery.async_load_platform(hass, "notify", DOMAIN, {}, config) | ||
) | ||
hass.async_create_task( | ||
discovery.async_load_platform(hass, "sensor", DOMAIN, {}, config) | ||
) | ||
|
||
|
||
def request_configuration(hass, config, url, callback_view): | ||
configurator = callback_view.configurator | ||
hass.data[DOMAIN] = configurator.async_request_config( | ||
DEFAULT_NAME, | ||
lambda _: None, | ||
link_name=CONFIGURATOR_LINK_NAME, | ||
link_url=url, | ||
description=CONFIGURATOR_DESCRIPTION, | ||
submit_caption=CONFIGURATOR_SUBMIT_CAPTION, | ||
) | ||
|
||
|
||
def request_configuration_alt(hass, config, url, callback_view): | ||
configurator = callback_view.configurator | ||
hass.data[DOMAIN] = configurator.async_request_config( | ||
f"{DEFAULT_NAME} - Alternative configuration", | ||
callback_view.alt_callback, | ||
link_name=CONFIGURATOR_LINK_NAME, | ||
link_url=url, | ||
fields=[{"id": "token", "name": "Returned Url", "type": "token"}], | ||
description="Complete the configuration and copy the complete url into this field afterwards and submit", | ||
submit_caption="Submit", | ||
) | ||
|
||
|
||
class O365AuthCallbackView(HomeAssistantView): | ||
"""O365 Authorization Callback View.""" | ||
|
||
requires_auth = False | ||
url = AUTH_CALLBACK_PATH | ||
name = AUTH_CALLBACK_NAME | ||
|
||
def __init__(self, config, add_devices, account, state, callback_url, hass): | ||
"""Initialize.""" | ||
self.config = config | ||
self.add_devices = add_devices | ||
self.account = account | ||
self.state = state | ||
self.callback = callback_url | ||
self._hass = hass | ||
self.configurator = self._hass.components.configurator | ||
|
||
@callback | ||
def get(self, request): | ||
from aiohttp import web_response | ||
|
||
"""Receive authorization token.""" | ||
url = str(request.url) | ||
if url[:5].lower() == "http:": | ||
url = f"https:{url[5:]}" | ||
if "code" not in url: | ||
return web_response.Response( | ||
headers={"content-type": "text/html"}, | ||
text="<script>window.close()</script>Error, the originating url does not seem to be a valid microsoft redirect", | ||
) | ||
self.account.con.request_token( | ||
url, state=self.state, redirect_uri=self.callback | ||
) | ||
do_setup(self._hass, self.config) | ||
self.configurator.async_request_done(self._hass.data[DOMAIN]) | ||
|
||
return web_response.Response( | ||
headers={"content-type": "text/html"}, | ||
text="<script>window.close()</script>Success! This window can be closed", | ||
) | ||
|
||
def alt_callback(self, data): | ||
"""Receive authorization token.""" | ||
url = data.get("token") | ||
if not url: | ||
url = [v for k, v in data.items()][0] | ||
|
||
result = self.account.con.request_token( | ||
url, state=self.state, redirect_uri=AUTH_CALLBACK_PATH_ALT | ||
) | ||
if not result: | ||
self.configurator.notify_errors( | ||
self._hass.data[DOMAIN], | ||
"Error while authenticating, please see logs for more info.", | ||
) | ||
return | ||
do_setup(self._hass, self.config) | ||
self.configurator.request_done(self._hass.data[DOMAIN]) | ||
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import logging | ||
from datetime import timedelta, datetime | ||
from operator import itemgetter | ||
import json | ||
from homeassistant.helpers.entity import Entity | ||
from homeassistant.util.dt import utcnow | ||
from .const import ( | ||
DOMAIN, | ||
CALENDAR_SCHEMA, | ||
CONF_CALENDARS, | ||
CONF_NAME, | ||
CONF_CALENDAR_NAME, | ||
CONF_HOURS_FORWARD_TO_GET, | ||
CONF_HOURS_BACKWARD_TO_GET, | ||
DATETIME_FORMAT, | ||
) | ||
from .utils import clean_html | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
def setup_platform(hass, config, add_devices, discovery_info=None): | ||
"""Set up the O365 platform.""" | ||
if discovery_info is None: | ||
return | ||
|
||
account = hass.data[DOMAIN]["account"] | ||
is_authenticated = account.is_authenticated | ||
if not is_authenticated: | ||
return False | ||
|
||
calendars = hass.data[DOMAIN].get(CONF_CALENDARS, []) | ||
if len(calendars) < 1: | ||
calendars = [CALENDAR_SCHEMA({})] | ||
for calendar in calendars: | ||
name = calendar.get(CONF_NAME) | ||
calendar_name = calendar.get(CONF_CALENDAR_NAME) | ||
hours_forward = calendar.get(CONF_HOURS_FORWARD_TO_GET, 24) | ||
hours_backward = calendar.get(CONF_HOURS_BACKWARD_TO_GET, 0) | ||
cal = O365Calendar( | ||
account, hass, name, calendar_name, hours_forward, hours_backward | ||
) | ||
add_devices([cal], True) | ||
return True | ||
|
||
|
||
class O365Calendar(Entity): | ||
def __init__( | ||
self, account, hass, name, calendar_name, hours_forward, hours_backward | ||
): | ||
self.account = account | ||
self.hass = hass | ||
self._state = None | ||
self._name = name | ||
self.calendar_name = calendar_name | ||
self.hours_forward = hours_forward | ||
self.hours_backward = hours_backward | ||
self._attributes = {} | ||
# self.get_calendar_events(None) | ||
|
||
@property | ||
def name(self): | ||
return self._name | ||
|
||
@property | ||
def state(self): | ||
return self._state | ||
|
||
@property | ||
def device_state_attributes(self): | ||
return self._attributes | ||
|
||
def update(self): | ||
self._attributes = self.update_attributes() | ||
self._state = self.device_state_attributes.get("event_active", False) | ||
|
||
def update_attributes(self): | ||
attributes = {} | ||
start_date = datetime.now() + timedelta(hours=self.hours_backward) | ||
end_date = datetime.now() + timedelta(hours=self.hours_forward) | ||
schedule = self.account.schedule() | ||
if self.calendar_name != "" and self.calendar_name != "default_calendar": | ||
calendar = schedule.get_calendar(calendar_name=self.calendar_name) | ||
else: | ||
calendar = schedule.get_default_calendar() | ||
if calendar is None: | ||
raise ValueError(f"Could not find calendar called {self.calendar_name}") | ||
query = calendar.new_query("start").greater_equal(start_date) | ||
query.chain("and").on_attribute("end").less_equal(end_date) | ||
data = [] | ||
now = utcnow() | ||
|
||
for event in calendar.get_events( | ||
limit=999, query=query, include_recurring=True | ||
): | ||
data.append( | ||
{ | ||
"subject": event.subject, | ||
"body": clean_html(event.body), | ||
"location": event.location["displayName"], | ||
"categories": event.categories, | ||
"sensitivity": event.sensitivity.name, | ||
"show_as": event.show_as.name, | ||
"is_all_day": event.is_all_day, | ||
"attendees": [ | ||
x.address for x in event.attendees._Attendees__attendees | ||
], | ||
"start": event.start.strftime(DATETIME_FORMAT), | ||
"end": event.end.strftime(DATETIME_FORMAT), | ||
"event_active": event.start <= now and event.end >= now, | ||
} | ||
) | ||
data.sort(key=itemgetter("start")) | ||
attributes["data_str_repr"] = json.dumps(data, indent=2) | ||
attributes["data"] = data | ||
attributes["event_active"] = len([x for x in data if x["event_active"]]) > 0 | ||
return attributes |
Oops, something went wrong.