diff --git a/src/aosm/HISTORY.rst b/src/aosm/HISTORY.rst index e733b20bcec..bb9d6dc4b25 100644 --- a/src/aosm/HISTORY.rst +++ b/src/aosm/HISTORY.rst @@ -17,6 +17,7 @@ unreleased * Fix Manifest name for NSDs so it isn't the same as that for NFDs * Add validation of source_registry_id format for CNF configuration * Workaround Oras client bug (#90) on Windows for Artifact upload to ACR +* Take Oras 0.1.18 so above Workaround could be removed 0.2.0 ++++++ diff --git a/src/aosm/azext_aosm/_configuration.py b/src/aosm/azext_aosm/_configuration.py index 0475350a592..221e972423b 100644 --- a/src/aosm/azext_aosm/_configuration.py +++ b/src/aosm/azext_aosm/_configuration.py @@ -1,3 +1,10 @@ +# -------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT +# License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------- +"""Configuration class for input config file parsing,""" +import abc +import logging import json import os import re @@ -6,7 +13,6 @@ from typing import Any, Dict, List, Optional from azure.cli.core.azclierror import InvalidArgumentValueError, ValidationError - from azext_aosm.util.constants import ( CNF, NF_DEFINITION_OUTPUT_BICEP_PREFIX, @@ -17,6 +23,8 @@ SOURCE_ACR_REGEX, ) +logger = logging.getLogger(__name__) + DESCRIPTION_MAP: Dict[str, str] = { "publisher_resource_group_name": ( "Resource group for the Publisher resource. " @@ -118,8 +126,14 @@ class ArtifactConfig: @dataclass -class Configuration: +class Configuration(abc.ABC): config_file: Optional[str] = None + publisher_name: str = DESCRIPTION_MAP["publisher_name"] + publisher_resource_group_name: str = DESCRIPTION_MAP[ + "publisher_resource_group_name" + ] + acr_artifact_store_name: str = DESCRIPTION_MAP["acr_artifact_store_name"] + location: str = DESCRIPTION_MAP["location"] def path_from_cli_dir(self, path: str) -> str: """ @@ -131,6 +145,8 @@ def path_from_cli_dir(self, path: str) -> str: :param path: The path relative to the config file. """ + assert self.config_file + # If no path has been supplied we shouldn't try to update it. if path == "": return "" @@ -139,11 +155,29 @@ def path_from_cli_dir(self, path: str) -> str: if os.path.isabs(path): return path - return os.path.join(os.path.dirname(self.config_file), path) + config_file_dir = Path(self.config_file).parent + + updated_path = str(config_file_dir / path) + + logger.debug("Updated path: %s", updated_path) + + return updated_path + + @property + def output_directory_for_build(self) -> Path: + """Base class method to ensure subclasses implement this function.""" + raise NotImplementedError("Subclass must define property") + + @property + def acr_manifest_name(self) -> str: + """Base class method to ensure subclasses implement this function.""" + raise NotImplementedError("Subclass must define property") @dataclass class NFConfiguration(Configuration): + """Network Function configuration.""" + publisher_name: str = DESCRIPTION_MAP["publisher_name"] publisher_resource_group_name: str = DESCRIPTION_MAP[ "publisher_resource_group_name" @@ -237,10 +271,10 @@ def validate(self): raise ValueError("NSD Version must be set") @property - def build_output_folder_name(self) -> str: + def output_directory_for_build(self) -> Path: """Return the local folder for generating the bicep template to.""" current_working_directory = os.getcwd() - return f"{current_working_directory}/{NSD_OUTPUT_BICEP_PREFIX}" + return Path(f"{current_working_directory}/{NSD_OUTPUT_BICEP_PREFIX}") @property def resource_element_name(self) -> str: @@ -276,7 +310,7 @@ def arm_template(self) -> ArtifactConfig: artifact = ArtifactConfig() artifact.version = self.nsd_version artifact.file_path = os.path.join( - self.build_output_folder_name, NF_DEFINITION_JSON_FILENAME + self.output_directory_for_build, NF_DEFINITION_JSON_FILENAME ) return artifact @@ -389,7 +423,9 @@ def __post_init__(self): """ for package_index, package in enumerate(self.helm_packages): if isinstance(package, dict): - package["path_to_chart"] = self.path_from_cli_dir(package["path_to_chart"]) + package["path_to_chart"] = self.path_from_cli_dir( + package["path_to_chart"] + ) package["path_to_mappings"] = self.path_from_cli_dir( package["path_to_mappings"] ) @@ -397,11 +433,12 @@ def __post_init__(self): @property def output_directory_for_build(self) -> Path: - """Return the directory the build command will writes its output to""" + """Return the directory the build command will writes its output to.""" return Path(f"{NF_DEFINITION_OUTPUT_BICEP_PREFIX}{self.nf_name}") def validate(self): - """Validate the CNF config + """ + Validate the CNF config. :raises ValidationError: If source registry ID doesn't match the regex """ @@ -418,8 +455,8 @@ def validate(self): def get_configuration( - configuration_type: str, config_file: str = None -) -> NFConfiguration or NSConfiguration: + configuration_type: str, config_file: Optional[str] = None +) -> Configuration: """ Return the correct configuration object based on the type. @@ -433,6 +470,8 @@ def get_configuration( else: config_as_dict = {} + config: Configuration + if configuration_type == VNF: config = VNFConfiguration(config_file=config_file, **config_as_dict) elif configuration_type == CNF: diff --git a/src/aosm/azext_aosm/custom.py b/src/aosm/azext_aosm/custom.py index e444909e806..e3e6d225ece 100644 --- a/src/aosm/azext_aosm/custom.py +++ b/src/aosm/azext_aosm/custom.py @@ -20,6 +20,7 @@ from azext_aosm._client_factory import cf_acr_registries, cf_resources from azext_aosm._configuration import ( CNFConfiguration, + Configuration, NFConfiguration, NSConfiguration, VNFConfiguration, @@ -57,6 +58,7 @@ def build_definition( config = _get_config_from_file( config_file=config_file, configuration_type=definition_type ) + assert isinstance(config, NFConfiguration) # Generate the NFD and the artifact manifest. _generate_nfd( @@ -78,9 +80,7 @@ def generate_definition_config(definition_type: str, output_file: str = "input.j _generate_config(configuration_type=definition_type, output_file=output_file) -def _get_config_from_file( - config_file: str, configuration_type: str -) -> NFConfiguration or NSConfiguration: +def _get_config_from_file(config_file: str, configuration_type: str) -> Configuration: """ Read input config file JSON and turn it into a Configuration object. @@ -363,7 +363,7 @@ def publish_design( ) config = _get_config_from_file(config_file=config_file, configuration_type=NSD) - + assert isinstance(config, NSConfiguration) config.validate() deployer = DeployerViaArm(api_clients, config=config) @@ -379,14 +379,14 @@ def publish_design( def _generate_nsd(config: NSConfiguration, api_clients: ApiClients): """Generate a Network Service Design for the given config.""" - if os.path.exists(config.build_output_folder_name): + if os.path.exists(config.output_directory_for_build): carry_on = input( - f"The folder {config.build_output_folder_name} already exists - delete it" + f"The folder {config.output_directory_for_build} already exists - delete it" " and continue? (y/n)" ) if carry_on != "y": raise UnclassifiedUserFault("User aborted! ") - shutil.rmtree(config.build_output_folder_name) + shutil.rmtree(config.output_directory_for_build) nsd_generator = NSDGenerator(api_clients, config) nsd_generator.generate_nsd() diff --git a/src/aosm/azext_aosm/delete/delete.py b/src/aosm/azext_aosm/delete/delete.py index 6894f6f3544..3580a5d7737 100644 --- a/src/aosm/azext_aosm/delete/delete.py +++ b/src/aosm/azext_aosm/delete/delete.py @@ -5,7 +5,12 @@ """Contains class for deploying generated definitions using the Python SDK.""" from knack.log import get_logger -from azext_aosm._configuration import NFConfiguration, NSConfiguration, VNFConfiguration +from azext_aosm._configuration import ( + Configuration, + NFConfiguration, + NSConfiguration, + VNFConfiguration, +) from azext_aosm.util.management_clients import ApiClients from azext_aosm.util.utils import input_ack @@ -16,7 +21,7 @@ class ResourceDeleter: def __init__( self, api_clients: ApiClients, - config: NFConfiguration or NSConfiguration, + config: Configuration, ) -> None: """ Initializes a new instance of the Deployer class. @@ -32,12 +37,12 @@ def __init__( def delete_nfd(self, clean: bool = False): """ - Delete the NFDV and manifests. If they don't exist it still reports them as - deleted. + Delete the NFDV and manifests. If they don't exist it still reports them as deleted. :param clean: Delete the NFDG, artifact stores and publisher too. Defaults to False. Use with care. """ + assert isinstance(self.config, NFConfiguration) if clean: print( @@ -105,6 +110,7 @@ def delete_nsd(self): self.delete_config_group_schema() def delete_nfdv(self): + assert isinstance(self.config, NFConfiguration) message = ( f"Delete NFDV {self.config.version} from group {self.config.nfdg_name} and" f" publisher {self.config.publisher_name}" @@ -199,6 +205,7 @@ def delete_artifact_manifest(self, store_type: str) -> None: def delete_nsdg(self) -> None: """Delete the NSDG.""" + assert isinstance(self.config, NSConfiguration) message = f"Delete NSD Group {self.config.nsdg_name}" logger.debug(message) print(message) @@ -218,6 +225,7 @@ def delete_nsdg(self) -> None: def delete_nfdg(self) -> None: """Delete the NFDG.""" + assert isinstance(self.config, NFConfiguration) message = f"Delete NFD Group {self.config.nfdg_name}" logger.debug(message) print(message) @@ -287,6 +295,7 @@ def delete_publisher(self) -> None: def delete_config_group_schema(self) -> None: """Delete the Configuration Group Schema.""" + assert isinstance(self.config, NSConfiguration) message = f"Delete Configuration Group Schema {self.config.cg_schema_name}" logger.debug(message) print(message) diff --git a/src/aosm/azext_aosm/deploy/artifact.py b/src/aosm/azext_aosm/deploy/artifact.py index 8fc72f4d8ef..25ec4667b41 100644 --- a/src/aosm/azext_aosm/deploy/artifact.py +++ b/src/aosm/azext_aosm/deploy/artifact.py @@ -3,21 +3,17 @@ # pylint: disable=unidiomatic-typecheck """A module to handle interacting with artifacts.""" -import os import subprocess from dataclasses import dataclass from typing import List, Union from azure.cli.core.commands import LongRunningOperation from azure.mgmt.containerregistry import ContainerRegistryManagementClient -from azure.mgmt.containerregistry.models import (ImportImageParameters, - ImportSource) +from azure.mgmt.containerregistry.models import ImportImageParameters, ImportSource from azure.storage.blob import BlobClient, BlobType from knack.log import get_logger from knack.util import CLIError from oras.client import OrasClient -from azure.cli.core.commands import LongRunningOperation -from azure.mgmt.containerregistry import ContainerRegistryManagementClient from azext_aosm._configuration import ArtifactConfig, HelmPackageConfig @@ -33,7 +29,7 @@ class Artifact: artifact_version: str artifact_client: Union[BlobClient, OrasClient] - def upload(self, artifact_config: ArtifactConfig or HelmPackageConfig) -> None: + def upload(self, artifact_config: Union[ArtifactConfig, HelmPackageConfig]) -> None: """ Upload aritfact. @@ -47,6 +43,7 @@ def upload(self, artifact_config: ArtifactConfig or HelmPackageConfig) -> None: else: raise ValueError(f"Unsupported artifact type: {type(artifact_config)}.") else: + assert isinstance(artifact_config, ArtifactConfig) self._upload_to_storage_account(artifact_config) def _upload_arm_to_acr(self, artifact_config: ArtifactConfig) -> None: @@ -58,30 +55,17 @@ def _upload_arm_to_acr(self, artifact_config: ArtifactConfig) -> None: assert type(self.artifact_client) == OrasClient if artifact_config.file_path: - try: - # OrasClient 0.1.17 has a bug - # https://github.com/oras-project/oras-py/issues/90 which means on - # Windows we need a real blank file on disk, without a colon in the - # filepath (so tempfile can't be used and we just put it in the working - # directory), that can act as the manifest config file. So create one - # and then delete it after the upload. - with open("dummyManifestConfig.json", "w", encoding='utf-8') as f: - target = ( - f"{self.artifact_client.remote.hostname.replace('https://', '')}" - f"/{self.artifact_name}:{self.artifact_version}" - ) - logger.debug("Uploading %s to %s", artifact_config.file_path, target) - self.artifact_client.push( - files=[artifact_config.file_path], - target=target, - manifest_config=f.name, - ) - finally: - # Delete the dummy file - try: - os.remove("dummyManifestConfig.json") - except FileNotFoundError: - pass + if not self.artifact_client.remote.hostname: + raise ValueError( + "Cannot upload ARM template as OrasClient has no remote hostname." + " Please check your ACR config." + ) + target = ( + f"{self.artifact_client.remote.hostname.replace('https://', '')}" + f"/{self.artifact_name}:{self.artifact_version}" + ) + logger.debug("Uploading %s to %s", artifact_config.file_path, target) + self.artifact_client.push(files=[artifact_config.file_path], target=target) else: raise NotImplementedError( "Copying artifacts is not implemented for ACR artifacts stores." @@ -93,7 +77,12 @@ def _upload_helm_to_acr(self, artifact_config: HelmPackageConfig) -> None: :param artifact_config: configuration for the artifact being uploaded """ + assert isinstance(self.artifact_client, OrasClient) chart_path = artifact_config.path_to_chart + if not self.artifact_client.remote.hostname: + raise ValueError( + "Cannot upload artifact. Oras client has no remote hostname." + ) registry = self.artifact_client.remote.hostname.replace("https://", "") target_registry = f"oci://{registry}" registry_name = registry.replace(".azurecr.io", "") @@ -137,6 +126,8 @@ def _upload_to_storage_account(self, artifact_config: ArtifactConfig) -> None: self.artifact_client.account_name, ) else: + # Config Validation will raise error if not true + assert artifact_config.blob_sas_url logger.info("Copy from SAS URL to blob store") source_blob = BlobClient.from_blob_url(artifact_config.blob_sas_url) diff --git a/src/aosm/azext_aosm/deploy/artifact_manifest.py b/src/aosm/azext_aosm/deploy/artifact_manifest.py index 1ac303d4590..20cecf5c056 100644 --- a/src/aosm/azext_aosm/deploy/artifact_manifest.py +++ b/src/aosm/azext_aosm/deploy/artifact_manifest.py @@ -10,7 +10,7 @@ from knack.log import get_logger from oras.client import OrasClient -from azext_aosm._configuration import NFConfiguration, NSConfiguration +from azext_aosm._configuration import Configuration from azext_aosm.deploy.artifact import Artifact from azext_aosm.util.management_clients import ApiClients from azext_aosm.vendored_sdks.models import ( @@ -29,7 +29,7 @@ class ArtifactManifestOperator: # pylint: disable=too-few-public-methods def __init__( self, - config: NFConfiguration or NSConfiguration, + config: Configuration, api_clients: ApiClients, store_name: str, manifest_name: str, diff --git a/src/aosm/azext_aosm/deploy/deploy_with_arm.py b/src/aosm/azext_aosm/deploy/deploy_with_arm.py index f66e1e0995a..0d79fafcddd 100644 --- a/src/aosm/azext_aosm/deploy/deploy_with_arm.py +++ b/src/aosm/azext_aosm/deploy/deploy_with_arm.py @@ -16,7 +16,7 @@ from azext_aosm._configuration import ( CNFConfiguration, - NFConfiguration, + Configuration, NSConfiguration, VNFConfiguration, ) @@ -50,11 +50,7 @@ class DeployerViaArm: templates. """ - def __init__( - self, - api_clients: ApiClients, - config: NFConfiguration or NSConfiguration, - ) -> None: + def __init__(self, api_clients: ApiClients, config: Configuration) -> None: """ Initializes a new instance of the Deployer class. @@ -224,7 +220,9 @@ def construct_vnfd_parameters(self) -> Dict[str, Any]: def construct_cnfd_parameters(self) -> Dict[str, Any]: """ - Create the parmeters dictionary for cnfdefinition.bicep. CNF specific. + Create the parmeters dictionary for cnfdefinition.bicep. + + CNF specific. """ assert isinstance(self.config, CNFConfiguration) return { @@ -297,7 +295,7 @@ def deploy_cnfd_from_bicep( # User has not passed in a bicep template, so we are deploying the # default one produced from building the NFDV using this CLI bicep_path = os.path.join( - self.config.build_output_folder_name, + self.config.output_directory_for_build, CNF_DEFINITION_BICEP_TEMPLATE_FILENAME, ) @@ -348,6 +346,12 @@ def deploy_cnfd_from_bicep( publisher_name=self.config.publisher_name, artifact_store_name=self.config.acr_artifact_store_name, ) + if not acr_properties.storage_resource_id: + raise ValueError( + f"Artifact store {self.config.acr_artifact_store_name} " + "has no storage resource id linked" + ) + target_registry_name = acr_properties.storage_resource_id.split("/")[-1] target_registry_resource_group_name = acr_properties.storage_resource_id.split( "/" @@ -434,7 +438,7 @@ def deploy_nsd_from_bicep( # User has not passed in a bicep template, so we are deploying the default # one produced from building the NSDV using this CLI bicep_path = os.path.join( - self.config.build_output_folder_name, + self.config.output_directory_for_build, NSD_BICEP_FILENAME, ) @@ -490,10 +494,14 @@ def deploy_nsd_from_bicep( # Convert the NF bicep to ARM arm_template_artifact_json = self.convert_bicep_to_arm( os.path.join( - self.config.build_output_folder_name, NF_DEFINITION_BICEP_FILENAME + self.config.output_directory_for_build, NF_DEFINITION_BICEP_FILENAME ) ) + # appease mypy + assert ( + self.config.arm_template.file_path + ), "Config missing ARM template file path" with open(self.config.arm_template.file_path, "w", encoding="utf-8") as file: file.write(json.dumps(arm_template_artifact_json, indent=4)) @@ -515,6 +523,7 @@ def deploy_manifest_template( logger.debug("Deploy manifest bicep") if not manifest_bicep_path: + file_name: str = "" if configuration_type == NSD: file_name = NSD_ARTIFACT_MANIFEST_BICEP_FILENAME elif configuration_type == VNF: @@ -523,7 +532,7 @@ def deploy_manifest_template( file_name = CNF_MANIFEST_BICEP_TEMPLATE_FILENAME manifest_bicep_path = os.path.join( - self.config.build_output_folder_name, + str(self.config.output_directory_for_build), file_name, ) if not manifest_parameters_json_file: diff --git a/src/aosm/azext_aosm/deploy/pre_deploy.py b/src/aosm/azext_aosm/deploy/pre_deploy.py index 8c314a2872c..ea5147fc44d 100644 --- a/src/aosm/azext_aosm/deploy/pre_deploy.py +++ b/src/aosm/azext_aosm/deploy/pre_deploy.py @@ -11,7 +11,7 @@ from knack.log import get_logger from azext_aosm._configuration import ( - NFConfiguration, + Configuration, NSConfiguration, VNFConfiguration, CNFConfiguration, @@ -40,7 +40,7 @@ class PreDeployerViaSDK: def __init__( self, api_clients: ApiClients, - config: NFConfiguration or NSConfiguration, + config: Configuration, ) -> None: """ Initializes a new instance of the Deployer class. @@ -143,7 +143,7 @@ def ensure_config_publisher_exists(self) -> None: def ensure_config_source_registry_exists(self) -> None: """ - Ensures that the source registry exists + Ensures that the source registry exists. Finds the parameters from self.config """ @@ -166,6 +166,7 @@ def ensure_config_source_registry_exists(self) -> None: source_registry_name = source_registry_match.group("registry_name") # This will raise an error if the registry does not exist + assert self.api_clients.container_registry_client self.api_clients.container_registry_client.get( resource_group_name=source_registry_resource_group_name, registry_name=source_registry_name, @@ -250,7 +251,7 @@ def ensure_acr_artifact_store_exists(self) -> None: self.config.publisher_resource_group_name, self.config.publisher_name, self.config.acr_artifact_store_name, - ArtifactStoreType.AZURE_CONTAINER_REGISTRY, + ArtifactStoreType.AZURE_CONTAINER_REGISTRY, # type: ignore self.config.location, ) @@ -271,7 +272,7 @@ def ensure_sa_artifact_store_exists(self) -> None: self.config.publisher_resource_group_name, self.config.publisher_name, self.config.blob_artifact_store_name, - ArtifactStoreType.AZURE_STORAGE_ACCOUNT, + ArtifactStoreType.AZURE_STORAGE_ACCOUNT, # type: ignore self.config.location, ) @@ -385,7 +386,7 @@ def does_artifact_manifest_exist( def do_config_artifact_manifests_exist( self, - ): + ) -> bool: """Returns True if all required manifests exist, False otherwise.""" acr_manny_exists: bool = self.does_artifact_manifest_exist( rg_name=self.config.publisher_resource_group_name, diff --git a/src/aosm/azext_aosm/generate_nfd/cnf_nfd_generator.py b/src/aosm/azext_aosm/generate_nfd/cnf_nfd_generator.py index 748cb14eccf..dbf2bc7a314 100644 --- a/src/aosm/azext_aosm/generate_nfd/cnf_nfd_generator.py +++ b/src/aosm/azext_aosm/generate_nfd/cnf_nfd_generator.py @@ -8,6 +8,7 @@ import shutil import tarfile import tempfile +from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterator, List, Optional, Tuple @@ -40,6 +41,34 @@ logger = get_logger(__name__) +@dataclass +class Artifact: + """ + Information about an artifact. + """ + + name: str + version: str + + +@dataclass +class NFApplicationConfiguration: + name: str + chartName: str + chartVersion: str + dependsOnProfile: List[str] + registryValuesPaths: List[str] + imagePullSecretsValuesPaths: List[str] + valueMappingsPath: Path + + +@dataclass +class ImageInfo: + parameter: List[str] + name: str + version: str + + class CnfNfdGenerator(NFDGenerator): # pylint: disable=too-many-instance-attributes """ CNF NFD Generator. @@ -76,9 +105,9 @@ def __init__(self, config: CNFConfiguration, interactive: bool = False): ) self._tmp_dir: Optional[Path] = None - self.artifacts = [] - self.nf_application_configurations = [] - self.deployment_parameter_schema = SCHEMA_PREFIX + self.artifacts: List[Artifact] = [] + self.nf_application_configurations: List[NFApplicationConfiguration] = [] + self.deployment_parameter_schema: Dict[str, Any] = SCHEMA_PREFIX self.interactive = interactive def generate_nfd(self) -> None: @@ -107,13 +136,14 @@ def generate_nfd(self) -> None: # Get all image line matches for files in the chart. # Do this here so we don't have to do it multiple times. - image_line_matches = self._find_pattern_matches_in_chart( - helm_package, IMAGE_START_STRING + image_line_matches = self._find_image_parameter_from_chart( + helm_package ) + # Creates a flattened list of image registry paths to prevent set error - image_registry_paths = [] - for registry_path in image_line_matches: - image_registry_paths += registry_path[0] + image_registry_paths: List[str] = [] + for image_info in image_line_matches: + image_registry_paths += image_info.parameter # Generate the NF application configuration for the chart # passed to jinja2 renderer to render bicep template @@ -121,8 +151,8 @@ def generate_nfd(self) -> None: self._generate_nf_application_config( helm_package, image_registry_paths, - self._find_pattern_matches_in_chart( - helm_package, IMAGE_PULL_SECRETS_START_STRING + self._find_image_pull_secrets_parameter_from_chart( + helm_package ), ) ) @@ -163,6 +193,7 @@ def _extract_chart(self, path: Path) -> None: :param path: The path to helm package """ + assert self._tmp_dir logger.debug("Extracting helm package %s", path) @@ -188,6 +219,7 @@ def _generate_chart_value_mappings(self, helm_package: HelmPackageConfig) -> Non Expected use when a helm chart is very simple and user wants every value to be a deployment parameter. """ + assert self._tmp_dir logger.debug( "Creating chart value mappings file for %s", helm_package.path_to_chart ) @@ -210,7 +242,7 @@ def _generate_chart_value_mappings(self, helm_package: HelmPackageConfig) -> Non yaml.dump(mapping_to_write, mapping_file) # Update the config that points to the mapping file - helm_package.path_to_mappings = mapping_filepath + helm_package.path_to_mappings = str(mapping_filepath) def _read_top_level_values_yaml( self, helm_package: HelmPackageConfig @@ -224,6 +256,7 @@ def _read_top_level_values_yaml( :return: A dictionary of the yaml read from the file :rtype: Dict[str, Any] """ + assert self._tmp_dir for file in Path(self._tmp_dir / helm_package.name).iterdir(): if file.name in ("values.yaml", "values.yml"): with file.open(encoding="UTF-8") as values_file: @@ -236,6 +269,8 @@ def _read_top_level_values_yaml( def _write_manifest_bicep_file(self) -> None: """Write the bicep file for the Artifact Manifest to the temp directory.""" + assert self._tmp_dir + with open(self.manifest_jinja2_template_path, "r", encoding="UTF-8") as f: template: Template = Template( f.read(), @@ -254,6 +289,7 @@ def _write_manifest_bicep_file(self) -> None: def _write_nfd_bicep_file(self) -> None: """Write the bicep file for the NFD to the temp directory.""" + assert self._tmp_dir with open(self.nfd_jinja2_template_path, "r", encoding="UTF-8") as f: template: Template = Template( f.read(), @@ -273,8 +309,8 @@ def _write_nfd_bicep_file(self) -> None: def _write_schema_to_file(self) -> None: """Write the schema to file deploymentParameters.json to the temp directory.""" - logger.debug("Create deploymentParameters.json") + assert self._tmp_dir full_schema = self._tmp_dir / DEPLOYMENT_PARAMETERS_FILENAME with open(full_schema, "w", encoding="UTF-8") as f: @@ -283,7 +319,13 @@ def _write_schema_to_file(self) -> None: logger.debug("%s created", full_schema) def _copy_to_output_directory(self) -> None: - """Copy the config mappings, schema and bicep templates (artifact manifest and NFDV) from the temp directory to the output directory.""" + """ + Copy files from the temp directory to the output directory. + + Files are the config mappings, schema and bicep templates (artifact manifest + and NFDV). + """ + assert self._tmp_dir logger.info("Create NFD bicep %s", self.output_directory) @@ -327,26 +369,26 @@ def _generate_nf_application_config( self, helm_package: HelmPackageConfig, image_registry_path: List[str], - image_pull_secret_line_matches: List[Tuple[str, ...]], - ) -> Dict[str, Any]: + image_pull_secret_line_matches: List[str], + ) -> NFApplicationConfiguration: """Generate NF application config.""" (name, version) = self._get_chart_name_and_version(helm_package) registry_values_paths = set(image_registry_path) image_pull_secrets_values_paths = set(image_pull_secret_line_matches) - return { - "name": helm_package.name, - "chartName": name, - "chartVersion": version, - "dependsOnProfile": helm_package.depends_on, - "registryValuesPaths": list(registry_values_paths), - "imagePullSecretsValuesPaths": list(image_pull_secrets_values_paths), - "valueMappingsPath": self._jsonify_value_mappings(helm_package), - } + return NFApplicationConfiguration( + name=helm_package.name, + chartName=name, + chartVersion=version, + dependsOnProfile=helm_package.depends_on, + registryValuesPaths=list(registry_values_paths), + imagePullSecretsValuesPaths=list(image_pull_secrets_values_paths), + valueMappingsPath=self._jsonify_value_mappings(helm_package), + ) @staticmethod - def _find_yaml_files(directory: Path) -> Iterator[str]: + def _find_yaml_files(directory: Path) -> Iterator[Path]: """ Find all yaml files recursively in given directory. @@ -355,67 +397,90 @@ def _find_yaml_files(directory: Path) -> Iterator[str]: yield from directory.glob("**/*.yaml") yield from directory.glob("**/*.yml") - def _find_pattern_matches_in_chart( - self, helm_package: HelmPackageConfig, start_string: str - ) -> List[Tuple[str, ...]]: + def _find_image_parameter_from_chart( + self, helm_package_config: HelmPackageConfig + ) -> List[ImageInfo]: """ - Find pattern matches in Helm chart, using provided REGEX pattern. + Find pattern matches in Helm chart for the names of the image parameters. :param helm_package: The helm package config. - :param start_string: The string to search for, either imagePullSecrets: or image: - - If searching for imagePullSecrets, returns list of lists containing image pull - secrets paths, e.g. Values.foo.bar.imagePullSecret - If searching for image, returns list of tuples containing the list of image + Returns list of tuples containing the list of image paths and the name and version of the image. e.g. (Values.foo.bar.repoPath, foo, 1.2.3) """ - chart_dir = self._tmp_dir / helm_package.name + assert self._tmp_dir + chart_dir = self._tmp_dir / helm_package_config.name matches = [] path = [] for file in self._find_yaml_files(chart_dir): with open(file, "r", encoding="UTF-8") as f: - logger.debug("Searching for %s in %s", start_string, file) + logger.debug("Searching for %s in %s", IMAGE_START_STRING, file) for line in f: - if start_string in line: - logger.debug("Found %s in %s", start_string, line) + if IMAGE_START_STRING in line: + logger.debug("Found %s in %s", IMAGE_START_STRING, line) path = re.findall(IMAGE_PATH_REGEX, line) + # If "image:", search for chart name and version - if start_string == IMAGE_START_STRING: - name_and_version = re.search( - IMAGE_NAME_AND_VERSION_REGEX, line - ) + name_and_version = re.search(IMAGE_NAME_AND_VERSION_REGEX, line) + logger.debug( + "Regex match for name and version is %s", + name_and_version, + ) + + if name_and_version and len(name_and_version.groups()) == 2: logger.debug( - "Regex match for name and version is %s", - name_and_version, + "Found image name and version %s %s", + name_and_version.group("name"), + name_and_version.group("version"), ) - - if name_and_version and len(name_and_version.groups()) == 2: - logger.debug( - "Found image name and version %s %s", + matches.append( + ImageInfo( + path, name_and_version.group("name"), name_and_version.group("version"), ) - matches.append( - ( - path, - name_and_version.group("name"), - name_and_version.group("version"), - ) - ) - else: - logger.debug("No image name and version found") + ) else: - matches += path + logger.debug("No image name and version found") + return matches + + def _find_image_pull_secrets_parameter_from_chart( + self, helm_package_config: HelmPackageConfig + ) -> List[str]: + """ + Find pattern matches in Helm chart for the ImagePullSecrets parameter. + + :param helm_package: The helm package config. + + Returns list of lists containing image pull + secrets paths, e.g. Values.foo.bar.imagePullSecret + """ + assert self._tmp_dir + chart_dir = self._tmp_dir / helm_package_config.name + matches = [] + path = [] + + for file in self._find_yaml_files(chart_dir): + with open(file, "r", encoding="UTF-8") as f: + logger.debug( + "Searching for %s in %s", IMAGE_PULL_SECRETS_START_STRING, file + ) + for line in f: + if IMAGE_PULL_SECRETS_START_STRING in line: + logger.debug( + "Found %s in %s", IMAGE_PULL_SECRETS_START_STRING, line + ) + path = re.findall(IMAGE_PATH_REGEX, line) + matches += path return matches def _get_artifact_list( self, helm_package: HelmPackageConfig, - image_line_matches: List[Tuple[str, ...]], - ) -> List[Any]: + image_line_matches: List[ImageInfo], + ) -> List[Artifact]: """ Get the list of artifacts for the chart. @@ -423,19 +488,12 @@ def _get_artifact_list( :param image_line_matches: The list of image line matches. """ artifact_list = [] - (chart_name, chart_version) = self._get_chart_name_and_version(helm_package) - helm_artifact = { - "name": chart_name, - "version": chart_version, - } + (name, version) = self._get_chart_name_and_version(helm_package) + helm_artifact = Artifact(name, version) + artifact_list.append(helm_artifact) - for match in image_line_matches: - artifact_list.append( - { - "name": match[1], - "version": match[2], - } - ) + for image_info in image_line_matches: + artifact_list.append(Artifact(image_info.name, image_info.version)) return artifact_list @@ -448,7 +506,7 @@ def _get_chart_mapping_schema( param helm_package: The helm package config. """ - + assert self._tmp_dir logger.debug("Get chart mapping schema for %s", helm_package.name) mappings_path = helm_package.path_to_mappings @@ -499,34 +557,54 @@ def traverse_dict( :param d: The dictionary to traverse. :param target: The regex to search for. """ + # pylint: disable=too-many-nested-blocks + @dataclass + class DictNode: + # The dictionary under this node + sub_dict: Dict[Any, Any] + + # The path to this node under the main dictionary + position_path: List[str] + # Initialize the stack with the dictionary and an empty path - stack = [(dict_to_search, [])] + stack: List[DictNode] = [DictNode(dict_to_search, [])] result = {} # Initialize empty dictionary to store the results while stack: # While there are still items in the stack # Pop the last item from the stack and unpack it into node (the dictionary) and path - (node, path) = stack.pop() + node = stack.pop() + # For each key-value pair in the popped item - for k, v in node.items(): + for key, value in node.sub_dict.items(): + # If the value is a dictionary - if isinstance(v, dict): + if isinstance(value, dict): # Add the dictionary to the stack with the path - stack.append((v, path + [k])) + stack.append(DictNode(value, node.position_path + [key])) + # If the value is a string + matches target regex - elif isinstance(v, str) and re.search(target_regex, v): + elif isinstance(value, str): # Take the match i.e, foo from {deployParameter.foo} - match = re.search(target_regex, v) + match = re.search(target_regex, value) + # Add it to the result dictionary with its path as the value - result[match.group(1)] = path + [k] - elif isinstance(v, list): - logger.debug("Found a list %s", v) - for i in v: - logger.debug("Found an item %s", i) - if isinstance(i, str) and re.search(target_regex, i): - match = re.search(target_regex, i) - result[match.group(1)] = path + [k] - elif isinstance(i, dict): - stack.append((i, path + [k])) - elif isinstance(i, list): + if match: + result[match.group(1)] = node.position_path + [key] + + elif isinstance(value, list): + logger.debug("Found a list %s", value) + for item in value: + logger.debug("Found an item %s", item) + + if isinstance(item, str): + match = re.search(target_regex, item) + + if match: + result[match.group(1)] = node.position_path + [key] + + elif isinstance(item, dict): + stack.append(DictNode(item, node.position_path + [key])) + + elif isinstance(item, list): # We should fix this but for now just log a warning and # carry on logger.warning( @@ -534,7 +612,7 @@ def traverse_dict( "at path %s, which this tool cannot parse. " "Please check the output configMappings and schemas " "files and check that they are as required.", - path + [k], + node.position_path + [key], ) return result @@ -669,6 +747,7 @@ def _get_chart_name_and_version( self, helm_package: HelmPackageConfig ) -> Tuple[str, str]: """Get the name and version of the chart.""" + assert self._tmp_dir chart_path = self._tmp_dir / helm_package.name / "Chart.yaml" if not chart_path.exists(): @@ -693,6 +772,7 @@ def _get_chart_name_and_version( def _jsonify_value_mappings(self, helm_package: HelmPackageConfig) -> Path: """Yaml->JSON values mapping file, then return path to it.""" + assert self._tmp_dir mappings_yaml_file = helm_package.path_to_mappings mappings_dir = self._tmp_dir / CONFIG_MAPPINGS_DIR_NAME mappings_output_file = mappings_dir / f"{helm_package.name}-mappings.json" diff --git a/src/aosm/azext_aosm/generate_nfd/vnf_nfd_generator.py b/src/aosm/azext_aosm/generate_nfd/vnf_nfd_generator.py index 98dd6a587e0..3aaa6ca25d6 100644 --- a/src/aosm/azext_aosm/generate_nfd/vnf_nfd_generator.py +++ b/src/aosm/azext_aosm/generate_nfd/vnf_nfd_generator.py @@ -103,7 +103,7 @@ def nfd_bicep_path(self) -> Optional[Path]: return None @property - def manifest_bicep_path(self) -> Optional[str]: + def manifest_bicep_path(self) -> Optional[Path]: """Returns the path to the bicep file for the NFD if it has been created.""" if self._manifest_bicep_path.exists(): return self._manifest_bicep_path @@ -149,6 +149,7 @@ def vm_parameters_ordered(self) -> Dict[str, Any]: def _create_parameter_files(self) -> None: """Create the deployment, template and VHD parameter files.""" + assert self._tmp_dir tmp_schemas_directory: Path = self._tmp_dir / SCHEMAS_DIR_NAME tmp_schemas_directory.mkdir() self.write_deployment_parameters(tmp_schemas_directory) @@ -298,6 +299,7 @@ def write_vhd_parameters(self, directory: Path) -> None: def _copy_to_output_directory(self) -> None: """Copy the static bicep templates and generated config mappings and schema into the build output directory.""" logger.info("Create NFD bicep %s", self.output_directory) + assert self._tmp_dir Path(self.output_directory).mkdir(exist_ok=True) static_bicep_templates_dir = Path(__file__).parent / "templates" diff --git a/src/aosm/azext_aosm/generate_nsd/nsd_generator.py b/src/aosm/azext_aosm/generate_nsd/nsd_generator.py index bf1bdc1b1f8..18c81f3731d 100644 --- a/src/aosm/azext_aosm/generate_nsd/nsd_generator.py +++ b/src/aosm/azext_aosm/generate_nsd/nsd_generator.py @@ -62,8 +62,10 @@ def __init__(self, api_clients: ApiClients, config: NSConfiguration): self.nsd_bicep_template_name = NSD_DEFINITION_JINJA2_SOURCE_TEMPLATE self.nf_bicep_template_name = NF_TEMPLATE_JINJA2_SOURCE_TEMPLATE self.nsd_bicep_output_name = NSD_BICEP_FILENAME - self.nfdv_parameter_name = f"{self.config.network_function_definition_group_name.replace('-', '_')}_nfd_version" - self.build_folder_name = self.config.build_output_folder_name + self.nfdv_parameter_name = ( + f"{self.config.network_function_definition_group_name.replace('-', '_')}" + "_nfd_version" + ) nfdv = self._get_nfdv(config, api_clients) print("Finding the deploy parameters of the NFDV resource") if not nfdv.deploy_parameters: @@ -74,6 +76,7 @@ def __init__(self, api_clients: ApiClients, config: NSConfiguration): nfdv.deploy_parameters ) + # pylint: disable=no-self-use def _get_nfdv( self, config: NSConfiguration, api_clients ) -> NetworkFunctionDefinitionVersion: @@ -97,7 +100,7 @@ def generate_nsd(self) -> None: # Create temporary folder. with tempfile.TemporaryDirectory() as tmpdirname: - self.tmp_folder_name = tmpdirname + self.tmp_folder_name = tmpdirname # pylint: disable=attribute-defined-outside-init self.create_config_group_schema_files() self.write_nsd_manifest() @@ -105,7 +108,10 @@ def generate_nsd(self) -> None: self.write_nsd_bicep() self.copy_to_output_folder() - print(f"Generated NSD bicep templates created in {self.build_folder_name}") + print( + "Generated NSD bicep templates created in" + f" {self.config.output_directory_for_build}" + ) print( "Please review these templates. When you are happy with them run " "`az aosm nsd publish` with the same arguments." @@ -115,9 +121,9 @@ def generate_nsd(self) -> None: def config_group_schema_dict(self) -> Dict[str, Any]: """ :return: The Config Group Schema as a dictionary. + + This function cannot be called before deployment parameters have been supplied. """ - # This function cannot be called before deployment parameters have been - # supplied. assert self.deploy_parameters # Take a copy of the deploy parameters. @@ -190,7 +196,7 @@ def write_schema(self, folder_path: str) -> None: schema_path = os.path.join(folder_path, f"{self.config.cg_schema_name}.json") - with open(schema_path, "w") as _file: + with open(schema_path, "w", encoding="utf-8") as _file: _file.write(json.dumps(self.config_group_schema_dict, indent=4)) logger.debug("%s created", schema_path) @@ -211,7 +217,7 @@ def write_config_mappings(self, folder_path: str) -> None: config_mappings_path = os.path.join(folder_path, NSD_CONFIG_MAPPING_FILENAME) - with open(config_mappings_path, "w") as _file: + with open(config_mappings_path, "w", encoding="utf-8") as _file: _file.write(json.dumps(config_mappings, indent=4)) logger.debug("%s created", config_mappings_path) @@ -243,6 +249,7 @@ def write_nf_bicep(self) -> None: bicep_params += f"param {key} {bicep_type}\n" bicep_deploymentValues += f"{key}: {key}\n " + # pylint: disable=no-member self.generate_bicep( self.nf_bicep_template_name, NF_DEFINITION_BICEP_FILENAME, @@ -265,11 +272,11 @@ def write_nf_bicep(self) -> None: # NF, as we do for deployParameters, but the SDK currently doesn't # support this and needs to be rebuilt to do so. "nfvi_type": ( - NFVIType.AZURE_CORE.value + NFVIType.AZURE_CORE.value # type: ignore[attr-defined] if self.config.network_function_type == VNF - else NFVIType.AZURE_ARC_KUBERNETES.value + else NFVIType.AZURE_ARC_KUBERNETES.value # type: ignore[attr-defined] ), - "CNF": True if self.config.network_function_type == CNF else False, + "CNF": self.config.network_function_type == CNF, }, ) @@ -298,10 +305,9 @@ def write_nsd_manifest(self) -> None: {}, ) - def generate_bicep(self, - template_name: str, - output_file_name: str, - params: Dict[Any, Any]) -> None: + def generate_bicep( + self, template_name: str, output_file_name: str, params: Dict[Any, Any] + ) -> None: """ Render the bicep templates with the correct parameters and copy them into the build output folder. @@ -314,7 +320,7 @@ def generate_bicep(self, bicep_template_path = os.path.join(code_dir, TEMPLATES_DIR_NAME, template_name) - with open(bicep_template_path, "r") as file: + with open(bicep_template_path, "r", encoding="utf-8") as file: bicep_contents = file.read() bicep_template = Template(bicep_contents) @@ -324,19 +330,19 @@ def generate_bicep(self, bicep_file_build_path = os.path.join(self.tmp_folder_name, output_file_name) - with open(bicep_file_build_path, "w") as file: + with open(bicep_file_build_path, "w", encoding="utf-8") as file: file.write(rendered_template) def copy_to_output_folder(self) -> None: """Copy the bicep templates, config mappings and schema into the build output folder.""" - logger.info("Create NSD bicep %s", self.build_folder_name) - os.mkdir(self.build_folder_name) + logger.info("Create NSD bicep %s", self.config.output_directory_for_build) + os.mkdir(self.config.output_directory_for_build) shutil.copytree( self.tmp_folder_name, - self.build_folder_name, + self.config.output_directory_for_build, dirs_exist_ok=True, ) - logger.info("Copied files to %s", self.build_folder_name) + logger.info("Copied files to %s", self.config.output_directory_for_build) diff --git a/src/aosm/pyproject.toml b/src/aosm/pyproject.toml new file mode 100644 index 00000000000..e4bbc5837c4 --- /dev/null +++ b/src/aosm/pyproject.toml @@ -0,0 +1,7 @@ +[tool.mypy] +ignore_missing_imports = true +no_namespace_packages = true + +[[tool.mypy.overrides]] +module = ["azext_aosm.vendored_sdks.*"] +ignore_errors = true diff --git a/src/aosm/setup.py b/src/aosm/setup.py index 807c81eb801..c3af671b370 100644 --- a/src/aosm/setup.py +++ b/src/aosm/setup.py @@ -34,7 +34,7 @@ ] # TODO: Add any additional SDK dependencies here -DEPENDENCIES = ["oras~=0.1.17", "azure-storage-blob>=12.15.0", "jinja2>=3.1.2"] +DEPENDENCIES = ["oras~=0.1.18", "azure-storage-blob>=12.15.0", "jinja2>=3.1.2"] with open("README.md", "r", encoding="utf-8") as f: README = f.read()