Skip to content

Commit 96b6b87

Browse files
authored
Fixes (#291)
* Add doc on cp that is not implemented * Fix response not none * Add per request timeout capability * Concat shouldn't retry * Add docstring * Update history and version * Address code review
1 parent 5c91e98 commit 96b6b87

File tree

5 files changed

+25
-11
lines changed

5 files changed

+25
-11
lines changed

HISTORY.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@
33
Release History
44
===============
55

6+
0.0.46 (2019-06-25)
7+
+++++++++++++++++++
8+
* Expose per request timeout. Default to 60.
9+
* Concat will not retry by default.
10+
* Bug fixes.
11+
612
0.0.45 (2019-05-10)
713
+++++++++++++++++++
814
* Update open and close ADLFile semantics

azure/datalake/store/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# license information.
77
# --------------------------------------------------------------------------
88

9-
__version__ = "0.0.45"
9+
__version__ = "0.0.46"
1010

1111
from .core import AzureDLFileSystem
1212
from .multithread import ADLDownloader

azure/datalake/store/core.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from .lib import DatalakeRESTInterface
2929
from .utils import ensure_writable, read_block
3030
from .enums import ExpiryOptionType
31-
from .retry import ExponentialRetryPolicy
31+
from .retry import ExponentialRetryPolicy, NoRetryPolicy
3232
from .multiprocessor import multi_processor_change_acl
3333

3434
if sys.version_info >= (3, 4):
@@ -39,6 +39,7 @@
3939
logger = logging.getLogger(__name__)
4040
valid_expire_types = [x.value for x in ExpiryOptionType]
4141

42+
4243
class AzureDLFileSystem(object):
4344
"""
4445
Access Azure DataLake Store as if it were a file-system
@@ -57,16 +58,18 @@ class AzureDLFileSystem(object):
5758
The API version to target with requests. Changing this value will
5859
change the behavior of the requests, and can cause unexpected behavior or
5960
breaking changes. Changes to this value should be undergone with caution.
61+
per_call_timeout_seconds : float(60)
62+
This is the timeout for each requests library call.
6063
kwargs: optional key/values
6164
See ``lib.auth()``; full list: tenant_id, username, password, client_id,
6265
client_secret, resource
6366
"""
6467
_singleton = [None]
6568

66-
def __init__(self, token=None, **kwargs):
67-
# store instance vars
69+
def __init__(self, token=None, per_call_timeout_seconds=60, **kwargs):
6870
self.token = token
6971
self.kwargs = kwargs
72+
self.per_call_timeout_seconds = per_call_timeout_seconds
7073
self.connect()
7174
self.dirs = {}
7275
self._emptyDirs = []
@@ -85,7 +88,7 @@ def connect(self):
8588
"""
8689
Establish connection object.
8790
"""
88-
self.azure = DatalakeRESTInterface(token=self.token, **self.kwargs)
91+
self.azure = DatalakeRESTInterface(token=self.token, req_timeout_s= self.per_call_timeout_seconds, **self.kwargs)
8992
self.token = self.azure.token
9093

9194
def __setstate__(self, state):
@@ -775,16 +778,18 @@ def concat(self, outfile, filelist, delete_source=False):
775778
sourceList = [AzureDLPath(f).as_posix() for f in filelist]
776779
sources = {}
777780
sources["sources"] = sourceList
781+
778782
self.azure.call('MSCONCAT', outfile.as_posix(),
779783
data=bytearray(json.dumps(sources,separators=(',', ':')), encoding="utf-8"),
780784
deleteSourceDirectory=delete,
781-
headers={'Content-Type': "application/json"})
785+
headers={'Content-Type': "application/json"},
786+
retry_policy=NoRetryPolicy())
782787
self.invalidate_cache(outfile)
783788

784789
merge = concat
785790

786791
def cp(self, path1, path2):
787-
""" Copy file between locations on ADL """
792+
""" Not implemented. Copy file between locations on ADL """
788793
# TODO: any implementation for this without download?
789794
raise NotImplementedError
790795

azure/datalake/store/lib.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ class DatalakeRESTInterface:
225225
The API version to target with requests. Changing this value will
226226
change the behavior of the requests, and can cause unexpected behavior or
227227
breaking changes. Changes to this value should be undergone with caution.
228+
req_timeout_s : float(60)
229+
This is the timeout for each requests library call.
228230
kwargs: optional arguments to auth
229231
See ``auth()``. Includes, e.g., username, password, tenant; will pull
230232
values from environment variables if not provided.
@@ -256,7 +258,7 @@ class DatalakeRESTInterface:
256258
}
257259

258260
def __init__(self, store_name=default_store, token=None,
259-
url_suffix=default_adls_suffix, api_version='2018-09-01', **kwargs):
261+
url_suffix=default_adls_suffix, api_version='2018-09-01', req_timeout_s=60, **kwargs):
260262
# in the case where an empty string is passed for the url suffix, it must be replaced with the default.
261263
url_suffix = url_suffix or default_adls_suffix
262264
self.local = threading.local()
@@ -278,6 +280,7 @@ def __init__(self, store_name=default_store, token=None,
278280
platform.platform(),
279281
__name__,
280282
__version__)
283+
self.req_timeout_s = req_timeout_s
281284

282285
@property
283286
def session(self):
@@ -474,7 +477,7 @@ def __call_once(self, method, url, params, data, stream, request_id, retry_count
474477
req_headers['User-Agent'] = self.user_agent
475478
req_headers.update(headers)
476479
self._log_request(method, url, op, urllib.quote(path), kwargs, req_headers, retry_count)
477-
return func(url, params=params, headers=req_headers, data=data, stream=stream)
480+
return func(url, params=params, headers=req_headers, data=data, stream=stream, timeout=self.req_timeout_s)
478481

479482
def __getstate__(self):
480483
state = self.__dict__.copy()

azure/datalake/store/retry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ def f_retry(*args, **kwargs):
100100
response = response_from_adal_exception(last_exception)
101101
if hasattr(last_exception, 'response'): # HTTP exception i.e 429
102102
response = last_exception.response
103-
104-
request_successful = last_exception is None or response.status_code == 401 # 401 = Invalid credentials
103+
104+
request_successful = last_exception is None or (response is not None and response.status_code == 401) # 401 = Invalid credentials
105105
if request_successful or not retry_policy.should_retry(response, last_exception, retry_count):
106106
break
107107
if last_exception is not None:

0 commit comments

Comments
 (0)