Skip to content

Commit

Permalink
[Storage][DataLake]Match _shared files in other packages (#8991)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiafu-msft authored and zezha-msft committed Dec 4, 2019
1 parent 6de92ee commit d59d06d
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def format_shared_key_credential(account, credential):
def parse_connection_str(conn_str, credential, service):
conn_str = conn_str.rstrip(";")
conn_settings = [s.split("=", 1) for s in conn_str.split(";")]
if any(len(tup) != 2 for tup in conn_settings):
if any(len(tup) != 2 for tup in conn_settings):
raise ValueError("Connection string is either blank or malformed.")
conn_settings = dict(conn_settings)
endpoints = _SERVICE_PARAMS[service]
Expand Down
6 changes: 6 additions & 0 deletions sdk/storage/azure-storage-file-datalake/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Change Log azure-storage-file-datalake

## Version 12.0.0b6 (2019-12-04)
- `StorageErrorException` is parsed into more detailed error.
- `etag` and `match_condition` parameters are added as an option('if_match' and 'if_none_match' are still supported).
- ADLS Gen1 to Gen2 API Mapping is available.
- All the clients now have a `close()` method to close the sockets opened by the client

## Version 12.0.0b5 (2019-11-06)
- Initial Release. Please see the README for information on the new design.
- Support for Azure Data Lake Storage REST APIs.
Expand Down
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-file-datalake/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ Several DataLake Storage Python SDK samples are available to you in the SDK's Gi

### Additional documentation

Table for [ADLS Gen1 to ADLS Gen2 API Mapping](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/storage/azure-storage-file-datalake/GEN1_GEN2_MAPPING.md)
For more extensive REST documentation on Data Lake Storage Gen2, see the [Data Lake Storage Gen2 documentation](https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/filesystem) on docs.microsoft.com.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
HttpLoggingPolicy,
)

