Skip to content

Commit

Permalink
[CosmosDB]: az cosmosdb dts: Add cosmos db data transfer commands (#4563
Browse files Browse the repository at this point in the history
)

* Add dts commands

* Fix lint error

* Fix static analysis

* Fix CLI Linter

* Fix review comments

* Add new version

* Fix linter

* Update python sdk to 7.0.0b5 version

* Add pause, resume, cancel API

* Fix static analysis

* Update metadata file

* Update test recording

* Run live tests

* Add test for dts commands

* Fix short summary

* Remove export import command

Co-authored-by: Nitesh Vijay <niteshvijay@microsoft.com>
  • Loading branch information
niteshvijay1995 and niteshvijay-ms authored May 12, 2022
1 parent 0c9628d commit c8dedfa
Show file tree
Hide file tree
Showing 112 changed files with 19,305 additions and 15,024 deletions.
4 changes: 4 additions & 0 deletions src/cosmosdb-preview/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
Release History
===============
0.16.0
++++++
* Create and manage data transfer export, import and copy jobs.

0.15.0
++++++
* Add `--enable-materialized-views` parameter for create/update database account.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ def cf_restorable_tables(cli_ctx, _):

def cf_restorable_table_resources(cli_ctx, _):
return cf_cosmosdb_preview(cli_ctx).restorable_table_resources


