Skip to content

Commit f346c38

Browse files
authored
Merge to master for release 0.0.35 (#253)
* Retry for authentication(getting tokens) (#251) * Fix test environment variables * Upgrade requests minimum version because of CVE 2018-18074 * Test recordings update (#250) * Updated history.rst and version number
1 parent 4b203ee commit f346c38

File tree

108 files changed

+105268
-1067
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

108 files changed

+105268
-1067
lines changed

HISTORY.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@
33
Release History
44
===============
55

6+
0.0.35 (2018-10-29)
7+
+++++++++++++++++++
8+
* Added retry for getting tokens
9+
* Added requests>=2.20 because of CVE 2018-18074
10+
* Fixed test parameters and updated test recordings
11+
612
0.0.34 (2018-10-15)
713
+++++++++++++++++++
814
* Fixed concat issue with plus(or other symbols) in filename

azure/datalake/store/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
# license information.
77
# --------------------------------------------------------------------------
88

9-
__version__ = "0.0.34"
9+
__version__ = "0.0.35"
10+
1011

1112
from .core import AzureDLFileSystem
1213
from .multithread import ADLDownloader

azure/datalake/store/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ def concat(self, outfile, filelist, delete_source=False):
548548
self.azure.call('MSCONCAT', outfile.as_posix(),
549549
data=bytearray(json.dumps(sources,separators=(',', ':')), encoding="utf-8"),
550550
deleteSourceDirectory=delete,
551-
headers={'Content-Type': "application/json"},)
551+
headers={'Content-Type': "application/json"})
552552
self.invalidate_cache(outfile)
553553

554554
merge = concat

azure/datalake/store/lib.py

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
else:
2727
import urllib
2828

29-
from .retry import ExponentialRetryPolicy
29+
from .retry import ExponentialRetryPolicy, retry_decorator_for_auth
3030

3131
# 3rd party imports
3232
import adal
@@ -74,7 +74,7 @@
7474
def auth(tenant_id=None, username=None,
7575
password=None, client_id=default_client,
7676
client_secret=None, resource=DEFAULT_RESOURCE_ENDPOINT,
77-
require_2fa=False, authority=None, **kwargs):
77+
require_2fa=False, authority=None, retry_policy=None, **kwargs):
7878
""" User/password authentication
7979
8080
Parameters
@@ -103,6 +103,7 @@ def auth(tenant_id=None, username=None,
103103
-------
104104
:type DataLakeCredential :mod: `A DataLakeCredential object`
105105
"""
106+
106107
if not authority:
107108
authority = 'https://login.microsoftonline.com/'
108109