from .constants import STORAGE_OAUTH_SCOPE, SERVICE_HOST_BASE, DEFAULT_SOCKET_TIMEOUT
from .constants import STORAGE_OAUTH_SCOPE, SERVICE_HOST_BASE, CONNECTION_TIMEOUT, READ_TIMEOUT
from .models import LocationMode
from .authentication import SharedKeyCredentialPolicy
from .shared_access_signature import QueryStringConstants
Expand Down Expand Up @@ -79,25 +79,28 @@ def __init__(
self._hosts = kwargs.get("_hosts")
self.scheme = parsed_url.scheme

if service not in ["blob", "queue", "file", "dfs"]:
if service not in ["blob", "queue", "file-share", "dfs"]:
raise ValueError("Invalid service: {}".format(service))
account = parsed_url.netloc.split(".{}.core.".format(service))
self.account_name = account[0]
service_name = service.split('-')[0]
account = parsed_url.netloc.split(".{}.core.".format(service_name))
self.account_name = account[0] if len(account) > 1 else None
secondary_hostname = None

self.credential = format_shared_key_credential(account, credential)
if self.scheme.lower() != "https" and hasattr(self.credential, "get_token"):
raise ValueError("Token credential is only supported with HTTPS.")
if hasattr(self.credential, "account_name"):
self.account_name = self.credential.account_name
secondary_hostname = "{}-secondary.{}.{}".format(self.credential.account_name, service, SERVICE_HOST_BASE)
secondary_hostname = "{}-secondary.{}.{}".format(
self.credential.account_name, service_name, SERVICE_HOST_BASE)

if not self._hosts:
if len(account) > 1:
secondary_hostname = parsed_url.netloc.replace(account[0], account[0] + "-secondary")
if kwargs.get("secondary_hostname"):
secondary_hostname = kwargs["secondary_hostname"]
self._hosts = {LocationMode.PRIMARY: parsed_url.netloc, LocationMode.SECONDARY: secondary_hostname}
primary_hostname = (parsed_url.netloc + parsed_url.path).rstrip('/')
self._hosts = {LocationMode.PRIMARY: primary_hostname, LocationMode.SECONDARY: secondary_hostname}

self.require_encryption = kwargs.get("require_encryption", False)
self.key_encryption_key = kwargs.get("key_encryption_key")
Expand All @@ -111,30 +114,68 @@ def __enter__(self):
def __exit__(self, *args):
self._client.__exit__(*args)

def close(self):
self._client.close()

@property
def url(self):
"""The full endpoint URL to this entity, including SAS token if used.
This could be either the primary endpoint,
or the secondary endpoint depending on the current :func:`location_mode`.
"""
return self._format_url(self._hosts[self._location_mode])

@property
def primary_endpoint(self):
"""The full primary endpoint URL.
:type: str
"""
return self._format_url(self._hosts[LocationMode.PRIMARY])

@property
def primary_hostname(self):
"""The hostname of the primary endpoint.
:type: str
"""
return self._hosts[LocationMode.PRIMARY]

@property
def secondary_endpoint(self):
"""The full secondary endpoint URL if configured.
If not available a ValueError will be raised. To explicitly specify a secondary hostname, use the optional
`secondary_hostname` keyword argument on instantiation.
:type: str
:raise ValueError:
"""
if not self._hosts[LocationMode.SECONDARY]:
raise ValueError("No secondary host configured.")
return self._format_url(self._hosts[LocationMode.SECONDARY])

@property
def secondary_hostname(self):
"""The hostname of the secondary endpoint.
If not available this will be None. To explicitly specify a secondary hostname, use the optional
`secondary_hostname` keyword argument on instantiation.
:type: str or None
"""
return self._hosts[LocationMode.SECONDARY]

@property
def location_mode(self):
"""The location mode that the client is currently using.
By default this will be "primary". Options include "primary" and "secondary".
:type: str
"""

return self._location_mode

@location_mode.setter
Expand Down Expand Up @@ -172,8 +213,8 @@ def _create_pipeline(self, credential, **kwargs):
if kwargs.get("_pipeline"):
return config, kwargs["_pipeline"]
config.transport = kwargs.get("transport") # type: ignore
if "connection_timeout" not in kwargs:
kwargs["connection_timeout"] = DEFAULT_SOCKET_TIMEOUT
kwargs.setdefault("connection_timeout", CONNECTION_TIMEOUT)
kwargs.setdefault("read_timeout", READ_TIMEOUT)
if not config.transport:
config.transport = RequestsTransport(**kwargs)
policies = [
Expand Down Expand Up @@ -280,9 +321,10 @@ def format_shared_key_credential(account, credential):

def parse_connection_str(conn_str, credential, service):
conn_str = conn_str.rstrip(";")
conn_settings = dict( # pylint: disable=consider-using-dict-comprehension
[s.split("=", 1) for s in conn_str.split(";")]
)
conn_settings = [s.split("=", 1) for s in conn_str.split(";")]
if any(len(tup) != 2 for tup in conn_settings):
raise ValueError("Connection string is either blank or malformed.")
conn_settings = dict(conn_settings)
endpoints = _SERVICE_PARAMS[service]
primary = None
secondary = None
Expand Down Expand Up @@ -354,7 +396,7 @@ def create_configuration(**kwargs):
def parse_query(query_str):
sas_values = QueryStringConstants.to_list()
parsed_query = {k: v[0] for k, v in parse_qs(query_str).items()}
sas_params = ["{}={}".format(k, quote(v)) for k, v in parsed_query.items() if k in sas_values]
sas_params = ["{}={}".format(k, quote(v, safe='')) for k, v in parsed_query.items() if k in sas_values]
sas_token = None
if sas_params:
sas_token = "&".join(sas_params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from azure.core.pipeline.transport import AsyncHttpTransport

from .constants import STORAGE_OAUTH_SCOPE, DEFAULT_SOCKET_TIMEOUT
from .constants import STORAGE_OAUTH_SCOPE, CONNECTION_TIMEOUT, READ_TIMEOUT
from .authentication import SharedKeyCredentialPolicy
from .base_client import create_configuration
from .policies import (
Expand Down Expand Up @@ -58,6 +58,9 @@ async def __aenter__(self):
async def __aexit__(self, *args):
await self._client.__aexit__(*args)

async def close(self):
await self._client.close()

def _create_pipeline(self, credential, **kwargs):
# type: (Any, **Any) -> Tuple[Configuration, Pipeline]
self._credential_policy = None
Expand All @@ -71,8 +74,8 @@ def _create_pipeline(self, credential, **kwargs):
if kwargs.get('_pipeline'):
return config, kwargs['_pipeline']
config.transport = kwargs.get('transport') # type: ignore
if 'connection_timeout' not in kwargs:
kwargs['connection_timeout'] = DEFAULT_SOCKET_TIMEOUT[0] # type: ignore
kwargs.setdefault("connection_timeout", CONNECTION_TIMEOUT)
kwargs.setdefault("read_timeout", READ_TIMEOUT)
if not config.transport:
try:
from azure.core.pipeline.transport import AioHttpTransport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
X_MS_VERSION = VERSION

# Socket timeout in seconds
DEFAULT_SOCKET_TIMEOUT = 20
CONNECTION_TIMEOUT = 20
READ_TIMEOUT = 20

# for python 3.5+, there was a change to the definition of the socket timeout (as far as socket.sendall is concerned)
# The socket timeout is now the maximum total duration to send all data.
if sys.version_info >= (3, 5):
# the timeout to connect is 20 seconds, and the read timeout is 2000 seconds
# the 2000 seconds was calculated with: 100MB (max block size)/ 50KB/s (an arbitrarily chosen minimum upload speed)
DEFAULT_SOCKET_TIMEOUT = (20, 2000) # type: ignore
READ_TIMEOUT = 2000

STORAGE_OAUTH_SCOPE = "https://storage.azure.com/.default"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ def get_length(data):
except (AttributeError, UnsupportedOperation):
pass
else:
return fstat(fileno).st_size
try:
return fstat(fileno).st_size
except OSError:
# Not a valid fileno, may be possible requests returned
# a socket number?
pass

# If the stream is seekable and tell() is implemented, calculate the stream size.
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
class PartialBatchErrorException(HttpResponseError):
"""There is a partial failure in batch operations.
:param message: The message of the exception.
:param str message: The message of the exception.
:param response: Server response to be deserialized.
:param parts: A list of the parts in multipart response.
:param list parts: A list of the parts in multipart response.
"""

def __init__(self, message, response, parts):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
# license information.
# --------------------------------------------------------------------------

VERSION = "12.0.0b5"
VERSION = "12.0.0b6"
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-file-datalake/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
'tests',
]),
install_requires=[
"azure-core<2.0.0,>=1.0.0",
"azure-core<2.0.0,>=1.1.0",
"msrest>=0.6.10",
"azure-storage-blob~=12.0"
],
Expand Down
1 change: 1 addition & 0 deletions shared_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,4 @@ opencensus-ext-azure>=0.3.1
#override azure-storage-blob azure-core<2.0.0,>=1.1.0
#override azure-storage-queue azure-core<2.0.0,>=1.1.0
#override azure-storage-file-share azure-core<2.0.0,>=1.1.0
#override azure-storage-file-datalake azure-core<2.0.0,>=1.1.0

0 comments on commit d59d06d

Please sign in to comment.