From c0b517d5c2c14b98997b0026b0eef968c0583a86 Mon Sep 17 00:00:00 2001 From: Alessandro De Maria Date: Mon, 14 Aug 2023 22:14:59 +0000 Subject: [PATCH] Add Kubernetes generators --- argocd/__init__.py | 349 ------- argocd/k8s.py | 33 - ingresses/README.md | 242 ----- kubernetes/__init__.py | 1179 +---------------------- kubernetes/argocd.py | 103 ++ kubernetes/autoscaling.py | 69 ++ kubernetes/base.py | 51 + kubernetes/certmanager.py | 38 + kubernetes/common.py | 34 +- kubernetes/gke.py | 13 + kubernetes/helm.py | 36 + kubernetes/istio.py | 18 + kubernetes/k8s.py | 32 - kubernetes/networking.py | 269 +++++- kubernetes/prometheus.py | 54 ++ kubernetes/rbac.py | 84 +- kubernetes/storage.py | 82 +- kubernetes/workloads.py | 702 ++++++++++++++ lib/{generators => kgenlib}/__init__.py | 150 ++- rabbitmq/__init__.py | 866 ----------------- rabbitmq/k8s.py | 32 - terraform/__init__.py | 22 - 22 files changed, 1624 insertions(+), 2834 deletions(-) delete mode 100644 argocd/__init__.py delete mode 100644 argocd/k8s.py delete mode 100644 ingresses/README.md create mode 100644 kubernetes/argocd.py create mode 100644 kubernetes/autoscaling.py create mode 100644 kubernetes/base.py create mode 100644 kubernetes/certmanager.py create mode 100644 kubernetes/gke.py create mode 100644 kubernetes/helm.py create mode 100644 kubernetes/istio.py delete mode 100644 kubernetes/k8s.py create mode 100644 kubernetes/prometheus.py create mode 100644 kubernetes/workloads.py rename lib/{generators => kgenlib}/__init__.py (63%) delete mode 100644 rabbitmq/__init__.py delete mode 100644 rabbitmq/k8s.py delete mode 100644 terraform/__init__.py diff --git a/argocd/__init__.py b/argocd/__init__.py deleted file mode 100644 index 10abee2..0000000 --- a/argocd/__init__.py +++ /dev/null @@ -1,349 +0,0 @@ -import base64 -import hashlib -import os - -from kapitan.cached import args -from kapitan.inputs.kadet import BaseObj, inventory -from kapitan.utils import render_jinja2_file - -search_paths = args.get("search_paths") - -from . import k8s - - -def j2(filename, ctx): - return render_jinja2_file(filename, ctx, search_paths=search_paths) - - -inv = inventory(lazy=True) - - -def merge(source, destination): - for key, value in source.items(): - if isinstance(value, dict): - node = destination.setdefault(key, value) - if node is None: - destination[key] = value - else: - merge(value, node) - else: - destination[key] = destination.setdefault(key, value) - - return destination - - -class ArgoCDAppProject(k8s.Base): - def new(self): - self.need("name") - self.kwargs.apiVersion = "argoproj.io/v1alpha1" - self.kwargs.kind = "AppProject" - - # Add a this finalizer ONLY if you want these to cascade delete - self.kwargs.finalizers = list("resources-finalizer.argocd.argoproj.io") - super().new() - - def body(self): - super().body() - - # You'll usually want to add your resources to the argocd namespace. - self.add_namespace(inv.parameters.argocd_namespace) - - argocd_project = self.kwargs.argocd_project - - self.add_annotations(argocd_project.get("annotations", {})) - self.add_labels(argocd_project.get("labels", {})) - - # Allow manifests to deploy from any Git repos - if argocd_project.source_repos: - self.root.spec.sourceRepos = argocd_project.source_repos - - # Only permit applications to deploy to the namespace in the same cluster - if argocd_project.destinations: - self.root.spec.destinations = argocd_project.destinations - - # Deny all cluster-scoped resources from being created, except for Namespace - if argocd_project.cluster_resource_whitelist: - self.root.spec.clusterResourceWhitelist = ( - argocd_project.cluster_resource_whitelist - ) - - # Allow all namespaced-scoped resources to be created, except for ResourceQuota, LimitRange, NetworkPolicy - if argocd_project.namespace_resource_blacklist: - self.root.spec.namespaceResourceBlacklist = ( - argocd_project.namespace_resource_blacklist - ) - - # Deny all namespaced-scoped resources from being created, except for Deployment and StatefulSet - if argocd_project.namespace_resource_whitelist: - self.root.spec.namespaceResourceWhitelist = ( - argocd_project.namespace_resource_whitelist - ) - - # Enables namespace orphaned resource monitoring. - if argocd_project.orphaned_resources: - self.root.spec.orphanedResources = argocd_project.orphaned_resources - - # Roles - if argocd_project.roles: - self.root.spec.roles = argocd_project.roles - - -class ArgoCDApplication(k8s.Base): - def new(self): - self.need("name") - self.kwargs.apiVersion = "argoproj.io/v1alpha1" - self.kwargs.kind = "Application" - - # Add a this finalizer ONLY if you want these to cascade delete - - # self.kwargs.finalizers = list('resources-finalizer.argocd.argoproj.io') - super().new() - - def body(self): - super().body() - - # You'll usually want to add your resources to the argocd namespace. - self.add_namespace(inv.parameters.argocd_namespace) - - argocd_application = self.kwargs.argocd_application - - self.add_annotations(argocd_application.get("annotations", {})) - self.add_labels(argocd_application.get("labels", {})) - - # The project the argocd_application belongs to. - self.root.spec.project = argocd_application.project - - # The destination in which Namespace the application should be deployed - self.root.spec.destination = argocd_application.destination - - # Source of the application manifests - if argocd_application.source: - self.root.spec.source = argocd_application.source - - # Sync policy - if argocd_application.sync_policy: - self.root.spec.syncPolicy = argocd_application.sync_policy - - # Ignore differences at the specified json pointers - if argocd_application.ignore_differences: - self.root.spec.ignoreDifferences = argocd_application.ignore_differences - - -# The following classes are required to generate Secrets + ConfigMaps -# TODO: Imported from k8s-generator -class SharedConfig: - """Shared class to use for both Secrets and ConfigMaps classes. - - contain anything needed by both classes, so that their behavious is basically the same. - Each subclass will then implement its own way of adding the data depending on their implementation. - """ - - @staticmethod - def encode_string(unencoded_string): - return base64.b64encode(unencoded_string.encode("ascii")).decode("ascii") - - def setup_metadata(self): - self.add_namespace(inv.parameters.argocd_namespace) - self.add_annotations(self.config.annotations) - self.add_labels(self.config.labels) - - self.items = self.config["items"] - try: - if isinstance(self, ConfigMap): - globals = ( - inv.parameters.generators.manifest.default_config.globals.config_maps - ) - else: - globals = ( - inv.parameters.generators.manifest.default_config.globals.secrets - ) - self.add_annotations(globals.get("annotations", {})) - self.add_labels(globals.get("labels", {})) - except AttributeError: - pass - - self.versioning(self.config.get("versioned", False)) - - def add_directory(self, directory, encode=False): - stringdata = inv.parameters.get("use_tesoro", False) - if directory and os.path.isdir(directory): - for filename in os.listdir(directory): - with open(f"{directory}/{filename}", "r") as f: - file_content = f.read() - self.add_item( - filename, - file_content, - request_encode=encode, - stringdata=stringdata, - ) - - def add_data(self, data): - stringdata = inv.parameters.get("use_tesoro", False) - - for key, spec in data.items(): - encode = spec.get("b64_encode", False) - - if "value" in spec: - value = spec.get("value") - if "template" in spec: - value = j2(spec.template, spec.get("values", {})) - if "file" in spec: - with open(spec.file, "r") as f: - value = f.read() - - self.add_item(key, value, request_encode=encode, stringdata=stringdata) - - def add_string_data(self, string_data, encode=False): - stringdata = True - - for key, spec in string_data.items(): - - if "value" in spec: - value = spec.get("value") - if "template" in spec: - value = j2(spec.template, spec.get("values", {})) - if "file" in spec: - with open(spec.file, "r") as f: - value = f.read() - - self.add_item(key, value, request_encode=encode, stringdata=stringdata) - - def versioning(self, enabled=False): - if enabled: - self.hash = hashlib.sha256(str(self.root.to_dict()).encode()).hexdigest()[ - :8 - ] - self.root.metadata.name += f"-{self.hash}" - - -# TODO: Imported from k8s-generator -class ConfigMap(k8s.Base, SharedConfig): - def new(self): - self.kwargs.apiVersion = "v1" - self.kwargs.kind = "ConfigMap" - super().new() - - def body(self): - super().body() - - def add_item(self, key, value, request_encode=False, stringdata=False): - encode = request_encode - - self.root["data"][key] = self.encode_string(value) if encode else value - - -# TODO: Imported from k8s-generator -class ComponentConfig(ConfigMap, SharedConfig): - def new(self): - super().new() - self.need("config") - - def body(self): - super().body() - self.config = self.kwargs.config - - self.setup_metadata() - self.add_data(self.config.data) - self.add_directory(self.config.directory, encode=False) - - -class Secret(k8s.Base): - def new(self): - self.kwargs.apiVersion = "v1" - self.kwargs.kind = "Secret" - super().new() - - def body(self): - super().body() - - def add_item(self, key, value, request_encode=False, stringdata=False): - encode = not stringdata and request_encode - field = "stringData" if stringdata else "data" - self.root[field][key] = self.encode_string(value) if encode else value - - -class ComponentSecret(Secret, SharedConfig): - def new(self): - super().new() - self.need("config") - - def body(self): - super().body() - self.config = self.kwargs.config - self.root.type = self.config.get("type", "Opaque") - - self.setup_metadata() - if self.config.data: - self.add_data(self.config.data) - if self.config.string_data: - self.add_string_data(self.config.string_data) - self.add_directory(self.config.directory, encode=True) - - -# This function renderes an ArgoCD-AppProject -def generate_argocd_appproject(input_params): - obj = BaseObj() - bundle = list() - argocd_projects = inv.parameters.argocd_projects - for name in argocd_projects.keys(): - argocd_project = ArgoCDAppProject( - name=name, argocd_project=argocd_projects[name] - ) - - obj.root["{}-argo-appproject".format(name)] = argocd_project - - return obj - - -# This function renderes an ArgoCD-Application -def generate_argocd_application(input_params): - obj = BaseObj() - bundle = list() - argocd_applications = inv.parameters.argocd_applications - for name in argocd_applications.keys(): - argocd_application = ArgoCDApplication( - name=name, argocd_application=argocd_applications[name] - ) - - obj.root["{}-argo-application".format(name)] = argocd_application - - return obj - - -# This function renderes an Shared-ConfigMaps + Secrets -def generate_resource_manifests(input_params): - obj = BaseObj() - - for secret_name, secret_spec in inv.parameters.generators.argocd.secrets.items(): - name = secret_spec.get("name", secret_name) - secret = ComponentSecret(name=name, config=secret_spec) - obj.root[f"{name}"] = secret - - for config_name, config_spec in inv.parameters.generators.argocd.configs.items(): - name = config_spec.get("name", config_name) - config = ComponentConfig(name=name, config=config_spec) - obj.root[f"{name}"] = config - - return obj - - -# This function renderes all previous defined functions and returns -def generate_manifests(input_params): - all_manifests = BaseObj() - - argocd_project_manifests = generate_argocd_appproject(input_params) - argocd_application_manifests = generate_argocd_application(input_params) - resource_manifests = generate_resource_manifests(input_params) - - all_manifests.root.update(argocd_project_manifests.root) - all_manifests.root.update(argocd_application_manifests.root) - all_manifests.root.update(resource_manifests.root) - - return all_manifests - - -def main(input_params): - whitelisted_functions = ["generate_manifests"] - function = input_params.get("function", "generate_manifests") - if function in whitelisted_functions: - return globals()[function](input_params) diff --git a/argocd/k8s.py b/argocd/k8s.py deleted file mode 100644 index 5b86d24..0000000 --- a/argocd/k8s.py +++ /dev/null @@ -1,33 +0,0 @@ -from kapitan.inputs.kadet import BaseObj - - -# TODO: Imported from k8s-generator -class Base(BaseObj): - def new(self): - self.need("apiVersion") - self.need("kind") - self.need("name") - - def body(self): - self.root.apiVersion = self.kwargs.apiVersion - self.root.kind = self.kwargs.kind - self.name = self.kwargs.name - self.root.metadata.name = self.kwargs.get("rendered_name", self.name) - self.add_label("name", self.root.metadata.name) - - def add_labels(self, labels): - for key, value in labels.items(): - self.add_label(key, value) - - def add_label(self, key, value): - self.root.metadata.labels[key] = value - - def add_namespace(self, namespace): - self.root.metadata.namespace = namespace - - def add_annotations(self, annotations): - for key, value in annotations.items(): - self.add_annotation(key, value) - - def add_annotation(self, key, value): - self.root.metadata.annotations[key] = value diff --git a/ingresses/README.md b/ingresses/README.md deleted file mode 100644 index 3c1fbe2..0000000 --- a/ingresses/README.md +++ /dev/null @@ -1,242 +0,0 @@ -# Ingresses generator - -The `ingresses` generator adds on the manifest generator by providing a quick way to expose paths to your application using ingresses resources. - -## Basic usage - -The generator is expecting ingresses to be defined under the `parameters.ingresses` path of the inventory. - -For convenience, you can add the configuration in the same files as your component. - -For instance, add the following to the component [echo-server](inventory/classes/components/echo-server.yml). - -```yaml -ingresses: - global: - annotations: - kubernetes.io/ingress.global-static-ip-name: my_static_ip - paths: - - backend: - serviceName: echo-server - servicePort: 80 - path: /echo/* -``` - -which will generate a file similar to: - -```yaml -apiVersion: networking.k8s.io/v1beta1 -kind: Ingress -metadata: - annotations: - kubernetes.io/ingress.global-static-ip-name: my_static_ip - labels: - name: global - name: global - namespace: tutorial -spec: - rules: - - http: - paths: - - backend: - serviceName: echo-server - servicePort: 80 - path: /echo/* -``` - -Injecting "rules" confirations is also supported: - -```yaml -ingresses: - global: - annotations: - kubernetes.io/ingress.global-static-ip-name: my_static_ip - rules: - - http: - paths: - - backend: - serviceName: echo-server - servicePort: 80 - path: /echo/* -``` - -### Create an ingress resource - -Each key under the `ingresses` parameters represent an ingress resource: - -```yaml -parameters: ---- -ingresses: - main: - default_backend: - name: frontend - port: 80 -``` - -Will generate the following `Ingress` resource - -```yaml -apiVersion: networking.k8s.io/v1 -kind: Ingress -metadata: - labels: - name: main - name: main - namespace: prod-sockshop -spec: - backend: - serviceName: frontend - servicePort: 80 -``` - -### Add annotations to an ingress - -Simply adding the `annotations` directive allows to configure an ingress: - -```yaml -ingresses: - main: - annotations: - kubernetes.io/ingress.global-static-ip-name: static-ip-name - default_backend: - name: frontend - port: 80 -``` - -The generator will add the annotations to the resource - -```yaml -apiVersion: networking.k8s.io/v1 -kind: Ingress -metadata: - annotations: - kubernetes.io/ingress.global-static-ip-name: static-ip-name - labels: - name: main - name: main - namespace: prod-sockshop -spec: - backend: - serviceName: frontend - servicePort: 80 -``` - -## Adding TLS certificates - -You can define a TLS certificate to be used by the ingress with the following syntax - -```yaml -generators: - kubernetes: - secrets: - sockshop.kapicorp.com: - type: kubernetes.io/tls - data: - tls.crt: - value: ?{gkms:targets/${target_name}/sockshop.kapicorp.com.crt} - tls.key: - value: ?{gkms:targets/${target_name}/sockshop.kapicorp.com.key} -``` - -Both references need to be configured before hand with the correct PEM certificates. - -You can then pass the TLS configuration to the ingress, with a reference to the secret just created: - -```yaml - ingresses: - global: - annotations: - kubernetes.io/ingress.global-static-ip-name: sock-shop-prod - default_backend: - name: frontend - port: 80 - tls: - - hosts: - - sockshop.kapicorp.com - secretName: sockshop.kapicorp.com -``` - -## Managed certificats (currently GKE only) - -### Add a managed certificate - -Set the `manage_certificate` directive to the domain you want to manage a certificate for. - -```yaml -ingresses: - main: - managed_certificate: sockshop.kapicorp.com - annotations: - kubernetes.io/ingress.global-static-ip-name: static-ip-name - default_backend: - name: frontend - port: 80 -``` - -Which will create a new `ManagedCertificate` resource for such domain - -```yaml -apiVersion: networking.gke.io/v1beta1 -kind: ManagedCertificate -metadata: - labels: - name: sockshop.kapicorp.com - name: sockshop.kapicorp.com - namespace: prod-sockshop -spec: - domains: - - sockshop.kapicorp.com -``` - -and injects the correct annotation into the ingress resource: - -```yaml -apiVersion: networking.k8s.io/v1 -kind: Ingress -metadata: - annotations: - kubernetes.io/ingress.global-static-ip-name: static-ip-name - networking.gke.io/managed-certificates: sockshop.kapicorp.com - labels: - name: main - name: main - namespace: prod-sockshop -spec: - backend: - serviceName: frontend - servicePort: 80 -``` - -### Multiple certificats - -The generator also supports multiple certificates with the `additional_domains` directive. - -```yaml -ingresses: - main: - annotations: - kubernetes.io/ingress.global-static-ip-name: static-ip-name - managed_certificate: sockshop.kapicorp.com - additional_domains: - - secure.kapicorp.com - default_backend: - name: frontend - port: 80 -``` - -Which will generate: - -```yaml -apiVersion: networking.gke.io/v1beta1 -kind: ManagedCertificate -metadata: - labels: - name: sockshop.kapicorp.com - name: sockshop.kapicorp.com - namespace: prod-sockshop -spec: - domains: - - sockshop.kapicorp.com - - secure.kapicorp.com -``` diff --git a/kubernetes/__init__.py b/kubernetes/__init__.py index 6b00489..d51fb7c 100644 --- a/kubernetes/__init__.py +++ b/kubernetes/__init__.py @@ -1,1176 +1,25 @@ import logging -from typing import Any - -from kapitan.inputs.helm import HelmChart -from kapitan.inputs.kadet import ( - BaseModel, - BaseObj, - CompileError, - Dict, - inventory, - load_from_search_paths, -) - -kgenlib = load_from_search_paths("generators") - -from .common import KubernetesResource, ResourceType -from .networking import NetworkPolicy -from .rbac import ClusterRole, ClusterRoleBinding, Role, RoleBinding -from .storage import ConfigMap, Secret logger = logging.getLogger(__name__) +from kapitan.inputs.kadet import inventory + +from .argocd import * +from .base import * +from .certmanager import * +from .common import kgenlib +from .gke import * +from .helm import * +from .istio import * +from .networking import * +from .prometheus import * +from .rbac import * +from .storage import * +from .workloads import * inv = inventory(lazy=True) -class Workload(KubernetesResource): - @classmethod - def create_workflow(cls, name, config): - config = config - name = name - if config.type == "deployment": - workload = Deployment(name=name, config=config) - elif config.type == "statefulset": - workload = StatefulSet(name=name, config=config) - elif config.type == "daemonset": - workload = DaemonSet(name=name, config=config) - elif config.type == "job": - workload = Job(name=name, config=config) - else: - raise () - - if config.get("namespace") or inv.parameters.get("namespace"): - workload.root.metadata.namespace = config.setdefault( - "namespace", inv.parameters.namespace - ) - workload.add_annotations(config.setdefault("annotations", {})) - workload.root.spec.template.metadata.annotations = config.get( - "pod_annotations", {} - ) - workload.add_labels(config.setdefault("labels", {})) - workload.add_volumes(config.setdefault("volumes", {})) - workload.add_volume_claims(config.setdefault("volume_claims", {})) - workload.root.spec.template.spec.securityContext = ( - config.workload_security_context - ) - workload.root.spec.minReadySeconds = config.min_ready_seconds - if config.service_account.enabled: - workload.root.spec.template.spec.serviceAccountName = ( - config.service_account.get("name", name) - ) - - container = Container(name=name, config=config) - additional_containers = [ - Container(name=name, config=config) - for name, config in config.additional_containers.items() - ] - workload.add_containers([container]) - workload.add_containers(additional_containers) - init_containers = [ - Container(name=name, config=config) - for name, config in config.init_containers.items() - ] - - workload.add_init_containers(init_containers) - if config.image_pull_secrets or inv.parameters.image_pull_secrets: - workload.root.spec.template.spec.imagePullSecrets = config.get( - "image_pull_secrets", inv.parameters.image_pull_secrets - ) - workload.root.spec.template.spec.dnsPolicy = config.dns_policy - workload.root.spec.template.spec.terminationGracePeriodSeconds = config.get( - "grace_period", 30 - ) - - if config.node_selector: - workload.root.spec.template.spec.nodeSelector = config.node_selector - - if config.tolerations: - workload.root.spec.template.spec.tolerations = config.tolerations - - affinity = workload.root.spec.template.spec.affinity - if config.prefer_pods_in_node_with_expression and not config.node_selector: - affinity.nodeAffinity.setdefault( - "preferredDuringSchedulingIgnoredDuringExecutio", [] - ) - affinity.nodeAffinity.preferredDuringSchedulingIgnoredDuringExecution.append( - { - "preference": { - "matchExpressions": [config.prefer_pods_in_node_with_expression] - }, - "weight": 1, - } - ) - - if config.prefer_pods_in_different_nodes: - affinity.podAntiAffinity.setdefault( - "preferredDuringSchedulingIgnoredDuringExecution", [] - ) - affinity.podAntiAffinity.preferredDuringSchedulingIgnoredDuringExecution.append( - { - "podAffinityTerm": { - "labelSelector": { - "matchExpressions": [ - {"key": "app", "operator": "In", "values": [name]} - ] - }, - "topologyKey": "kubernetes.io/hostname", - }, - "weight": 1, - } - ) - - if config.prefer_pods_in_different_zones: - affinity.podAntiAffinity.setdefault( - "preferredDuringSchedulingIgnoredDuringExecution", [] - ) - affinity.podAntiAffinity.preferredDuringSchedulingIgnoredDuringExecution.append( - { - "podAffinityTerm": { - "labelSelector": { - "matchExpressions": [ - {"key": "app", "operator": "In", "values": [name]} - ] - }, - "topologyKey": "failure-domain.beta.kubernetes.io/zone", - }, - "weight": 1, - } - ) - - return workload - - def set_replicas(self, replicas): - self.root.spec.replicas = replicas - - def add_containers(self, containers): - self.root.spec.template.spec.setdefault("containers", []).extend( - [container.root for container in containers] - ) - - def add_init_containers(self, containers): - self.root.spec.template.spec.setdefault("initContainers", []).extend( - container.root for container in containers - ) - - def add_volumes(self, volumes): - for key, value in volumes.items(): - kgenlib.merge({"name": key}, value) - self.root.spec.template.spec.setdefault("volumes", []).append(value) - - def add_volume_claims(self, volume_claims): - self.root.spec.setdefault("volumeClaimTemplates", []) - for key, value in volume_claims.items(): - kgenlib.merge({"metadata": {"name": key, "labels": {"name": key}}}, value) - self.root.spec.volumeClaimTemplates += [value] - - def add_volumes_for_object(self, object): - object_name = object.object_name - rendered_name = object.rendered_name - - if type(object) == ComponentConfig: - key = "configMap" - name_key = "name" - else: - key = "secret" - name_key = "secretName" - - template = self.root.spec.template - if isinstance(self, CronJob): - template = self.root.spec.jobTemplate.spec.template - - template.spec.setdefault("volumes", []).append( - { - "name": object_name, - key: { - "defaultMode": object.config.get("default_mode", 420), - name_key: rendered_name, - "items": [{"key": value, "path": value} for value in object.items], - }, - } - ) - - -@kgenlib.register_generator(path="generators.kubernetes.service_accounts") -class ServiceAccountGenerator(kgenlib.BaseStore): - def body(self): - config = self.config - name = config.get("name", self.name) - sa = ServiceAccount(name=name, config=config) - sa.add_annotations(config.annotations) - sa.add_labels(config.labels) - self.add(sa) - - -class ServiceAccount(KubernetesResource): - resource_type = ResourceType( - kind="ServiceAccount", api_version="v1", id="service_account" - ) - - def new(self): - super().new() - - def body(self): - super().body() - config = self.config - self.add_annotations(config.service_account.annotations) - if config.image_pull_secrets or inv.parameters.pull_secret.name: - self.root.imagePullSecrets = [ - { - "name": config.get( - "image_pull_secrets", inv.parameters.pull_secret.name - ) - } - ] - - -class ComponentConfig(ConfigMap): - config: Dict - - def new(self): - super().new() - - def body(self): - super().body() - self.versioning_enabled = self.config.get("versioned", False) - self.setup_metadata(inventory=inv) - if getattr(self, "workload", None) and self.workload.root.metadata.name: - self.add_label("name", self.workload.root.metadata.name) - self.add_data(self.config.data) - self.add_directory(self.config.directory, encode=False) - if getattr(self, "workload", None): - self.workload.add_volumes_for_object(self) - - -@kgenlib.register_generator(path="generators.kubernetes.config_maps") -class ConfigGenerator(kgenlib.BaseStore): - def body(self): - self.add(ComponentConfig(name=self.name, config=self.config)) - - -class ComponentSecret(Secret): - config: Dict - - def new(self): - super().new() - - def body(self): - super().body() - self.root.type = self.config.get("type", "Opaque") - self.versioning_enabled = self.config.get("versioned", False) - if getattr(self, "workload", None) and self.workload.root.metadata.name: - self.add_label("name", self.workload.root.metadata.name) - self.setup_metadata(inventory=inv) - if self.config.data: - self.add_data(self.config.data) - if self.config.string_data: - self.add_string_data(self.config.string_data) - self.add_directory(self.config.directory, encode=True) - if getattr(self, "workload", None): - self.workload.add_volumes_for_object(self) - - -@kgenlib.register_generator(path="generators.kubernetes.secrets") -class SecretGenerator(kgenlib.BaseStore): - def body(self): - self.add(ComponentSecret(name=self.name, config=self.config)) - - -class Service(KubernetesResource): - - resource_type = ResourceType(kind="Service", api_version="v1", id="service") - workload: Workload - service_spec: dict - - def new(self): - super().new() - - def body(self): - config = self.config - workload = self.workload.root - service_spec = self.service_spec - - self.name = service_spec.get("service_name", self.name) - super().body() - - self.add_labels(config.get("labels", {})) - self.add_annotations(service_spec.annotations) - self.root.spec.setdefault("selector", {}).update( - workload.spec.template.metadata.labels - ) - self.root.spec.setdefault("selector", {}).update(service_spec.selectors) - self.root.spec.type = service_spec.type - if service_spec.get("publish_not_ready_address", False): - self.root.spec.publishNotReadyAddresses = True - if service_spec.get("headless", False): - self.root.spec.clusterIP = "None" - self.root.spec.clusterIP - self.root.spec.sessionAffinity = service_spec.get("session_affinity", "None") - all_ports = [config.ports] + [ - container.ports - for container in config.additional_containers.values() - if "ports" in container - ] - - exposed_ports = {} - - for port in all_ports: - for port_name in port.keys(): - if ( - not service_spec.expose_ports - or port_name in service_spec.expose_ports - ): - exposed_ports.update(port) - - for port_name in sorted(exposed_ports): - self.root.spec.setdefault("ports", []) - port_spec = exposed_ports[port_name] - service_port = port_spec.get("service_port", None) - if service_port: - self.root.spec.setdefault("ports", []).append( - { - "name": port_name, - "port": service_port, - "targetPort": port_name, - "protocol": port_spec.get("protocol", "TCP"), - } - ) - - -class Ingress(KubernetesResource): - resource_type = ResourceType( - kind="Ingress", api_version="networking.k8s.io/v1", id="ingress" - ) - - def new(self): - super().new() - - def body(self): - super().body() - config = self.config - - self.add_annotations(config.get("annotations", {})) - self.add_labels(config.get("labels", {})) - if "default_backend" in config: - self.root.spec.backend.service.name = config.default_backend.get("name") - self.root.spec.backend.service.port = config.default_backend.get("port", 80) - if "paths" in config: - host = config.host - paths = config.paths - self.root.spec.setdefault("rules", []).extend( - [{"host": host, "http": {"paths": paths}}] - ) - if "rules" in config: - self.root.spec.setdefault("rules", []).extend(config.rules) - if config.tls: - self.root.spec.tls = config.tls - - -class GoogleManagedCertificate(KubernetesResource): - resource_type = ResourceType( - kind="ManagedCertificate", - api_version="networking.gke.io/v1beta1", - id="google_managed_certificate", - ) - - def body(self): - super().body() - config = self.config - self.root.spec.domains = config.get("domains", []) - - -@kgenlib.register_generator(path="certmanager.issuer") -class CertManagerIssuer(KubernetesResource): - resource_type = ResourceType( - kind="Issuer", api_version="cert-manager.io/v1", id="cert_manager_issuer" - ) - - def body(self): - config = self.config - super().body() - self.root.spec = config.get("spec") - - -@kgenlib.register_generator(path="certmanager.cluster_issuer") -class CertManagerClusterIssuer(KubernetesResource): - resource_type = ResourceType( - kind="ClusterIssuer", - api_version="cert-manager.io/v1", - id="cert_manager_cluster_issuer", - ) - - def body(self): - config = self.config - super().body() - self.root.spec = config.get("spec") - - -@kgenlib.register_generator(path="certmanager.certificate") -class CertManagerCertificate(KubernetesResource): - resource_type = ResourceType( - kind="Certificate", - api_version="cert-manager.io/v1", - id="cert_manager_certificate", - ) - - def body(self): - config = self.config - super().body() - self.root.spec = config.get("spec") - - -class IstioPolicy(KubernetesResource): - resource_type = ResourceType( - kind="IstioPolicy", - api_version="authentication.istio.io/v1alpha1", - id="istio_policy", - ) - - def body(self): - super().body() - config = self.config - name = self.name - self.root.spec.origins = config.istio_policy.policies.origins - self.root.spec.principalBinding = "USE_ORIGIN" - self.root.spec.targets = [{"name": name}] - - -@kgenlib.register_generator(path="generators.kubernetes.namespace") -class NamespaceGenerator(kgenlib.BaseStore): - def body(self): - name = self.config.get("name", self.name) - self.add(Namespace(name=name, config=self.config)) - - -class Namespace(KubernetesResource): - resource_type = ResourceType(kind="Namespace", api_version="v1", id="namespace") - - def body(self): - super().body() - config = self.config - labels = config.get("labels", {}) - annotations = config.get("annotations", {}) - self.add_labels(labels) - self.add_annotations(annotations) - - -class Deployment(Workload): - resource_type = ResourceType( - kind="Deployment", api_version="apps/v1", id="deployment" - ) - - def body(self): - default_strategy = { - "type": "RollingUpdate", - "rollingUpdate": {"maxSurge": "25%", "maxUnavailable": "25%"}, - } - super().body() - config = self.config - self.root.spec.template.metadata.setdefault("labels", {}).update( - config.labels + self.root.metadata.labels - ) - self.root.spec.selector.setdefault("matchLabels", {}).update( - config.labels + self.root.metadata.labels - ) - self.root.spec.template.spec.restartPolicy = config.get( - "restart_policy", "Always" - ) - if "host_network" in config: - self.root.spec.template.spec.hostNetwork = config.host_network - if "host_pid" in config: - self.root.spec.template.spec.hostPID = config.host_pid - self.root.spec.strategy = config.get("update_strategy", default_strategy) - self.root.spec.revisionHistoryLimit = config.revision_history_limit - self.root.spec.progressDeadlineSeconds = ( - config.deployment_progress_deadline_seconds - ) - self.set_replicas(config.get("replicas", 1)) - - -class StatefulSet(Workload): - resource_type = ResourceType( - kind="StatefulSet", api_version="apps/v1", id="stateful_set" - ) - - def body(self): - default_strategy = {} - update_strategy = {"rollingUpdate": {"partition": 0}, "type": "RollingUpdate"} - - super().body() - name = self.name - config = self.config - self.root.spec.template.metadata.setdefault("labels", {}).update( - config.labels + self.root.metadata.labels - ) - self.root.spec.selector.setdefault("matchLabels", {}).update( - config.labels + self.root.metadata.labels - ) - self.root.spec.template.spec.restartPolicy = config.get( - "restart_policy", "Always" - ) - if "host_network" in config: - self.root.spec.template.spec.hostNetwork = config.host_network - if "host_pid" in config: - self.root.spec.template.spec.hostPID = config.host_pid - self.root.spec.revisionHistoryLimit = config.revision_history_limit - self.root.spec.strategy = config.get("strategy", default_strategy) - self.root.spec.updateStrategy = config.get("update_strategy", update_strategy) - self.root.spec.serviceName = config.service.get("service_name", name) - self.set_replicas(config.get("replicas", 1)) - - -class DaemonSet(Workload): - resource_type = ResourceType( - kind="DaemonSet", api_version="apps/v1", id="daemon_set" - ) - - def body(self): - super().body() - config = self.config - self.root.spec.template.metadata.setdefault("labels", {}).update( - config.labels + self.root.metadata.labels - ) - self.root.spec.selector.setdefault("matchLabels", {}).update( - config.labels + self.root.metadata.labels - ) - self.root.spec.template.spec.restartPolicy = config.get( - "restart_policy", "Always" - ) - if "host_network" in config: - self.root.spec.template.spec.hostNetwork = config.host_network - if "host_pid" in config: - self.root.spec.template.spec.hostPID = config.host_pid - self.root.spec.revisionHistoryLimit = config.revision_history_limit - self.root.spec.progressDeadlineSeconds = ( - config.deployment_progress_deadline_seconds - ) - - -class Job(Workload): - resource_type = ResourceType(kind="Job", api_version="batch/v1", id="job") - - def body(self): - super().body() - config = self.config - self.root.spec.template.metadata.setdefault("labels", {}).update( - config.labels + self.root.metadata.labels - ) - self.root.spec.template.spec.restartPolicy = config.get( - "restart_policy", "Never" - ) - if "host_network" in config: - self.root.spec.template.spec.hostNetwork = config.host_network - if "host_pid" in config: - self.root.spec.template.spec.hostPID = config.host_pid - self.root.spec.backoffLimit = config.get("backoff_limit", 1) - self.root.spec.completions = config.get("completions", 1) - self.root.spec.parallelism = config.get("parallelism", 1) - - -class CronJob(Workload): - resource_type = ResourceType(kind="Job", api_version="batch/v1beta1", id="cronjob") - job: Job - - def body(self): - super().body() - config = self.config - job = self.job - self.root.metadata = job.root.metadata - self.root.spec.jobTemplate.spec = job.root.spec - self.root.spec.schedule = config.schedule - - -class Container(BaseModel): - def new(self): - name: str - config: dict - - @staticmethod - def find_key_in_config(key, configs): - for name, config in configs.items(): - if key in config.data.keys(): - return name - raise ( - BaseException( - "Unable to find key {} in your configs definitions".format(key) - ) - ) - - def process_envs(self, config): - - name = self.name - - for env_name, value in sorted(config.env.items()): - if isinstance(value, dict): - if "fieldRef" in value: - self.root.setdefault("env", []).append( - {"name": env_name, "valueFrom": value} - ) - elif "secretKeyRef" in value: - if "name" not in value["secretKeyRef"]: - config_name = self.find_key_in_config( - value["secretKeyRef"]["key"], config.secrets - ) - # TODO(ademaria) I keep repeating this logic. Refactor. - if len(config.secrets.keys()) == 1: - value["secretKeyRef"]["name"] = name - else: - value["secretKeyRef"]["name"] = "{}-{}".format( - name, config_name - ) - - self.root.setdefault("env", []).append( - {"name": env_name, "valueFrom": value} - ) - if "configMapKeyRef" in value: - if "name" not in value["configMapKeyRef"]: - config_name = self.find_key_in_config( - value["configMapKeyRef"]["key"], config.config_maps - ) - # TODO(ademaria) I keep repeating this logic. Refactor. - if len(config.config_maps.keys()) == 1: - value["configMapKeyRef"]["name"] = name - else: - value["configMapKeyRef"]["name"] = "{}-{}".format( - name, config_name - ) - - self.root.setdefault("env", []).append( - {"name": env_name, "valueFrom": value} - ) - else: - self.root.setdefault("env", []).append( - {"name": env_name, "value": str(value)} - ) - - def add_volume_mounts_from_configs(self): - name = self.name - config = self.config - configs = config.config_maps.items() - secrets = config.secrets.items() - for object_name, spec in configs: - if spec is None: - raise CompileError( - f"error with '{object_name}' for component {name}: configuration cannot be empty!" - ) - - if "mount" in spec: - self.root.setdefault("volumeMounts", []) - self.root.volumeMounts += [ - { - "mountPath": spec.mount, - "readOnly": spec.get("readOnly", None), - "name": object_name, - "subPath": spec.subPath, - } - ] - for object_name, spec in secrets: - if spec is None: - raise CompileError( - f"error with '{object_name}' for component {name}: configuration cannot be empty!" - ) - - if "mount" in spec: - self.root.setdefault("volumeMounts", []).append( - { - "mountPath": spec.mount, - "readOnly": spec.get("readOnly", None), - "name": object_name, - "subPath": spec.subPath, - } - ) - - def add_volume_mounts(self, volume_mounts): - for key, value in volume_mounts.items(): - kgenlib.merge({"name": key}, value) - self.root.setdefault("volumeMounts", []).append(value) - - @staticmethod - def create_probe(probe_definition): - probe = BaseObj() - if "type" in probe_definition: - probe.root.initialDelaySeconds = probe_definition.get( - "initial_delay_seconds", 0 - ) - probe.root.periodSeconds = probe_definition.get("period_seconds", 10) - probe.root.timeoutSeconds = probe_definition.get("timeout_seconds", 5) - probe.root.successThreshold = probe_definition.get("success_threshold", 1) - probe.root.failureThreshold = probe_definition.get("failure_threshold", 3) - - if probe_definition.type == "http": - probe.root.httpGet.scheme = probe_definition.get("scheme", "HTTP") - probe.root.httpGet.port = probe_definition.get("port", 80) - probe.root.httpGet.path = probe_definition.path - probe.root.httpGet.httpHeaders = probe_definition.httpHeaders - if probe_definition.type == "tcp": - probe.root.tcpSocket.port = probe_definition.port - if probe_definition.type == "command": - probe.root.exec.command = probe_definition.command - return probe.root - - def body(self): - name = self.name - config = self.config - - self.root.name = name - self.root.image = config.image - self.root.imagePullPolicy = config.get("pull_policy", "IfNotPresent") - if config.lifecycle: - self.root.lifecycle = config.lifecycle - self.root.resources = config.resources - self.root.args = config.args - self.root.command = config.command - # legacy container.security - if config.security: - self.root.securityContext.allowPrivilegeEscalation = ( - config.security.allow_privilege_escalation - ) - self.root.securityContext.runAsUser = config.security.user_id - else: - self.root.securityContext = config.security_context - self.add_volume_mounts_from_configs() - self.add_volume_mounts(config.volume_mounts) - - for name, port in sorted(config.ports.items()): - self.root.setdefault("ports", []) - self.root.ports.append( - { - "containerPort": port.get("container_port", port.service_port), - "name": name, - "protocol": port.get("protocol", "TCP"), - } - ) - - self.root.startupProbe = self.create_probe(config.healthcheck.startup) - self.root.livenessProbe = self.create_probe(config.healthcheck.liveness) - self.root.readinessProbe = self.create_probe(config.healthcheck.readiness) - self.process_envs(config) - - -class GenerateMultipleObjectsForClass(kgenlib.BaseStore): - """Helper to generate multiple classes - - As a convention for generators we have that if you define only one policy/config/secret configuration - for your component, then the name of that resource will be the component {name} itself. - - However if there are multiple objects being defined, then we call them: {name}-{object_name} - - This class helps achieve that for policies/config/secrets to avoid duplication. - """ - - component_config: dict - generating_class: Any - workload: Any - - def body(self): - component_config = self.component_config - name = self.name - objects_configs = self.config - generating_class = self.generating_class - workload = self.workload - - for object_name, object_config in objects_configs.items(): - if object_config == None: - raise CompileError( - f"error with '{object_name}' for component {name}: configuration cannot be empty!" - ) - - if len(objects_configs.items()) == 1: - name = f"{self.name}" - else: - name = f"{self.name}-{object_name}" - - generated_object = generating_class( - name=name, - object_name=object_name, - config=object_config, - component=component_config, - workload=workload, - ) - - self.add(generated_object) - - -class PrometheusRule(KubernetesResource): - resource_type = ResourceType( - kind="PrometheusRule", - api_version="monitoring.coreos.com/v1", - id="prometheus_rule", - ) - - def body(self): - super().body() - name = self.name - config = self.config - self.root.spec.setdefault("groups", []).append( - {"name": name, "rules": config.prometheus_rules.rules} - ) - - -class BackendConfig(KubernetesResource): - resource_type = ResourceType( - kind="BackendConfig", api_version="cloud.google.com/v1", id="backend_config" - ) - - def body(self): - super().body() - self.root.spec = self.config.backend_config - - -class ServiceMonitor(KubernetesResource): - resource_type = ResourceType( - kind="ServiceMonitor", - api_version="monitoring.coreos.com/v1", - id="service_monitor", - ) - workload: Workload - - def new(self): - super().new() - - def body(self): - # TODO(ademaria) This name mangling is here just to simplify diff. - # Change it once done - name = self.name - workload = self.workload - self.name = "{}-metrics".format(name) - - super().body() - name = self.name - config = self.config - self.root.spec.endpoints = config.service_monitors.endpoints - self.root.spec.jobLabel = name - self.root.spec.namespaceSelector.matchNames = [self.namespace] - self.root.spec.selector.matchLabels = ( - workload.root.spec.template.metadata.labels - ) - - -class MutatingWebhookConfiguration(KubernetesResource): - resource_type = ResourceType( - kind="MutatingWebhookConfiguration", - api_version="admissionregistration.k8s.io/v1", - id="mutating_webhook_configuration", - ) - - def new(self): - super().new() - - def body(self): - super().body() - name = self.name - config = self.config - self.root.webhooks = config.webhooks - - -class PodDisruptionBudget(KubernetesResource): - resource_type = ResourceType( - kind="PodDisruptionBudget", - api_version="policy/v1beta1", - id="pod_disruption_budget", - ) - workload: Workload - - def new(self): - super().new() - - def body(self): - super().body() - config = self.config - workload = self.workload - self.add_namespace(config.get("namespace", inv.parameters.namespace)) - if config.auto_pdb: - self.root.spec.maxUnavailable = 1 - else: - self.root.spec.minAvailable = config.pdb_min_available - self.root.spec.selector.matchLabels = ( - workload.root.spec.template.metadata.labels - ) - - -class VerticalPodAutoscaler(KubernetesResource): - resource_type = ResourceType( - kind="VerticalPodAutoscaler", - api_version="autoscaling.k8s.io/v1beta2", - id="vertical_pod_autoscaler", - ) - workload: Workload - - def new(self): - super().new() - - def body(self): - super().body() - config = self.config - workload = self.workload - self.add_labels(workload.root.metadata.labels) - self.root.spec.targetRef.apiVersion = workload.api_version - self.root.spec.targetRef.kind = workload.kind - self.root.spec.targetRef.name = workload.name - self.root.spec.updatePolicy.updateMode = config.vpa - - # TODO(ademaria) Istio blacklist is always desirable but add way to make it configurable. - self.root.spec.resourcePolicy.containerPolicies = [ - {"containerName": "istio-proxy", "mode": "Off"} - ] - - -class HorizontalPodAutoscaler(KubernetesResource): - resource_type = ResourceType( - kind="HorizontalPodAutoscaler", - api_version="autoscaling.k8s.io/v2beta2", - id="horizontal_pod_autoscaler", - ) - workload: Workload - - def new(self): - super().new() - - def body(self): - super().body() - config = self.config - workload = self.workload - self.add_namespace(inv.parameters.namespace) - self.add_labels(workload.root.metadata.labels) - self.root.spec.scaleTargetRef.apiVersion = workload.api_version - self.root.spec.scaleTargetRef.kind = workload.kind - self.root.spec.scaleTargetRef.name = workload.name - self.root.spec.minReplicas = config.hpa.min_replicas - self.root.spec.maxReplicas = config.hpa.max_replicas - self.root.spec.metrics = config.hpa.metrics - - -class PodSecurityPolicy(KubernetesResource): - resource_type = ResourceType( - kind="PodSecurityPolicy", api_version="policy/v1beta1", id="pod_security_policy" - ) - workload: Workload - - def new(self): - super().new() - - def body(self): - super().body() - config = self.config - self.root.spec = config.pod_security_policy.spec - # Merge Dicts into PSP Annotations - self.root.metadata.annotations = { - **config.get("annotations", {}), - **config.pod_security_policy.get("annotations", {}), - } - # Merge Dicts into PSP Labels - self.root.metadata.labels = { - **config.get("labels", {}), - **config.pod_security_policy.get("labels", {}), - } - - -@kgenlib.register_generator(path="ingresses") -class IngressComponent(kgenlib.BaseStore): - name: str - config: Any - - def body(self): - name = self.name - config = self.config - ingress = Ingress(name=name, config=config) - self.add(ingress) - - if "managed_certificate" in config: - certificate_name = config.managed_certificate - additional_domains = config.get("additional_domains", []) - domains = [certificate_name] + additional_domains - ingress.add_annotations( - {"networking.gke.io/managed-certificates": certificate_name} - ) - self.add( - GoogleManagedCertificate( - name=certificate_name, config={"domains": domains} - ) - ) - - -@kgenlib.register_generator( - path="components", - apply_patches=[ - "generators.manifest.default_config", - "applications.{application}.component_defaults", - ], -) -class Components(kgenlib.BaseStore): - def body(self): - name = self.name - config = self.config - workload = Workload.create_workflow(name=name, config=config) - - logging.debug(f"Generating component {name} from {config}") - if config.schedule: - workload = CronJob(name=name, config=config, job=workload) - - workload.add_label("app.kapicorp.dev/component", name) - - configs = GenerateMultipleObjectsForClass( - name=name, - component_config=config, - generating_class=ComponentConfig, - config=config.config_maps, - workload=workload, - ) - - map(lambda x: x.add_label("app.kapicorp.dev/component", name), configs) - - secrets = GenerateMultipleObjectsForClass( - name=name, - component_config=config, - generating_class=ComponentSecret, - config=config.secrets, - workload=workload, - ) - - map(lambda x: x.add_label("app.kapicorp.dev/component", name), secrets) - - self.add(workload) - self.add(configs) - self.add(secrets) - - if ( - config.vpa - and inv.parameters.get("enable_vpa", True) - and config.type != "job" - ): - vpa = VerticalPodAutoscaler(name=name, config=config, workload=workload) - vpa.add_label("app.kapicorp.dev/component", name) - self.add(vpa) - - if config.pdb_min_available: - pdb = PodDisruptionBudget(name=name, config=config, workload=workload) - pdb.add_label("app.kapicorp.dev/component", name) - self.add(pdb) - - if config.hpa: - hpa = HorizontalPodAutoscaler(name=name, config=config, workload=workload) - hpa.add_label("app.kapicorp.dev/component", name) - self.add(hpa) - - if config.type != "job": - if config.pdb_min_available or config.auto_pdb: - pdb = PodDisruptionBudget(name=name, config=config, workload=workload) - pdb.add_label("app.kapicorp.dev/component", name) - self.add(pdb) - - if config.istio_policy: - istio_policy = IstioPolicy(name=name, config=config, workload=workload) - istio_policy.add_label("app.kapicorp.dev/component", name) - self.add(istio_policy) - - if config.pod_security_policy: - psp = PodSecurityPolicy(name=name, config=config, workload=workload) - psp.add_label("app.kapicorp.dev/component", name) - self.add(psp) - - if config.service: - service = Service( - name=name, - config=config, - workload=workload, - service_spec=config.service, - ) - service.add_label("app.kapicorp.dev/component", name) - self.add(service) - - if config.additional_services: - for service_name, service_spec in config.additional_services.items(): - service = Service( - name=service_name, - config=config, - workload=workload, - service_spec=service_spec, - ) - service.add_label("app.kapicorp.dev/component", name) - self.add(service) - - if config.network_policies: - policies = GenerateMultipleObjectsForClass( - name=name, - component_config=config, - generating_class=NetworkPolicy, - config=config.network_policies, - workload=workload, - ) - map(lambda x: x.add_label("app.kapicorp.dev/component", name), policies) - self.add(policies) - - if config.webhooks: - webhooks = MutatingWebhookConfiguration(name=name, config=config) - webhooks.add_label("app.kapicorp.dev/component", name) - self.add(webhooks) - - if config.service_monitors: - service_monitor = ServiceMonitor( - name=name, config=config, workload=workload - ) - service_monitor.add_label("app.kapicorp.dev/component", name) - self.add(service_monitor) - - if config.prometheus_rules: - prometheus_rule = PrometheusRule(name=name, config=config) - prometheus_rule.add_label("app.kapicorp.dev/component", name) - self.add(prometheus_rule) - - if config.service_account.get("create", False): - sa_name = config.service_account.get("name", name) - sa = ServiceAccount(name=sa_name, config=config) - sa.add_label("app.kapicorp.dev/component", name) - self.add(sa) - - if config.role: - role = Role(name=name, config=config) - role.add_label("app.kapicorp.dev/component", name) - self.add(role) - role_binding = RoleBinding(name=name, config=config, sa=sa) - role_binding.add_label("app.kapicorp.dev/component", name) - self.add(role_binding) - - if config.cluster_role: - cluster_role = ClusterRole(name=name, config=config) - self.add(cluster_role) - cluster_role.add_label("app.kapicorp.dev/component", name) - cluster_role_binding = ClusterRoleBinding(name=name, config=config, sa=sa) - cluster_role_binding.add_label("app.kapicorp.dev/component", name) - self.add(cluster_role_binding) - - if config.backend_config: - backend_config = BackendConfig(name=name, config=config) - backend_config.add_label("app.kapicorp.dev/component", name) - self.add(backend_config) - - -class MyHelmChart(HelmChart): - def new(self): - for obj in self.load_chart(): - if obj: - self.root[ - f"{obj['metadata']['name'].lower()}-{obj['kind'].lower().replace(':','-')}" - ] = BaseObj.from_dict(obj) - - -@kgenlib.register_generator(path="charts") -class HelmChartGenerator(kgenlib.BaseStore): - name: str - config: Any - - def body(self): - helm_config = self.config.to_dict() - chart_name = self.config.helm_params.name - - rendered_chart = MyHelmChart(**helm_config) - - for helm_resource in rendered_chart.root.values(): - resource = KubernetesResource.from_baseobj(helm_resource) - resource.add_label("app.kapicorp.dev/component", chart_name) - self.add(resource) - - def main(input_params): generator = kgenlib.BaseGenerator(inventory=inv) store = generator.generate() diff --git a/kubernetes/argocd.py b/kubernetes/argocd.py new file mode 100644 index 0000000..a449ad2 --- /dev/null +++ b/kubernetes/argocd.py @@ -0,0 +1,103 @@ +import json +import logging + +logger = logging.getLogger(__name__) + +from .base import Namespace +from .common import KubernetesResource, kgenlib + +class ArgoCDApplication(KubernetesResource): + source: dict = None + kind = "Application" + api_version = "argoproj.io/v1alpha1" + + def body(self): + super().body() + project = self.config.get("project", "default") + self.root.spec.project = project + self.root.spec.destination = self.config.get("destination") + self.root.spec.source = self.config.get("source") + if self.config.get("sync_policy"): + self.root.spec.syncPolicy = self.config.get("sync_policy") + + self.root.spec.ignoreDifferences = self.config.get("ignore_differences", None) + namespace = self.config.get("namespace", None) + + if namespace is None: + namespace = f"argocd-project-{project}" + + self.set_namespace(namespace) + + +@kgenlib.register_generator( + path="generators.argocd.applications", + global_generator=True, + activation_path="argocd.app_of_apps", + apply_patches=["generators.argocd.defaults.application"], +) +class GenArgoCDApplication(kgenlib.BaseStore): + def body(self): + config = self.config + namespace = config.get("namespace", "argocd") + name = config.get("name", self.name) + + argo_application = ArgoCDApplication( + name=name, namespace=namespace, config=config + ) + self.add(argo_application) + +class ArgoCDProject(KubernetesResource): + kind = "AppProject" + api_version = "argoproj.io/v1alpha1" + + def body(self): + super().body() + self.root.spec.sourceRepos = self.config.get("source_repos") + self.root.spec.destinations = self.config.get("destinations") + if self.config.get("cluster_resource_whitelist"): + self.root.spec.clusterResourceWhitelist = self.config.get( + "cluster_resource_whitelist" + ) + self.root.spec.sourceNamespaces = self.config.setdefault( + "source_namespaces", [f"argocd-project-{self.name}"] + ) + + +@kgenlib.register_generator( + path="generators.argocd.projects", + apply_patches=["generators.argocd.defaults.project"], +) +class GenArgoCDProject(kgenlib.BaseStore): + def body(self): + config = self.config + namespace = config.get("namespace", "argocd") + name = config.get("name", self.name) + + self.add(ArgoCDProject(name=name, namespace=namespace, config=config)) + self.add(Namespace(name=f"argocd-project-{name}", config=config)) + + +@kgenlib.register_generator( + path="clusters", global_generator=True, activation_path="argocd.clusters" +) +class GenArgoCDCluster(kgenlib.BaseStore): + def body(self): + config = self.config + target = self.target + namespace = self.global_inventory[target]["parameters"]["namespace"] + name = config.get("name") + cluster = ArgoCDCluster(name=name, namespace=namespace, config=config) + + self.add(cluster) + + +class ArgoCDCluster(KubernetesResource): + kind = "Secret" + api_version = "v1" + + def body(self): + super().body() + self.add_label("argocd.argoproj.io/secret-type", "cluster") + self.root.stringData.name = self.config.argocd.name + self.root.stringData.server = self.config.endpoint_url + self.root.stringData.config = json.dumps(self.config.argocd.config, indent=4) diff --git a/kubernetes/autoscaling.py b/kubernetes/autoscaling.py new file mode 100644 index 0000000..65e0182 --- /dev/null +++ b/kubernetes/autoscaling.py @@ -0,0 +1,69 @@ +import logging + +logger = logging.getLogger(__name__) + +from .common import KubernetesResource + + +class KedaScaledObject(KubernetesResource): + kind = "ScaledObject" + api_version = "keda.sh/v1alpha1" + + def body(self): + super().body() + config = self.config + workload = self.workload + self.root.spec.scaleTargetRef.name = workload.root.metadata.name + self.root.spec.scaleTargetRef.kind = workload.root.kind + self.root.spec.scaleTargetRef.apiVersion = workload.root.apiVersion + self.root.spec.update(config.get("keda_scaled_object", {})) + + +class PodDisruptionBudget(KubernetesResource): + kind = "PodDisruptionBudget" + api_version = "policy/v1beta1" + + def body(self): + super().body() + config = self.config + workload = self.workload + if config.auto_pdb: + self.root.spec.maxUnavailable = 1 + else: + self.root.spec.minAvailable = config.pdb_min_available + self.root.spec.selector.matchLabels = ( + workload.root.spec.template.metadata.labels + ) + + +class VerticalPodAutoscaler(KubernetesResource): + kind = "VerticalPodAutoscaler" + api_version = "autoscaling.k8s.io/v1" + + def body(self): + super().body() + config = self.config + workload = self.workload + self.add_labels(workload.root.metadata.labels) + self.root.spec.targetRef.apiVersion = workload.api_version + self.root.spec.targetRef.kind = workload.kind + self.root.spec.targetRef.name = workload.name + self.root.spec.updatePolicy.updateMode = config.vpa.update_mode + self.root.spec.resourcePolicy = config.vpa.resource_policy + + +class HorizontalPodAutoscaler(KubernetesResource): + kind = "HorizontalPodAutoscaler" + api_version = "autoscaling.k8s.io/v2beta2" + + def body(self): + super().body() + config = self.config + workload = self.workload + self.add_labels(workload.root.metadata.labels) + self.root.spec.scaleTargetRef.apiVersion = workload.api_version + self.root.spec.scaleTargetRef.kind = workload.kind + self.root.spec.scaleTargetRef.name = workload.name + self.root.spec.minReplicas = config.hpa.min_replicas + self.root.spec.maxReplicas = config.hpa.max_replicas + self.root.spec.metrics = config.hpa.metrics diff --git a/kubernetes/base.py b/kubernetes/base.py new file mode 100644 index 0000000..63ca474 --- /dev/null +++ b/kubernetes/base.py @@ -0,0 +1,51 @@ +import logging + +logger = logging.getLogger(__name__) + +from .common import KubernetesResource, kgenlib + + +class MutatingWebhookConfiguration(KubernetesResource): + kind = "MutatingWebhookConfiguration" + api_version = "admissionregistration.k8s.io/v1" + + def new(self): + super().new() + + def body(self): + super().body() + name = self.name + config = self.config + self.root.webhooks = config.webhooks + + +class PriorityClass(KubernetesResource): + kind = "PriorityClass" + api_version = "scheduling.k8s.io/v1" + priority: int + + def body(self): + super().body() + config = self.config + self.root.value = self.priority + self.root.globalDefault = False + + +class Namespace(KubernetesResource): + kind = "Namespace" + api_version = "v1" + + def body(self): + super().body() + config = self.config + labels = config.get("labels", {}) + annotations = config.get("annotations", {}) + self.add_labels(labels) + self.add_annotations(annotations) + + +@kgenlib.register_generator(path="generators.kubernetes.namespace") +class NamespaceGenerator(kgenlib.BaseStore): + def body(self): + name = self.config.get("name", self.name) + self.add(Namespace(name=name, config=self.config)) diff --git a/kubernetes/certmanager.py b/kubernetes/certmanager.py new file mode 100644 index 0000000..2a95a7d --- /dev/null +++ b/kubernetes/certmanager.py @@ -0,0 +1,38 @@ +import logging + +logger = logging.getLogger(__name__) + +from .common import KubernetesResource, kgenlib + + +@kgenlib.register_generator(path="certmanager.issuer") +class CertManagerIssuer(KubernetesResource): + kind = "Issuer" + api_version = "cert-manager.io/v1" + + def body(self): + config = self.config + super().body() + self.root.spec = config.get("spec") + + +@kgenlib.register_generator(path="certmanager.cluster_issuer") +class CertManagerClusterIssuer(KubernetesResource): + kind = "ClusterIssuer" + api_version = "cert-manager.io/v1" + + def body(self): + config = self.config + super().body() + self.root.spec = config.get("spec") + + +@kgenlib.register_generator(path="certmanager.certificate") +class CertManagerCertificate(KubernetesResource): + kind = "Certificate" + api_version = "cert-manager.io/v1" + + def body(self): + config = self.config + super().body() + self.root.spec = config.get("spec") diff --git a/kubernetes/common.py b/kubernetes/common.py index 655b578..2813eba 100644 --- a/kubernetes/common.py +++ b/kubernetes/common.py @@ -1,27 +1,27 @@ import logging -from kapitan.inputs.kadet import BaseModel, BaseObj, load_from_search_paths - -kgenlib = load_from_search_paths("generators") - logger = logging.getLogger(__name__) +from kapitan.inputs.kadet import BaseObj, load_from_search_paths -class ResourceType(BaseModel): - kind: str - id: str - api_version: str +kgenlib = load_from_search_paths("kgenlib") class KubernetesResource(kgenlib.BaseContent): - resource_type: ResourceType name: str + api_version: str + kind: str namespace: str = None config: dict = None - api_version: str = None - kind: str = None rendered_name: str = None - id: str = None + + def __eq__(self, other): + return ( + self.root.metadata.name == other.root.metadata.name + and self.root.kind == other.root.kind + and self.root.apiVersion == other.root.apiVersion + and self.root.metadata.namespace == other.root.metadata.namespace + ) @classmethod def from_baseobj(cls, baseobj: BaseObj): @@ -29,10 +29,9 @@ def from_baseobj(cls, baseobj: BaseObj): kind = baseobj.root.kind api_version = baseobj.root.apiVersion - id = kind.lower() + name = baseobj.root.metadata.name - resource_type = ResourceType(kind=kind, api_version=api_version, id=id) - resource = cls(resource_type=resource_type, name=baseobj.root.metadata.name) + resource = cls(name=name, api_version=api_version, kind=kind) resource.root = baseobj.root return resource @@ -41,9 +40,6 @@ def component_name(self): return self.get_label("app.kapicorp.dev/component") or self.name def new(self): - self.kind = self.resource_type.kind - self.api_version = self.resource_type.api_version - self.id = self.resource_type.id if self.config: if not self.namespace: self.namespace = self.config.get("namespace", None) @@ -80,7 +76,7 @@ def add_annotations(self, annotations: dict): for key, value in annotations.items(): self.add_annotation(key, value) - def add_namespace(self, namespace: str): + def set_namespace(self, namespace: str): self.root.metadata.namespace = namespace def set_labels(self, labels: dict): diff --git a/kubernetes/gke.py b/kubernetes/gke.py new file mode 100644 index 0000000..f3fbc10 --- /dev/null +++ b/kubernetes/gke.py @@ -0,0 +1,13 @@ +import logging + +logger = logging.getLogger(__name__) + +from .common import KubernetesResource + +class BackendConfig(KubernetesResource): + kind = "BackendConfig" + api_version = "cloud.google.com/v1" + + def body(self): + super().body() + self.root.spec = self.config.backend_config diff --git a/kubernetes/helm.py b/kubernetes/helm.py new file mode 100644 index 0000000..4c05c4e --- /dev/null +++ b/kubernetes/helm.py @@ -0,0 +1,36 @@ +import logging + +logger = logging.getLogger(__name__) + +from typing import Any + +from kapitan.inputs.helm import HelmChart +from kapitan.inputs.kadet import BaseObj + +from .common import KubernetesResource, kgenlib + + +class MyHelmChart(HelmChart): + def new(self): + for obj in self.load_chart(): + if obj: + self.root[ + f"{obj['metadata']['name'].lower()}-{obj['kind'].lower().replace(':','-')}" + ] = BaseObj.from_dict(obj) + + +@kgenlib.register_generator(path="charts") +class HelmChartGenerator(kgenlib.BaseStore): + name: str + config: Any + + def body(self): + helm_config = self.config.to_dict() + chart_name = self.config.helm_params.name + + rendered_chart = MyHelmChart(**helm_config) + + for helm_resource in rendered_chart.root.values(): + resource = KubernetesResource.from_baseobj(helm_resource) + resource.add_label("app.kapicorp.dev/component", chart_name) + self.add(resource) diff --git a/kubernetes/istio.py b/kubernetes/istio.py new file mode 100644 index 0000000..df60768 --- /dev/null +++ b/kubernetes/istio.py @@ -0,0 +1,18 @@ +import logging + +logger = logging.getLogger(__name__) + +from .common import KubernetesResource + + +class IstioPolicy(KubernetesResource): + kind = "IstioPolicy" + api_version = "authentication.istio.io/v1alpha1" + + def body(self): + super().body() + config = self.config + name = self.name + self.root.spec.origins = config.istio_policy.policies.origins + self.root.spec.principalBinding = "USE_ORIGIN" + self.root.spec.targets = [{"name": name}] diff --git a/kubernetes/k8s.py b/kubernetes/k8s.py deleted file mode 100644 index 2516be6..0000000 --- a/kubernetes/k8s.py +++ /dev/null @@ -1,32 +0,0 @@ -from kapitan.inputs.kadet import BaseObj - - -class Base(BaseObj): - def new(self): - self.need("apiVersion") - self.need("kind") - self.need("name") - - def body(self): - self.root.apiVersion = self.kwargs.apiVersion - self.root.kind = self.kwargs.kind - self.name = self.kwargs.name - self.root.metadata.name = self.kwargs.get("rendered_name", self.name) - self.add_label("name", self.root.metadata.name) - - def add_labels(self, labels): - for key, value in labels.items(): - self.add_label(key, value) - - def add_label(self, key, value): - self.root.metadata.labels[key] = value - - def add_namespace(self, namespace): - self.root.metadata.namespace = namespace - - def add_annotations(self, annotations): - for key, value in annotations.items(): - self.add_annotation(key, value) - - def add_annotation(self, key, value): - self.root.metadata.annotations[key] = value diff --git a/kubernetes/networking.py b/kubernetes/networking.py index 9cc1618..37f5575 100644 --- a/kubernetes/networking.py +++ b/kubernetes/networking.py @@ -1,18 +1,54 @@ -from kapitan.inputs.kadet import load_from_search_paths +import logging -from .common import KubernetesResource, ResourceType +logger = logging.getLogger(__name__) -kgenlib = load_from_search_paths("generators") +from typing import Any +from .common import KubernetesResource, kgenlib -class NetworkPolicy(KubernetesResource): - resource_type = ResourceType( - kind="NetworkPolicy", api_version="networking.k8s.io/v1", id="network_policy" - ) + +class Ingress(KubernetesResource): + kind = "Ingress" + api_version = "networking.k8s.io/v1" def new(self): super().new() + def body(self): + super().body() + config = self.config + + self.add_annotations(config.get("annotations", {})) + self.add_labels(config.get("labels", {})) + if "default_backend" in config: + self.root.spec.backend.service.name = config.default_backend.get("name") + self.root.spec.backend.service.port = config.default_backend.get("port", 80) + if "paths" in config: + host = config.host + paths = config.paths + self.root.spec.setdefault("rules", []).extend( + [{"host": host, "http": {"paths": paths}}] + ) + if "rules" in config: + self.root.spec.setdefault("rules", []).extend(config.rules) + if config.tls: + self.root.spec.tls = config.tls + + +class GoogleManagedCertificate(KubernetesResource): + kind = "ManagedCertificate" + api_version = "networking.gke.io/v1beta1" + + def body(self): + super().body() + config = self.config + self.root.spec.domains = config.get("domains", []) + + +class NetworkPolicy(KubernetesResource): + kind = "NetworkPolicy" + api_version = "networking.k8s.io/v1" + def body(self): super().body() policy = self.config @@ -25,3 +61,222 @@ def body(self): if self.root.spec.egress: self.root.spec.setdefault("policyTypes", []).append("Egress") + + +class HealthCheckPolicy(KubernetesResource): + kind = "HealthCheckPolicy" + api_version = "networking.gke.io/v1" + + def body(self): + super().body() + config = self.config + + self.root.spec.default.logConfig.enabled = config.healthcheck.get("log", False) + + config_spec = self.root.spec.default.config + container_port = config.healthcheck.get("container_port", self.name) + config_spec.type = config.healthcheck.get("type", "HTTP").upper() + if config_spec.type == "HTTP": + config_spec.httpHealthCheck.portSpecification = "USE_FIXED_PORT" + config_spec.httpHealthCheck.port = container_port + config_spec.httpHealthCheck.requestPath = config.healthcheck.get( + "path", config.get("path", "/") + ) + + self.root.spec.targetRef = { + "group": "", + "kind": "Service", + "name": config.get("service"), + } + + +class Gateway(KubernetesResource): + kind = "Gateway" + api_version = "gateway.networking.k8s.io/v1beta1" + + def body(self): + super().body() + self.root.spec.gatewayClassName = self.config.type + default_listener = {"name": "http", "protocol": "HTTP", "port": 80} + + certificate = self.config.get("certificate", None) + if certificate: + default_listener = { + "name": "https", + "protocol": "HTTPS", + "port": 443, + "tls": { + "mode": "Terminate", + "certificateRefs": [{"name": certificate}], + }, + } + + self.root.spec.listeners = self.config.listeners or [default_listener] + + if self.config.get("named_address"): + self.root.spec.setdefault("addresses", []).append( + {"type": "NamedAddress", "value": self.config.get("named_address")} + ) + + +class GCPGatewayPolicy(KubernetesResource): + kind = "GCPGatewayPolicy" + api_version = "networking.gke.io/v1" + gateway: Gateway = None + + def body(self): + super().body() + self.root.spec.default.allowGlobalAccess = self.config.get( + "allow_global_access", False + ) + self.root.spec.targetRef = { + "group": "gateway.networking.k8s.io", + "kind": "Gateway", + "name": self.gateway.name, + } + + +class HTTPRoute(KubernetesResource): + kind = "HTTPRoute" + api_version = "gateway.networking.k8s.io/v1beta1" + gateway: Gateway = None + + def body(self): + super().body() + self.root.spec.setdefault("parentRefs", []).append( + { + "kind": "Gateway", + "name": self.gateway.name, + } + ) + + self.root.spec.hostnames = self.config.get("hostnames", []) + + for service_name, service_config in self.config.get("services", {}).items(): + match = {"path": {"value": service_config.get("path", "/")}} + rule = { + "backendRefs": [ + { + "name": service_config.get("service", service_name), + "port": service_config.get("port", 80), + } + ], + "matches": [match], + } + self.root.spec.setdefault("rules", []).append(rule) + + +@kgenlib.register_generator( + path="generators.kubernetes.gateway", +) +class GatewayGenerator(kgenlib.BaseStore): + def body(self): + filename = f"{self.name}-gateway.yaml" + gateway = Gateway(name=self.name, config=self.config) + gateway.filename = filename + self.add(gateway) + + policy = GCPGatewayPolicy(name=self.name, config=self.config, gateway=gateway) + policy.filename = filename + self.add(policy) + + for route_id, route_config in self.config.get("routes", {}).items(): + route_name = f"{self.name}-{route_id}" + route = HTTPRoute(name=route_name, config=route_config, gateway=gateway) + route.filename = filename + self.add(route) + + for service_id, service_config in route_config.get("services", {}).items(): + healthcheck = HealthCheckPolicy( + name=f"{route_name}-{service_id}", + config=service_config, + gateway=gateway, + ) + self.add(healthcheck) + + +class Service(KubernetesResource): + kind = "Service" + api_version = "v1" + + service_spec: dict + + def new(self): + super().new() + + def body(self): + config = self.config + workload = self.workload.root + service_spec = self.service_spec + + self.name = service_spec.get("service_name", self.name) + super().body() + + self.add_labels(config.get("labels", {})) + self.add_annotations(service_spec.annotations) + self.root.spec.setdefault("selector", {}).update( + workload.spec.template.metadata.labels + ) + self.root.spec.setdefault("selector", {}).update(service_spec.selectors) + self.root.spec.type = service_spec.type + if service_spec.get("publish_not_ready_address", False): + self.root.spec.publishNotReadyAddresses = True + if service_spec.get("headless", False): + self.root.spec.clusterIP = "None" + self.root.spec.clusterIP + self.root.spec.sessionAffinity = service_spec.get("session_affinity", "None") + all_ports = [config.ports] + [ + container.ports + for container in config.additional_containers.values() + if "ports" in container + ] + + self.exposed_ports = {} + + for port in all_ports: + for port_name in port.keys(): + if ( + not service_spec.expose_ports + or port_name in service_spec.expose_ports + ): + self.exposed_ports.update(port) + + for port_name in sorted(self.exposed_ports): + self.root.spec.setdefault("ports", []) + port_spec = self.exposed_ports[port_name] + port_spec["name"] = port_name + service_port = port_spec.get("service_port", None) + if service_port: + self.root.spec.setdefault("ports", []).append( + { + "name": port_name, + "port": service_port, + "targetPort": port_name, + "protocol": port_spec.get("protocol", "TCP"), + } + ) + + +@kgenlib.register_generator(path="ingresses") +class IngressComponent(kgenlib.BaseStore): + name: str + config: Any + + def body(self): + name = self.name + config = self.config + ingress = Ingress(name=name, config=config) + self.add(ingress) + + if "managed_certificate" in config: + certificate_name = config.managed_certificate + additional_domains = config.get("additional_domains", []) + domains = [certificate_name] + additional_domains + ingress.add_annotations( + {"networking.gke.io/managed-certificates": certificate_name} + ) + self.add( + GoogleManagedCertificate( + name=certificate_name, config={"domains": domains} + ) + ) diff --git a/kubernetes/prometheus.py b/kubernetes/prometheus.py new file mode 100644 index 0000000..f65d655 --- /dev/null +++ b/kubernetes/prometheus.py @@ -0,0 +1,54 @@ +import logging + +logger = logging.getLogger(__name__) + +from .common import KubernetesResource, kgenlib + + +@kgenlib.register_generator( + path="generators.prometheus.gen_pod_monitoring", + apply_patches=["generators.prometheus.defaults.gen_pod_monitoring"], +) +class PodMonitoring(KubernetesResource): + kind = "PodMonitoring" + api_version = "monitoring.googleapis.com/v1" + + def body(self): + super().body() + self.root.spec = self.config + + +class PrometheusRule(KubernetesResource): + kind = "PrometheusRule" + api_version = "monitoring.coreos.com/v1" + + def body(self): + super().body() + name = self.name + config = self.config + self.root.spec.setdefault("groups", []).append( + {"name": name, "rules": config.prometheus_rules.rules} + ) + + +class ServiceMonitor(KubernetesResource): + kind = "ServiceMonitor" + api_version = "monitoring.coreos.com/v1" + + def new(self): + super().new() + + def body(self): + name = self.name + workload = self.workload + self.name = "{}-metrics".format(name) + + super().body() + name = self.name + config = self.config + self.root.spec.endpoints = config.service_monitors.endpoints + self.root.spec.jobLabel = name + self.root.spec.namespaceSelector.matchNames = [self.namespace] + self.root.spec.selector.matchLabels = ( + workload.root.spec.template.metadata.labels + ) diff --git a/kubernetes/rbac.py b/kubernetes/rbac.py index 62654a0..de19162 100644 --- a/kubernetes/rbac.py +++ b/kubernetes/rbac.py @@ -1,42 +1,33 @@ -from kapitan.inputs.kadet import load_from_search_paths +import logging -from .common import KubernetesResource, ResourceType +logger = logging.getLogger(__name__) -kgenlib = load_from_search_paths("generators") +from .common import KubernetesResource, kgenlib class Role(KubernetesResource): - resource_type = ResourceType( - kind="Role", api_version="rbac.authorization.k8s.io/v1", id="role" - ) - - def new(self): - super().new() + kind = "Role" + api_version = "rbac.authorization.k8s.io/v1" def body(self): super().body() config = self.config - self.root.rules = config.role.rules + self.root.rules = config["role"]["rules"] class RoleBinding(KubernetesResource): - resource_type = ResourceType( - kind="RoleBinding", - api_version="rbac.authorization.k8s.io/v1", - id="role_binding", - ) - - def new(self): - super().new() + kind = "RoleBinding" + api_version = "rbac.authorization.k8s.io/v1" def body(self): super().body() config = self.config sa = self.sa + name = config.get("name", self.name) default_role_ref = { "apiGroup": "rbac.authorization.k8s.io", "kind": "Role", - "name": config.name, + "name": name, } default_subject = [ { @@ -50,11 +41,8 @@ def body(self): class ClusterRole(KubernetesResource): - resource_type = ResourceType( - kind="ClusterRole", - api_version="rbac.authorization.k8s.io/v1", - id="cluster_role", - ) + kind = "ClusterRole" + api_version = "rbac.authorization.k8s.io/v1" def new(self): super().new() @@ -66,14 +54,8 @@ def body(self): class ClusterRoleBinding(KubernetesResource): - resource_type = ResourceType( - kind="ClusterRoleBinding", - api_version="rbac.authorization.k8s.io/v1", - id="cluster_role_binding", - ) - - def new(self): - super().new() + kind = "ClusterRoleBinding" + api_version = "rbac.authorization.k8s.io/v1" def body(self): super().body() @@ -93,3 +75,41 @@ def body(self): ] self.root.roleRef = config.get("roleRef", default_role_ref) self.root.subjects = config.get("subject", default_subject) + + +@kgenlib.register_generator(path="generators.kubernetes.service_accounts") +class ServiceAccountGenerator(kgenlib.BaseStore): + def body(self): + config = self.config + name = config.get("name", self.name) + namespace = config["namespace"] + sa = ServiceAccount(name=name, config=config) + sa.add_annotations(config.annotations) + sa.add_labels(config.labels) + + roles = config.get("roles") + objs = [sa] + if roles is not None: + role_cfg = {"role": {"rules": roles}} + r = Role(name=f"{name}-role", namespace=namespace, config=role_cfg) + rb_cfg = {"name": r.name} + rb = RoleBinding( + name=f"{name}-role-binding", namespace=namespace, config=rb_cfg, sa=sa + ) + + objs += [r, rb] + + self.add_list(objs) + + +class ServiceAccount(KubernetesResource): + kind = "ServiceAccount" + api_version = "v1" + + def new(self): + super().new() + + def body(self): + super().body() + config = self.config + self.add_annotations(config.service_account.annotations) diff --git a/kubernetes/storage.py b/kubernetes/storage.py index 2eec677..c02b9c7 100644 --- a/kubernetes/storage.py +++ b/kubernetes/storage.py @@ -1,14 +1,15 @@ +import logging + +logger = logging.getLogger(__name__) + import base64 import hashlib import logging import os -from kapitan.inputs.kadet import Dict, load_from_search_paths +from kapitan.inputs.kadet import Dict -from .common import KubernetesResource, ResourceType - -logger = logging.getLogger(__name__) -kgenlib = load_from_search_paths("generators") +from .common import KubernetesResource, kgenlib class SharedConfig(KubernetesResource): @@ -25,20 +26,18 @@ class SharedConfig(KubernetesResource): def encode_string(unencoded_string): return base64.b64encode(unencoded_string.encode("ascii")).decode("ascii") - def setup_metadata(self, inventory): - namespace = inventory.parameters.get("namespace", None) - + def setup_metadata(self): + namespace = None if self.component: namespace = self.component.get("namespace", namespace) namespace = self.config.get("namespace", namespace) if namespace: - self.add_namespace(namespace) + self.set_namespace(namespace) self.add_annotations(self.config.get("annotations", {}).copy()) self.add_labels(self.config.get("labels", {}).copy()) - self.setup_global_defaults(inventory=inventory) self.items = self.config["items"] @@ -77,9 +76,7 @@ def add_item(self, key, value, request_encode=False, stringdata=False): self.root[field][key] = self.encode_string(value) if encode else value def add_string_data(self, string_data, encode=False, stringdata=True): - for key, spec in string_data.items(): - if "value" in spec: value = spec.get("value") if "template" in spec: @@ -106,8 +103,65 @@ def versioning(self, enabled=False): class ConfigMap(SharedConfig): - resource_type = ResourceType(kind="ConfigMap", api_version="v1", id="config_map") + kind = "ConfigMap" + api_version = "v1" class Secret(SharedConfig): - resource_type = ResourceType(kind="Secret", api_version="v1", id="secret") + kind = "Secret" + api_version = "v1" + + +class ComponentConfig(ConfigMap): + config: Dict + + def body(self): + super().body() + self.setup_metadata() + self.versioning_enabled = self.config.get("versioned", False) + if getattr(self, "workload", None) and self.workload.root.metadata.name: + self.add_label("name", self.workload.root.metadata.name) + self.add_data(self.config.data) + self.add_directory(self.config.directory, encode=False) + if getattr(self, "workload", None): + self.workload.add_volumes_for_object(self) + + +class ComponentSecret(Secret): + config: Dict + + def new(self): + super().new() + + def body(self): + super().body() + self.root.type = self.config.get("type", "Opaque") + self.versioning_enabled = self.config.get("versioned", False) + if getattr(self, "workload", None) and self.workload.root.metadata.name: + self.add_label("name", self.workload.root.metadata.name) + self.setup_metadata() + if self.config.data: + self.add_data(self.config.data) + if self.config.string_data: + self.add_string_data(self.config.string_data) + self.add_directory(self.config.directory, encode=True) + if getattr(self, "workload", None): + self.workload.add_volumes_for_object(self) + + +@kgenlib.register_generator( + path="generators.kubernetes.secrets", + apply_patches=["generators.manifest.default_resource"], +) +class SecretGenerator(kgenlib.BaseStore): + def body(self): + self.add(ComponentSecret(name=self.name, config=self.config)) + + +@kgenlib.register_generator( + path="generators.kubernetes.config_maps", + apply_patches=["generators.manifest.default_resource"], +) +class ConfigGenerator(kgenlib.BaseStore): + def body(self): + self.add(ComponentConfig(name=self.name, config=self.config)) diff --git a/kubernetes/workloads.py b/kubernetes/workloads.py new file mode 100644 index 0000000..46d56ad --- /dev/null +++ b/kubernetes/workloads.py @@ -0,0 +1,702 @@ +import logging + +logger = logging.getLogger(__name__) + +from typing import Any + +from kapitan.inputs.kadet import BaseModel, BaseObj, CompileError + +from .autoscaling import ( + HorizontalPodAutoscaler, + KedaScaledObject, + PodDisruptionBudget, + VerticalPodAutoscaler, +) +from .common import KubernetesResource, kgenlib +from .networking import HealthCheckPolicy, Service +from .rbac import ServiceAccount +from .storage import ComponentConfig, ComponentSecret + + +class GenerateMultipleObjectsForClass(kgenlib.BaseStore): + """Helper to generate multiple classes + + As a convention for generators we have that if you define only one policy/config/secret configuration + for your component, then the name of that resource will be the component {name} itself. + + However if there are multiple objects being defined, then we call them: {name}-{object_name} + + This class helps achieve that for policies/config/secrets to avoid duplication. + """ + + component_config: dict + generating_class: Any + workload: Any + + def body(self): + component_config = self.component_config + name = self.name + objects_configs = self.config + generating_class = self.generating_class + workload = self.workload + + for object_name, object_config in objects_configs.items(): + if object_config == None: + raise CompileError( + f"error with '{object_name}' for component {name}: configuration cannot be empty!" + ) + + if len(objects_configs.items()) == 1: + name = f"{self.name}" + else: + name = f"{self.name}-{object_name}" + + generated_object = generating_class( + name=name, + object_name=object_name, + config=object_config, + component=component_config, + workload=workload, + ) + + self.add(generated_object) + + +class Workload(KubernetesResource): + @classmethod + def create_workflow(cls, name, config): + config = config + name = name + if config.type == "deployment": + workload = Deployment(name=name, config=config) + elif config.type == "statefulset": + workload = StatefulSet(name=name, config=config) + elif config.type == "daemonset": + workload = DaemonSet(name=name, config=config) + elif config.type == "job": + workload = Job(name=name, config=config) + else: + raise () + + workload.add_annotations(config.setdefault("annotations", {})) + workload.root.spec.template.metadata.annotations = config.get( + "pod_annotations", {} + ) + workload.add_labels(config.setdefault("labels", {})) + workload.add_volumes(config.setdefault("volumes", {})) + workload.add_volume_claims(config.setdefault("volume_claims", {})) + workload.root.spec.template.spec.securityContext = ( + config.workload_security_context + ) + workload.root.spec.minReadySeconds = config.min_ready_seconds + if config.service_account.enabled: + workload.root.spec.template.spec.serviceAccountName = ( + config.service_account.get("name", name) + ) + + container = Container(name=name, config=config) + additional_containers = [ + Container(name=name, config=config) + for name, config in config.additional_containers.items() + ] + workload.add_containers([container]) + workload.add_containers(additional_containers) + init_containers = [ + Container(name=name, config=config) + for name, config in config.init_containers.items() + ] + + workload.add_init_containers(init_containers) + if config.image_pull_secrets: + workload.root.spec.template.spec.imagePullSecrets = config.get( + "image_pull_secrets" + ) + workload.root.spec.template.spec.dnsPolicy = config.dns_policy + workload.root.spec.template.spec.terminationGracePeriodSeconds = config.get( + "grace_period", 30 + ) + + if config.node_selector: + workload.root.spec.template.spec.nodeSelector = config.node_selector + + if config.tolerations: + workload.root.spec.template.spec.tolerations = config.tolerations + + affinity = workload.root.spec.template.spec.affinity + if config.prefer_pods_in_node_with_expression and not config.node_selector: + affinity.nodeAffinity.setdefault( + "preferredDuringSchedulingIgnoredDuringExecutio", [] + ) + affinity.nodeAffinity.preferredDuringSchedulingIgnoredDuringExecution.append( + { + "preference": { + "matchExpressions": [config.prefer_pods_in_node_with_expression] + }, + "weight": 1, + } + ) + + if config.prefer_pods_in_different_nodes: + affinity.podAntiAffinity.setdefault( + "preferredDuringSchedulingIgnoredDuringExecution", [] + ) + affinity.podAntiAffinity.preferredDuringSchedulingIgnoredDuringExecution.append( + { + "podAffinityTerm": { + "labelSelector": { + "matchExpressions": [ + {"key": "app", "operator": "In", "values": [name]} + ] + }, + "topologyKey": "kubernetes.io/hostname", + }, + "weight": 1, + } + ) + + if config.prefer_pods_in_different_zones: + affinity.podAntiAffinity.setdefault( + "preferredDuringSchedulingIgnoredDuringExecution", [] + ) + affinity.podAntiAffinity.preferredDuringSchedulingIgnoredDuringExecution.append( + { + "podAffinityTerm": { + "labelSelector": { + "matchExpressions": [ + {"key": "app", "operator": "In", "values": [name]} + ] + }, + "topologyKey": "failure-domain.beta.kubernetes.io/zone", + }, + "weight": 1, + } + ) + + return workload + + def set_replicas(self, replicas): + self.root.spec.replicas = replicas + + def add_containers(self, containers): + self.root.spec.template.spec.setdefault("containers", []).extend( + [container.root for container in containers] + ) + + def add_init_containers(self, containers): + self.root.spec.template.spec.setdefault("initContainers", []).extend( + container.root for container in containers + ) + + def add_volumes(self, volumes): + for key, value in volumes.items(): + kgenlib.merge({"name": key}, value) + self.root.spec.template.spec.setdefault("volumes", []).append(value) + + def add_volume_claims(self, volume_claims): + self.root.spec.setdefault("volumeClaimTemplates", []) + for key, value in volume_claims.items(): + kgenlib.merge({"metadata": {"name": key, "labels": {"name": key}}}, value) + self.root.spec.volumeClaimTemplates += [value] + + def add_volumes_for_object(self, object): + object_name = object.object_name + rendered_name = object.rendered_name + + if type(object) == ComponentConfig: + key = "configMap" + name_key = "name" + else: + key = "secret" + name_key = "secretName" + + template = self.root.spec.template + if isinstance(self, CronJob): + template = self.root.spec.jobTemplate.spec.template + + template.spec.setdefault("volumes", []).append( + { + "name": object_name, + key: { + "defaultMode": object.config.get("default_mode", 420), + name_key: rendered_name, + "items": [ + {"key": value, "path": value} + for value in object.config.get("items", []) + ], + }, + } + ) + + +class Deployment(Workload): + kind = "Deployment" + api_version = "apps/v1" + + def body(self): + default_strategy = { + "type": "RollingUpdate", + "rollingUpdate": {"maxSurge": "25%", "maxUnavailable": "25%"}, + } + super().body() + config = self.config + self.root.spec.template.metadata.setdefault("labels", {}).update( + config.labels + self.root.metadata.labels + ) + self.root.spec.selector.setdefault("matchLabels", {}).update( + config.labels + self.root.metadata.labels + ) + self.root.spec.template.spec.restartPolicy = config.get( + "restart_policy", "Always" + ) + if "host_network" in config: + self.root.spec.template.spec.hostNetwork = config.host_network + if "host_pid" in config: + self.root.spec.template.spec.hostPID = config.host_pid + self.root.spec.strategy = config.get("update_strategy", default_strategy) + self.root.spec.revisionHistoryLimit = config.revision_history_limit + self.root.spec.progressDeadlineSeconds = ( + config.deployment_progress_deadline_seconds + ) + self.set_replicas(config.get("replicas", 1)) + + +class StatefulSet(Workload): + kind = "StatefulSet" + api_version = "apps/v1" + + def body(self): + default_strategy = {} + update_strategy = {"rollingUpdate": {"partition": 0}, "type": "RollingUpdate"} + + super().body() + name = self.name + config = self.config + self.root.spec.template.metadata.setdefault("labels", {}).update( + config.labels + self.root.metadata.labels + ) + self.root.spec.selector.setdefault("matchLabels", {}).update( + config.labels + self.root.metadata.labels + ) + self.root.spec.template.spec.restartPolicy = config.get( + "restart_policy", "Always" + ) + if "host_network" in config: + self.root.spec.template.spec.hostNetwork = config.host_network + if "host_pid" in config: + self.root.spec.template.spec.hostPID = config.host_pid + self.root.spec.revisionHistoryLimit = config.revision_history_limit + self.root.spec.strategy = config.get("strategy", default_strategy) + self.root.spec.updateStrategy = config.get("update_strategy", update_strategy) + self.root.spec.serviceName = config.service.get("service_name", name) + self.set_replicas(config.get("replicas", 1)) + + +class DaemonSet(Workload): + kind = "DaemonSet" + api_version = "apps/v1" + + def body(self): + super().body() + config = self.config + self.root.spec.template.metadata.setdefault("labels", {}).update( + config.labels + self.root.metadata.labels + ) + self.root.spec.selector.setdefault("matchLabels", {}).update( + config.labels + self.root.metadata.labels + ) + self.root.spec.template.spec.restartPolicy = config.get( + "restart_policy", "Always" + ) + if "host_network" in config: + self.root.spec.template.spec.hostNetwork = config.host_network + if "host_pid" in config: + self.root.spec.template.spec.hostPID = config.host_pid + self.root.spec.revisionHistoryLimit = config.revision_history_limit + self.root.spec.progressDeadlineSeconds = ( + config.deployment_progress_deadline_seconds + ) + + +class Job(Workload): + kind = "Job" + api_version = "batch/v1" + + def body(self): + super().body() + config = self.config + self.root.spec.template.metadata.setdefault("labels", {}).update( + config.labels + self.root.metadata.labels + ) + self.root.spec.template.spec.restartPolicy = config.get( + "restart_policy", "Never" + ) + if "host_network" in config: + self.root.spec.template.spec.hostNetwork = config.host_network + if "host_pid" in config: + self.root.spec.template.spec.hostPID = config.host_pid + self.root.spec.backoffLimit = config.get("backoff_limit", 1) + self.root.spec.completions = config.get("completions", 1) + self.root.spec.parallelism = config.get("parallelism", 1) + + +class CronJob(Workload): + kind = "CronJob" + api_version = "batch/v1" + job: Job + + def body(self): + super().body() + config = self.config + job = self.job + self.root.metadata = job.root.metadata + self.root.spec.jobTemplate.spec = job.root.spec + self.root.spec.schedule = config.schedule + + +class Container(BaseModel): + def new(self): + name: str + config: dict + + @staticmethod + def find_key_in_config(key, configs): + for name, config in configs.items(): + if key in config.data.keys(): + return name + raise ( + BaseException( + "Unable to find key {} in your configs definitions".format(key) + ) + ) + + def process_envs(self, config): + name = self.name + + for env_name, value in sorted(config.env.items()): + if isinstance(value, dict): + if "fieldRef" in value: + self.root.setdefault("env", []).append( + {"name": env_name, "valueFrom": value} + ) + elif "secretKeyRef" in value: + if "name" not in value["secretKeyRef"]: + config_name = self.find_key_in_config( + value["secretKeyRef"]["key"], config.secrets + ) + # TODO(ademaria) I keep repeating this logic. Refactor. + if len(config.secrets.keys()) == 1: + value["secretKeyRef"]["name"] = name + else: + value["secretKeyRef"]["name"] = "{}-{}".format( + name, config_name + ) + + self.root.setdefault("env", []).append( + {"name": env_name, "valueFrom": value} + ) + if "configMapKeyRef" in value: + if "name" not in value["configMapKeyRef"]: + config_name = self.find_key_in_config( + value["configMapKeyRef"]["key"], config.config_maps + ) + # TODO(ademaria) I keep repeating this logic. Refactor. + if len(config.config_maps.keys()) == 1: + value["configMapKeyRef"]["name"] = name + else: + value["configMapKeyRef"]["name"] = "{}-{}".format( + name, config_name + ) + + self.root.setdefault("env", []).append( + {"name": env_name, "valueFrom": value} + ) + else: + self.root.setdefault("env", []).append( + {"name": env_name, "value": str(value)} + ) + + def add_volume_mounts_from_configs(self): + name = self.name + config = self.config + configs = config.config_maps.items() + secrets = config.secrets.items() + for object_name, spec in configs: + if spec is None: + raise CompileError( + f"error with '{object_name}' for component {name}: configuration cannot be empty!" + ) + + if "mount" in spec: + self.root.setdefault("volumeMounts", []) + self.root.volumeMounts += [ + { + "mountPath": spec.mount, + "readOnly": spec.get("readOnly", None), + "name": object_name, + "subPath": spec.subPath, + } + ] + for object_name, spec in secrets: + if spec is None: + raise CompileError( + f"error with '{object_name}' for component {name}: configuration cannot be empty!" + ) + + if "mount" in spec: + self.root.setdefault("volumeMounts", []).append( + { + "mountPath": spec.mount, + "readOnly": spec.get("readOnly", None), + "name": object_name, + "subPath": spec.subPath, + } + ) + + def add_volume_mounts(self, volume_mounts): + for key, value in volume_mounts.items(): + kgenlib.merge({"name": key}, value) + self.root.setdefault("volumeMounts", []).append(value) + + @staticmethod + def create_probe(probe_definition): + probe = BaseObj() + if "type" in probe_definition: + probe.root.initialDelaySeconds = probe_definition.get( + "initial_delay_seconds", 0 + ) + probe.root.periodSeconds = probe_definition.get("period_seconds", 10) + probe.root.timeoutSeconds = probe_definition.get("timeout_seconds", 5) + probe.root.successThreshold = probe_definition.get("success_threshold", 1) + probe.root.failureThreshold = probe_definition.get("failure_threshold", 3) + + if probe_definition.type == "http": + probe.root.httpGet.scheme = probe_definition.get("scheme", "HTTP") + probe.root.httpGet.port = probe_definition.get("port", 80) + probe.root.httpGet.path = probe_definition.path + probe.root.httpGet.httpHeaders = probe_definition.httpHeaders + if probe_definition.type == "tcp": + probe.root.tcpSocket.port = probe_definition.port + if probe_definition.type == "command": + probe.root.exec.command = probe_definition.command + return probe.root + + def body(self): + name = self.name + config = self.config + + self.root.name = name + self.root.image = config.image + self.root.imagePullPolicy = config.get("pull_policy", "IfNotPresent") + if config.lifecycle: + self.root.lifecycle = config.lifecycle + self.root.resources = config.resources + self.root.args = config.args + self.root.command = config.command + # legacy container.security + if config.security: + self.root.securityContext.allowPrivilegeEscalation = ( + config.security.allow_privilege_escalation + ) + self.root.securityContext.runAsUser = config.security.user_id + else: + self.root.securityContext = config.security_context + self.add_volume_mounts_from_configs() + self.add_volume_mounts(config.volume_mounts) + + for name, port in sorted(config.ports.items()): + self.root.setdefault("ports", []) + self.root.ports.append( + { + "containerPort": port.get("container_port", port.service_port), + "name": name, + "protocol": port.get("protocol", "TCP"), + } + ) + + self.root.startupProbe = self.create_probe(config.healthcheck.startup) + self.root.livenessProbe = self.create_probe(config.healthcheck.liveness) + self.root.readinessProbe = self.create_probe(config.healthcheck.readiness) + self.process_envs(config) + + +class PodSecurityPolicy(KubernetesResource): + kind = "PodSecurityPolicy" + api_version = "policy/v1beta1" + workload: Workload + + def new(self): + super().new() + + def body(self): + super().body() + config = self.config + self.root.spec = config.pod_security_policy.spec + # Merge Dicts into PSP Annotations + self.root.metadata.annotations = { + **config.get("annotations", {}), + **config.pod_security_policy.get("annotations", {}), + } + # Merge Dicts into PSP Labels + self.root.metadata.labels = { + **config.get("labels", {}), + **config.pod_security_policy.get("labels", {}), + } + + +@kgenlib.register_generator( + path="components", + apply_patches=[ + "generators.manifest.default_config", + "applications.{application}.component_defaults", + ], +) +class Components(kgenlib.BaseStore): + def body(self): + name = self.name + config = self.config + workload = Workload.create_workflow(name=name, config=config) + + logging.debug(f"Generating component {name} from {config}") + if config.schedule: + workload = CronJob(name=name, config=config, job=workload) + + workload.add_label("app.kapicorp.dev/component", name) + + configs = GenerateMultipleObjectsForClass( + name=name, + component_config=config, + generating_class=ComponentConfig, + config=config.config_maps, + workload=workload, + ) + + map(lambda x: x.add_label("app.kapicorp.dev/component", name), configs) + + secrets = GenerateMultipleObjectsForClass( + name=name, + component_config=config, + generating_class=ComponentSecret, + config=config.secrets, + workload=workload, + ) + + map(lambda x: x.add_label("app.kapicorp.dev/component", name), secrets) + + self.add(workload) + self.add(configs) + self.add(secrets) + + if config.pdb_min_available: + pdb = PodDisruptionBudget(name=name, config=config, workload=workload) + pdb.add_label("app.kapicorp.dev/component", name) + self.add(pdb) + + if config.hpa: + hpa = HorizontalPodAutoscaler(name=name, config=config, workload=workload) + hpa.add_label("app.kapicorp.dev/component", name) + self.add(hpa) + + if config.get("vpa", False): + vpa = VerticalPodAutoscaler(name=name, config=config, workload=workload) + vpa.add_label("app.kapicorp.dev/component", name) + self.add(vpa) + + if config.keda_scaled_object: + scaled_object = KedaScaledObject( + name=name, config=config, workload=workload + ) + scaled_object.add_label("app.kapicorp.dev/component", name) + self.add(scaled_object) + + if config.type != "job": + if config.pdb_min_available or config.auto_pdb: + pdb = PodDisruptionBudget(name=name, config=config, workload=workload) + pdb.add_label("app.kapicorp.dev/component", name) + self.add(pdb) + + if config.istio_policy: + istio_policy = IstioPolicy(name=name, config=config, workload=workload) + istio_policy.add_label("app.kapicorp.dev/component", name) + self.add(istio_policy) + + if config.pod_security_policy: + psp = PodSecurityPolicy(name=name, config=config, workload=workload) + psp.add_label("app.kapicorp.dev/component", name) + self.add(psp) + + if config.service: + service = Service( + name=name, + config=config, + workload=workload, + service_spec=config.service, + ) + service.add_label("app.kapicorp.dev/component", name) + + self.add(service) + + if config.additional_services: + for service_name, service_spec in config.additional_services.items(): + service = Service( + name=service_name, + config=config, + workload=workload, + service_spec=service_spec, + ) + service.add_label("app.kapicorp.dev/component", name) + self.add(service) + + if config.network_policies: + policies = GenerateMultipleObjectsForClass( + name=name, + component_config=config, + generating_class=NetworkPolicy, + config=config.network_policies, + workload=workload, + ) + map(lambda x: x.add_label("app.kapicorp.dev/component", name), policies) + self.add(policies) + + if config.webhooks: + webhooks = MutatingWebhookConfiguration(name=name, config=config) + webhooks.add_label("app.kapicorp.dev/component", name) + self.add(webhooks) + + if config.service_monitors: + service_monitor = ServiceMonitor( + name=name, config=config, workload=workload + ) + service_monitor.add_label("app.kapicorp.dev/component", name) + self.add(service_monitor) + + if config.prometheus_rules: + prometheus_rule = PrometheusRule(name=name, config=config) + prometheus_rule.add_label("app.kapicorp.dev/component", name) + self.add(prometheus_rule) + + if config.service_account.get("create", False): + sa_name = config.service_account.get("name", name) + sa = ServiceAccount(name=sa_name, config=config) + sa.add_label("app.kapicorp.dev/component", name) + self.add(sa) + + if config.role: + role = Role(name=name, config=config) + role.add_label("app.kapicorp.dev/component", name) + self.add(role) + role_binding = RoleBinding(name=name, config=config, sa=sa) + role_binding.add_label("app.kapicorp.dev/component", name) + self.add(role_binding) + + if config.cluster_role: + cluster_role = ClusterRole(name=name, config=config) + self.add(cluster_role) + cluster_role.add_label("app.kapicorp.dev/component", name) + cluster_role_binding = ClusterRoleBinding(name=name, config=config, sa=sa) + cluster_role_binding.add_label("app.kapicorp.dev/component", name) + self.add(cluster_role_binding) + + if config.backend_config: + backend_config = BackendConfig(name=name, config=config) + backend_config.add_label("app.kapicorp.dev/component", name) + self.add(backend_config) diff --git a/lib/generators/__init__.py b/lib/kgenlib/__init__.py similarity index 63% rename from lib/generators/__init__.py rename to lib/kgenlib/__init__.py index ec8915c..a5e2020 100644 --- a/lib/generators/__init__.py +++ b/lib/kgenlib/__init__.py @@ -1,43 +1,72 @@ import contextvars -import functools import logging from enum import Enum -from types import FunctionType from typing import List import yaml from box.exceptions import BoxValueError from kapitan.cached import args from kapitan.inputs.helm import HelmChart -from kapitan.inputs.kadet import BaseModel, BaseObj, CompileError, Dict, current_target +from kapitan.inputs.kadet import ( + BaseModel, + BaseObj, + CompileError, + Dict, + current_target, + inventory_global, +) from kapitan.utils import render_jinja2_file logger = logging.getLogger(__name__) search_paths = args.get("search_paths") registered_generators = contextvars.ContextVar( - "current registered_generators in thread" + "current registered_generators in thread", default={} ) target = current_target.get() -registered_generators.set({}) + + +class DeleteContent(Exception): + # Raised when a content should be deleted + pass + + +def patch_config(config: Dict, inventory: Dict, inventory_path: str) -> None: + """Apply patch to config""" + patch = findpath(inventory, inventory_path, {}) + logger.debug(f"Applying patch {inventory_path} : {patch}") + merge(patch, config) def register_function(func, params): + target = current_target.get() logging.debug( f"Registering generator {func.__name__} with params {params} for target {target}" ) + my_dict = registered_generators.get() - my_dict.setdefault(target, []).append((func, params)) + generator_list = my_dict.get(target, []) + generator_list.append((func, params)) + + logging.debug( + f"Currently registered {len(generator_list)} generators for target {target}" + ) + + my_dict[target] = generator_list + registered_generators.set(my_dict) def merge(source, destination): for key, value in source.items(): if isinstance(value, dict): - node = destination.setdefault(key, value) + node = destination.get(key, None) if node is None: destination[key] = value + elif len(node) == 0: + # node is set to an empty dict on purpose as a way to override the value + pass else: merge(value, node) else: @@ -50,6 +79,27 @@ def render_jinja(filename, ctx): return render_jinja2_file(filename, ctx, search_paths=search_paths) +def findpaths_by_property(obj: dict, property: str) -> dict: + """ + Traverses the whole dictionary looking of objects containing a given property. + + Args: + obj: the dictionary to scan for a given property + property: the key to look for in a dictionary + + Returns: + A dictionary with found objects. Keys in the dictionary are the "name" properties of these objects. + """ + res = {} + for k, v in obj.items(): + if k == property: + res[obj["name"]] = obj + if isinstance(v, dict): + sub_results = findpaths_by_property(v, property) + res = {**res, **sub_results} + return res + + def findpath(obj, path, default={}): value = default if path: @@ -63,6 +113,10 @@ def findpath(obj, path, default={}): if value is not None: return value logging.info(f"Key {e} not found in {obj}: ignoring") + except AttributeError as e: + if value is not None: + return value + logging.info(f"Attribute {e} not found in {obj}: ignoring") if len(path_parts) == 1: return value @@ -92,6 +146,9 @@ class BaseContent(BaseModel): content_type: ContentType = ContentType.YAML filename: str = "output" + def body(self): + pass + @classmethod def from_baseobj(cls, baseobj: BaseObj): """Return a BaseContent initialised with baseobj.""" @@ -145,7 +202,7 @@ def mutate(self, mutations: List): if action == "delete": for condition in conditions: if self.match(condition["conditions"]): - self = None + raise DeleteContent(f"Deleting {self} because of {condition}") if action == "bundle": for condition in conditions: if self.match(condition["conditions"]): @@ -165,7 +222,7 @@ def match(self, match_conditions): return True def patch(self, patch): - self.root.merge_update(Dict(patch)) + self.root.merge_update(Dict(patch), box_merge_lists="extend") class BaseStore(BaseModel): @@ -221,6 +278,9 @@ def process_mutations(self, mutations: Dict): for content in self.get_content_list(): try: content.mutate(mutations) + except DeleteContent as e: + logging.debug(e) + self.content_list.remove(content) except: raise CompileError(f"Error when processing mutations on {content}") @@ -236,8 +296,14 @@ def dump(self, output_filename=None, already_processed=False): output_format = output_filename else: output_format = getattr(content, "filename", "output") - filename = output_format.format(content=content) + file_content_list = self.root.get(filename, []) + if content in file_content_list: + logger.debug( + f"Skipping duplicated content content for reason 'Duplicate name {content.name} for {filename}'" + ) + continue + self.root.setdefault(filename, []).append(content) return super().dump() @@ -245,9 +311,13 @@ def dump(self, output_filename=None, already_processed=False): class BaseGenerator: def __init__( - self, inventory: Dict, store: BaseStore = None, defaults_path: str = None + self, + inventory: Dict, + store: BaseStore = None, + defaults_path: str = None, ) -> None: self.inventory = inventory + self.global_inventory = inventory_global() self.generator_defaults = findpath(self.inventory, defaults_path) logging.debug(f"Setting {self.generator_defaults} as generator defaults") @@ -256,17 +326,27 @@ def __init__( else: self.store = store() - def expand_and_run(self, func, params): - inventory = self.inventory + def expand_and_run(self, func, params, inventory=None): + if inventory == None: + inventory = self.inventory + path = params.get("path") + activation_property = params.get("activation_property") patches = params.get("apply_patches", []) - configs = findpath(inventory.parameters, path) + if path is not None: + configs = findpath(inventory.parameters, path) + elif activation_property is not None: + configs = findpaths_by_property(inventory.parameters, activation_property) + else: + raise CompileError( + f"generator need to provide either 'path' or 'activation_property'" + ) + if configs: logging.debug( f"Found {len(configs)} configs to generate at {path} for target {target}" ) - - for name, config in configs.items(): + for config_id, config in configs.items(): patched_config = Dict(config) patch_paths_to_apply = patches patches_applied = [] @@ -282,14 +362,18 @@ def expand_and_run(self, func, params): patched_config = merge(patch, patched_config) local_params = { - "name": name, + "id": config_id, + "name": patched_config.get("name", config_id), "config": patched_config, "patches_applied": patches_applied, "original_config": config, "defaults": self.generator_defaults, + "inventory": inventory, + "global_inventory": self.global_inventory, + "target": current_target.get(), } logging.debug( - f"Running class {func.__name__} with params {local_params.keys()} and name {name}" + f"Running class {func.__name__} for {config_id} with params {local_params.keys()}" ) self.store.add(func(**local_params)) @@ -299,7 +383,31 @@ def generate(self): f"{len(generators)} classes registered as generators for target {target}" ) for func, params in generators: - - logging.debug(f"Expanding {func.__name__} with params {params}") - self.expand_and_run(func=func, params=params) + activation_path = params.get("activation_path", False) + global_generator = params.get("global_generator", False) + if activation_path and global_generator: + logger.debug( + f"Running global generator {func.__name__} with activation path {activation_path}" + ) + if not findpath(self.inventory.parameters, activation_path): + logger.debug( + f"Skipping global generator {func.__name__} with params {params}" + ) + continue + else: + logger.debug( + f"Running global generator {func.__name__} with params {params}" + ) + + for _, inventory in self.global_inventory.items(): + self.expand_and_run( + func=func, params=params, inventory=inventory + ) + elif not global_generator: + logger.debug(f"Expanding {func.__name__} with params {params}") + self.expand_and_run(func=func, params=params) + else: + logger.debug( + f"Skipping generator {func.__name__} with params {params} because not global and no activation path" + ) return self.store diff --git a/rabbitmq/__init__.py b/rabbitmq/__init__.py deleted file mode 100644 index 1c6bc3d..0000000 --- a/rabbitmq/__init__.py +++ /dev/null @@ -1,866 +0,0 @@ -import base64 -import hashlib -import os - -from kapitan.cached import args -from kapitan.inputs.kadet import BaseObj, inventory -from kapitan.utils import render_jinja2_file - -search_paths = args.get("search_paths") - -from . import k8s - - -def j2(filename, ctx): - return render_jinja2_file(filename, ctx, search_paths=search_paths) - - -inv = inventory(lazy=True) - - -def merge(source, destination): - for key, value in source.items(): - if isinstance(value, dict): - node = destination.setdefault(key, value) - if node is None: - destination[key] = value - else: - merge(value, node) - else: - destination[key] = destination.setdefault(key, value) - - return destination - - -class RabbitmqCluster(k8s.Base): - def new(self): - self.kwargs.apiVersion = "rabbitmq.com/v1beta1" - self.kwargs.kind = "RabbitmqCluster" - - self.kwargs.finalizers = list( - "deletion.finalizers.rabbitmqclusters.rabbitmq.com" - ) - super().new() - self.need("name") - - def body(self): - super().body() - - self.add_namespace(inv.parameters.rabbitmq_namespace) - - rabbitmqcluster = self.kwargs.rabbitmqcluster - self.add_annotations(rabbitmqcluster.get("annotations", {})) - self.add_labels(rabbitmqcluster.get("labels", {})) - - if rabbitmqcluster.replicas: - self.root.spec.replicas = rabbitmqcluster.replicas - - if rabbitmqcluster.image: - self.root.spec.image = rabbitmqcluster.image - - if rabbitmqcluster.imagePullSecrets: - self.root.spec.imagePullSecrets = rabbitmqcluster.imagePullSecrets - - if rabbitmqcluster.service: - self.root.spec.service = rabbitmqcluster.service - - if rabbitmqcluster.persistence: - self.root.spec.persistence = rabbitmqcluster.persistence - - if rabbitmqcluster.resources: - self.root.spec.resources = rabbitmqcluster.resources - - if rabbitmqcluster.affinity: - self.root.spec.resources = rabbitmqcluster.affinity - - if rabbitmqcluster.tolerations: - self.root.spec.tolerations = rabbitmqcluster.tolerations - - if rabbitmqcluster.rabbitmq: - self.root.spec.rabbitmq = rabbitmqcluster.rabbitmq - - if rabbitmqcluster.tls: - self.root.spec.tls = rabbitmqcluster.tls - - if rabbitmqcluster.skipPostDeploySteps: - self.root.spec.skipPostDeploySteps = rabbitmqcluster.skipPostDeploySteps - - if rabbitmqcluster.terminationGracePeriodSeconds: - self.root.spec.terminationGracePeriodSeconds = ( - rabbitmqcluster.terminationGracePeriodSeconds - ) - - if rabbitmqcluster.secretBackend: - self.root.spec.secretBackend = rabbitmqcluster.secretBackend - - if rabbitmqcluster.override: - self.root.spec.override = rabbitmqcluster.override - - -class RabbitmqQueue(k8s.Base): - def new(self): - self.kwargs.apiVersion = "rabbitmq.com/v1beta1" - self.kwargs.kind = "Queue" - - self.kwargs.finalizers = list( - "deletion.finalizers.rabbitmqclusters.rabbitmq.com" - ) - super().new() - self.need("name") - - def body(self): - super().body() - - self.add_namespace(inv.parameters.rabbitmq_namespace) - - rabbitmq_queue = self.kwargs.rabbitmq_queue - self.add_annotations(rabbitmq_queue.get("annotations", {})) - self.add_labels(rabbitmq_queue.get("labels", {})) - - if rabbitmq_queue.name: - self.root.spec.name = rabbitmq_queue.name - - if type(rabbitmq_queue.autoDelete) is bool: - self.root.spec.autoDelete = rabbitmq_queue.autoDelete - - if type(rabbitmq_queue.durable) is bool: - self.root.spec.durable = rabbitmq_queue.durable - - if rabbitmq_queue.rabbitmqClusterReference: - self.root.spec.rabbitmqClusterReference = ( - rabbitmq_queue.rabbitmqClusterReference - ) - - if rabbitmq_queue.arguments: - self.root.spec.arguments = rabbitmq_queue.arguments - - -class RabbitmqPolicy(k8s.Base): - def new(self): - self.kwargs.apiVersion = "rabbitmq.com/v1beta1" - self.kwargs.kind = "Policy" - - self.kwargs.finalizers = list( - "deletion.finalizers.rabbitmqclusters.rabbitmq.com" - ) - super().new() - self.need("name") - - def body(self): - super().body() - - self.add_namespace(inv.parameters.rabbitmq_namespace) - - rabbitmq_policy = self.kwargs.rabbitmq_policy - self.add_annotations(rabbitmq_policy.get("annotations", {})) - self.add_labels(rabbitmq_policy.get("labels", {})) - - if rabbitmq_policy.name: - self.root.spec.name = rabbitmq_policy.name - - if rabbitmq_policy.pattern: - self.root.spec.pattern = rabbitmq_policy.pattern - - if rabbitmq_policy.applyTo: - self.root.spec.applyTo = rabbitmq_policy.applyTo - - if rabbitmq_policy.definition: - self.root.spec.definition = rabbitmq_policy.definition - - if rabbitmq_policy.rabbitmqClusterReference: - self.root.spec.rabbitmqClusterReference = ( - rabbitmq_policy.rabbitmqClusterReference - ) - - if rabbitmq_policy.priority: - self.root.spec.priority = rabbitmq_policy.priority - - if rabbitmq_policy.vhost: - self.root.spec.vhost = rabbitmq_policy.vhost - - -class RabbitmqExchange(k8s.Base): - def new(self): - self.kwargs.apiVersion = "rabbitmq.com/v1beta1" - self.kwargs.kind = "Exchange" - - self.kwargs.finalizers = list( - "deletion.finalizers.rabbitmqclusters.rabbitmq.com" - ) - super().new() - self.need("name") - - def body(self): - super().body() - - self.add_namespace(inv.parameters.rabbitmq_namespace) - - rabbitmq_exchange = self.kwargs.rabbitmq_exchange - self.add_annotations(rabbitmq_exchange.get("annotations", {})) - self.add_labels(rabbitmq_exchange.get("labels", {})) - - if rabbitmq_exchange.name: - self.root.spec.name = rabbitmq_exchange.name - - if rabbitmq_exchange.type: - self.root.spec.type = rabbitmq_exchange.type - - if type(rabbitmq_exchange.autoDelete) is bool: - self.root.spec.autoDelete = rabbitmq_exchange.autoDelete - - if type(rabbitmq_exchange.durable) is bool: - self.root.spec.durable = rabbitmq_exchange.durable - - if rabbitmq_exchange.rabbitmqClusterReference: - self.root.spec.rabbitmqClusterReference = ( - rabbitmq_exchange.rabbitmqClusterReference - ) - - if rabbitmq_exchange.arguments: - self.root.spec.arguments = rabbitmq_exchange.arguments - - if rabbitmq_exchange.vhost: - self.root.spec.vhost = rabbitmq_exchange.vhost - - -class RabbitmqBinding(k8s.Base): - def new(self): - self.kwargs.apiVersion = "rabbitmq.com/v1beta1" - self.kwargs.kind = "Binding" - - self.kwargs.finalizers = list( - "deletion.finalizers.rabbitmqclusters.rabbitmq.com" - ) - super().new() - self.need("name") - - def body(self): - super().body() - - self.add_namespace(inv.parameters.rabbitmq_namespace) - - rabbitmq_binding = self.kwargs.rabbitmq_binding - self.add_annotations(rabbitmq_binding.get("annotations", {})) - self.add_labels(rabbitmq_binding.get("labels", {})) - - if rabbitmq_binding.source: - self.root.spec.source = rabbitmq_binding.source - - if rabbitmq_binding.destination: - self.root.spec.destination = rabbitmq_binding.destination - - if rabbitmq_binding.destinationType: - self.root.spec.destinationType = rabbitmq_binding.destinationType - - if rabbitmq_binding.rabbitmqClusterReference: - self.root.spec.rabbitmqClusterReference = ( - rabbitmq_binding.rabbitmqClusterReference - ) - - if rabbitmq_binding.routingKey: - self.root.spec.routingKey = rabbitmq_binding.routingKey - - if rabbitmq_binding.arguments: - self.root.spec.arguments = rabbitmq_binding.arguments - - if rabbitmq_binding.vhost: - self.root.spec.vhost = rabbitmq_binding.vhost - - -class RabbitmqUser(k8s.Base): - def new(self): - self.kwargs.apiVersion = "rabbitmq.com/v1beta1" - self.kwargs.kind = "User" - - self.kwargs.finalizers = list( - "deletion.finalizers.rabbitmqclusters.rabbitmq.com" - ) - super().new() - self.need("name") - - def body(self): - super().body() - - self.add_namespace(inv.parameters.rabbitmq_namespace) - - rabbitmq_user = self.kwargs.rabbitmq_user - self.add_annotations(rabbitmq_user.get("annotations", {})) - self.add_labels(rabbitmq_user.get("labels", {})) - - if rabbitmq_user.tags: - self.root.spec.tags = rabbitmq_user.tags - - if rabbitmq_user.rabbitmqClusterReference: - self.root.spec.rabbitmqClusterReference = ( - rabbitmq_user.rabbitmqClusterReference - ) - - if rabbitmq_user.importCredentialsSecret: - self.root.spec.importCredentialsSecret = ( - rabbitmq_user.importCredentialsSecret - ) - - -class RabbitmqPermission(k8s.Base): - def new(self): - self.kwargs.apiVersion = "rabbitmq.com/v1beta1" - self.kwargs.kind = "Permission" - - self.kwargs.finalizers = list( - "deletion.finalizers.rabbitmqclusters.rabbitmq.com" - ) - super().new() - self.need("name") - - def body(self): - super().body() - - self.add_namespace(inv.parameters.rabbitmq_namespace) - - rabbitmq_permission = self.kwargs.rabbitmq_permission - self.add_annotations(rabbitmq_permission.get("annotations", {})) - self.add_labels(rabbitmq_permission.get("labels", {})) - - if rabbitmq_permission.vhost: - self.root.spec.vhost = rabbitmq_permission.vhost - - if rabbitmq_permission.user: - self.root.spec.user = rabbitmq_permission.user - - if rabbitmq_permission.permissions: - self.root.spec.permissions = rabbitmq_permission.permissions - - if rabbitmq_permission.rabbitmqClusterReference: - self.root.spec.rabbitmqClusterReference = ( - rabbitmq_permission.rabbitmqClusterReference - ) - - if rabbitmq_permission.userReference: - self.root.spec.userReference = rabbitmq_permission.userReference - - -class RabbitmqVhost(k8s.Base): - def new(self): - self.kwargs.apiVersion = "rabbitmq.com/v1beta1" - self.kwargs.kind = "Vhost" - - self.kwargs.finalizers = list( - "deletion.finalizers.rabbitmqclusters.rabbitmq.com" - ) - super().new() - self.need("name") - - def body(self): - super().body() - - self.add_namespace(inv.parameters.rabbitmq_namespace) - - rabbitmq_vhost = self.kwargs.rabbitmq_vhost - self.add_annotations(rabbitmq_vhost.get("annotations", {})) - self.add_labels(rabbitmq_vhost.get("labels", {})) - - if rabbitmq_vhost.name: - self.root.spec.name = rabbitmq_vhost.name - - if rabbitmq_vhost.rabbitmqClusterReference: - self.root.spec.rabbitmqClusterReference = ( - rabbitmq_vhost.rabbitmqClusterReference - ) - - if rabbitmq_vhost.tags: - self.root.spec.tags = rabbitmq_vhost.tags - - if rabbitmq_vhost.tracing: - self.root.spec.tracing = rabbitmq_vhost.tracing - - -class RabbitmqFederation(k8s.Base): - def new(self): - self.kwargs.apiVersion = "rabbitmq.com/v1beta1" - self.kwargs.kind = "Federation" - - self.kwargs.finalizers = list( - "deletion.finalizers.rabbitmqclusters.rabbitmq.com" - ) - super().new() - self.need("name") - - def body(self): - super().body() - - self.add_namespace(inv.parameters.rabbitmq_namespace) - - rabbitmq_federation = self.kwargs.rabbitmq_federation - self.add_annotations(rabbitmq_federation.get("annotations", {})) - self.add_labels(rabbitmq_federation.get("labels", {})) - - if rabbitmq_federation.name: - self.root.spec.name = rabbitmq_federation.name - - if rabbitmq_federation.uriSecret: - self.root.spec.uriSecret = rabbitmq_federation.uriSecret - - if rabbitmq_federation.ackMode: - self.root.spec.ackMode = rabbitmq_federation.ackMode - - if rabbitmq_federation.rabbitmqClusterReference: - self.root.spec.rabbitmqClusterReference = ( - rabbitmq_federation.rabbitmqClusterReference - ) - - if rabbitmq_federation.exchange: - self.root.sec.exchange = rabbitmq_federation.exchange - - if rabbitmq_federation.expires: - self.root.spec.expires = rabbitmq_federation.expires - - if rabbitmq_federation.maxHops: - self.root.spec.maxHops = rabbitmq_federation.maxHops - - if rabbitmq_federation.messageTTL: - self.root.spec.messageTTL = rabbitmq_federation.messageTTL - - if rabbitmq_federation.prefetch_count: - self.root.spec.prefetch_count = rabbitmq_federation.prefetch_count - - if rabbitmq_federation.queue: - self.root.spec.queue = rabbitmq_federation.queue - - if rabbitmq_federation.reconnectDelay: - self.root.spec.reconnectDelay = rabbitmq_federation.reconnectDelay - - if rabbitmq_federation.trustUserId: - self.root.spec.trustUserId = rabbitmq_federation.trustUserId - - if rabbitmq_federation.vhost: - self.root.spec.vhost = rabbitmq_federation.vhost - - -class RabbitmqShovel(k8s.Base): - def new(self): - self.kwargs.apiVersion = "rabbitmq.com/v1beta1" - self.kwargs.kind = "Shovel" - - self.kwargs.finalizers = list( - "deletion.finalizers.rabbitmqclusters.rabbitmq.com" - ) - super().new() - self.need("name") - - def body(self): - super().body() - - self.add_namespace(inv.parameters.rabbitmq_namespace) - - rabbitmq_shovel = self.kwargs.rabbitmq_shovel - self.add_annotations(rabbitmq_shovel.get("annotations", {})) - self.add_labels(rabbitmq_shovel.get("labels", {})) - - if rabbitmq_shovel.name: - self.root.spec.name = rabbitmq_shovel.name - - if rabbitmq_shovel.uriSecret: - self.root.spec.uriSecret = rabbitmq_shovel.uriSecret - - if rabbitmq_shovel.srcQueue: - self.root.spec.srcQueue = rabbitmq_shovel.srcQueue - - if rabbitmq_shovel.destQueue: - self.root.spec.destQueue = rabbitmq_shovel.destQueue - - if rabbitmq_shovel.rabbitmqClusterReference: - self.root.spec.rabbitmqClusterReference = ( - rabbitmq_shovel.rabbitmqClusterReference - ) - - if rabbitmq_shovel.ackMode: - self.root.spec.ackMode = rabbitmq_shovel.ackMode - - if rabbitmq_shovel.addForwardHeaders: - self.root.spec.addForwardHeaders = rabbitmq_shovel.addForwardHeaders - - if rabbitmq_shovel.deleteAfter: - self.root.spec.deleteAfter = rabbitmq_shovel.deleteAfter - - if rabbitmq_shovel.destAddForwardHeaders: - self.root.spec.destAddForwardHeaders = rabbitmq_shovel.destAddForwardHeaders - - if rabbitmq_shovel.destAddTimestampHeader: - self.root.spec.destAddTimestampHeader = ( - rabbitmq_shovel.destAddTimestampHeader - ) - - if rabbitmq_shovel.destAddress: - self.root.spec.destAddress = rabbitmq_shovel.destAddress - - if rabbitmq_shovel.destApplicationProperties: - self.root.spec.destApplicationProperties = ( - rabbitmq_shovel.destApplicationProperties - ) - - if rabbitmq_shovel.destExchange: - self.root.spec.destExchange = rabbitmq_shovel.destExchange - - if rabbitmq_shovel.destExchangeKey: - self.root.spec.destExchangeKey = rabbitmq_shovel.destExchangeKey - - if rabbitmq_shovel.destProperties: - self.root.spec.destProperties = rabbitmq_shovel.destProperties - - if rabbitmq_shovel.destProtocol: - self.root.spec.destProtocol = rabbitmq_shovel.destProtocol - - if rabbitmq_shovel.destPublishProperties: - self.root.spec.destPublishProperties = rabbitmq_shovel.destPublishProperties - - if rabbitmq_shovel.prefetchCount: - self.root.spec.prefetchCount = rabbitmq_shovel.prefetchCount - - if rabbitmq_shovel.reconnectDelay: - self.root.spec.reconnectDelay = rabbitmq_shovel.reconnectDelay - - if rabbitmq_shovel.srcAddress: - self.root.spec.srcAddress = rabbitmq_shovel.srcAddress - - if rabbitmq_shovel.srcDeleteAfter: - self.root.spec.srcDeleteAfter = rabbitmq_shovel.srcDeleteAfter - - if rabbitmq_shovel.srcExchange: - self.root.spec.srcExchange = rabbitmq_shovel.srcExchange - - if rabbitmq_shovel.srcExchangeKey: - self.root.spec.srcExchangeKey = rabbitmq_shovel.srcExchangeKey - - if rabbitmq_shovel.srcPrefetchCount: - self.root.spec.srcPrefetchCount = rabbitmq_shovel.srcPrefetchCount - - if rabbitmq_shovel.srcProtocol: - self.root.spec.srcProtocol = rabbitmq_shovel.srcProtocol - - if rabbitmq_shovel.vhost: - self.root.spec.vhost = rabbitmq_shovel.vhost - - -# The following classes are required to generate Secrets + ConfigMaps -class SharedConfig: - """Shared class to use for both Secrets and ConfigMaps classes. - - containt anything needed by both classes, so that their behavious is basically the same. - Each subclass will then implement its own way of adding the data depending on their implementation. - """ - - @staticmethod - def encode_string(unencoded_string): - return base64.b64encode(unencoded_string.encode("ascii")).decode("ascii") - - def setup_metadata(self): - self.add_namespace(inv.parameters.rabbitmq_namespace) - self.add_annotations(self.config.annotations) - self.add_labels(self.config.labels) - - self.items = self.config["items"] - try: - if isinstance(self, ConfigMap): - globals = ( - inv.parameters.generators.manifest.default_config.globals.config_maps - ) - else: - globals = ( - inv.parameters.generators.manifest.default_config.globals.secrets - ) - self.add_annotations(globals.get("annotations", {})) - self.add_labels(globals.get("labels", {})) - except AttributeError: - pass - - self.versioning(self.config.get("versioned", False)) - - def add_directory(self, directory, encode=False): - stringdata = inv.parameters.get("use_tesoro", False) - if directory and os.path.isdir(directory): - for filename in os.listdir(directory): - with open(f"{directory}/{filename}", "r") as f: - file_content = f.read() - self.add_item( - filename, - file_content, - request_encode=encode, - stringdata=stringdata, - ) - - def add_data(self, data): - stringdata = inv.parameters.get("use_tesoro", False) - - for key, spec in data.items(): - encode = spec.get("b64_encode", False) - - if "value" in spec: - value = spec.get("value") - if "template" in spec: - value = j2(spec.template, spec.get("values", {})) - if "file" in spec: - with open(spec.file, "r") as f: - value = f.read() - - self.add_item(key, value, request_encode=encode, stringdata=stringdata) - - def add_string_data(self, string_data, encode=False): - stringdata = True - - for key, spec in string_data.items(): - - if "value" in spec: - value = spec.get("value") - if "template" in spec: - value = j2(spec.template, spec.get("values", {})) - if "file" in spec: - with open(spec.file, "r") as f: - value = f.read() - - self.add_item(key, value, request_encode=encode, stringdata=stringdata) - - def versioning(self, enabled=False): - if enabled: - self.hash = hashlib.sha256(str(self.root.to_dict()).encode()).hexdigest()[ - :8 - ] - self.root.metadata.name += f"-{self.hash}" - - -class ConfigMap(k8s.Base, SharedConfig): - def new(self): - self.kwargs.apiVersion = "v1" - self.kwargs.kind = "ConfigMap" - super().new() - - def body(self): - super().body() - - def add_item(self, key, value, request_encode=False, stringdata=False): - encode = request_encode - - self.root["data"][key] = self.encode_string(value) if encode else value - - -class ComponentConfig(ConfigMap, SharedConfig): - def new(self): - super().new() - self.need("config") - - def body(self): - super().body() - self.config = self.kwargs.config - - self.setup_metadata() - self.add_data(self.config.data) - self.add_directory(self.config.directory, encode=False) - - -class Secret(k8s.Base): - def new(self): - self.kwargs.apiVersion = "v1" - self.kwargs.kind = "Secret" - super().new() - - def body(self): - super().body() - - def add_item(self, key, value, request_encode=False, stringdata=False): - encode = not stringdata and request_encode - field = "stringData" if stringdata else "data" - self.root[field][key] = self.encode_string(value) if encode else value - - -class ComponentSecret(Secret, SharedConfig): - def new(self): - super().new() - self.need("config") - - def body(self): - super().body() - self.config = self.kwargs.config - self.root.type = self.config.get("type", "Opaque") - - self.setup_metadata() - if self.config.data: - self.add_data(self.config.data) - if self.config.string_data: - self.add_string_data(self.config.string_data) - self.add_directory(self.config.directory, encode=True) - - -def generate_rabbitmqcluster(input_params): - obj = BaseObj() - rabbitmqcluster_list = inv.parameters.rabbitmqcluster - for name in rabbitmqcluster_list.keys(): - rabbitmqcluster = RabbitmqCluster( - name=name, rabbitmqcluster=rabbitmqcluster_list[name] - ) - - obj.root["{}-rabbitmq".format(name)] = rabbitmqcluster - - return obj - - -def generate_rabbitmq_queue(input_params): - obj = BaseObj() - rabbitmq_queue_list = inv.parameters.rabbitmq_queue - for name in rabbitmq_queue_list.keys(): - rabbitmq_queue = RabbitmqQueue( - name=name, rabbitmq_queue=rabbitmq_queue_list[name] - ) - - obj.root["{}-rabbitmq".format(name)] = rabbitmq_queue - return obj - - -def generate_rabbitmq_policy(input_params): - obj = BaseObj() - rabbitmq_policy_list = inv.parameters.rabbitmq_policy - for name in rabbitmq_policy_list.keys(): - rabbitmq_policy = RabbitmqPolicy( - name=name, rabbitmq_policy=rabbitmq_policy_list[name] - ) - - obj.root["{}-rabbitmq".format(name)] = rabbitmq_policy - return obj - - -def generate_rabbitmq_exchange(input_params): - obj = BaseObj() - rabbitmq_exchange_list = inv.parameters.rabbitmq_exchange - for name in rabbitmq_exchange_list.keys(): - rabbitmq_exchange = RabbitmqExchange( - name=name, rabbitmq_exchange=rabbitmq_exchange_list[name] - ) - - obj.root["{}-rabbitmq".format(name)] = rabbitmq_exchange - return obj - - -def generate_rabbitmq_binding(input_params): - obj = BaseObj() - rabbitmq_binding_list = inv.parameters.rabbitmq_binding - for name in rabbitmq_binding_list.keys(): - rabbitmq_binding = RabbitmqBinding( - name=name, rabbitmq_binding=rabbitmq_binding_list[name] - ) - - obj.root["{}-rabbitmq".format(name)] = rabbitmq_binding - return obj - - -def generate_rabbitmq_user(input_params): - obj = BaseObj() - rabbitmq_user_list = inv.parameters.rabbitmq_user - for name in rabbitmq_user_list.keys(): - rabbitmq_user = RabbitmqUser(name=name, rabbitmq_user=rabbitmq_user_list[name]) - - obj.root["{}-rabbitmq".format(name)] = rabbitmq_user - return obj - - -def generate_rabbitmq_permission(input_params): - obj = BaseObj() - rabbitmq_permission_list = inv.parameters.rabbitmq_permission - for name in rabbitmq_permission_list.keys(): - rabbitmq_permission = RabbitmqPermission( - name=name, rabbitmq_permission=rabbitmq_permission_list[name] - ) - - obj.root["{}-rabbitmq".format(name)] = rabbitmq_permission - return obj - - -def generate_rabbitmq_vhost(input_params): - obj = BaseObj() - rabbitmq_vhost_list = inv.parameters.rabbitmq_vhost - for name in rabbitmq_vhost_list.keys(): - rabbitmq_vhost = RabbitmqVhost( - name=name, rabbitmq_vhost=rabbitmq_vhost_list[name] - ) - - obj.root["{}-rabbitmq".format(name)] = rabbitmq_vhost - return obj - - -def generate_rabbitmq_federation(input_params): - obj = BaseObj() - rabbitmq_federation_list = inv.parameters.rabbitmq_federation - for name in rabbitmq_federation_list.keys(): - rabbitmq_federation = RabbitmqFederation( - name=name, rabbitmq_federation=rabbitmq_federation_list[name] - ) - - obj.root["{}-rabbitmq".format(name)] = rabbitmq_federation - return obj - - -def generate_rabbitmq_shovel(input_params): - obj = BaseObj() - rabbitmq_shovel_list = inv.parameters.rabbitmq_shovel - for name in rabbitmq_shovel_list.keys(): - rabbitmq_shovel = RabbitmqShovel( - name=name, rabbitmq_shovel=rabbitmq_shovel_list[name] - ) - - obj.root["{}-rabbitmq".format(name)] = rabbitmq_shovel - return obj - - -# This function renderes an Shared-ConfigMaps + Secrets -def generate_resource_manifests(input_params): - obj = BaseObj() - - for secret_name, secret_spec in inv.parameters.generators.rabbitmq.secrets.items(): - name = secret_spec.get("name", secret_name) - secret = ComponentSecret(name=name, config=secret_spec) - obj.root[f"{name}"] = secret - - for config_name, config_spec in inv.parameters.generators.rabbitmq.configs.items(): - name = config_spec.get("name", config_name) - config = ComponentConfig(name=name, config=config_spec) - obj.root[f"{name}"] = config - return obj - - -# This function renderes all previous defined functions and returns - - -def generate_manifests(input_params): - all_manifests = BaseObj() - - rabbitmq_manifests = generate_rabbitmqcluster(input_params) - rabbitmq_queue_manifests = generate_rabbitmq_queue(input_params) - rabbitmq_policy_manifests = generate_rabbitmq_policy(input_params) - rabbitmq_exchange_manifests = generate_rabbitmq_exchange(input_params) - rabbitmq_binding_manifests = generate_rabbitmq_binding(input_params) - rabbitmq_user_manifests = generate_rabbitmq_user(input_params) - rabbitmq_permission_manifests = generate_rabbitmq_permission(input_params) - rabbitmq_vhost_manifests = generate_rabbitmq_vhost(input_params) - rabbitmq_federation_manifests = generate_rabbitmq_federation(input_params) - rabbitmq_shovel_manifests = generate_rabbitmq_shovel(input_params) - - resource_manifests = generate_resource_manifests(input_params) - - all_manifests.root.update(rabbitmq_manifests.root) - all_manifests.root.update(rabbitmq_queue_manifests.root) - all_manifests.root.update(rabbitmq_policy_manifests.root) - all_manifests.root.update(rabbitmq_exchange_manifests.root) - all_manifests.root.update(rabbitmq_binding_manifests.root) - all_manifests.root.update(rabbitmq_user_manifests.root) - all_manifests.root.update(rabbitmq_permission_manifests.root) - all_manifests.root.update(rabbitmq_vhost_manifests.root) - all_manifests.root.update(rabbitmq_federation_manifests.root) - all_manifests.root.update(rabbitmq_shovel_manifests.root) - - all_manifests.root.update(resource_manifests.root) - - return all_manifests - - -def main(input_params): - whitelisted_functions = ["generate_manifests"] - function = input_params.get("function", "generate_manifests") - if function in whitelisted_functions: - return globals()[function](input_params) diff --git a/rabbitmq/k8s.py b/rabbitmq/k8s.py deleted file mode 100644 index 2516be6..0000000 --- a/rabbitmq/k8s.py +++ /dev/null @@ -1,32 +0,0 @@ -from kapitan.inputs.kadet import BaseObj - - -class Base(BaseObj): - def new(self): - self.need("apiVersion") - self.need("kind") - self.need("name") - - def body(self): - self.root.apiVersion = self.kwargs.apiVersion - self.root.kind = self.kwargs.kind - self.name = self.kwargs.name - self.root.metadata.name = self.kwargs.get("rendered_name", self.name) - self.add_label("name", self.root.metadata.name) - - def add_labels(self, labels): - for key, value in labels.items(): - self.add_label(key, value) - - def add_label(self, key, value): - self.root.metadata.labels[key] = value - - def add_namespace(self, namespace): - self.root.metadata.namespace = namespace - - def add_annotations(self, annotations): - for key, value in annotations.items(): - self.add_annotation(key, value) - - def add_annotation(self, key, value): - self.root.metadata.annotations[key] = value diff --git a/terraform/__init__.py b/terraform/__init__.py deleted file mode 100644 index 83e2bb7..0000000 --- a/terraform/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -from kapitan.inputs.kadet import BaseObj, inventory - -inv = inventory() - - -def main(input_params): - obj = BaseObj() - generator_root_paths = input_params.get("generator_root", "sections.tf").split(".") - root = inv.parameters - - for path in generator_root_paths: - root = root.get(path, {}) - - for section_name, content in root.items(): - if section_name in ["resource", "data"]: - for resource_name, content in content.items(): - obj.root["{}.tf".format(resource_name)][section_name][ - resource_name - ] = content - else: - obj.root["{}.tf".format(section_name)][section_name] = content - return obj