Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CosmosDB]: az cosmosdb dts: Add cosmos db data transfer commands #4563

Merged
merged 16 commits into from
May 12, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ def cf_restorable_tables(cli_ctx, _):

def cf_restorable_table_resources(cli_ctx, _):
return cf_cosmosdb_preview(cli_ctx).restorable_table_resources


def cf_data_transfer_job(cli_ctx, _):
return cf_cosmosdb_preview(cli_ctx).data_transfer_jobs
113 changes: 113 additions & 0 deletions src/cosmosdb-preview/azext_cosmosdb_preview/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,3 +536,116 @@
type: command
short-summary: Retrieves latest restorable timestamp for the given table in given region.
"""

helps['cosmosdb dts export'] = """
type: command
short-summary: "Creates a data transfer export Job."
parameters:
- name: --cassandra-table
short-summary: "Cassandra table data source"
long-summary: |
Usage: --cassandra-table keyspace=XX table=XX'
keyspace: Cassandra keyspace name.
table: Cassandra table name.
- name: --blob-container
short-summary: "Blob container data sink"
long-summary: |
Usage: --blob-container name=XX url=XX
name: Container name of Azure Blob Storage.
url: Endpoint Url of Azure Blob Storage.

examples:
- name: Export to cassandra table to blob container
text: |-
az cosmosdb dts export --account-name "db1" -g "rg1" --job-name "j1"\
--cassandra-table keyspace=testkeyspace table=testtable\
--blob-container name=backup1 url=https://backupstorage.blob.core.windows.net/
"""

helps['cosmosdb dts import'] = """
type: command
short-summary: "Creates a data transfer import Job."
parameters:
- name: --cassandra-table
short-summary: "Cassandra table data sink"
long-summary: |
Usage: --cassandra-table keyspace=XX table=XX'
keyspace: Keyspace name of CosmosDB Cassandra.
table: Table name of CosmosDB Cassandra.
- name: --blob-container
short-summary: "Blob conatiner data source"
long-summary: |
Usage: --blob-container name=XX url=XX
name: Container name of Azure Blob Storage.
url: Endpoint Url of Azure Blob Storage.

examples:
- name: Import cassandra table from blob container
text: |-
az cosmosdb dts import --account-name "db1" -g "rg1" --job-name "j1"\
--blob-container name=backup1 url=https://backupstorage.blob.core.windows.net/\
--cassandra-table keyspace=testkeyspace table=testtable
"""

helps['cosmosdb dts copy'] = """
type: command
short-summary: "Creates a data transfer copy Job."
parameters:
- name: --source-cassandra-table
short-summary: "Source cassandra table"
long-summary: |
Usage: --source-cassandra-table keyspace=XX table=XX'
keyspace: Keyspace name of CosmosDB Cassandra.
table: Table name of CosmosDB Cassandra.
- name: --dest-cassandra-table
short-summary: "Destination cassandra table"
long-summary: |
Usage: --dest-cassandra-table keyspace=XX table=XX'
keyspace: Keyspace name of CosmosDB Cassandra.
table: Table name of CosmosDB Cassandra.
- name: --source-sql-container
short-summary: "Blob conatiner data source"
short-summary: "Source sql container"
niteshvijay1995 marked this conversation as resolved.
Show resolved Hide resolved
long-summary: |
Usage: --source-sql-container database=XX container=XX'
database: Database name of CosmosDB Sql.
container: Container name of CosmosDB Sql.
- name: --dest-sql-container
short-summary: "Blob conatiner data source"
short-summary: "Destination sql container"
niteshvijay1995 marked this conversation as resolved.
Show resolved Hide resolved
long-summary: |
Usage: --dest-sql-container database=XX container=XX'
database: Database name of CosmosDB Sql.
container: Container name of CosmosDB Sql.

