Skip to content

Commit

Permalink
{Storage} az storage file download/download-batch/delete-batch: Tra…
Browse files Browse the repository at this point in the history
…ck2 SDK migration (#22919)

* file download, md5 encoding still error, not tested

* fix md5 encoding error

* rerun some tests

* download batch

* delete-batch

* lint

* fix download progress, lint

* pr review

* lint

* rerun test

* fix progress update

* lint

* remove unnecessary changes
  • Loading branch information
calvinhzy authored Sep 13, 2022
1 parent 6775805 commit 6b230f9
Show file tree
Hide file tree
Showing 16 changed files with 1,243 additions and 943 deletions.
46 changes: 32 additions & 14 deletions src/azure-cli/azure/cli/command_modules/storage/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
validate_included_datasets_validator, validate_custom_domain, validate_hns_migration_type,
validate_container_public_access,
add_progress_callback, process_resource_group,
storage_account_key_options, process_file_download_namespace, process_metric_update_namespace,
storage_account_key_options, process_metric_update_namespace,
get_char_options_validator, validate_bypass, validate_encryption_source, validate_marker,
validate_storage_data_plane_list, validate_azcopy_upload_destination_url,
validate_azcopy_remove_arguments, as_user_validator, parse_storage_account,
Expand Down Expand Up @@ -1928,14 +1928,31 @@ def load_arguments(self, _): # pylint: disable=too-many-locals, too-many-statem

with self.argument_context('storage file download') as c:
c.register_path_argument()
c.argument('file_path', options_list=('--dest',), type=file_type, required=False,
help='Path of the file to write to. The source filename will be used if not specified.',
validator=process_file_download_namespace, completer=FilesCompleter())
c.argument('path', validator=None) # validator called manually from process_file_download_namespace
c.extra('no_progress', progress_type)
c.argument('max_connections', type=int)
c.argument('start_range', type=int)
c.argument('end_range', type=int)
c.extra('share_name', share_name_type, required=True)
c.extra('destination_path', options_list=('--dest',), type=file_type, required=False,
help='Path of the file to write to. The source filename will be used if not specified.',
completer=FilesCompleter())
c.extra('no_progress', progress_type, validator=add_progress_callback)
c.argument('max_connections', type=int, help='Maximum number of parallel connections to use.')
c.extra('start_range', type=int, help='Start of byte range to use for downloading a section of the file. '
'If no --end-range is given, all bytes after the --start-range will be '
'downloaded. The --start-range and --end-range params are inclusive. Ex: '
'--start-range=0, --end-range=511 will download first 512 bytes of file.')
c.extra('end_range', type=int, help='End of byte range to use for downloading a section of the file. If '
'--end-range is given, --start-range must be provided. The --start-range '
'and --end-range params are inclusive. Ex: --start-range=0, '
'--end-range=511 will download first 512 bytes of file.')
c.argument('timeout', help='Request timeout in seconds. Applies to each call to the service.', type=int)
c.extra('snapshot', help="A string that represents the snapshot version, if applicable.")
c.argument('open_mode', help="Mode to use when opening the file. Note that specifying append only "
"open_mode prevents parallel download. So, --max-connections must be "
"set to 1 if this --open-mode is used.")
c.extra('validate_content', help="If set to true, validates an MD5 hash for each retrieved portion of the file."
" This is primarily valuable for detecting bitflips on the wire if using "
"http instead of https as https (the default) will already validate. "
"As computing the MD5 takes processing time and more requests will "
"need to be done due to the reduced chunk size there may be some increase "
"in latency.")

with self.argument_context('storage file exists') as c:
c.register_path_argument()
Expand Down Expand Up @@ -2012,15 +2029,14 @@ def load_arguments(self, _): # pylint: disable=too-many-locals, too-many-statem
c.extra('timeout', help='Request timeout in seconds. Applies to each call to the service.', type=int)

with self.argument_context('storage file upload') as c:
from ._validators import add_progress_callback_v2
t_file_content_settings = self.get_sdk('file.models#ContentSettings')

c.register_path_argument(default_file_param='local_file_path')
c.register_content_settings_argument(t_file_content_settings, update=False, guess_from_file='local_file_path',
process_md5=True)
c.argument('local_file_path', options_list='--source', type=file_type, completer=FilesCompleter(),
help='Path of the local file to upload as the file content.')
c.extra('no_progress', progress_type, validator=add_progress_callback_v2)
c.extra('no_progress', progress_type, validator=add_progress_callback)
c.argument('max_connections', type=int, help='Maximum number of parallel connections to use.')
c.extra('share_name', share_name_type, required=True)
c.argument('validate_content', action='store_true', min_api='2016-05-31',
Expand All @@ -2035,21 +2051,23 @@ def load_arguments(self, _): # pylint: disable=too-many-locals, too-many-statem
c.argument('protocol', arg_type=get_enum_type(['http', 'https'], 'https'), help='Protocol to use.')

with self.argument_context('storage file upload-batch') as c:
from ._validators import process_file_upload_batch_parameters, add_progress_callback_v2
from ._validators import process_file_upload_batch_parameters
c.argument('source', options_list=('--source', '-s'), validator=process_file_upload_batch_parameters)
c.argument('destination', options_list=('--destination', '-d'))
c.argument('max_connections', arg_group='Download Control', type=int)
c.argument('validate_content', action='store_true', min_api='2016-05-31')
c.register_content_settings_argument(t_file_content_settings, update=False, arg_group='Content Settings')
c.extra('no_progress', progress_type, validator=add_progress_callback_v2)
c.extra('no_progress', progress_type, validator=add_progress_callback)

with self.argument_context('storage file download-batch') as c:
from ._validators import process_file_download_batch_parameters
c.argument('source', options_list=('--source', '-s'), validator=process_file_download_batch_parameters)
c.argument('destination', options_list=('--destination', '-d'))
c.argument('max_connections', arg_group='Download Control', type=int)
c.argument('validate_content', action='store_true', min_api='2016-05-31')
c.extra('no_progress', progress_type)
c.extra('no_progress', progress_type, validator=add_progress_callback)
c.extra('snapshot', help='The snapshot parameter is an opaque DateTime value that, when present, '
'specifies the snapshot.')

with self.argument_context('storage file delete-batch') as c:
from ._validators import process_file_batch_source_parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ def load_arguments(self, _): # pylint: disable=too-many-locals, too-many-statem
c.extra('no_progress', progress_type)

with self.argument_context('storage file delete-batch') as c:
from ._validators import process_file_batch_source_parameters
from ._validators_azure_stack import process_file_batch_source_parameters
c.argument('source', options_list=('--source', '-s'), validator=process_file_batch_source_parameters)

with self.argument_context('storage file copy start') as c:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,4 +491,13 @@ def transform_file_show_result(result):
new_result['properties']['contentSettings']['contentMd5'] = _encode_bytes(new_result['properties']
['contentSettings']['contentMd5'])
new_result.update(result)
_decode_bytearray(new_result)
return new_result


def _decode_bytearray(result):
for k, v in result.items():
if isinstance(v, bytearray):
result[k] = base64.urlsafe_b64encode(v).decode()
elif isinstance(v, dict):
_decode_bytearray(v)
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,8 @@ def process_file_batch_source_parameters(cmd, namespace):
if not namespace.account_name:
namespace.account_name = identifier.account_name

namespace.share_name = namespace.source


def process_file_download_namespace(namespace):
get_file_path_validator()(namespace)
Expand Down
24 changes: 7 additions & 17 deletions src/azure-cli/azure/cli/command_modules/storage/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# --------------------------------------------------------------------------------------------

from azure.cli.command_modules.storage._client_factory import (cf_sa, cf_blob_container_mgmt, blob_data_service_factory,
page_blob_service_factory, file_data_service_factory,
page_blob_service_factory,
multi_service_properties_factory,
cf_mgmt_policy, cf_sa_for_keys,
cf_mgmt_blob_services, cf_mgmt_file_shares,
Expand Down Expand Up @@ -574,11 +574,6 @@ def get_custom_sdk(custom_module, client_factory, resource_type=ResourceType.DAT
g.show_command('show', 'get')
g.command('migrate-vlw', 'begin_object_level_worm', supports_no_wait=True, is_preview=True)

file_sdk = CliCommandType(
operations_tmpl='azure.multiapi.storage.file.fileservice#FileService.{}',
client_factory=file_data_service_factory,
resource_type=ResourceType.DATA_STORAGE)

share_client_sdk = CliCommandType(
operations_tmpl='azure.multiapi.storagev2.fileshare._share_client#ShareClient.{}',
client_factory=cf_share_client,
Expand Down Expand Up @@ -691,24 +686,17 @@ def get_custom_sdk(custom_module, client_factory, resource_type=ResourceType.DAT
transform=transform_file_directory_result,
table_transformer=transform_file_output)

with self.command_group('storage file', command_type=file_sdk,
custom_command_type=get_custom_sdk('file', file_data_service_factory)) as g:
from ._format import transform_boolean_for_table, transform_file_output
from ._exception_handler import file_related_exception_handler
g.storage_command('download', 'get_file_to_path', exception_handler=file_related_exception_handler)
g.storage_custom_command(
'download-batch', 'storage_file_download_batch')
g.storage_custom_command('delete-batch', 'storage_file_delete_batch')

with self.command_group('storage file', command_type=file_client_sdk,
custom_command_type=get_custom_sdk('file', cf_share_file_client)) as g:
from ._transformers import transform_file_show_result
from ._format import transform_metadata_show
from ._format import transform_metadata_show, transform_boolean_for_table, transform_file_output
from ._exception_handler import file_related_exception_handler
g.storage_custom_command('list', 'list_share_files', client_factory=cf_share_client,
transform=transform_file_directory_result,
table_transformer=transform_file_output)
g.storage_command('delete', 'delete_file', transform=create_boolean_result_output_transformer('deleted'),
table_transformer=transform_boolean_for_table)
g.storage_custom_command('delete-batch', 'storage_file_delete_batch', client_factory=cf_share_client)
g.storage_command('resize', 'resize_file')
g.storage_custom_command('url', 'create_file_url', transform=transform_url_without_encode,
client_factory=cf_share_client)
Expand All @@ -724,11 +712,13 @@ def get_custom_sdk(custom_module, client_factory, resource_type=ResourceType.DAT
g.storage_custom_command('copy start', 'storage_file_copy', resource_type=ResourceType.DATA_STORAGE_FILESHARE)
g.storage_command('copy cancel', 'abort_copy')
g.storage_custom_command('copy start-batch', 'storage_file_copy_batch', client_factory=cf_share_client)

g.storage_custom_command('upload', 'storage_file_upload', exception_handler=file_related_exception_handler,
resource_type=ResourceType.DATA_STORAGE_FILESHARE)
g.storage_custom_command('upload-batch', 'storage_file_upload_batch',
custom_command_type=get_custom_sdk('file', client_factory=cf_share_client))
g.storage_custom_command('download', 'download_file', exception_handler=file_related_exception_handler,
transform=transform_file_show_result)
g.storage_custom_command('download-batch', 'storage_file_download_batch', client_factory=cf_share_client)

with self.command_group('storage cors', get_custom_sdk('cors', multi_service_properties_factory)) as g:
from ._transformers import transform_cors_list_output
Expand Down
67 changes: 44 additions & 23 deletions src/azure-cli/azure/cli/command_modules/storage/operations/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def storage_file_upload(client, local_file_path, content_settings=None,
'timeout': timeout
}
if progress_callback:
upload_args['raw_response_hook'] = progress_callback
upload_args['progress_hook'] = progress_callback
# Because the contents of the uploaded file may be too large, it should be passed into the a stream object,
# upload_file() read file data in batches to avoid OOM problems
count = os.path.getsize(local_file_path)
Expand Down Expand Up @@ -208,15 +208,41 @@ def _upload_action(src, dst2):
return list(_upload_action(src, dst) for src, dst in source_files)


def storage_file_download_batch(cmd, client, source, destination, pattern=None, dryrun=False, validate_content=False,
max_connections=1, progress_callback=None, snapshot=None):
def download_file(client, destination_path=None, timeout=None, max_connections=2, open_mode='wb', **kwargs):
from azure.cli.command_modules.storage.util import mkdir_p
destination_folder = os.path.dirname(destination_path) if destination_path else ""
if destination_folder and not os.path.exists(destination_folder):
mkdir_p(destination_folder)

if not destination_folder or os.path.isdir(destination_path):
file = client.get_file_properties(timeout=timeout)
file_name = file.name.split("/")[-1]
destination_path = os.path.join(destination_path, file_name) \
if destination_path else file_name

kwargs['progress_hook'] = kwargs.pop("progress_callback", None)

with open(destination_path, open_mode) as stream:
start_range = kwargs.pop('start_range', None)
end_range = kwargs.pop('end_range', None)
length = None
if start_range is not None and end_range is not None:
length = end_range - start_range + 1
download = client.download_file(offset=start_range, length=length, timeout=timeout,
max_concurrency=max_connections, **kwargs)
download.readinto(stream)
return client.get_file_properties()


def storage_file_download_batch(client, source, destination, pattern=None, dryrun=False, validate_content=False,
max_connections=1, progress_callback=None):
"""
Download files from file share to local directory in batch
"""

from azure.cli.command_modules.storage.util import glob_files_remotely, mkdir_p
from azure.cli.command_modules.storage.util import glob_files_remotely_track2

source_files = glob_files_remotely(cmd, client, source, pattern, snapshot=snapshot)
source_files = glob_files_remotely_track2(client, source, pattern, is_share_client=True)

if dryrun:
source_files_list = list(source_files)
Expand All @@ -234,18 +260,14 @@ def storage_file_download_batch(cmd, client, source, destination, pattern=None,
return []

def _download_action(pair):
destination_dir = os.path.join(destination, pair[0])
mkdir_p(destination_dir)

get_file_args = {'share_name': source, 'directory_name': pair[0], 'file_name': pair[1],
'file_path': os.path.join(destination, *pair), 'max_connections': max_connections,
'progress_callback': progress_callback, 'snapshot': snapshot}
path = os.path.join(*pair)
local_path = os.path.join(destination, *pair)
file_client = client.get_file_client(path)

if cmd.supported_api_version(min_api='2016-05-31'):
get_file_args['validate_content'] = validate_content
download_file(file_client, destination_path=local_path, max_connections=max_connections,
progress_callback=progress_callback, validate_content=validate_content)

client.get_file_to_path(**get_file_args)
return client.make_file_url(source, *pair)
return file_client.url.replace('%5C', '/')

return list(_download_action(f) for f in source_files)

Expand Down Expand Up @@ -320,19 +342,18 @@ def action_file_copy(file_info):
raise ValueError('Fail to find source. Neither blob container or file share is specified.')


def storage_file_delete_batch(cmd, client, source, pattern=None, dryrun=False, timeout=None):
def storage_file_delete_batch(client, source, pattern=None, dryrun=False, timeout=None):
"""
Delete files from file share in batch
"""

def delete_action(file_pair):
delete_file_args = {'share_name': source, 'directory_name': file_pair[0], 'file_name': file_pair[1],
'timeout': timeout}

return client.delete_file(**delete_file_args)
def delete_action(pair):
path = os.path.join(*pair)
file_client = client.get_file_client(path)
return file_client.delete_file(timeout=timeout)

from azure.cli.command_modules.storage.util import glob_files_remotely
source_files = list(glob_files_remotely(cmd, client, source, pattern))
from azure.cli.command_modules.storage.util import glob_files_remotely_track2
source_files = list(glob_files_remotely_track2(client, source, pattern, is_share_client=True))

if dryrun:
logger.warning('delete files from %s', source)
Expand Down
Loading

0 comments on commit 6b230f9

Please sign in to comment.