Skip to content

Commit

Permalink
[Storage] Support rewrite existing blob (Azure#16796)
Browse files Browse the repository at this point in the history
* upgrade version

* draft command

* draftfor rewrite work

* refine signature

* refine signature

* test pass

* fix style

* fix linter

* rewrite blob with upload blob from url

* refine code

* test pass

* refine test

* fix linter

* add validation

* refine test

* refine code

* Apply suggestions from code review

Co-authored-by: Yishi Wang <yishiwang@microsoft.com>

Co-authored-by: Yishi Wang <yishiwang@microsoft.com>
  • Loading branch information
Juliehzl and evelyn-ys authored Feb 26, 2021
1 parent 3647a9a commit aa0155e
Show file tree
Hide file tree
Showing 16 changed files with 1,174 additions and 866 deletions.
2 changes: 1 addition & 1 deletion src/azure-cli-core/azure/cli/core/profiles/_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def default_api_version(self):
ResourceType.DATA_KEYVAULT_ADMINISTRATION_BACKUP: '7.2-preview',
ResourceType.DATA_KEYVAULT_ADMINISTRATION_ACCESS_CONTROL: '7.2-preview',
ResourceType.DATA_STORAGE: '2018-11-09',
ResourceType.DATA_STORAGE_BLOB: '2019-12-12',
ResourceType.DATA_STORAGE_BLOB: '2020-04-08',
ResourceType.DATA_STORAGE_FILEDATALAKE: '2020-02-10',
ResourceType.DATA_STORAGE_FILESHARE: '2019-07-07',
ResourceType.DATA_STORAGE_QUEUE: '2018-03-28',
Expand Down
46 changes: 30 additions & 16 deletions src/azure-cli/azure/cli/command_modules/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def register_content_settings_argument(self, settings_class, update, arg_group=N
self.extra('content_disposition', default=None, arg_group=arg_group,
help='Conveys additional information about how to process the response payload, and can also be '
'used to attach additional metadata.')
self.extra('content_cache_control', default=None, help='The cache control string.', arg_group=arg_group)
self.extra('content_cache_control', options_list=['--content-cache-control', '--content-cache'],
default=None, help='The cache control string.',
arg_group=arg_group)
self.extra('content_md5', default=None, help='The content\'s MD5 hash.', arg_group=arg_group)
if update:
self.extra('clear_content_settings', help='If this flag is set, then if any one or more of the '
Expand All @@ -106,25 +108,27 @@ def register_path_argument(self, default_file_param=None, options_list=None):
self.ignore('file_name')
self.ignore('directory_name')

def register_source_uri_arguments(self, validator, blob_only=False):
def register_source_uri_arguments(self, validator, blob_only=False, arg_group='Copy Source'):
self.argument('copy_source', options_list=('--source-uri', '-u'), validator=validator, required=False,
arg_group='Copy Source')
self.extra('source_sas', default=None, arg_group='Copy Source',
arg_group=arg_group)
self.argument('source_url', options_list=('--source-uri', '-u'), validator=validator, required=False,
arg_group=arg_group)
self.extra('source_sas', default=None, arg_group=arg_group,
help='The shared access signature for the source storage account.')
self.extra('source_container', default=None, arg_group='Copy Source',
self.extra('source_container', default=None, arg_group=arg_group,
help='The container name for the source storage account.')
self.extra('source_blob', default=None, arg_group='Copy Source',
self.extra('source_blob', default=None, arg_group=arg_group,
help='The blob name for the source storage account.')
self.extra('source_snapshot', default=None, arg_group='Copy Source',
self.extra('source_snapshot', default=None, arg_group=arg_group,
help='The blob snapshot for the source storage account.')
self.extra('source_account_name', default=None, arg_group='Copy Source',
self.extra('source_account_name', default=None, arg_group=arg_group,
help='The storage account name of the source blob.')
self.extra('source_account_key', default=None, arg_group='Copy Source',
self.extra('source_account_key', default=None, arg_group=arg_group,
help='The storage account key of the source blob.')
if not blob_only:
self.extra('source_path', default=None, arg_group='Copy Source',
self.extra('source_path', default=None, arg_group=arg_group,
help='The file path for the source storage account.')
self.extra('source_share', default=None, arg_group='Copy Source',
self.extra('source_share', default=None, arg_group=arg_group,
help='The share name for the source storage account.')

def register_common_storage_account_options(self):
Expand Down Expand Up @@ -153,15 +157,25 @@ def register_common_storage_account_options(self):
resource_type=ResourceType.MGMT_STORAGE, min_api='2016-12-01', nargs='+',
validator=validate_encryption_services, help='Specifies which service(s) to encrypt.')

def register_precondition_options(self):
self.extra('if_modified_since')
self.extra('if_unmodified_since')
self.extra('if_match', help="An ETag value, or the wildcard character (*). Specify this header to perform the "
def register_precondition_options(self, prefix=''):
from ._validators import (get_datetime_type)
self.extra('{}if_modified_since'.format(prefix), arg_group='Precondition',
help="Commence only if modified since supplied UTC datetime (Y-m-d'T'H:M'Z').",
type=get_datetime_type(False))
self.extra('{}if_unmodified_since'.format(prefix), arg_group='Precondition',
help="Commence only if unmodified since supplied UTC datetime (Y-m-d'T'H:M'Z').",
type=get_datetime_type(False))
self.extra('{}if_match'.format(prefix), arg_group='Precondition',
help="An ETag value, or the wildcard character (*). Specify this header to perform the "
"operation only if the resource's ETag matches the value specified.")
self.extra('if_none_match', help="An ETag value, or the wildcard character (*). Specify this header to perform "
self.extra('{}if_none_match'.format(prefix), arg_group='Precondition',
help="An ETag value, or the wildcard character (*). Specify this header to perform "
"the operation only if the resource's ETag does not match the value specified. Specify the wildcard "
"character (*) to perform the operation only if the resource does not exist, and fail the operation "
"if it does exist.")
self.extra('{}if_tags_match_condition'.format(prefix), arg_group='Precondition',
options_list=['--{}tags-condition'.format(prefix.replace('_', '-'))],
help='Specify a SQL where clause on blob tags to operate only on blobs with a matching value.')

def register_blob_arguments(self):
from ._validators import get_not_none_validator
Expand Down
9 changes: 9 additions & 0 deletions src/azure-cli/azure/cli/command_modules/storage/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,15 @@
az storage blob restore --account-name mystorageaccount -g MyResourceGroup -t $time -r container0/blob1 container0/blob2 --no-wait
"""

helps['storage blob rewrite'] = """
type: command
short-summary: Create a new Block Blob where the content of the blob is read from a given URL.
long-summary: The content of an existing blob is overwritten with the new blob.
examples:
- name: Update encryption scope for existing blob.
text: az storage blob rewrite --source-uri https://srcaccount.blob.core.windows.net/mycontainer/myblob?<sastoken> --encryption-scope newscope -c mycontainer -n myblob --account-name mystorageaccount --account-key 0000-0000
"""

helps['storage blob service-properties'] = """
type: group
short-summary: Manage storage blob service properties.
Expand Down
30 changes: 27 additions & 3 deletions src/azure-cli/azure/cli/command_modules/storage/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
validate_delete_retention_days, validate_container_delete_retention_days,
validate_file_delete_retention_days, validator_change_feed_retention_days,
validate_fs_public_access, validate_logging_version, validate_or_policy, validate_policy,
get_api_version_type, blob_download_file_path_validator)
get_api_version_type, blob_download_file_path_validator, blob_tier_validator)


def load_arguments(self, _): # pylint: disable=too-many-locals, too-many-statements, too-many-lines
Expand Down Expand Up @@ -174,6 +174,9 @@ def load_arguments(self, _): # pylint: disable=too-many-locals, too-many-statem
'Shared Key. If false, then all requests, including shared access signatures, must be authorized with '
'Azure Active Directory (Azure AD). The default value is null, which is equivalent to true.')

t_blob_tier = self.get_sdk('_generated.models._azure_blob_storage_enums#AccessTierOptional',
resource_type=ResourceType.DATA_STORAGE_BLOB)

with self.argument_context('storage') as c:
c.argument('container_name', container_name_type)
c.argument('directory_name', directory_type)
Expand Down Expand Up @@ -649,6 +652,28 @@ def load_arguments(self, _): # pylint: disable=too-many-locals, too-many-statem
c.argument('time_to_restore', type=get_datetime_type(True), options_list=['--time-to-restore', '-t'],
help='Restore blob to the specified time, which should be UTC datetime in (Y-m-d\'T\'H:M:S\'Z\').')

with self.argument_context('storage blob rewrite', resource_type=ResourceType.DATA_STORAGE_BLOB,
min_api='2020-04-08') as c:
c.register_blob_arguments()
c.register_precondition_options()

c.argument('source_url', options_list=['--source-uri', '-u'],
help='A URL of up to 2 KB in length that specifies a file or blob. The value should be URL-encoded '
'as it would appear in a request URI. If the source is in another account, the source must either '
'be public or must be authenticated via a shared access signature. If the source is public, no '
'authentication is required.')
c.extra('lease', options_list='--lease-id',
help='Required if the blob has an active lease. Value can be a BlobLeaseClient object '
'or the lease ID as a string.')
c.extra('standard_blob_tier', arg_type=get_enum_type(t_blob_tier), options_list='--tier',
help='A standard blob tier value to set the blob to. For this version of the library, '
'this is only applicable to block blobs on standard storage accounts.')
c.extra('encryption_scope',
help='A predefined encryption scope used to encrypt the data on the service. An encryption scope '
'can be created using the Management API and referenced here by name. If a default encryption scope '
'has been defined at the container, this value will override it if the container-level scope is '
'configured to allow overrides. Otherwise an error will be raised.')

with self.argument_context('storage blob update') as c:
t_blob_content_settings = self.get_sdk('blob.models#ContentSettings')
c.register_content_settings_argument(t_blob_content_settings, update=True)
Expand All @@ -662,8 +687,7 @@ def load_arguments(self, _): # pylint: disable=too-many-locals, too-many-statem
'this query parameter indicates the snapshot version.')

with self.argument_context('storage blob set-tier') as c:
from azure.cli.command_modules.storage._validators import (blob_tier_validator,
blob_rehydrate_priority_validator)
from azure.cli.command_modules.storage._validators import (blob_rehydrate_priority_validator)
c.register_blob_arguments()

c.argument('blob_type', options_list=('--type', '-t'), arg_type=get_enum_type(('block', 'page')))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,12 @@ def transform_restore_policy_output(result):
if hasattr(result, 'restore_policy') and hasattr(result.restore_policy, 'last_enabled_time'):
del result.restore_policy.last_enabled_time
return result


def transform_response_with_bytearray(response):
""" transform bytearray to string """
from msrest import Serializer
for item in response:
if response[item] and isinstance(response[item], (bytes, bytearray)):
response[item] = Serializer.serialize_bytearray(response[item])
return response
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ def get_custom_sdk(custom_module, client_factory, resource_type=ResourceType.DAT
table_transformer=transform_blob_output)
g.storage_custom_command_oauth('query', 'query_blob',
is_preview=True, min_api='2019-12-12')
g.storage_custom_command_oauth('rewrite', 'rewrite_blob', is_preview=True, min_api='2020-04-08')

blob_lease_client_sdk = CliCommandType(
operations_tmpl='azure.multiapi.storagev2.blob._lease#BlobLeaseClient.{}',
Expand Down
66 changes: 62 additions & 4 deletions src/azure-cli/azure/cli/command_modules/storage/operations/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
check_precondition_success)
from knack.log import get_logger
from knack.util import CLIError
from .._transformers import transform_response_with_bytearray

logger = get_logger(__name__)

Expand Down Expand Up @@ -438,6 +439,18 @@ def transform_blob_type(cmd, blob_type):
return None


# pylint: disable=protected-access
def _adjust_block_blob_size(client, blob_type, length):
if not blob_type or blob_type != 'block':
return

# increase the block size to 4000MB when the block list will contain more than
# 50,000 blocks(each block 100MB)
if length > 50000 * 100 * 1024 * 1024:
client._config.max_block_size = 4000 * 1024 * 1024
client._config.max_single_put_size = 5000 * 1024 * 1024


# pylint: disable=too-many-locals
def upload_blob(cmd, client, file_path, container_name=None, blob_name=None, blob_type=None, content_settings=None,
metadata=None, validate_content=False, maxsize_condition=None, max_connections=2, lease_id=None,
Expand Down Expand Up @@ -480,11 +493,9 @@ def upload_blob(cmd, client, file_path, container_name=None, blob_name=None, blo
if if_none_match:
upload_args['etag'] = if_none_match
upload_args['match_condition'] = MatchConditions.IfModified
_adjust_block_blob_size(client, blob_type, length=count)
response = client.upload_blob(data=data, length=count, encryption_scope=encryption_scope, **upload_args)
if response['content_md5'] is not None:
from msrest import Serializer
response['content_md5'] = Serializer.serialize_bytearray(response['content_md5'])
return response
return transform_response_with_bytearray(response)

t_content_settings = cmd.get_models('blob.models#ContentSettings')
content_settings = guess_content_type(file_path, content_settings, t_content_settings)
Expand Down Expand Up @@ -560,6 +571,53 @@ def upload_block_blob():
return type_func[blob_type]()


def get_block_ids(content_length, block_length):
"""Get the block id arrary from block blob length, block size"""
block_count = 0
if block_length:
block_count = content_length // block_length
if block_count * block_length != content_length:
block_count += 1
block_ids = []
for i in range(block_count):
chunk_offset = i * block_length
block_id = '{0:032d}'.format(chunk_offset)
block_ids.append(block_id)
return block_ids


def rewrite_blob(cmd, client, source_url, encryption_scope=None, **kwargs):
src_properties = client.from_blob_url(source_url).get_blob_properties()
BlobType = cmd.get_models('_models#BlobType', resource_type=ResourceType.DATA_STORAGE_BLOB)
if src_properties.blob_type != BlobType.BlockBlob:
from azure.cli.core.azclierror import ValidationError
raise ValidationError("Currently only support block blob! The source blob is {}.".format(
src_properties.blob_type))
src_content_length = src_properties.size
if src_content_length <= 5000 * 1024 * 1024:
return client.upload_blob_from_url(source_url=source_url, overwrite=True, encryption_scope=encryption_scope,
destination_lease=kwargs.pop('lease', None), **kwargs)

block_length = 4000 * 1024 * 1024 # using max block size
block_ids = get_block_ids(src_content_length, block_length)

copyoffset = 0
for block_id in block_ids:
block_size = block_length
if copyoffset + block_size > src_content_length:
block_size = src_content_length - copyoffset
client.stage_block_from_url(
block_id=block_id,
source_url=source_url,
source_offset=copyoffset,
source_length=block_size,
encryption_scope=encryption_scope)
copyoffset += block_size
response = client.commit_block_list(block_list=block_ids, content_settings=src_properties.content_settings,
metadata=src_properties.metadata, encryption_scope=encryption_scope, **kwargs)
return transform_response_with_bytearray(response)


def show_blob(cmd, client, container_name, blob_name, snapshot=None, lease_id=None,
if_modified_since=None, if_unmodified_since=None, if_match=None,
if_none_match=None, timeout=None):
Expand Down
Loading

0 comments on commit aa0155e

Please sign in to comment.