Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network analytics ingest #7540

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
Review feedback update
  • Loading branch information
sabuqamar-ms committed Apr 22, 2024
commit 7be37678b9ce0788b354e488403f745a51370849
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
# pylint: skip-file
# flake8: noqa

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want linting wherever we can get it to keep with best practice.

import os
import re
import uuid

from azure.cli.command_modules.keyvault._client_factory import data_plane_azure_keyvault_secret_client
from azure.cli.core.aaz import *
from azure.cli.core.azclierror import InvalidArgumentValueError, UnauthorizedError
from azure.core.exceptions import ClientAuthenticationError
from azure.cli.core.profiles import ResourceType
from azure.cli.core.commands.client_factory import get_mgmt_service_client
from datetime import date
from .profiles import DATA_STORAGE_BLOB_CONTAINER

@register_command(
Expand Down Expand Up @@ -69,16 +73,10 @@ def _build_arguments_schema(cls, *args, **kwargs):
help="Data Type.",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be more explanatory.

Suggested change
help="Data Type.",
help="The name of the data type into which you wish to ingest the specified file(s).",

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

required=True,
)
_args_schema.file_path = AAZStrArg(
options=["--file-path"],
_args_schema.source = AAZStrArg(
options=["--srcdir"],
arg_group="Body",
help="File path.",
required=True,
)
_args_schema.principal_id = AAZStrArg(
options=["--principal-id"],
arg_group="Body",
help="Object ID of the AAD principal or security-group.",
help="Source directory path.",
required=True,
)

Expand All @@ -104,8 +102,8 @@ def _output(self, *args, **kwargs):
return result

class DataProductsIngest(object):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class name could be more explanatory.

Suggested change
class DataProductsIngest(object):
class DataProductFileIngestion(object):

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other command classes in this repo suggest that the class name follows the command name. Command group (dataProducts) followed by command name (Ingest)

SECRETS_USER_ROLE_ID = "providers/Microsoft.Authorization/roleDefinitions/4633458b-17de-408a-b874-0445c86b69e6"
DATA_PRODUCT_ARM_ID = "/subscriptions/{}/resourceGroups/{}/providers/Microsoft.NetworkAnalytics/dataProducts/{}"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide names for the variables to be replaced as mentioned in my comment about the key vault URI.

Copy link
Author

@sabuqamar-ms sabuqamar-ms Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Edit: I don't this that syntax is valid in python

API_VERSION = "2023-11-15"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently only have one API version but all resources end up with multiple. Are there examples of supporting multiple API versions for a resource in other CLI commands, and if so, is there a way for us to support that upfront?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I first decided to hard code the API version as all the other commands in our extension have it hard coded, however, I have learnt since that the code is autogenerated and is overwritten if any changes occur. You are right, I have changed it so that it pulls the latest version

KEYVAULT_NAME = "input-storage-sas"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
KEYVAULT_NAME = "input-storage-sas"
INPUT_STORAGE_ACCOUNT_SAS_URL_SECRET_NAME = "input-storage-sas"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

KEYVAULT_URI = "https://aoi-{}-kv.vault.azure.net/"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can define this with a named variable to replace:

Suggested change
KEYVAULT_URI = "https://aoi-{}-kv.vault.azure.net/"
KEYVAULT_URI = "https://aoi-{unique_id}-kv.vault.azure.net/"

This provides context as to what the replaced value should be.

Copy link
Author

@sabuqamar-ms sabuqamar-ms Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Edit: I don't think this is valid syntax in python


Expand All @@ -115,10 +113,8 @@ def __init__(self, ctx):
self.resource_group = ctx.args.resource_group
self.data_product_name = ctx.args.data_product_name
self.data_type = ctx.args.data_type
self.file_path = ctx.args.file_path
self.principal_id = ctx.args.principal_id
self.source = ctx.args.source

self.roles_client = get_mgmt_service_client(self.ctx, ResourceType.MGMT_AUTHORIZATION, subscription_id=self.subscription_id).role_assignments
self.resources_client = get_mgmt_service_client(self.ctx, ResourceType.MGMT_RESOURCE_RESOURCES, subscription_id=self.subscription_id).resources
self.container_client = get_mgmt_service_client(self.ctx, DATA_STORAGE_BLOB_CONTAINER)

Expand All @@ -129,72 +125,47 @@ def __call__(self, *args, **kwargs):

def get_data_product(self):
arm_id = self.DATA_PRODUCT_ARM_ID.format(self.subscription_id, self.resource_group, self.data_product_name)
api_version = self.get_api_version()
resource = self.resources_client.get_by_id(arm_id, api_version)
resource = self.resources_client.get_by_id(arm_id, self.API_VERSION)
return resource

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a DataProductResource class to which we then map the resultant JSON would make this code much easier to manage.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that it makes the code cleaner, but I am not sure if it might add clutter to the overall code. I am open for discussing this idea though. I would like to know your thoughts behind it.


def get_api_version(self):
# TODO: get the value dynamically
return "2023-11-15"

def get_hosted_resources_rg(self, data_product):
return data_product.properties.managedResourceGroupConfiguration.name

def get_keyvault_url(self, data_product):
ingestion_url = data_product.properties.consumptionEndpoints.ingestionUrl
unique_id = re.search("https://aoiingestion(.*)\.blob\.core\.windows\.net", ingestion_url).group(1)
vault_base_url = self.KEYVAULT_URI.format(unique_id)
return vault_base_url

def create_role_assignment(self, hosted_resources_rg):
scope = "/".join(hosted_resources_rg.split('/')[0:5])
scoped_role_id = f'{scope}/{self.SECRETS_USER_ROLE_ID}'
assignment_name = uuid.uuid4()
params_role_assignment = {
'role_definition_id': scoped_role_id,
'principal_id': self.principal_id
}

existing_role_assignment = self.check_if_role_exists(scope)
if existing_role_assignment is None:
self.roles_client.create(scope, assignment_name, params_role_assignment)

def check_if_role_exists(self, scope):
role_definition_id = f'/subscriptions/{self.subscription_id}/{self.SECRETS_USER_ROLE_ID}'
existing_role_assignments = list(self.roles_client.role_assignments.list_for_scope(scope))
existing_role_assignment = next((x for x in existing_role_assignments
if x.principal_id.lower() == self.principal_id.lower() and
x.role_definition_id.lower() == role_definition_id.lower() and
x.scope.lower() == scope.lower()), None)
return existing_role_assignment

def get_key_vault_secret(self, data_product):
hosted_resources_rg = self.get_hosted_resources_rg(data_product)
self.create_role_assignment(hosted_resources_rg)
keyvault_url = self.get_keyvault_url(data_product)
command_args = {'vault_base_url': keyvault_url}

keyvault_client = data_plane_azure_keyvault_secret_client(self.ctx, command_args)
secret = keyvault_client.get_secret(name=self.KEYVAULT_NAME)
try:
secret = keyvault_client.get_secret(name=self.KEYVAULT_NAME)
except ClientAuthenticationError:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key vault exists int he customers subscription, so its entirely possible for them to rename the managed resources resource group, key vault and/or secret. I assume this would break any management of that resource by the data product itself, but it is possible. So we should probably add some error handling that returns a helpful message if the expected resource group, key vault or secret does not exist.

err_msg = f'You do not have permission to access the key vault of data product {self.data_product_name}'
raise UnauthorizedError(err_msg)
return secret.value

def get_storage_url_and_sas_token(self, secret):

def upload_file(self, secret):
storage_container = self.get_storage_container(secret)
file_name = os.path.basename(self.source)
blob_name = "sample_data/{}/{}".format(date.today, file_name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes the user wants to send a file to a directory called sample_data/{todays-date}/{name-of-file-on-host}. Enzyme currently requires a minimum of two directory components in a file path in the input storage account but the digestion definition allows for digestion of specific files based on the names of the directories in their blob path. So I dont think we should decide the directory structure upfront because it is very unlikely to match the users digestion config. I think we should support --source and --dest which provides the path to a single file and the path the blob inside the given data type storage container.

If we want support for ingesting multiple files then maybe we need two commands ingest file and ingest files with differing arguments. Lighthouse team would be useful in determining this behaviour.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per our design meeting, I have included a new flag --destination and we currently ingesting only support one file


try:
data = open(self.source, "rb")
storage_container.upload_blob(name=blob_name, data=data, overwrite=True)
except:
err_msg = "The source directory provided is invalid or cannot be accessed."

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Invalid" implies that a directory can be valid or invalid but doesnt state what a "valid" directory is.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved this to argument validation and have changed the error message

raise InvalidArgumentValueError(err_msg)

def get_storage_container(self, secret):
result = secret.split("?", 1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The keyvault is within the customers subscription so its possible for them to update the secret with a non SAS value. Not likely but possible. We should check if the secret is a valid SAS url e.g. contains ? or contains more than one ?, and error accordingly. Perhaps there is a way to validate a SAS URL before using it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The container client will throw an exception if the container instance could not be created (whether from invalid container URL or invalid credentials). If you think we should handle that error, we can discuss that. However, I think it is sufficient.

storage_url = result[0]
sas_token = result[1]
return storage_url, sas_token

def upload_file(self, secret):
storage_url, sas_token = self.get_storage_url_and_sas_token(secret)
container_name = self.data_type
container_url = f'{storage_url}/{container_name}'
container_client = container_client.from_container_url(
return self.container_client.from_container_url(
container_url=container_url,
credential=sas_token
)
# TODO: determine naming convention
blob_name = ""
with open(self.file_path, "rb") as data:
container_client.upload_blob(name=blob_name, data=data)

__all__ = ["Ingest"]
Loading