@@ -124,24 +125,30 @@ def auth(tenant_id=None, username=None,
124125
if not client_secret:
125126
client_secret = os.environ.get('azure_client_secret', None)
126127

127-
# You can explicitly authenticate with 2fa, or pass in nothing to the auth call and
128+
# You can explicitly authenticate with 2fa, or pass in nothing to the auth call
128129
# and the user will be prompted to login interactively through a browser.
129-
if require_2fa or (username is None and password is None and client_secret is None):
130-
code = context.acquire_user_code(resource, client_id)
131-
print(code['message'])
132-
out = context.acquire_token_with_device_code(resource, code, client_id)
133-
134-
elif username and password:
135-
out = context.acquire_token_with_username_password(resource, username,
136-
password, client_id)
137-
elif client_id and client_secret:
138-
out = context.acquire_token_with_client_credentials(resource, client_id,
139-
client_secret)
140-
# for service principal, we store the secret in the credential object for use when refreshing.
141-
out.update({'secret': client_secret})
142-
else:
143-
raise ValueError("No authentication method found for credentials")
144130

131+
@retry_decorator_for_auth(retry_policy=retry_policy)
132+
def get_token_internal():
133+
# Internal function used so as to use retry decorator
134+
if require_2fa or (username is None and password is None and client_secret is None):
135+
code = context.acquire_user_code(resource, client_id)
136+
print(code['message'])
137+
out = context.acquire_token_with_device_code(resource, code, client_id)
138+
139+
elif username and password:
140+
out = context.acquire_token_with_username_password(resource, username,
141+
password, client_id)
142+
elif client_id and client_secret:
143+
out = context.acquire_token_with_client_credentials(resource, client_id,
144+
client_secret)
145+
# for service principal, we store the secret in the credential object for use when refreshing.
146+
out.update({'secret': client_secret})
147+
else:
148+
raise ValueError("No authentication method found for credentials")
149+
return out
150+
151+
out = get_token_internal()
145152
out.update({'access': out['accessToken'], 'resource': resource,
146153
'refresh': out.get('refreshToken', False),
147154
'time': time.time(), 'tenant': tenant_id, 'client': client_id})
@@ -152,22 +159,22 @@ class DataLakeCredential:
152159
def __init__(self, token):
153160
self.token = token
154161

155-
def signed_session(self):
162+
def signed_session(self, retry_policy=None):
156163
# type: () -> requests.Session
157164
"""Create requests session with any required auth headers applied.
158165
159166
:rtype: requests.Session
160167
"""
161168
session = requests.Session()
162169
if time.time() - self.token['time'] > self.token['expiresIn'] - 100:
163-
self.refresh_token()
170+
self.refresh_token(retry_poliy=retry_policy)
164171

165172
scheme, token = self.token['tokenType'], self.token['access']
166173
header = "{} {}".format(scheme, token)
167174
session.headers['Authorization'] = header
168175
return session
169176

170-
def refresh_token(self, authority=None):
177+
def refresh_token(self, authority=None, retry_policy=None):
171178
""" Refresh an expired authorization token
172179
173180
Parameters
@@ -183,15 +190,22 @@ def refresh_token(self, authority=None):
183190

184191
context = adal.AuthenticationContext(authority +
185192
self.token['tenant'])
186-
if self.token.get('secret') and self.token.get('client'):
187-
out = context.acquire_token_with_client_credentials(self.token['resource'], self.token['client'],
188-
self.token['secret'])
189-
out.update({'secret': self.token['secret']})
190-
else:
191-
out = context.acquire_token_with_refresh_token(self.token['refresh'],
192-
client_id=self.token['client'],
193-
resource=self.token['resource'])
194-
out.update({'refresh': out['refreshToken']})
193+
194+
@retry_decorator_for_auth(retry_policy=retry_policy)
195+
def get_token_internal():
196+
# Internal function used so as to use retry decorator
197+
if self.token.get('secret') and self.token.get('client'):
198+
out = context.acquire_token_with_client_credentials(self.token['resource'],
199+
self.token['client'],
200+
self.token['secret'])
201+
out.update({'secret': self.token['secret']})
202+
else:
203+
out = context.acquire_token_with_refresh_token(self.token['refresh'],
204+
client_id=self.token['client'],
205+
resource=self.token['resource'])
206+
return out
207+
208+
out = get_token_internal()
195209
# common items to update
196210
out.update({'access': out['accessToken'],
197211
'time': time.time(), 'tenant': self.token['tenant'],
@@ -257,7 +271,7 @@ def __init__(self, store_name=default_store, token=None,
257271
# There is a case where the user can opt to exclude an API version, in which case
258272
# the service itself decides on the API version to use (it's default).
259273
self.api_version = api_version or None
260-
self.head = {'Authorization': token.signed_session().headers['Authorization']}
274+
self.head = {'Authorization': token.signed_session(retry_policy=None).headers['Authorization']}
261275
self.url = 'https://%s.%s/' % (store_name, url_suffix)
262276
self.webhdfs = 'webhdfs/v1/'
263277
self.extended_operations = 'webhdfsext/'
@@ -282,8 +296,8 @@ def session(self):
282296
self.local.session = s
283297
return s
284298

285-
def _check_token(self):
286-
cur_session = self.token.signed_session()
299+
def _check_token(self, retry_policy=None):
300+
cur_session = self.token.signed_session(retry_policy=retry_policy)
287301
if not self.head or self.head.get('Authorization') != cur_session.headers['Authorization']:
288302
self.head = {'Authorization': cur_session.headers['Authorization']}
289303
self.local.session = None

azure/datalake/store/retry.py

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import logging
1515
import sys
1616
import time
17-
17+
from functools import wraps
1818
# local imports
1919

2020
logger = logging.getLogger(__name__)
@@ -45,6 +45,9 @@ def should_retry(self, response, last_exception, retry_count):
4545
self.__backoff()
4646
return True
4747

48+
if response is None:
49+
return False
50+
4851
status_code = response.status_code
4952

5053
if(status_code == 501
@@ -58,8 +61,8 @@ def should_retry(self, response, last_exception, retry_count):
5861
if(status_code >= 500
5962
or status_code == 401
6063
or status_code == 408
61-
or status_code == 429):
62-
64+
or status_code == 429
65+
or status_code == 104):
6366
self.__backoff()
6467
return True
6568

@@ -70,4 +73,60 @@ def should_retry(self, response, last_exception, retry_count):
7073

7174
def __backoff(self):
7275
time.sleep(self.exponential_retry_interval)
73-
self.exponential_retry_interval *= self.exponential_factor
76+
self.exponential_retry_interval *= self.exponential_factor
77+
78+
79+
def retry_decorator_for_auth(retry_policy = None):
80+
import adal
81+
from requests import HTTPError
82+
if retry_policy is None:
83+
retry_policy = ExponentialRetryPolicy(max_retries=2)
84+
85+
def deco_retry(func):
86+
@wraps(func)
87+
def f_retry(*args, **kwargs):
88+
retry_count = -1
89+
last_exception = None
90+
out = None
91+
while True:
92+
retry_count += 1
93+
try:
94+
out = func(*args, **kwargs)
95+
except (adal.adal_error.AdalError, HTTPError) as e:
96+
# ADAL error corresponds to everything but 429, which bubbles up HTTP error.
97+
last_exception = e
98+
logger.exception("Retry count " + str(retry_count) + "Exception :" + str(last_exception))
99+
100+
if hasattr(last_exception, 'error_response'): # ADAL exception
101+
response = response_from_adal_exception(last_exception)
102+
if hasattr(last_exception, 'response'): # HTTP exception i.e 429
103+
response = last_exception.response
104+
105+
request_successful = last_exception is None or response.status_code == 401 # 401 = Invalid credentials
106+
if request_successful or not retry_policy.should_retry(response, last_exception, retry_count):
107+
break
108+
if out is None:
109+
raise last_exception
110+
return out
111+
112+
return f_retry
113+
114+
return deco_retry
115+
116+
117+
def response_from_adal_exception(e):
118+
import re
119+
from collections import namedtuple
120+
121+
response = e.error_response
122+
http_code = re.search("http error: (\d+)", str(e))
123+
if http_code is not None: # Add status_code to response object for use in should_retry
124+
keys = list(response.keys()) + ['status_code']
125+
status_code = int(http_code.group(1))
126+
values = list(response.values()) + [status_code]
127+
128+
Response = namedtuple("Response", keys)
129+
response = Response(
130+
*values) # Construct response object with adal exception response and http code
131+
return response
132+

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
install_requires=[
4949
'cffi',
5050
'adal>=0.4.2',
51+
'requests>=2.20.0'
5152
],
5253
extras_require={
5354
":python_version<'3.4'": ['pathlib2'],

tests/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ environment variables should be defined:
3131
* `azure_data_lake_store_name`
3232
* `azure_subscription_id`
3333
* `azure_resource_group_name`
34-
* `azure_client_id`
35-
* `azure_client_secret`
34+
* `azure_service_principal`
35+
* `azure_service_principal_secret`
3636

3737
Optionally, you may need to define `azure_tenant_id` or `azure_data_lake_store_url_suffix`.

0 commit comments

Comments
 (0)