Skip to content

Commit

Permalink
[Storage-blob-preview] az storage blob query: Support parquet for…
Browse files Browse the repository at this point in the history
… `--input-format` (#3809)

* adopt azure-storage-blob-12.9.0b1

* remove sdk aio package

* copy storage blob query command from main repo

* match main repo fix

* Add parquet as an input format for storage blob query

* linter EOF new line

* update release history

* Fix main repo typo

* Add to 0.6.0

* Address PR comments

* remove unnecessary decoding change

* eof newline

Co-authored-by: Zhiyi Huang <17182306+calvinhzy@users.noreply.github.com>
  • Loading branch information
evelyn-ys and calvinhzy authored Sep 1, 2021
1 parent c722445 commit 98b45e7
Show file tree
Hide file tree
Showing 74 changed files with 54,366 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/storage-blob-preview/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Release History
0.6.0
++++++
* Remove `az storage account blob-service-properties` since all the preview arguments are supported in main azure cli
* Add `parquet` option to `az storage blob query --input-format`

0.5.2
++++++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
class StorageCommandsLoader(AzCommandsLoader):
def __init__(self, cli_ctx=None):
from azure.cli.core.commands import CliCommandType
register_resource_type('latest', CUSTOM_DATA_STORAGE_BLOB, '2020-06-12')
register_resource_type('latest', CUSTOM_DATA_STORAGE_BLOB, '2020-10-02')
storage_custom = CliCommandType(operations_tmpl='azure.cli.command_modules.storage.custom#{}')
super(StorageCommandsLoader, self).__init__(cli_ctx=cli_ctx,
resource_type=CUSTOM_DATA_STORAGE_BLOB,
Expand Down
12 changes: 12 additions & 0 deletions src/storage-blob-preview/azext_storage_blob_preview/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,15 @@
text: |
echo $data | az storage blob upload --data @- -c mycontainer -n myblob --account-name mystorageaccount --account-key 0000-0000
"""

helps['storage blob query'] = """
type: command
short-summary: Enable users to select/project on blob or blob snapshot data by providing simple query expressions.
examples:
- name: Enable users to select/project on blob by providing simple query expressions.
text: az storage blob query -c mycontainer -n myblob --query-expression "SELECT _2 from BlobStorage"
- name: Enable users to select/project on blob by providing simple query expressions and save in target file.
text: az storage blob query -c mycontainer -n myblob --query-expression "SELECT _2 from BlobStorage" --result-file result.csv
- name: Enable users to select/project on blob by providing simple query expressions and an input format
text: az storage blob query -c mycontainer -n myblob --query-expression "SELECT _2 from BlobStorage" --input-format parquet
"""
63 changes: 63 additions & 0 deletions src/storage-blob-preview/azext_storage_blob_preview/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,69 @@ def load_arguments(self, _): # pylint: disable=too-many-locals, too-many-statem
c.extra('tier', tier_type, is_preview=True)
c.extra('overwrite', overwrite_type, is_preview=True)

with self.argument_context('storage blob query') as c:
from ._validators import validate_text_configuration
c.register_blob_arguments()
c.register_precondition_options()
line_separator = CLIArgumentType(help="The string used to separate records.", default='\n')
column_separator = CLIArgumentType(help="The string used to separate columns.", default=',')
quote_char = CLIArgumentType(help="The string used to quote a specific field.", default='"')
record_separator = CLIArgumentType(help="The string used to separate records.", default='\n')
escape_char = CLIArgumentType(help="The string used as an escape character. Default to empty.", default="")
has_header = CLIArgumentType(
arg_type=get_three_state_flag(),
help="Whether the blob data includes headers in the first line. "
"The default value is False, meaning that the data will be returned inclusive of the first line. "
"If set to True, the data will be returned exclusive of the first line.", default=False)
c.extra('lease', options_list='--lease-id',
help='Required if the blob has an active lease.')
c.argument('query_expression', help='The query expression in SQL. The maximum size of the query expression '
'is 256KiB. For more information about the expression syntax, please see '
'https://docs.microsoft.com/azure/storage/blobs/query-acceleration-sql-reference')
c.extra('input_format', arg_type=get_enum_type(['csv', 'json', 'parquet']), validator=validate_text_configuration,
min_api='2020-10-02',
help='Serialization type of the data currently stored in the blob. '
'The default is to treat the blob data as CSV data formatted in the default dialect.'
'The blob data will be reformatted according to that profile when blob format is specified. '
'If you choose `json`, please specify `Input Json Text Configuration Arguments` accordingly; '
'If you choose `csv`, please specify `Input Delimited Text Configuration Arguments`.')
c.extra('output_format', arg_type=get_enum_type(['csv', 'json']),
help='Output serialization type for the data stream. '
'By default the data will be returned as it is represented in the blob. '
'By providing an output format, the blob data will be reformatted according to that profile. '
'If you choose `json`, please specify `Output Json Text Configuration Arguments` accordingly; '
'If you choose `csv`, please specify `Output Delimited Text Configuration Arguments`.'
'By default data with input_format of `parquet` will have the output_format of `csv`')
c.extra('in_line_separator',
arg_group='Input Json Text Configuration',
arg_type=line_separator)
c.extra('in_column_separator', arg_group='Input Delimited Text Configuration',
arg_type=column_separator)
c.extra('in_quote_char', arg_group='Input Delimited Text Configuration',
arg_type=quote_char)
c.extra('in_record_separator', arg_group='Input Delimited Text Configuration',
arg_type=record_separator)
c.extra('in_escape_char', arg_group='Input Delimited Text Configuration',
arg_type=escape_char)
c.extra('in_has_header', arg_group='Input Delimited Text Configuration',
arg_type=has_header)
c.extra('out_line_separator',
arg_group='Output Json Text Configuration',
arg_type=line_separator)
c.extra('out_column_separator', arg_group='Output Delimited Text Configuration',
arg_type=column_separator)
c.extra('out_quote_char', arg_group='Output Delimited Text Configuration',
arg_type=quote_char)
c.extra('out_record_separator', arg_group='Output Delimited Text Configuration',
arg_type=record_separator)
c.extra('out_escape_char', arg_group='Output Delimited Text Configuration',
arg_type=escape_char)
c.extra('out_has_header', arg_group='Output Delimited Text Configuration',
arg_type=has_header)
c.extra('result_file', help='Specify the file path to save result.')
c.ignore('input_config')
c.ignore('output_config')

with self.argument_context('storage container') as c:
c.argument('container_name', container_name_type, options_list=('--name', '-n'))

Expand Down
31 changes: 31 additions & 0 deletions src/storage-blob-preview/azext_storage_blob_preview/_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -1273,3 +1273,34 @@ def get_source_file_or_blob_service_client(cmd, namespace):
account_key=source_key,
share=source_share)
ns['source_sas'] = source_sas


def validate_text_configuration(cmd, ns):
DelimitedTextDialect = cmd.get_models('_models#DelimitedTextDialect', resource_type=CUSTOM_DATA_STORAGE_BLOB)
DelimitedJsonDialect = cmd.get_models('_models#DelimitedJsonDialect', resource_type=CUSTOM_DATA_STORAGE_BLOB)
QuickQueryDialect = cmd.get_models('_models#QuickQueryDialect', resource_type=CUSTOM_DATA_STORAGE_BLOB)

if ns.input_format == 'csv':
ns.input_config = DelimitedTextDialect(
delimiter=ns.in_column_separator,
quotechar=ns.in_quote_char,
lineterminator=ns.in_record_separator,
escapechar=ns.in_escape_char,
has_header=ns.in_has_header)
if ns.input_format == 'json':
ns.input_config = DelimitedJsonDialect(delimiter=ns.in_line_separator)
if ns.input_format == 'parquet':
ns.input_config = QuickQueryDialect.ParquetDialect
if ns.output_format == 'csv':
ns.output_config = DelimitedTextDialect(
delimiter=ns.out_column_separator,
quotechar=ns.out_quote_char,
lineterminator=ns.out_record_separator,
escapechar=ns.out_escape_char,
has_header=ns.out_has_header)
if ns.output_format == 'json':
ns.output_config = DelimitedJsonDialect(delimiter=ns.out_line_separator)
del ns.input_format, ns.in_line_separator, ns.in_column_separator, ns.in_quote_char, ns.in_record_separator, \
ns.in_escape_char, ns.in_has_header
del ns.output_format, ns.out_line_separator, ns.out_column_separator, ns.out_quote_char, ns.out_record_separator, \
ns.out_escape_char, ns.out_has_header
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def get_custom_sdk(custom_module, client_factory, resource_type=ResourceType.DAT
g.storage_custom_command_oauth('delete-batch', 'storage_blob_delete_batch', client_factory=cf_blob_service,
validator=process_blob_delete_batch_parameters)
g.storage_custom_command_oauth('copy start-batch', 'storage_blob_copy_batch', client_factory=cf_blob_service)
g.storage_custom_command_oauth('query', 'query_blob',
is_preview=True, min_api='2020-10-02')

with self.command_group('storage blob', blob_service_sdk, resource_type=CUSTOM_DATA_STORAGE_BLOB,
min_api='2019-12-12',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,3 +697,17 @@ def acquire_blob_lease(client, lease_duration=-1, **kwargs):
def renew_blob_lease(client, **kwargs):
client.renew(**kwargs)
return client.id


def query_blob(cmd, client, query_expression, input_config=None, output_config=None, result_file=None, **kwargs):

reader = client.query_blob(query_expression=query_expression, blob_format=input_config, output_format=output_config,
**kwargs)

if result_file is not None:
with open(result_file, 'wb') as stream:
reader.readinto(stream)
stream.close()
return None

return reader.readall().decode("utf-8")
Loading

0 comments on commit 98b45e7

Please sign in to comment.