examples:
- name: Copy sql container
text: |-
az cosmosdb dts copy -g "rg1" --job-name "j1" --account-name "db1" --source-sql-container database=db1 container=c1 --dest-sql-container database=db2 container=c2
- name: Copy cassandra table
text: |-
az cosmosdb dts copy -g "rg1" --job-name "j1" --account-name "db1" --source-cassandra-table keyspace=k1 table=t1 --dest-cassandra-table keyspace=k1 table=t1
"""

helps['cosmosdb dts'] = """
type: group
short-summary: Manage data transfer job with cosmosdb
"""

helps['cosmosdb dts list'] = """
type: command
short-summary: "Get a list of Data Transfer jobs."
examples:
- name: List all jobs
text: |-
az cosmosdb dts list --account-name "ddb1" -g "rg1"
"""

helps['cosmosdb dts show'] = """
type: command
short-summary: "Get a Data Transfer Job."
examples:
- name: Show details of job j1
text: |-
az cosmosdb dts show --account-name "ddb1" --job-name "j1" -g "rg1"
"""
32 changes: 31 additions & 1 deletion src/cosmosdb-preview/azext_cosmosdb_preview/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
validate_mongo_user_definition_id)

from azext_cosmosdb_preview.actions import (
CreateGremlinDatabaseRestoreResource, CreateTableRestoreResource)
CreateGremlinDatabaseRestoreResource, CreateTableRestoreResource, AddCassandraTableAction, AddBlobContainerAction, AddSqlContainerAction)

from azure.cli.core.commands.parameters import (
tags_type, get_resource_name_completion_list, name_type, get_enum_type, get_three_state_flag, get_location_type)
Expand Down Expand Up @@ -284,3 +284,33 @@ def load_arguments(self, _):
c.argument('account_name', account_name_type, id_part=None, required=True, help='Name of the CosmosDB database account')
c.argument('table_name', options_list=['--table-name', '-n'], required=True, help='Name of the CosmosDB Table name')
c.argument('location', options_list=['--location', '-l'], help="Location of the account", required=True)

with self.argument_context('cosmosdb dts export') as c:
niteshvijay1995 marked this conversation as resolved.
Show resolved Hide resolved
c.argument('account_name', account_name_type, id_part=None, help='Name of the CosmosDB database account.')
c.argument('job_name', type=str, help='Name of the Data Transfer Job. A random job name will be generated if not passed.')
c.argument('cassandra_table', nargs='+', action=AddCassandraTableAction, help='Data source cassandra table', required=True)
c.argument('blob_container', nargs='+', action=AddBlobContainerAction, help='Data sink blob container', required=True)
c.argument('worker_count', type=int, help='Worker count')

with self.argument_context('cosmosdb dts import') as c:
c.argument('account_name', account_name_type, id_part=None, help='Name of the CosmosDB database account.')
c.argument('job_name', type=str, help='Name of the Data Transfer Job. A random job name will be generated if not passed.')
c.argument('cassandra_table', nargs='+', action=AddCassandraTableAction, help='Data sink cassandra table', required=True)
c.argument('blob_container', nargs='+', action=AddBlobContainerAction, help='Data source blob container', required=True)
c.argument('worker_count', type=int, help='Worker count')

with self.argument_context('cosmosdb dts copy') as c:
c.argument('account_name', account_name_type, id_part=None, help='Name of the CosmosDB database account.')
c.argument('job_name', type=str, help='Name of the Data Transfer Job')
c.argument('source_cassandra_table', nargs='+', action=AddCassandraTableAction, help='Source cassandra table')
c.argument('source_sql_container', nargs='+', action=AddSqlContainerAction, help='Source sql container')
c.argument('dest_cassandra_table', nargs='+', action=AddCassandraTableAction, help='Destination cassandra table')
c.argument('dest_sql_container', nargs='+', action=AddSqlContainerAction, help='Destination sql container')
c.argument('worker_count', type=int, help='Worker count')

with self.argument_context('cosmosdb dts list') as c:
c.argument('account_name', account_name_type, id_part=None, help='Name of the CosmosDB database account.')

with self.argument_context('cosmosdb dts show') as c:
c.argument('account_name', account_name_type, id_part=None, help='Name of the CosmosDB database account.')
c.argument('job_name', type=str, help='Name of the Data Transfer Job', required=True)
117 changes: 116 additions & 1 deletion src/cosmosdb-preview/azext_cosmosdb_preview/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@

from azext_cosmosdb_preview.vendored_sdks.azure_mgmt_cosmosdb.models import (
DatabaseRestoreResource,
GremlinDatabaseRestoreResource
GremlinDatabaseRestoreResource,
AzureBlobDataTransferDataSourceSink,
CosmosCassandraDataTransferDataSourceSink,
CosmosSqlDataTransferDataSourceSink
)

logger = get_logger(__name__)
Expand Down Expand Up @@ -93,3 +96,115 @@ def __call__(self, parser, namespace, values, option_string=None):

for item in values:
namespace.tables_to_restore.append(item)


class AddBlobContainerAction(argparse._AppendAction):
def __call__(self, parser, namespace, values, option_string=None):
if not values:
# pylint: disable=line-too-long
raise CLIError(f'usage error: {option_string} [KEY=VALUE ...]')

container_name = None
endpoint_url = None

for (k, v) in (x.split('=', 1) for x in values):
kl = k.lower()
if kl == 'name':
container_name = v

elif kl == 'url':
endpoint_url = v

else:
raise CLIError(
f'Unsupported Key {k} is provided for {option_string} component. All'
' possible keys are: name, url'
)

if container_name is None:
raise CLIError(f'usage error: missing key name in {option_string} component')

if endpoint_url is None:
raise CLIError(f'usage error: missing key url in {option_string} component')

blob_container = AzureBlobDataTransferDataSourceSink(container_name=container_name, endpoint_url=endpoint_url)

namespace.blob_container = blob_container


class AddCassandraTableAction(argparse._AppendAction):
def __call__(self, parser, namespace, values, option_string=None):
if not values:
# pylint: disable=line-too-long
raise CLIError(f'usage error: {option_string} [KEY=VALUE ...]')

keyspace_name = None
table_name = None

for (k, v) in (x.split('=', 1) for x in values):
kl = k.lower()
if kl == 'keyspace':
keyspace_name = v

elif kl == 'table':
table_name = v

else:
raise CLIError(
f'Unsupported Key {k} is provided for {option_string} component. All'
' possible keys are: keyspace, table'
)

if keyspace_name is None:
raise CLIError(f'usage error: missing key keyspace in {option_string} component')

if table_name is None:
raise CLIError(f'usage error: missing key table in {option_string} component')

cassandra_table = CosmosCassandraDataTransferDataSourceSink(keyspace_name=keyspace_name, table_name=table_name)

if option_string == "--source-cassandra-table":
namespace.source_cassandra_table = cassandra_table
elif option_string == "--dest-cassandra-table":
namespace.dest_cassandra_table = cassandra_table
else:
namespace.cassandra_table = cassandra_table


class AddSqlContainerAction(argparse._AppendAction):
def __call__(self, parser, namespace, values, option_string=None):
if not values:
# pylint: disable=line-too-long
raise CLIError(f'usage error: {option_string} [KEY=VALUE ...]')

database_name = None
container_name = None

for (k, v) in (x.split('=', 1) for x in values):
kl = k.lower()
if kl == 'database':
database_name = v

elif kl == 'container':
container_name = v

else:
raise CLIError(
f'Unsupported Key {k} is provided for {option_string} component. All'
' possible keys are: database, container'
)

if database_name is None:
raise CLIError(f'usage error: missing key database in {option_string} component')

if container_name is None:
raise CLIError(f'usage error: missing key container in {option_string} component')

sql_container = CosmosSqlDataTransferDataSourceSink(database_name=database_name, container_name=container_name)

if option_string == "--source-sql-container":
namespace.source_sql_container = sql_container
elif option_string == "--dest-sql-container":
namespace.dest_sql_container = sql_container
else:
namespace.sql_container = sql_container
16 changes: 15 additions & 1 deletion src/cosmosdb-preview/azext_cosmosdb_preview/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
cf_restorable_gremlin_graphs,
cf_restorable_gremlin_resources,
cf_restorable_tables,
cf_restorable_table_resources
cf_restorable_table_resources,
cf_data_transfer_job
)


Expand Down Expand Up @@ -161,3 +162,16 @@ def load_command_table(self, _):

with self.command_group('cosmosdb table', cosmosdb_table_sdk, client_factory=cf_table_resources) as g:
g.custom_command('retrieve-latest-backup-time', 'cli_table_retrieve_latest_backup_time', is_preview=True)

# Data Transfer Service
cosmosdb_data_transfer_job = CliCommandType(
operations_tmpl='azext_cosmosdb_preview.vendored_sdks.cosmodb.operations._data_transfer_jobs_operations#DataTransferJobsOperations.{}',
client_factory=cf_data_transfer_job
)

with self.command_group('cosmosdb dts', cosmosdb_data_transfer_job, client_factory=cf_data_transfer_job) as g:
g.custom_command('export', 'cosmosdb_data_transfer_export_job')
g.custom_command('import', 'cosmosdb_data_transfer_import_job')
g.custom_command('copy', 'cosmosdb_data_transfer_copy_job')
g.custom_command('list', 'cosmosdb_dts_list')
niteshvijay1995 marked this conversation as resolved.
Show resolved Hide resolved
g.custom_show_command('show', 'cosmosdb_dts_show')
niteshvijay1995 marked this conversation as resolved.
Show resolved Hide resolved
Loading