Skip to content

Commit

Permalink
[Storage][ADLS] API updates (Azure#10170)
Browse files Browse the repository at this point in the history
* Updates to read file API

* Rename download function

* Revert some test changes

* Pylint
  • Loading branch information
annatisch authored Mar 10, 2020
1 parent 4e9bb38 commit 74a9df1
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 61 deletions.
3 changes: 2 additions & 1 deletion sdk/storage/azure-storage-file-datalake/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ file = DataLakeFileClient.from_connection_string("my_connection_string",
file_system_name="myfilesystem", file_path="myfile")

with open("./BlockDestination.txt", "wb") as my_file:
file_data = file.read_file(stream=my_file)
download = file.download_file()
download.readinto(my_file)
```

### Enumerating paths
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# license information.
# --------------------------------------------------------------------------

from ._download import StorageStreamDownloader
from ._data_lake_file_client import DataLakeFileClient
from ._data_lake_directory_client import DataLakeDirectoryClient
from ._file_system_client import FileSystemClient
Expand Down Expand Up @@ -66,4 +67,5 @@
'generate_directory_sas',
'generate_file_sas',
'VERSION',
'StorageStreamDownloader'
]
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ._shared.uploads import IterStreamer
from ._upload_helper import upload_datalake_file
from ._generated.models import StorageErrorException
from ._download import StorageStreamDownloader
from ._path_client import PathClient
from ._serialize import get_mod_conditions, get_path_http_headers, get_access_conditions, add_metadata_headers
from ._deserialize import process_storage_error
Expand Down Expand Up @@ -504,22 +505,18 @@ def flush_data(self, offset, # type: int
except StorageErrorException as error:
process_storage_error(error)

def read_file(self, offset=None, # type: Optional[int]
length=None, # type: Optional[int]
stream=None, # type: Optional[IO]
**kwargs):
# type: (...) -> Union[int, byte]
"""Download a file from the service. Return the downloaded data in bytes or
write the downloaded data into user provided stream and return the written size.
def download_file(self, offset=None, length=None, **kwargs):
# type: (Optional[int], Optional[int], Any) -> StorageStreamDownloader
"""Downloads a file to the StorageStreamDownloader. The readall() method must
be used to read all the content, or readinto() must be used to download the file into
a stream.
:param int offset:
Start of byte range to use for downloading a section of the file.
Must be set if length is provided.
:param int length:
Number of bytes to read from the stream. This is optional, but
should be supplied for optimal performance.
:param int stream:
User provided stream to write the downloaded data into.
:keyword lease:
If specified, download only succeeds if the file's lease is active
and matches this ID. Required if the file has an active lease.
Expand Down Expand Up @@ -547,8 +544,8 @@ def read_file(self, offset=None, # type: Optional[int]
The timeout parameter is expressed in seconds. This method may make
multiple calls to the Azure service and the timeout will apply to
each call individually.
:returns: downloaded data or the size of data written into the provided stream
:rtype: bytes or int
:returns: A streaming object (StorageStreamDownloader)
:rtype: ~azure.storage.filedatalake.StorageStreamDownloader
.. admonition:: Example:
Expand All @@ -560,9 +557,7 @@ def read_file(self, offset=None, # type: Optional[int]
:caption: Return the downloaded data.
"""
downloader = self._blob_client.download_blob(offset=offset, length=length, **kwargs)
if stream:
return downloader.readinto(stream)
return downloader.readall()
return StorageStreamDownloader(downloader)

def rename_file(self, new_name, # type: str
**kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from ._models import FileProperties


class StorageStreamDownloader(object):
"""A streaming object to download from Azure Storage.
:ivar str name:
The name of the file being downloaded.
:ivar ~azure.storage.filedatalake.FileProperties properties:
The properties of the file being downloaded. If only a range of the data is being
downloaded, this will be reflected in the properties.
:ivar int size:
The size of the total data in the stream. This will be the byte range if speficied,
otherwise the total size of the file.
"""

def __init__(self, downloader):
self._downloader = downloader
self.name = self._downloader.name
self.properties = FileProperties._from_blob_properties(self._downloader.properties) # pylint: disable=protected-access
self.size = self._downloader.size

def __len__(self):
return self.size

def chunks(self):
return self._downloader.chunks()

def readall(self):
"""Download the contents of this file.
This operation is blocking until all data is downloaded.
:rtype: bytes or str
"""
return self._downloader.readall()

def readinto(self, stream):
"""Download the contents of this file to a stream.
:param stream:
The stream to download to. This can be an open file-handle,
or any writable stream. The stream must be seekable if the download
uses more than one parallel connection.
:returns: The number of bytes read.
:rtype: int
"""
return self._downloader.readinto(stream)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# license information.
# --------------------------------------------------------------------------

from ._download_async import StorageStreamDownloader
from .._shared.policies_async import ExponentialRetry, LinearRetry
from ._data_lake_file_client_async import DataLakeFileClient
from ._data_lake_directory_client_async import DataLakeDirectoryClient
Expand All @@ -19,4 +20,5 @@
'DataLakeLeaseClient',
'ExponentialRetry',
'LinearRetry',
'StorageStreamDownloader'
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# license information.
# --------------------------------------------------------------------------

from ._download_async import StorageStreamDownloader
from ._path_client_async import PathClient
from .._data_lake_file_client import DataLakeFileClient as DataLakeFileClientBase
from .._deserialize import process_storage_error
Expand Down Expand Up @@ -372,22 +373,18 @@ async def flush_data(self, offset, # type: int
except StorageErrorException as error:
process_storage_error(error)

async def read_file(self, offset=None, # type: Optional[int]
length=None, # type: Optional[int]
stream=None, # type: Optional[IO]
**kwargs):
# type: (...) -> Union[int, byte]
"""Download a file from the service. Return the downloaded data in bytes or
write the downloaded data into user provided stream and return the written size.
async def download_file(self, offset=None, length=None, **kwargs):
# type: (Optional[int], Optional[int], Any) -> StorageStreamDownloader
"""Downloads a file to the StorageStreamDownloader. The readall() method must
be used to read all the content, or readinto() must be used to download the file into
a stream.
:param int offset:
Start of byte range to use for downloading a section of the file.
Must be set if length is provided.
:param int length:
Number of bytes to read from the stream. This is optional, but
should be supplied for optimal performance.
:param int stream:
User provided stream to write the downloaded data into.
:keyword lease:
If specified, download only succeeds if the file's lease is active
and matches this ID. Required if the file has an active lease.
Expand Down Expand Up @@ -415,8 +412,8 @@ async def read_file(self, offset=None, # type: Optional[int]
The timeout parameter is expressed in seconds. This method may make
multiple calls to the Azure service and the timeout will apply to
each call individually.
:returns: downloaded data or the size of data written into the provided stream
:rtype: bytes or int
:returns: A streaming object (StorageStreamDownloader)
:rtype: ~azure.storage.filedatalake.aio.StorageStreamDownloader
.. admonition:: Example:
Expand All @@ -428,9 +425,7 @@ async def read_file(self, offset=None, # type: Optional[int]
:caption: Return the downloaded data.
"""
downloader = await self._blob_client.download_blob(offset=offset, length=length, **kwargs)
if stream:
return await downloader.readinto(stream)
return await downloader.readall()
return StorageStreamDownloader(downloader)

async def rename_file(self, new_name, # type: str
**kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from .._models import FileProperties


class StorageStreamDownloader(object):
"""A streaming object to download from Azure Storage.
:ivar str name:
The name of the file being downloaded.
:ivar ~azure.storage.filedatalake.FileProperties properties:
The properties of the file being downloaded. If only a range of the data is being
downloaded, this will be reflected in the properties.
:ivar int size:
The size of the total data in the stream. This will be the byte range if speficied,
otherwise the total size of the file.
"""

def __init__(self, downloader):
self._downloader = downloader
self.name = self._downloader.name
self.properties = FileProperties._from_blob_properties(self._downloader.properties) # pylint: disable=protected-access
self.size = self._downloader.size

def __len__(self):
return self.size

def chunks(self):
return self._downloader.chunks()

async def readall(self):
"""Download the contents of this file.
This operation is blocking until all data is downloaded.
:rtype: bytes or str
"""
return await self._downloader.readall()

async def readinto(self, stream):
"""Download the contents of this file to a stream.
:param stream:
The stream to download to. This can be an open file-handle,
or any writable stream. The stream must be seekable if the download
uses more than one parallel connection.
:returns: The number of bytes read.
:rtype: int
"""
return await self._downloader.readinto(stream)
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def upload_download_sample(filesystem_client):
# read the data back
print("Downloading data from '{}'.".format(file_name))
# [START read_file]
downloaded_bytes = file_client.read_file()
download = file_client.download_file()
downloaded_bytes = download.readall()
# [END read_file]

# verify the downloaded content
Expand All @@ -81,7 +82,8 @@ def upload_download_sample(filesystem_client):

# download the renamed file in to local file
with open(SOURCE_FILE, 'wb') as stream:
new_client.read_file(stream=stream)
download = new_client.download_file()
download.readinto(stream)

# [START delete_file]
new_client.delete_file()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ async def upload_download_sample(filesystem_client):
# read the data back
print("Downloading data from '{}'.".format(file_name))
# [START read_file]
downloaded_bytes = await file_client.read_file()
download = await file_client.download_file()
downloaded_bytes = await download.readall()
# [END read_file]

# verify the downloaded content
Expand All @@ -81,7 +82,8 @@ async def upload_download_sample(filesystem_client):

# download the renamed file in to local file
with open(SOURCE_FILE, 'wb') as stream:
await new_client.read_file(stream=stream)
download = await new_client.download_file()
await download.readinto(stream)

# [START delete_file]
await new_client.delete_file()
Expand Down
Loading

0 comments on commit 74a9df1

Please sign in to comment.