|
| 1 | +# coding=utf-8 |
| 2 | +""" |
| 3 | +This package is part of the USOS API project. |
| 4 | +https://apps.usos.edu.pl/developers/ |
| 5 | +""" |
| 6 | +import hashlib |
| 7 | +import os |
| 8 | +import os.path |
| 9 | +import tempfile |
| 10 | +import urllib.request |
| 11 | +import shutil |
| 12 | +import rauth |
| 13 | +import warnings |
| 14 | +import re |
| 15 | +import requests.exceptions |
| 16 | +import logging |
| 17 | +import time |
| 18 | + |
| 19 | +_REQUEST_TOKEN_SUFFIX = 'services/oauth/request_token' |
| 20 | +_AUTHORIZE_SUFFIX = 'services/oauth/authorize' |
| 21 | +_ACCESS_TOKEN_SUFFIX = 'services/oauth/access_token' |
| 22 | + |
| 23 | +SCOPES = 'offline_access' |
| 24 | + |
| 25 | +_LOGGER = logging.getLogger('USOSAPI') |
| 26 | +_DOWNLOAD_LOGGER = logging.getLogger('USOSAPI.download') |
| 27 | + |
| 28 | + |
| 29 | +class USOSAPIException(Exception): |
| 30 | + pass |
| 31 | + |
| 32 | + |
| 33 | +def download_file(url: str) -> str: |
| 34 | + """ |
| 35 | + This function is here for convenience. It's useful for downloading |
| 36 | + for eg. user photos. It blocks until the file is saved on the disk |
| 37 | + and then returns path of the file. |
| 38 | + If given url was already downloaded before, it won't be downloaded |
| 39 | + again (useful when you download user profile photos, most of them |
| 40 | + are blanks). |
| 41 | + """ |
| 42 | + md5 = hashlib.md5() |
| 43 | + file_name, extension = os.path.splitext(url) |
| 44 | + md5.update(url.encode()) |
| 45 | + file_name = md5.hexdigest() + extension |
| 46 | + file_dir = os.path.join(tempfile.gettempdir(), 'USOSAPI') |
| 47 | + |
| 48 | + if not os.path.exists(file_dir): |
| 49 | + os.mkdir(file_dir) |
| 50 | + else: |
| 51 | + if not os.path.isdir(file_dir): |
| 52 | + shutil.rmtree(file_dir) |
| 53 | + os.mkdir(file_dir) |
| 54 | + |
| 55 | + file_name = os.path.join(file_dir, file_name) |
| 56 | + |
| 57 | + if os.path.exists(file_name): |
| 58 | + if os.path.isfile(file_name): |
| 59 | + return file_name |
| 60 | + else: |
| 61 | + shutil.rmtree(file_name) |
| 62 | + |
| 63 | + with urllib.request.urlopen(url) as resp, open(file_name, 'wb') as out: |
| 64 | + shutil.copyfileobj(resp, out) |
| 65 | + |
| 66 | + _DOWNLOAD_LOGGER.info('File from {} saved as {}.'.format(url, file_name)) |
| 67 | + |
| 68 | + return file_name |
| 69 | + |
| 70 | + |
| 71 | +class USOSAPIConnection(): |
| 72 | + """ |
| 73 | + This class provides basic functionality required to work with |
| 74 | + USOS API server. To start communication you need to provide server |
| 75 | + address and your consumer key/secret pair to the constructor. |
| 76 | +
|
| 77 | + After you create an USOSAPIConnection object with working parameters |
| 78 | + (check them with test_connection function), you may already use a subset |
| 79 | + of USOS API services that don't require a valid access key. |
| 80 | +
|
| 81 | + To log in as a specific user you need to get an URL address with |
| 82 | + get_authorization_url and somehow display it to the user (this module |
| 83 | + doesn't provide any UI). On the web page, after accepting |
| 84 | + scopes required by the module, user will receive a PIN code. |
| 85 | + This code should be passed to authorize_with_pin function to |
| 86 | + complete the authorization process. After successfully calling the |
| 87 | + authorize_with_pin function, you will have an authorized_session. |
| 88 | + """ |
| 89 | + def __init__(self, api_base_address: str, consumer_key: str, |
| 90 | + consumer_secret: str): |
| 91 | + self.base_address = str(api_base_address) |
| 92 | + if not self.base_address: |
| 93 | + raise ValueError('Empty USOS API address.') |
| 94 | + if not self.base_address.startswith('https'): |
| 95 | + warnings.warn('Insecure protocol in USOS API address. ' |
| 96 | + 'The address should start with https.') |
| 97 | + if not self.base_address.endswith('/'): |
| 98 | + self.base_address += '/' |
| 99 | + |
| 100 | + self.consumer_key = str(consumer_key) |
| 101 | + self.consumer_secret = str(consumer_secret) |
| 102 | + |
| 103 | + req_token_url = self.base_address + _REQUEST_TOKEN_SUFFIX |
| 104 | + authorize_url = self.base_address + _AUTHORIZE_SUFFIX |
| 105 | + access_token_url = self.base_address + _ACCESS_TOKEN_SUFFIX |
| 106 | + |
| 107 | + self._service = rauth.OAuth1Service(consumer_key=consumer_key, |
| 108 | + consumer_secret=consumer_secret, |
| 109 | + name='USOSAPI', |
| 110 | + request_token_url=req_token_url, |
| 111 | + authorize_url=authorize_url, |
| 112 | + access_token_url=access_token_url, |
| 113 | + base_url=self.base_address) |
| 114 | + |
| 115 | + self._request_token_secret = '' |
| 116 | + self._request_token = '' |
| 117 | + |
| 118 | + self._authorized_session = None |
| 119 | + _LOGGER.info('New connection to {} created with key: {} ' |
| 120 | + 'and secret: {}.'.format(api_base_address, |
| 121 | + consumer_key, consumer_secret)) |
| 122 | + |
| 123 | + def _generate_request_token(self): |
| 124 | + params = {'oauth_callback': 'oob', 'scopes': SCOPES} |
| 125 | + token_tuple = self._service.get_request_token(params=params) |
| 126 | + self._request_token, self._request_token_secret = token_tuple |
| 127 | + _LOGGER.info("New request token generated: {}".format(token_tuple[0])) |
| 128 | + return |
| 129 | + |
| 130 | + def is_anonymous(self) -> bool: |
| 131 | + """ |
| 132 | + Checks if current USOS API session is anonymous. |
| 133 | + This function assumes that USOS API server connection data |
| 134 | + (server address, consumer key and consumer secret) are correct. |
| 135 | + """ |
| 136 | + return self._authorized_session is None |
| 137 | + |
| 138 | + def is_authorized(self) -> bool: |
| 139 | + """ |
| 140 | + Checks if current USOS API session is authorized (if you are logged in |
| 141 | + as specific user). This function assumes that USOS API server |
| 142 | + connection data (server address, consumer key and consumer secret) |
| 143 | + are correct. |
| 144 | + """ |
| 145 | + if self._authorized_session is None: |
| 146 | + return False |
| 147 | + try: |
| 148 | + identity = self.get('services/users/user') |
| 149 | + return bool(identity['id']) |
| 150 | + except USOSAPIException: |
| 151 | + return False |
| 152 | + |
| 153 | + def test_connection(self) -> bool: |
| 154 | + """ |
| 155 | + Checks if parameters passed for this object's constructor are correct |
| 156 | + and if it's possible to connect to the USOS API server. |
| 157 | + """ |
| 158 | + time_re = '^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}$' |
| 159 | + try: |
| 160 | + anonymous_session = self._service.get_session() |
| 161 | + now = anonymous_session.get('services/apisrv/now') |
| 162 | + now = now.json() |
| 163 | + return bool(re.fullmatch(time_re, now)) |
| 164 | + except Exception as e: |
| 165 | + _LOGGER.debug('Connection test failed: {}'.format(e)) |
| 166 | + return False |
| 167 | + |
| 168 | + def get_authorization_url(self) -> str: |
| 169 | + """ |
| 170 | + Returns an URL address that user has to visit using some |
| 171 | + internet browser to obtain a PIN code required for authorization. |
| 172 | + Every time you call this function, a new request token is generated, |
| 173 | + so only PIN code acquired with the last generated address will allow |
| 174 | + successful authorization. |
| 175 | + """ |
| 176 | + self._generate_request_token() |
| 177 | + url = self._service.get_authorize_url(self._request_token) |
| 178 | + _LOGGER.info('New authorization URL generated: {}'.format(url)) |
| 179 | + return url |
| 180 | + |
| 181 | + def authorize_with_pin(self, pin: str): |
| 182 | + """ |
| 183 | + Call this function after user has obtained PIN code from website |
| 184 | + which address was generated by the set_authorization_url function. |
| 185 | + Remember that only PIN code from the last generated address will work. |
| 186 | +
|
| 187 | + Will raise USOSAPIException if the PIN is incorrect. |
| 188 | + """ |
| 189 | + if not(self._request_token and self._request_token_secret): |
| 190 | + raise USOSAPIException('Request token not initialized. ' |
| 191 | + 'Use get_authorization_url to generate ' |
| 192 | + 'the token.') |
| 193 | + |
| 194 | + rt = self._request_token |
| 195 | + rts = self._request_token_secret |
| 196 | + params = {'oauth_verifier': pin} |
| 197 | + |
| 198 | + _LOGGER.debug('Trying to authorize request token {} ' |
| 199 | + 'with PIN code: {}'.format(self._request_token, pin)) |
| 200 | + |
| 201 | + try: |
| 202 | + self._authorized_session = \ |
| 203 | + self._service.get_auth_session(rt, rts, params=params) |
| 204 | + except KeyError: |
| 205 | + response = self._service.get_raw_access_token(rt, rts, |
| 206 | + params=params) |
| 207 | + text = response.json() |
| 208 | + if isinstance(text, dict) and 'message' in text: |
| 209 | + text = text['message'] |
| 210 | + _LOGGER.info('Authorization failed, response message: ' + text) |
| 211 | + raise USOSAPIException(text) |
| 212 | + at = self.get_access_data()[0] |
| 213 | + _LOGGER.info('Authorization successful, received access token: ' + at) |
| 214 | + |
| 215 | + def get_access_data(self) -> tuple: |
| 216 | + """ |
| 217 | + Returns a tuple of access token and access token secret. |
| 218 | + You can save them somewhere and later use them to resume |
| 219 | + an authorized session. |
| 220 | + """ |
| 221 | + if self.is_anonymous(): |
| 222 | + raise USOSAPIException('Connection not yet authorized.') |
| 223 | + at = self._authorized_session.access_token |
| 224 | + ats = self._authorized_session.access_token_secret |
| 225 | + return at, ats |
| 226 | + |
| 227 | + def set_access_data(self, access_token: str, |
| 228 | + access_token_secret: str) -> bool: |
| 229 | + """ |
| 230 | + Using this function you can resume an authorized session. |
| 231 | + Although this module requires offline_access scope from users |
| 232 | + it is still possible, that the session won't be valid when it's |
| 233 | + resumed. Check return value to make sure if provided access |
| 234 | + pair was valid. |
| 235 | + """ |
| 236 | + self._authorized_session = self._service.get_session() |
| 237 | + self._authorized_session.access_token = access_token |
| 238 | + self._authorized_session.access_token_secret = access_token_secret |
| 239 | + |
| 240 | + if not self.is_authorized(): |
| 241 | + self._authorized_session = None |
| 242 | + _LOGGER.info("Access token {} is invalid.".format(access_token)) |
| 243 | + return False |
| 244 | + |
| 245 | + _LOGGER.info('New access token ({}) and secret ({}) ' |
| 246 | + 'set.'.format(access_token, access_token_secret)) |
| 247 | + return True |
| 248 | + |
| 249 | + def get(self, service: str, **kwargs): |
| 250 | + """ |
| 251 | + General use function to retrieve data from USOS API server. |
| 252 | + Although it is called 'get' it will issue a POST request. |
| 253 | + It's arguments are service name and an optional set of keyword |
| 254 | + arguments, that will be passed as parameters of the request. |
| 255 | +
|
| 256 | + Return type depends on the called service. It will usually be |
| 257 | + a dictionary or a string. |
| 258 | + """ |
| 259 | + session = self._service.get_session() |
| 260 | + if self._authorized_session is not None: |
| 261 | + session = self._authorized_session |
| 262 | + |
| 263 | + start = time.time() |
| 264 | + response = session.post(service, params=kwargs, data={}) |
| 265 | + ex_time = time.time() - start |
| 266 | + |
| 267 | + if not response.ok: |
| 268 | + try: |
| 269 | + _LOGGER.info('{} ({}) FAILED: [{}] {}' |
| 270 | + ''.format(service, repr(kwargs), |
| 271 | + response.status_code, response.text)) |
| 272 | + response.raise_for_status() |
| 273 | + except requests.exceptions.HTTPError as e: |
| 274 | + if response.status_code == 401: |
| 275 | + raise USOSAPIException('HTTP 401: Unauthorized. Your ' |
| 276 | + 'access key probably expired.') |
| 277 | + if response.status_code == 400: |
| 278 | + msg = response.text |
| 279 | + raise USOSAPIException('HTTP 400: Bad request: ' + msg) |
| 280 | + raise e |
| 281 | + |
| 282 | + _LOGGER.info("{} ({}) {:f}s".format(service, repr(kwargs), |
| 283 | + ex_time)) |
| 284 | + _LOGGER.debug("{} ({}) -> {}".format(response.url, repr(kwargs), |
| 285 | + response.text)) |
| 286 | + |
| 287 | + return response.json() |
| 288 | + |
| 289 | + def logout(self): |
| 290 | + """ |
| 291 | + This function results in revoking currently used access key |
| 292 | + and closing the authenticated session. |
| 293 | + You can safely call this method multiple times. |
| 294 | + """ |
| 295 | + if self._authorized_session is None: |
| 296 | + return |
| 297 | + |
| 298 | + at = self.get_access_data()[0] |
| 299 | + self.get('services/oauth/revoke_token') |
| 300 | + _LOGGER.debug('Access token {} revoked.'.format(at)) |
| 301 | + self._authorized_session = None |
| 302 | + |
| 303 | + def current_identity(self): |
| 304 | + """ |
| 305 | + Returns a dictionary containing following keys: first_name, |
| 306 | + last_name and id. |
| 307 | +
|
| 308 | + If current session is anonymous it will raise USOSAPIException. |
| 309 | + """ |
| 310 | + try: |
| 311 | + data = self.get('services/users/user') |
| 312 | + return data |
| 313 | + except USOSAPIException: |
| 314 | + raise USOSAPIException('Trying to get identity of an unauthorized' |
| 315 | + ' session.') |
0 commit comments