def cf_data_transfer_job(cli_ctx, _):
return cf_cosmosdb_preview(cli_ctx).data_transfer_jobs
88 changes: 88 additions & 0 deletions src/cosmosdb-preview/azext_cosmosdb_preview/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,3 +536,91 @@
type: command
short-summary: Retrieves latest restorable timestamp for the given table in given region.
"""

helps['cosmosdb dts copy'] = """
type: command
short-summary: "Creates a Data Transfer Copy Job."
parameters:
- name: --source-cassandra-table
short-summary: "Source cassandra table"
long-summary: |
Usage: --source-cassandra-table keyspace=XX table=XX'
keyspace: Keyspace name of CosmosDB Cassandra.
table: Table name of CosmosDB Cassandra.
- name: --dest-cassandra-table
short-summary: "Destination cassandra table"
long-summary: |
Usage: --dest-cassandra-table keyspace=XX table=XX'
keyspace: Keyspace name of CosmosDB Cassandra.
table: Table name of CosmosDB Cassandra.
- name: --source-sql-container
short-summary: "Source sql container"
long-summary: |
Usage: --source-sql-container database=XX container=XX'
database: Database name of CosmosDB Sql.
container: Container name of CosmosDB Sql.
- name: --dest-sql-container
short-summary: "Destination sql container"
long-summary: |
Usage: --dest-sql-container database=XX container=XX'
database: Database name of CosmosDB Sql.
container: Container name of CosmosDB Sql.
examples:
- name: Copy sql container
text: |-
az cosmosdb dts copy -g "rg1" --job-name "j1" --account-name "db1" --source-sql-container database=db1 container=c1 --dest-sql-container database=db2 container=c2
- name: Copy cassandra table
text: |-
az cosmosdb dts copy -g "rg1" --job-name "j1" --account-name "db1" --source-cassandra-table keyspace=k1 table=t1 --dest-cassandra-table keyspace=k1 table=t1
"""

helps['cosmosdb dts'] = """
type: group
short-summary: Manage data transfer job with cosmosdb
"""

helps['cosmosdb dts list'] = """
type: command
short-summary: "Get a list of Data Transfer Jobs."
examples:
- name: List all jobs
text: |-
az cosmosdb dts list --account-name "ddb1" -g "rg1"
"""

helps['cosmosdb dts show'] = """
type: command
short-summary: "Get a Data Transfer Job."
examples:
- name: Show details of job j1
text: |-
az cosmosdb dts show --account-name "ddb1" --job-name "j1" -g "rg1"
"""

helps['cosmosdb dts pause'] = """
type: command
short-summary: "Pause a Data Transfer Job."
examples:
- name: Pause job j1
text: |-
az cosmosdb dts pause --account-name "ddb1" --job-name "j1" -g "rg1"
"""

helps['cosmosdb dts resume'] = """
type: command
short-summary: "Resumes a Data Transfer Job."
examples:
- name: Resume job j1
text: |-
az cosmosdb dts resume --account-name "ddb1" --job-name "j1" -g "rg1"
"""

helps['cosmosdb dts cancel'] = """
type: command
short-summary: "Cancels a Data Transfer Job."
examples:
- name: Cancel job j1
text: |-
az cosmosdb dts cancel --account-name "ddb1" --job-name "j1" -g "rg1"
"""
22 changes: 21 additions & 1 deletion src/cosmosdb-preview/azext_cosmosdb_preview/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
validate_mongo_user_definition_id)

from azext_cosmosdb_preview.actions import (
CreateGremlinDatabaseRestoreResource, CreateTableRestoreResource)
CreateGremlinDatabaseRestoreResource, CreateTableRestoreResource, AddCassandraTableAction, AddSqlContainerAction)

from azure.cli.core.commands.parameters import (
tags_type, get_resource_name_completion_list, name_type, get_enum_type, get_three_state_flag, get_location_type)
Expand Down Expand Up @@ -284,3 +284,23 @@ def load_arguments(self, _):
c.argument('account_name', account_name_type, id_part=None, required=True, help='Name of the CosmosDB database account')
c.argument('table_name', options_list=['--table-name', '-n'], required=True, help='Name of the CosmosDB Table name')
c.argument('location', options_list=['--location', '-l'], help="Location of the account", required=True)

with self.argument_context('cosmosdb dts') as c:
c.argument('account_name', account_name_type, id_part=None, help='Name of the CosmosDB database account.')

job_name_type = CLIArgumentType(options_list=['--job-name', '-n'], help='Name of the Data Transfer Job. A random job name will be generated if not passed.')
with self.argument_context('cosmosdb dts copy') as c:
c.argument('job_name', job_name_type)
c.argument('source_cassandra_table', nargs='+', action=AddCassandraTableAction, help='Source cassandra table')
c.argument('source_sql_container', nargs='+', action=AddSqlContainerAction, help='Source sql container')
c.argument('dest_cassandra_table', nargs='+', action=AddCassandraTableAction, help='Destination cassandra table')
c.argument('dest_sql_container', nargs='+', action=AddSqlContainerAction, help='Destination sql container')
c.argument('worker_count', type=int, help='Worker count')

for scope in [
'cosmosdb dts show',
'cosmosdb dts pause',
'cosmosdb dts resume',
'cosmosdb dts cancel']:
with self.argument_context(scope) as c:
c.argument('job_name', options_list=['--job-name', '-n'], help='Name of the Data Transfer Job.')
82 changes: 81 additions & 1 deletion src/cosmosdb-preview/azext_cosmosdb_preview/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

from azext_cosmosdb_preview.vendored_sdks.azure_mgmt_cosmosdb.models import (
DatabaseRestoreResource,
GremlinDatabaseRestoreResource
GremlinDatabaseRestoreResource,
CosmosCassandraDataTransferDataSourceSink,
CosmosSqlDataTransferDataSourceSink
)

logger = get_logger(__name__)
Expand Down Expand Up @@ -93,3 +95,81 @@ def __call__(self, parser, namespace, values, option_string=None):

for item in values:
namespace.tables_to_restore.append(item)


class AddCassandraTableAction(argparse._AppendAction):
def __call__(self, parser, namespace, values, option_string=None):
if not values:
# pylint: disable=line-too-long
raise CLIError(f'usage error: {option_string} [KEY=VALUE ...]')

keyspace_name = None
table_name = None

for (k, v) in (x.split('=', 1) for x in values):
kl = k.lower()
if kl == 'keyspace':
keyspace_name = v

elif kl == 'table':
table_name = v

else:
raise CLIError(
f'Unsupported Key {k} is provided for {option_string} component. All'
' possible keys are: keyspace, table'
)

if keyspace_name is None:
raise CLIError(f'usage error: missing key keyspace in {option_string} component')

if table_name is None:
raise CLIError(f'usage error: missing key table in {option_string} component')

cassandra_table = CosmosCassandraDataTransferDataSourceSink(keyspace_name=keyspace_name, table_name=table_name)

if option_string == "--source-cassandra-table":
namespace.source_cassandra_table = cassandra_table
elif option_string == "--dest-cassandra-table":
namespace.dest_cassandra_table = cassandra_table
else:
namespace.cassandra_table = cassandra_table


class AddSqlContainerAction(argparse._AppendAction):
def __call__(self, parser, namespace, values, option_string=None):
if not values:
# pylint: disable=line-too-long
raise CLIError(f'usage error: {option_string} [KEY=VALUE ...]')

database_name = None
container_name = None

for (k, v) in (x.split('=', 1) for x in values):
kl = k.lower()
if kl == 'database':
database_name = v

elif kl == 'container':
container_name = v

else:
raise CLIError(
f'Unsupported Key {k} is provided for {option_string} component. All'
' possible keys are: database, container'
)

if database_name is None:
raise CLIError(f'usage error: missing key database in {option_string} component')

if container_name is None:
raise CLIError(f'usage error: missing key container in {option_string} component')

sql_container = CosmosSqlDataTransferDataSourceSink(database_name=database_name, container_name=container_name)

if option_string == "--source-sql-container":
namespace.source_sql_container = sql_container
elif option_string == "--dest-sql-container":
namespace.dest_sql_container = sql_container
else:
namespace.sql_container = sql_container
17 changes: 16 additions & 1 deletion src/cosmosdb-preview/azext_cosmosdb_preview/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
cf_restorable_gremlin_graphs,
cf_restorable_gremlin_resources,
cf_restorable_tables,
cf_restorable_table_resources
cf_restorable_table_resources,
cf_data_transfer_job
)


Expand Down Expand Up @@ -161,3 +162,17 @@ def load_command_table(self, _):

with self.command_group('cosmosdb table', cosmosdb_table_sdk, client_factory=cf_table_resources) as g:
g.custom_command('retrieve-latest-backup-time', 'cli_table_retrieve_latest_backup_time', is_preview=True)

# Data Transfer Service
cosmosdb_data_transfer_job = CliCommandType(
operations_tmpl='azext_cosmosdb_preview.vendored_sdks.azure_mgmt_cosmosdb.operations._data_transfer_jobs_operations#DataTransferJobsOperations.{}',
client_factory=cf_data_transfer_job
)

with self.command_group('cosmosdb dts', cosmosdb_data_transfer_job, client_factory=cf_data_transfer_job, is_preview=True) as g:
g.custom_command('copy', 'cosmosdb_data_transfer_copy_job')
g.command('list', 'list_by_database_account')
g.show_command('show', 'get')
g.command('pause', 'pause')
g.command('resume', 'resume')
g.command('cancel', 'cancel')
50 changes: 50 additions & 0 deletions src/cosmosdb-preview/azext_cosmosdb_preview/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,3 +1038,53 @@ def cli_table_retrieve_latest_backup_time(client,
table_name,
restoreLocation)
return asyc_backupInfo.result()


def cosmosdb_data_transfer_copy_job(client,
resource_group_name,
account_name,
source_cassandra_table=None,
dest_cassandra_table=None,
source_sql_container=None,
dest_sql_container=None,
worker_count=0,
job_name=None):
if source_cassandra_table is None and source_sql_container is None:
raise CLIError('source component ismissing')

if source_cassandra_table is not None and source_sql_container is not None:
raise CLIError('Invalid input: multiple source components')

if dest_cassandra_table is None and dest_sql_container is None:
raise CLIError('destination component is missing')

if dest_cassandra_table is not None and dest_sql_container is not None:
raise CLIError('Invalid input: multiple destination components')

job_create_properties = {}

if source_cassandra_table is not None:
job_create_properties['source'] = source_cassandra_table

if source_sql_container is not None:
job_create_properties['source'] = source_sql_container

if dest_cassandra_table is not None:
job_create_properties['destination'] = dest_cassandra_table

if dest_sql_container is not None:
job_create_properties['destination'] = dest_sql_container

if worker_count > 0:
job_create_properties['worker_count'] = worker_count

job_create_parameters = {}
job_create_parameters['properties'] = job_create_properties

if job_name is None:
job_name = _gen_guid()

return client.create(resource_group_name=resource_group_name,
account_name=account_name,
job_name=job_name,
job_create_parameters=job_create_parameters)
Loading

0 comments on commit c8dedfa

Please sign in to comment.