diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000000..290d674d37 --- /dev/null +++ b/.flake8 @@ -0,0 +1,84 @@ +[flake8] +max-line-length = 120 +exclude = + doc, + .tox, + .git, + .yml, + Pipfile.*, + ansible/*, + docs/*, + .cache/* + +fcfn_exclude_functions = + re, + split, + replace, + os, + parser, + int, + str, + urllib, + shutil, + LOGGER, + list, + getattr, + compile, + get_plugin, + ElementTree, + xml_doc, + findall, + record_testsuite_property, + get, + time, + tmpdir_factory, + sh, + next, + json, + yaml, + subprocess, + sorted, + jinja2, + xmltodict, + netaddr, + dict, + round, + request, + range, + min, + len, + sub, + extend, + setdefault, + float, + sum, + append, + join, + deepcopy, + print, + logging, + add_option, + pytest, + update, + expect, + sendline, + write, + open, + insert, + send, + namedtuple, + strip, + parametrize, + pop, + remove, + console_eof_sampler, + check_output, + sleep, + datetime, + intersection + +[flake8:local-plugins] +extension = + FCFN = flake8_plugins.FunctionCallForceNames:FunctionCallForceNames +paths = + . diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000..4c8bb8895e --- /dev/null +++ b/.gitignore @@ -0,0 +1,117 @@ +# Temporary files +_out + +# Created by .ignore support plugin (hsz.mobi) +### Python template +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ +pytest-tests.log +tests-collected-info + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + +# Sphinx docs +docs/source/rst + +# IntelliJ project folder +.idea/ diff --git a/.isort.cfg b/.isort.cfg new file mode 100644 index 0000000000..721f74996f --- /dev/null +++ b/.isort.cfg @@ -0,0 +1,7 @@ +[settings] +use_parentheses=true +multi_line_output=3 +include_trailing_comma=true +line_length=88 +lines_after_imports=2 +known_first_party=six diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000000..e5df9052e0 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,28 @@ +default_language_version: + python: python3 + +repos: + - repo: https://github.com/pre-commit/mirrors-isort + rev: v4.3.21 + hooks: + - id: isort + + - repo: https://github.com/ambv/black + rev: 19.10b0 + hooks: + - id: black + + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v3.1.0 + hooks: + - id: check-merge-conflict + - id: debug-statements + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-ast + - id: check-builtin-literals + + - repo: https://github.com/PyCQA/flake8 + rev: 3.8.3 + hooks: + - id: flake8 diff --git a/automation/check-patch.mounts b/automation/check-patch.mounts new file mode 100644 index 0000000000..9604c3719d --- /dev/null +++ b/automation/check-patch.mounts @@ -0,0 +1 @@ +/var/run/docker.sock:/var/run/docker.sock diff --git a/automation/check-patch.packages b/automation/check-patch.packages new file mode 100644 index 0000000000..25751c91ac --- /dev/null +++ b/automation/check-patch.packages @@ -0,0 +1,9 @@ +docker +gcc +git +make +openshift-clients +python3-devel +python3-pip +python3-setuptools +wget diff --git a/automation/check-patch.repos b/automation/check-patch.repos new file mode 100644 index 0000000000..bcae4917be --- /dev/null +++ b/automation/check-patch.repos @@ -0,0 +1 @@ +http://download.eng.bos.redhat.com/rcm-guest/puddles/RHAOS/AtomicOpenShift/4.2/latest/x86_64/os/ diff --git a/automation/check-patch.sh b/automation/check-patch.sh new file mode 100644 index 0000000000..8e20837a97 --- /dev/null +++ b/automation/check-patch.sh @@ -0,0 +1,31 @@ +#!/bin/bash -e + +python=python3 + +main() { + TARGET="$0" + TARGET="${TARGET#./}" + TARGET="${TARGET%.*}" + TARGET="${TARGET#*.}" + echo "TARGET=$TARGET" + + export PATH="$PATH:/usr/local/bin" + + case "${TARGET}" in + "check" ) + check + ;; + * ) + echo "Unknown target" + exit 1 + ;; + esac +} + +check() { + $python -m pip install tox + make check +} + + +[[ "${BASH_SOURCE[0]}" == "$0" ]] && main "$@" diff --git a/flake8_plugins/FunctionCallForceNames/README.md b/flake8_plugins/FunctionCallForceNames/README.md new file mode 100644 index 0000000000..e652c4b89a --- /dev/null +++ b/flake8_plugins/FunctionCallForceNames/README.md @@ -0,0 +1,45 @@ +# Function Call Force Names (a flake8 plugin) +FunctionCallForceNames (FCFN001) is a flake8 plugin, which role is enforcing +that functions are called with arg=value, rather than value only. + +### Example: +Valid call: +```python + foo(num=1, txt="txt") +``` + +Invalid call: +```python + foo(1, txt="txt") +``` + +In case of an invalid call - the plugin will trigger a flake8 failure: +```bash +$ pre-commit run --all-files +... +flake8...................................................................Failed +... +::: FCFN001: [] function should be called with keywords arguments. value: (line: column:) +``` +(pre-commit includes invokation of flake8). + +For example: +```bash +$ pre-commit run --all-files +... +flake8...................................................................Failed +... +tests/network/general/test_bridge_marker.py:135:5: FCFN001: [test_bridge_marker_no_device] function should be called with keywords arguments. value: pod (line:135 column:45) +``` + +### Note: +1. It's no necessary to explictly call pre-commit to enforce FCFN001, +as flake8 is invoked automatically when running "git commit". +2. To test flake8 validity of a specific file, you can run +```bash +$ pre-commit run --file +``` +For example: +```bash +$ pre-commit run --file tests/network/general/test_bridge_marker.py +``` diff --git a/flake8_plugins/FunctionCallForceNames/__init__.py b/flake8_plugins/FunctionCallForceNames/__init__.py new file mode 100644 index 0000000000..3146740bab --- /dev/null +++ b/flake8_plugins/FunctionCallForceNames/__init__.py @@ -0,0 +1,196 @@ +# -*- coding: utf-8 -*- + +""" +flake8 plugin which verifies that all functions are called with arg=value (and not only with value). +""" + +import ast +import re + + +FCFN001 = ( + "FCFN001: [{f_name}] function should be called with keywords arguments. {values}" +) + + +class FunctionCallForceNames(object): + """ + flake8 plugin which verifies that all functions are called with arg=value (and not only with value). + """ + + name = "FunctionCallForceNames" + version = "1.0.0" + + def __init__(self, tree, lines): + self.tree = tree + self.lines = lines + + @classmethod + def add_options(cls, option_manager): + option_manager.add_option( + long_option_name="--fcfn_exclude_functions", + default="", + parse_from_config=True, + comma_separated_list=True, + help="Functions to exclude from checking.", + ) + + @classmethod + def parse_options(cls, options): + cls.exclude_functions = options.fcfn_exclude_functions + + def _get_values(self, args_): + values = "" + for arg in args_: + if isinstance(arg, ast.JoinedStr): + for val in arg.values: + if isinstance(val, ast.FormattedValue): + continue + + values += f"value: {self._get_func_name(elm=val)} (line:{arg.lineno} column:{arg.col_offset})" + + if isinstance(arg, ast.Dict): + values += ( + f"value: {ast.Dict} (line:{arg.lineno} column:{arg.col_offset})" + ) + + else: + values += ( + f"value: {self._get_func_name(elm=arg)} " + f"(line:{arg.lineno} column:{arg.col_offset})" + ) + return values + + def _get_elm_func_id(self, elm, attr=False): + if attr: + elm_func_attr = getattr(elm, "attr", None) + if elm_func_attr: + yield elm_func_attr + + if isinstance(elm, ast.With): + + def _parse_name(lineno): + name = re.findall(r"[a-z].*\(|[A-Z].*\(", lineno.strip()) + if not name: + _parse_name(lineno=self.lines[elm.lineno]) + return name + + name = _parse_name(lineno=self.lines[elm.lineno - 1]) + if name: + yield name[0].strip("(").strip("with ") + + for elm_body in elm.body: + if ( + isinstance(elm_body, ast.Call) + or isinstance(elm_body, ast.Expr) + or isinstance(elm_body, ast.Assign) + ): + yield from self._get_elm_func_id(elm=elm_body, attr=attr) + + elm_func_id = getattr(elm, "id", None) + if elm_func_id: + yield elm_func_id + + elm_func_s = getattr(elm, "s", None) + if elm_func_s: + yield elm_func_s + + elm_val_func = getattr(elm, "func", None) + if elm_val_func: + yield from self._get_elm_func_id(elm=elm_val_func, attr=attr) + + elm_val = getattr(elm, "value", None) + if elm_val: + yield from self._get_elm_func_id(elm=elm_val, attr=attr) + + def _get_func_name(self, elm, attr=True): + for name in self._get_elm_func_id(elm=elm, attr=attr): + if not name: + for name in self._get_elm_func_id(elm=elm, attr=not attr): + if name: + return name + + return name + + def _skip_function_from_check(self, elm): + name = self._get_func_name(elm=elm, attr=False) + if name not in self.exclude_functions: + if name and isinstance(name, str): + for _name in name.split("."): + if _name in self.exclude_functions: + return True + return self._get_func_name(elm=elm) in self.exclude_functions + + return name in self.exclude_functions + + @staticmethod + def _get_args(elm): + res = {} + if getattr(elm, "value", None): + _args = getattr(elm.value, "args", []) + _args = [ + ar + for ar in _args + if not (isinstance(ar, ast.Starred) or isinstance(ar, ast.JoinedStr)) + ] + if _args: + res[elm] = _args + + if isinstance(elm, ast.With): + for item in elm.items: + if isinstance(item.context_expr, ast.Call): + _args = getattr(item.context_expr, "args", []) + _args = [ + ar + for ar in _args + if not ( + isinstance(ar, ast.Starred) or isinstance(ar, ast.JoinedStr) + ) + ] + if _args: + res[item.context_expr] = _args + + return res + + def _missing_keywords(self, elm): + if self._skip_function_from_check(elm=elm): + return + + elm_and_args = self._get_args(elm=elm) + if not elm_and_args: + return + + name = self._get_func_name(elm=elm) + for elm_key, args_key in elm_and_args.items(): + values = self._get_values(args_=args_key) + if values: + yield ( + getattr(elm_key, "lineno", None) or elm.value.lineno, + getattr(elm_key, "col_offset", None) or elm.value.col_offset, + FCFN001.format(f_name=name, values=values), + self.name, + ) + + def _get_final_elm(self, elm): + if getattr(elm, "body", None): + for el in elm.body: + if getattr(el, "body", None): + yield from self._get_final_elm(elm=el) + + yield el + else: + yield elm + + def _get_func_call(self): + for elm in self._get_final_elm(elm=self.tree): + if ( + isinstance(elm, ast.Expr) + or isinstance(elm, ast.Call) + or isinstance(elm, ast.Assign) + or isinstance(elm, ast.With) + ): + yield elm + + def run(self): + for elm in self._get_func_call(): + yield from self._missing_keywords(elm=elm) diff --git a/flake8_plugins/__init__.py b/flake8_plugins/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000..8bb6ee5f51 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,2 @@ +[tool.black] +line-length = 88 diff --git a/resources/__init__.py b/resources/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/resources/api_service.py b/resources/api_service.py new file mode 100644 index 0000000000..0d06c21f71 --- /dev/null +++ b/resources/api_service.py @@ -0,0 +1,9 @@ +from .resource import Resource + + +class APIService(Resource): + """ + APIService object. + """ + + api_group = "apiregistration.k8s.io" diff --git a/resources/catalog_source_config.py b/resources/catalog_source_config.py new file mode 100644 index 0000000000..74c6e6e2fe --- /dev/null +++ b/resources/catalog_source_config.py @@ -0,0 +1,78 @@ +import logging + +from resources.resource import NamespacedResource +from resources.utils import TimeoutExpiredError, TimeoutSampler +from urllib3.exceptions import ProtocolError + + +LOGGER = logging.getLogger(__name__) + + +class CatalogSourceConfig(NamespacedResource): + api_group = "operators.coreos.com" + + def __init__( + self, + name, + namespace, + source, + target_namespace, + packages, + cs_display_name, + cs_publisher, + teardown=True, + ): + super().__init__(name=name, namespace=namespace, teardown=teardown) + self.source = source + self.target_namespace = target_namespace + self.packages = packages + self.cs_display_name = cs_display_name + self.cs_publisher = cs_publisher + + def to_dict(self): + res = super()._base_body() + res.update( + { + "spec": { + "source": self.source, + "targetNamespace": self.target_namespace, + "packages": self.packages, + "csDisplayName": self.cs_display_name, + "csPublisher": self.cs_publisher, + } + } + ) + + return res + + def wait_for_csc_status(self, status, timeout=120): + """ + Wait for CatalogSourceConfig to reach requested status. + CatalogSourceConfig Status is found under currentPhase.phase. + Example phase: {'message': 'The object has been successfully reconciled', 'name': 'Succeeded'} + + Raises: + TimeoutExpiredError: If CatalogSourceConfig in not in desire status. + """ + samples = TimeoutSampler( + timeout=timeout, + sleep=1, + exceptions=ProtocolError, + func=self.api().get, + field_selector=f"metadata.name=={self.name}", + namespace=self.namespace, + ) + current_status = None + try: + for sample in samples: + if sample.items: + sample_status = sample.items[0].status + if sample_status: + current_status = sample_status.currentPhase.phase["name"] + if current_status == status: + return + + except TimeoutExpiredError: + if current_status: + LOGGER.error(f"Status of {self.kind} {self.name} is {current_status}") + raise diff --git a/resources/cdi.py b/resources/cdi.py new file mode 100644 index 0000000000..7d6b022800 --- /dev/null +++ b/resources/cdi.py @@ -0,0 +1,13 @@ +from .resource import NamespacedResource + + +class CDI(NamespacedResource): + """ + CDI object. + """ + + api_group = "cdi.kubevirt.io" + + class Status(NamespacedResource.Status): + DEPLOYING = "Deploying" + DEPLOYED = "Deployed" diff --git a/resources/cdi_config.py b/resources/cdi_config.py new file mode 100644 index 0000000000..6da10487a4 --- /dev/null +++ b/resources/cdi_config.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- + +import logging + +from resources.utils import TimeoutSampler +from urllib3.exceptions import ProtocolError + +from .resource import TIMEOUT, Resource + + +LOGGER = logging.getLogger(__name__) + + +class CDIConfig(Resource): + """ + CDIConfig object. + """ + + api_group = "cdi.kubevirt.io" + + @property + def scratch_space_storage_class_from_spec(self): + return self.instance.spec.scratchSpaceStorageClass + + @property + def scratch_space_storage_class_from_status(self): + return self.instance.status.scratchSpaceStorageClass + + @property + def upload_proxy_url(self): + return self.instance.status.uploadProxyURL + + def wait_until_upload_url_changed(self, uploadproxy_url, timeout=TIMEOUT): + """ + Wait until upload proxy url is changed + + Args: + timeout (int): Time to wait for CDI Config. + + Returns: + bool: True if url is equal to uploadProxyURL. + """ + LOGGER.info( + f"Wait for {self.kind} {self.name} to ensure current URL == uploadProxyURL" + ) + samples = TimeoutSampler( + timeout=timeout, + sleep=1, + exceptions=ProtocolError, + func=self.api().get, + field_selector=f"metadata.name=={self.name}", + ) + for sample in samples: + if sample.items: + status = sample.items[0].status + current_url = status.uploadProxyURL + if current_url == uploadproxy_url: + return diff --git a/resources/cluster_operator.py b/resources/cluster_operator.py new file mode 100644 index 0000000000..ad0c79143f --- /dev/null +++ b/resources/cluster_operator.py @@ -0,0 +1,5 @@ +from resources.resource import Resource + + +class ClusterOperator(Resource): + api_group = "config.openshift.io" diff --git a/resources/cluster_role.py b/resources/cluster_role.py new file mode 100644 index 0000000000..832cc8f3fe --- /dev/null +++ b/resources/cluster_role.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- + +from .resource import Resource + + +class ClusterRole(Resource): + """ + ClusterRole object + """ + + api_group = "rbac.authorization.k8s.io" + + def __init__( + self, + name, + api_groups=None, + permissions_to_resources=None, + verbs=None, + teardown=True, + ): + super().__init__(name=name, teardown=teardown) + self.api_groups = api_groups + self.permissions_to_resources = permissions_to_resources + self.verbs = verbs + + def to_dict(self): + res = super()._base_body() + rules = {} + if self.api_groups: + rules["apiGroups"] = self.api_groups + if self.permissions_to_resources: + rules["resources"] = self.permissions_to_resources + if self.verbs: + rules["verbs"] = self.verbs + if rules: + res["rules"] = [rules] + return res diff --git a/resources/cluster_role_binding.py b/resources/cluster_role_binding.py new file mode 100644 index 0000000000..54f18a5fa2 --- /dev/null +++ b/resources/cluster_role_binding.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- + +from .resource import Resource + + +class ClusterRoleBinding(Resource): + """ + ClusterRoleBinding object. + """ + + api_group = "rbac.authorization.k8s.io" diff --git a/resources/cluster_service_version.py b/resources/cluster_service_version.py new file mode 100644 index 0000000000..4c14136428 --- /dev/null +++ b/resources/cluster_service_version.py @@ -0,0 +1,8 @@ +from resources.resource import NamespacedResource + + +class ClusterServiceVersion(NamespacedResource): + api_group = "operators.coreos.com" + + class Status(NamespacedResource.Status): + INSTALLING = "Installing" diff --git a/resources/cluster_version.py b/resources/cluster_version.py new file mode 100644 index 0000000000..fee6ca44b8 --- /dev/null +++ b/resources/cluster_version.py @@ -0,0 +1,5 @@ +from resources.resource import Resource + + +class ClusterVersion(Resource): + api_group = "config.openshift.io" diff --git a/resources/configmap.py b/resources/configmap.py new file mode 100644 index 0000000000..dc7caa24c6 --- /dev/null +++ b/resources/configmap.py @@ -0,0 +1,22 @@ +from .resource import NamespacedResource + + +class ConfigMap(NamespacedResource): + """ + Configmap object + """ + + api_version = "v1" + + def __init__(self, name, namespace, cert_name=None, data=None, teardown=True): + super().__init__(name=name, namespace=namespace, teardown=teardown) + self.cert_name = cert_name + self.data = data + + def to_dict(self): + res = super()._base_body() + if self.cert_name is None: + res.update({"data": {"tlsregistry.crt": self.data}}) + else: + res.update({"data": {self.cert_name: self.data}}) + return res diff --git a/resources/custom_resource_definition.py b/resources/custom_resource_definition.py new file mode 100644 index 0000000000..6bf958edd2 --- /dev/null +++ b/resources/custom_resource_definition.py @@ -0,0 +1,5 @@ +from .resource import Resource + + +class CustomResourceDefinition(Resource): + api_group = "apiextensions.k8s.io" diff --git a/resources/daemonset.py b/resources/daemonset.py new file mode 100644 index 0000000000..f47e821b0f --- /dev/null +++ b/resources/daemonset.py @@ -0,0 +1,73 @@ +import logging + +import kubernetes +from openshift.dynamic.exceptions import NotFoundError +from resources.utils import TimeoutSampler +from urllib3.exceptions import ProtocolError + +from .resource import TIMEOUT, NamespacedResource + + +LOGGER = logging.getLogger(__name__) + + +class DaemonSet(NamespacedResource): + """ + DaemonSet object. + """ + + api_group = "apps" + + def wait_until_deployed(self, timeout=TIMEOUT): + """ + Wait until all Pods are deployed and ready. + + Args: + timeout (int): Time to wait for the Daemonset. + + Raises: + TimeoutExpiredError: If not all the pods are deployed. + """ + LOGGER.info(f"Wait for {self.kind} {self.name} to deploy all desired pods") + samples = TimeoutSampler( + timeout=timeout, + sleep=1, + exceptions=ProtocolError, + func=self.api().get, + field_selector=f"metadata.name=={self.name}", + namespace=self.namespace, + ) + for sample in samples: + if sample.items: + status = sample.items[0].status + desired_number_scheduled = status.desiredNumberScheduled + number_ready = status.numberReady + if ( + desired_number_scheduled > 0 + and desired_number_scheduled == number_ready + ): + return + + def delete(self, wait=False): + """ + Delete Daemonset + + Args: + wait (bool): True to wait for Daemonset to be deleted. + + Returns: + bool: True if delete succeeded, False otherwise. + """ + try: + res = self.api().delete( + name=self.name, + namespace=self.namespace, + body=kubernetes.client.V1DeleteOptions(propagation_policy="Foreground"), + ) + except NotFoundError: + return False + + LOGGER.info(f"Delete {self.name}") + if wait and res: + return self.wait_deleted() + return res diff --git a/resources/datavolume.py b/resources/datavolume.py new file mode 100644 index 0000000000..d99da6a12f --- /dev/null +++ b/resources/datavolume.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- + +import logging + +from openshift.dynamic.exceptions import ResourceNotFoundError +from resources.pod import Pod + +from .persistent_volume_claim import PersistentVolumeClaim +from .resource import TIMEOUT, NamespacedResource, Resource + + +LOGGER = logging.getLogger(__name__) + + +class DataVolume(NamespacedResource): + """ + DataVolume object. + """ + + api_group = "cdi.kubevirt.io" + + class Status(NamespacedResource.Status): + BLANK = "Blank" + PENDING = "Pending" + PVC_BOUND = "PVCBound" + IMPORT_SCHEDULED = "ImportScheduled" + ClONE_SCHEDULED = "CloneScheduled" + UPLOAD_SCHEDULED = "UploadScheduled" + IMPORT_IN_PROGRESS = "ImportInProgress" + CLONE_IN_PROGRESS = "CloneInProgress" + UPLOAD_IN_PROGRESS = "UploadInProgress" + SNAPSHOT_FOR_SMART_CLONE_IN_PROGRESS = "SnapshotForSmartCloneInProgress" + SMART_CLONE_PVC_IN_PROGRESS = "SmartClonePVCInProgress" + UPLOAD_READY = "UploadReady" + UNKNOWN = "Unknown" + + class AccessMode: + """ + AccessMode object. + """ + + RWO = "ReadWriteOnce" + ROX = "ReadOnlyMany" + RWX = "ReadWriteMany" + + class ContentType: + """ + ContentType object + """ + + KUBEVIRT = "kubevirt" + ARCHIVE = "archive" + + class VolumeMode: + """ + VolumeMode object + """ + + BLOCK = "Block" + FILE = "Filesystem" + + class Condition: + class Type: + READY = "Ready" + BOUND = "Bound" + RUNNING = "Running" + + class Status(Resource.Condition.Status): + UNKNOWN = "Unknown" + + def wait_deleted(self, timeout=TIMEOUT): + """ + Wait until DataVolume and the PVC created by it are deleted + + Args: + timeout (int): Time to wait for the DataVolume and PVC to be deleted. + + Returns: + bool: True if DataVolume and its PVC are gone, False if timeout reached. + """ + super().wait_deleted(timeout=timeout) + return self.pvc.wait_deleted(timeout=timeout) + + def wait(self, timeout=600): + self.wait_for_status(status=self.Status.SUCCEEDED, timeout=timeout) + assert self.pvc.bound() + + @property + def pvc(self): + return PersistentVolumeClaim(name=self.name, namespace=self.namespace) + + @property + def scratch_pvc(self): + return PersistentVolumeClaim( + name=f"{self.name}-scratch", namespace=self.namespace + ) + + def _get_pod_startswith(self, starts_with): + pods = list(Pod.get(dyn_client=self.client, namespace=self.namespace)) + for pod in pods: + if pod.name.startswith(starts_with): + return pod + raise ResourceNotFoundError + + @property + def importer_pod(self): + return self._get_pod_startswith("importer") + + @property + def upload_target_pod(self): + return self._get_pod_startswith("cdi-upload") + + def __init__( + self, + name, + namespace, + source=None, + size=None, + storage_class=None, + url=None, + content_type=ContentType.KUBEVIRT, + access_modes=AccessMode.RWO, + cert_configmap=None, + secret=None, + client=None, + volume_mode=VolumeMode.FILE, + hostpath_node=None, + source_pvc=None, + source_namespace=None, + teardown=True, + ): + super().__init__( + name=name, namespace=namespace, client=client, teardown=teardown + ) + self.source = source + self.url = url + self.cert_configmap = cert_configmap + self.secret = secret + self.content_type = content_type + self.size = size + self.access_modes = access_modes + self.storage_class = storage_class + self.volume_mode = volume_mode + self.hostpath_node = hostpath_node + self.source_pvc = source_pvc + self.source_namespace = source_namespace + + def to_dict(self): + res = super()._base_body() + res.update( + { + "spec": { + "source": {self.source: {"url": self.url}}, + "pvc": { + "accessModes": [self.access_modes], + "resources": {"requests": {"storage": self.size}}, + }, + } + } + ) + if self.content_type: + res["spec"]["contentType"] = self.content_type + if self.storage_class: + res["spec"]["pvc"]["storageClassName"] = self.storage_class + if self.secret: + res["spec"]["source"][self.source]["secretRef"] = self.secret.name + if self.volume_mode: + res["spec"]["pvc"]["volumeMode"] = self.volume_mode + if self.source == "http" or "registry": + res["spec"]["source"][self.source]["url"] = self.url + if self.cert_configmap: + res["spec"]["source"][self.source]["certConfigMap"] = self.cert_configmap + if self.source == "upload" or self.source == "blank": + res["spec"]["source"][self.source] = {} + if self.hostpath_node: + res["metadata"]["annotations"] = { + "kubevirt.io/provisionOnNode": self.hostpath_node + } + if self.source == "pvc": + res["spec"]["source"]["pvc"] = { + "name": self.source_pvc or "dv-source", + "namespace": self.source_namespace or self.namespace, + } + + return res diff --git a/resources/deployment.py b/resources/deployment.py new file mode 100644 index 0000000000..3e8337f7b0 --- /dev/null +++ b/resources/deployment.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +import logging + +from resources.utils import TimeoutSampler +from urllib3.exceptions import ProtocolError + +from .resource import TIMEOUT, NamespacedResource + + +LOGGER = logging.getLogger(__name__) + + +class Deployment(NamespacedResource): + """ + OpenShift Deployment object. + """ + + api_group = "apps" + + def scale_replicas(self, replica_count=int): + """ + Update replicas in deployment. + + Args: + replica_count (int): Number of replicas. + + Returns: + Deployment is updated successfully + """ + body = super().to_dict() + body.update({"spec": {"replicas": replica_count}}) + + LOGGER.info(f"Set deployment replicas: {replica_count}") + return self.update(resource_dict=body) + + def wait_until_no_replicas(self, timeout=TIMEOUT): + """ + Wait until all replicas are updated. + + Args: + timeout (int): Time to wait for the deployment. + + Returns: + bool: True if availableReplicas is not found. + """ + LOGGER.info(f"Wait for {self.kind} {self.name} to update replicas") + samples = TimeoutSampler( + timeout=timeout, + sleep=1, + exceptions=ProtocolError, + func=self.api().get, + field_selector=f"metadata.name=={self.name}", + ) + for sample in samples: + if sample.items: + status = sample.items[0].status + available_replicas = status.availableReplicas + if not available_replicas: + return + + def wait_until_avail_replicas(self, timeout=TIMEOUT): + """ + Wait until all replicas are updated. + + Args: + timeout (int): Time to wait for the deployment. + + Raises: + TimeoutExpiredError: If not availableReplicas is equal to replicas. + """ + LOGGER.info( + f"Wait for {self.kind} {self.name} to ensure availableReplicas == replicas" + ) + samples = TimeoutSampler( + timeout=timeout, + sleep=1, + exceptions=ProtocolError, + func=self.api().get, + field_selector=f"metadata.name=={self.name}", + ) + for sample in samples: + if sample.items: + status = sample.items[0].status + available_replicas = status.availableReplicas + replicas = status.replicas + if replicas == available_replicas: + return + + +class HttpDeployment(Deployment): + def to_dict(self): + res = super()._base_body() + res.update( + { + "spec": { + "replicas": 1, + "selector": {"matchLabels": {"name": "internal-http"}}, + "template": { + "metadata": { + "labels": { + "name": "internal-http", + "cdi.kubevirt.io/testing": "", + } + }, + "spec": { + "terminationGracePeriodSeconds": 0, + "containers": [ + { + "name": "http", + "image": "quay.io/openshift-cnv/qe-cnv-tests-internal-http", + "imagePullPolicy": "IfNotPresent", + "command": ["/usr/sbin/nginx"], + "readinessProbe": { + "httpGet": {"path": "/", "port": 80}, + "initialDelaySeconds": 20, + "periodSeconds": 20, + }, + "securityContext": {"privileged": True}, + "livenessProbe": { + "httpGet": {"path": "/", "port": 80}, + "initialDelaySeconds": 20, + "periodSeconds": 20, + }, + } + ], + }, + }, + } + } + ) + return res diff --git a/resources/event.py b/resources/event.py new file mode 100644 index 0000000000..86874d3bb0 --- /dev/null +++ b/resources/event.py @@ -0,0 +1,106 @@ +import logging + + +LOGGER = logging.getLogger(__name__) + + +class Event: + """ + Allow read and remove K8s events. + """ + + api_version = "v1" + + @classmethod + def get( + cls, + dyn_client, + namespace=None, + name=None, + label_selector=None, + field_selector=None, + resource_version=None, + timeout=None, + ): + """ + get - retrieves K8s events. + :param dyn_client: K8s client; mandatory + :param namespace: event namespace; optional + :param name: event name; optional + :param label_selector: filter events by labels; comma separated string of key=value; optional + :param field_selector: filter events by event fields; comma separated string of key=value; optional + :param resource_version: filter events by their resource's version; optional + :param timeout: timeout; optional + :return: list of event objects + + example: reading all CSV Warning events in namespace "my-namespace", with reason of "AnEventReason" + for event in Event.get( + default_client, + namespace="my-namespace", + field_selector="involvedObject.kind==ClusterServiceVersion,type==Warning,reason=AnEventReason", + timeout=10, + ): + print(event.object) + """ + + LOGGER.info("Reading events") + LOGGER.debug( + f"get events parameters: namespace={namespace}, name={name}, label_selector={label_selector}, " + f"field_selector='{field_selector}', resource_version={resource_version}, timeout={timeout}" + ) + + event_listener = dyn_client.resources.get( + api_version=cls.api_version, kind=cls.__name__ + ) + for event in event_listener.watch( + namespace=namespace, + name=name, + label_selector=label_selector, + field_selector=field_selector, + resource_version=resource_version, + timeout=timeout, + ): + yield event + + @classmethod + def delete_events( + cls, + dyn_client, + namespace=None, + name=None, + label_selector=None, + field_selector=None, + resource_version=None, + timeout=None, + ): + """ + delete_events - delete K8s events. For example, to cleanup events before test, in order to not get old events in + the test, in order to prevent false positive test. + :param dyn_client: K8s client; mandatory + :param namespace: event namespace; optional + :param name: event name; optional + :param label_selector: filter events by labels; comma separated string of key=value; optional + :param field_selector: filter events by event fields; comma separated string of key=value; optional + :param resource_version: filter events by their resource's version; optional + :param timeout: timeout; optional + :return: list of event objects + + example: deleting all the event with a reason of "AnEventReason", from "my-namespace" namespace + + @pytest.fixture() + def delete_events_before_test(default_client): + Event.delete_events(default_client, namespace=my-namespace, field_selector="reason=AnEventReason") + """ + LOGGER.info("Deleting events") + LOGGER.debug( + f"delete_events parameters: namespace={namespace}, name={name}, label_selector={label_selector}, " + f"field_selector='{field_selector}', resource_version={resource_version}, timeout={timeout}" + ) + dyn_client.resources.get(api_version=cls.api_version, kind=cls.__name__).delete( + namespace=namespace, + name=name, + label_selector=label_selector, + field_selector=field_selector, + resource_version=resource_version, + timeout=timeout, + ) diff --git a/resources/hostpath_provisioner.py b/resources/hostpath_provisioner.py new file mode 100644 index 0000000000..1859106b73 --- /dev/null +++ b/resources/hostpath_provisioner.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- + +from resources.resource import Resource + + +class HostPathProvisioner(Resource): + """ + HostPathProvisioner Custom Resource Object. + """ + + api_group = "hostpathprovisioner.kubevirt.io" + + class Name: + HOSTPATH_PROVISIONER = "hostpath-provisioner" + + @property + def volume_path(self): + return self.instance.spec.pathConfig.path diff --git a/resources/hyperconverged.py b/resources/hyperconverged.py new file mode 100644 index 0000000000..4ee4a2ccbd --- /dev/null +++ b/resources/hyperconverged.py @@ -0,0 +1,5 @@ +from resources.resource import NamespacedResource + + +class HyperConverged(NamespacedResource): + api_group = "hco.kubevirt.io" diff --git a/resources/imagestreamtag.py b/resources/imagestreamtag.py new file mode 100644 index 0000000000..55237740b9 --- /dev/null +++ b/resources/imagestreamtag.py @@ -0,0 +1,9 @@ +from .resource import NamespacedResource + + +class ImageStreamTag(NamespacedResource): + """ + ImageStreamTag object. + """ + + api_group = "image.openshift.io" diff --git a/resources/installplan.py b/resources/installplan.py new file mode 100644 index 0000000000..00dd2ab041 --- /dev/null +++ b/resources/installplan.py @@ -0,0 +1,8 @@ +from resources.resource import NamespacedResource + + +class InstallPlan(NamespacedResource): + api_group = "operators.coreos.com" + + class Status(NamespacedResource.Status): + COMPLETE = "Complete" diff --git a/resources/kubevirt.py b/resources/kubevirt.py new file mode 100644 index 0000000000..0ef417a97d --- /dev/null +++ b/resources/kubevirt.py @@ -0,0 +1,5 @@ +from .resource import NamespacedResource + + +class KubeVirt(NamespacedResource): + api_group = "kubevirt.io" diff --git a/resources/mutating_webhook_config.py b/resources/mutating_webhook_config.py new file mode 100644 index 0000000000..b410d39531 --- /dev/null +++ b/resources/mutating_webhook_config.py @@ -0,0 +1,9 @@ +from .resource import Resource + + +class MutatingWebhookConfiguration(Resource): + """ + MutatingWebhookConfiguration object. + """ + + api_group = "admissionregistration.k8s.io" diff --git a/resources/namespace.py b/resources/namespace.py new file mode 100644 index 0000000000..7086cbd845 --- /dev/null +++ b/resources/namespace.py @@ -0,0 +1,41 @@ +import logging + +from resources.utils import NudgeTimers, nudge_delete + +from .resource import Resource + + +LOGGER = logging.getLogger(__name__) + +_DELETE_NUDGE_DELAY = 30 +_DELETE_NUDGE_INTERVAL = 5 + + +class Namespace(Resource): + """ + Namespace object, inherited from Resource. + """ + + api_version = "v1" + + class Status(Resource.Status): + ACTIVE = "Active" + + def __init__( + self, name, client=None, teardown=True, label=None, + ): + super().__init__(name=name, client=client, teardown=teardown) + self.label = label + + def to_dict(self): + res = super().to_dict() + if self.label: + res.setdefault("metadata", {}).setdefault("labels", {}).update(self.label) + return res + + # TODO: remove the nudge when the underlying issue with namespaces stuck in + # Terminating state is fixed. + # Upstream bug: https://github.com/kubernetes/kubernetes/issues/60807 + def nudge_delete(self): + timers = NudgeTimers() + nudge_delete(name=self.name, timers=timers) diff --git a/resources/network_addons_config.py b/resources/network_addons_config.py new file mode 100644 index 0000000000..60fd9fc6f3 --- /dev/null +++ b/resources/network_addons_config.py @@ -0,0 +1,9 @@ +from .resource import Resource + + +class NetworkAddonsConfig(Resource): + """ + NetworkAddonsConfig (a Custom Resource) object, inherited from Resource. + """ + + api_group = "networkaddonsoperator.network.kubevirt.io" diff --git a/resources/network_attachment_definition.py b/resources/network_attachment_definition.py new file mode 100644 index 0000000000..dc685708e1 --- /dev/null +++ b/resources/network_attachment_definition.py @@ -0,0 +1,24 @@ +from .resource import NamespacedResource + + +class NetworkAttachmentDefinition(NamespacedResource): + """ + NetworkAttachmentDefinition object. + """ + + api_group = "k8s.cni.cncf.io" + resource_name = None + + def wait_for_status( + self, status, timeout=None, label_selector=None, resource_version=None + ): + raise NotImplementedError(f"{self.kind} does not have status") + + def to_dict(self): + res = super().to_dict() + if self.resource_name is not None: + res["metadata"]["annotations"] = { + "k8s.v1.cni.cncf.io/resourceName": self.resource_name + } + res["spec"] = {} + return res diff --git a/resources/network_policy.py b/resources/network_policy.py new file mode 100644 index 0000000000..269f12d1ba --- /dev/null +++ b/resources/network_policy.py @@ -0,0 +1,9 @@ +from .resource import NamespacedResource + + +class NetworkPolicy(NamespacedResource): + """ + NetworkPolicy object. + """ + + api_group = "networking.k8s.io" diff --git a/resources/node.py b/resources/node.py new file mode 100644 index 0000000000..ec81caaec7 --- /dev/null +++ b/resources/node.py @@ -0,0 +1,24 @@ +from .resource import NamespacedResource, Resource + + +class Node(Resource): + """ + Node object, inherited from Resource. + """ + + api_version = "v1" + + class Status(NamespacedResource.Status): + READY = "Ready" + SCHEDULING_DISABLED = "Ready,SchedulingDisabled" + + @property + def kubelet_ready(self): + return any( + [ + stat + for stat in self.instance.status.conditions + if stat["reason"] == "KubeletReady" + and stat["status"] == self.Condition.Status.TRUE + ] + ) diff --git a/resources/node_maintenance.py b/resources/node_maintenance.py new file mode 100644 index 0000000000..e7a852529f --- /dev/null +++ b/resources/node_maintenance.py @@ -0,0 +1,23 @@ +from .resource import Resource + + +class NodeMaintenance(Resource): + """ + Node Maintenance object, inherited from Resource. + """ + + api_group = "nodemaintenance.kubevirt.io" + + class Status(Resource.Status): + RUNNING = "Running" + + def __init__(self, name, node=None, reason="TEST Reason", teardown=True): + super().__init__(name=name, teardown=teardown) + self.node = node + self.reason = reason + + def to_dict(self): + assert self.node, "node is mandatory for create" + res = super().to_dict() + res["spec"] = {"nodeName": self.node.name, "reason": self.reason} + return res diff --git a/resources/node_network_configuration_enactment.py b/resources/node_network_configuration_enactment.py new file mode 100644 index 0000000000..b47aa3e00d --- /dev/null +++ b/resources/node_network_configuration_enactment.py @@ -0,0 +1,12 @@ +from resources.resource import Resource + + +class NodeNetworkConfigurationEnactment(Resource): + + api_group = "nmstate.io" + + class ConditionType: + FAILING = "Failing" + AVAILABLE = "Available" + PROGRESSING = "Progressing" + MATCHING = "Matching" diff --git a/resources/node_network_configuration_policy.py b/resources/node_network_configuration_policy.py new file mode 100644 index 0000000000..2885e331fb --- /dev/null +++ b/resources/node_network_configuration_policy.py @@ -0,0 +1,302 @@ +import logging + +from openshift.dynamic.exceptions import ConflictError +from resources.utils import TimeoutExpiredError, TimeoutSampler + +from .node_network_state import NodeNetworkState +from .resource import Resource + + +LOGGER = logging.getLogger(__name__) + + +class NNCPConfigurationFailed(Exception): + pass + + +class NodeNetworkConfigurationPolicy(Resource): + + api_group = "nmstate.io" + + class Interface: + class State: + UP = "up" + DOWN = "down" + ABSENT = "absent" + + class Conditions: + class Type: + FAILING = "Failing" + AVAILABLE = "Available" + PROGRESSING = "Progressing" + MATCHING = "Matching" + + class Reason: + SUCCESS = "SuccessfullyConfigured" + FAILED = "FailedToConfigure" + + def __init__( + self, + name, + worker_pods=None, + node_selector=None, + teardown=True, + mtu=None, + ports=None, + ipv4_enable=False, + ipv4_dhcp=False, + ipv4_addresses=None, + ipv6_enable=False, + node_active_nics=None, + ): + """ + ipv4_addresses should be sent in this format: + [{"ip": , "prefix-length": }, + {"ip": , "prefix-length": }, ...] + For example: + [{"ip": "10.1.2.3", "prefix-length": 24}, + {"ip": "10.4.5.6", "prefix-length": 24}, + {"ip": "10.7.8.9", "prefix-length": 23}] + """ + super().__init__(name=name, teardown=teardown) + self.desired_state = {"interfaces": []} + self.worker_pods = worker_pods + self.mtu = mtu + self.mtu_dict = {} + self.ports = ports or [] + self.iface = None + self.ifaces = [] + self.node_active_nics = node_active_nics or [] + self.ipv4_enable = ipv4_enable + self._ipv4_dhcp = ipv4_dhcp + self.ipv4_addresses = ipv4_addresses or [] + self.ipv6_enable = ipv6_enable + self.ipv4_iface_state = {} + self.node_selector = node_selector + if self.node_selector: + for pod in self.worker_pods: + if pod.node.name == node_selector: + self.worker_pods = [pod] + self._node_selector = {"kubernetes.io/hostname": self.node_selector} + break + else: + self._node_selector = {"node-role.kubernetes.io/worker": ""} + + def set_interface(self, interface): + # First drop the interface if it's already in the list + interfaces = [ + i + for i in self.desired_state["interfaces"] + if not (i["name"] == interface["name"]) + ] + + # Add the interface + interfaces.append(interface) + self.desired_state["interfaces"] = interfaces + + def to_dict(self): + res = super()._base_body() + res.update({"spec": {"desiredState": self.desired_state}}) + if self._node_selector: + res["spec"]["nodeSelector"] = self._node_selector + + """ + It's the responsibility of the caller to verify the desired configuration they send. + For example: "ipv4.dhcp.enabled: false" without specifying any static IP address is a valid desired state and + therefore not blocked in the code, but nmstate would reject it. Such configuration might be used for negative + tests. + """ + self.iface["ipv4"] = {"enabled": self.ipv4_enable, "dhcp": self.ipv4_dhcp} + if self.ipv4_addresses: + self.iface["ipv4"]["address"] = self.ipv4_addresses + + self.iface["ipv6"] = {"enabled": self.ipv6_enable} + + self.set_interface(interface=self.iface) + if self.iface not in self.ifaces: + self.ifaces.append(self.iface) + + return res + + def apply(self): + resource = self.to_dict() + samples = TimeoutSampler( + timeout=3, + sleep=1, + exceptions=ConflictError, + func=self.update, + resource_dict=resource, + ) + for _sample in samples: + return + + def __enter__(self): + if self._ipv4_dhcp: + self._ipv4_state_backup() + + if self.mtu: + for pod in self.worker_pods: + for port in self.ports: + mtu = pod.execute( + command=["cat", f"/sys/class/net/{port}/mtu"] + ).strip() + LOGGER.info( + f"Backup MTU: {pod.node.name} interface {port} MTU is {mtu}" + ) + self.mtu_dict[port] = mtu + + super().__enter__() + + try: + self.wait_for_status_success() + self.validate_create() + return self + except Exception as e: + LOGGER.error(e) + self.clean_up() + raise + + def __exit__(self, exception_type, exception_value, traceback): + if not self.teardown: + return + self.clean_up() + + @property + def ipv4_dhcp(self): + return self._ipv4_dhcp + + @ipv4_dhcp.setter + def ipv4_dhcp(self, ipv4_dhcp): + if ipv4_dhcp != self._ipv4_dhcp: + self._ipv4_dhcp = ipv4_dhcp + + if self._ipv4_dhcp: + self._ipv4_state_backup() + self.iface["ipv4"] = {"dhcp": True, "enabled": True} + + self.set_interface(interface=self.iface) + self.apply() + + def clean_up(self): + if self.mtu: + for port in self.ports: + _port = { + "name": port, + "type": "ethernet", + "state": self.Interface.State.UP, + "mtu": int(self.mtu_dict[port]), + } + self.set_interface(interface=_port) + + for iface in self.ifaces: + """ + If any physical interfaces are part of the policy - we will skip them, + because we don't want to delete them (and we actually can't, and this attempt + would end with failure). + """ + if iface["name"] in self.node_active_nics: + continue + try: + self._absent_interface() + self.wait_for_interface_deleted() + except TimeoutExpiredError as e: + LOGGER.error(e) + + self.delete() + + def wait_for_interface_deleted(self): + for pod in self.worker_pods: + for iface in self.ifaces: + node_network_state = NodeNetworkState(name=pod.node.name) + node_network_state.wait_until_deleted(name=iface["name"]) + + def validate_create(self): + for pod in self.worker_pods: + for bridge in self.ifaces: + node_network_state = NodeNetworkState(name=pod.node.name) + node_network_state.wait_until_up(name=bridge["name"]) + + def _ipv4_state_backup(self): + # Backup current state of dhcp for the interfaces which arent veth or current bridge + for pod in self.worker_pods: + node_network_state = NodeNetworkState(name=pod.node.name) + self.ipv4_iface_state[pod.node.name] = {} + for interface in node_network_state.instance.status.currentState.interfaces: + if interface["name"] in self.ports: + self.ipv4_iface_state[pod.node.name].update( + { + interface["name"]: { + k: interface["ipv4"][k] for k in ("dhcp", "enabled") + } + } + ) + + def _absent_interface(self): + for bridge in self.ifaces: + bridge["state"] = self.Interface.State.ABSENT + self.set_interface(interface=bridge) + + if self._ipv4_dhcp: + temp_ipv4_iface_state = {} + for pod in self.worker_pods: + node_network_state = NodeNetworkState(name=pod.node.name) + temp_ipv4_iface_state[pod.node.name] = {} + # Find which interfaces got changed (of those that are connected to bridge) + for ( + interface + ) in node_network_state.instance.status.currentState.interfaces: + if interface["name"] in self.ports: + x = {k: interface["ipv4"][k] for k in ("dhcp", "enabled")} + if ( + self.ipv4_iface_state[pod.node.name][interface["name"]] + != x + ): + temp_ipv4_iface_state[pod.node.name].update( + { + interface["name"]: self.ipv4_iface_state[ + pod.node.name + ][interface["name"]] + } + ) + + previous_state = next(iter(temp_ipv4_iface_state.values())) + + # Restore DHCP state of the changed bridge connected ports + for iface_name, ipv4 in previous_state.items(): + interface = {"name": iface_name, "ipv4": ipv4} + self.set_interface(interface=interface) + + self.apply() + + def status(self): + for condition in self.instance.status.conditions: + if condition["type"] == self.Conditions.Type.AVAILABLE: + return condition["reason"] + + def wait_for_status_success(self): + # if we get here too fast there are no conditions, we need to wait. + self.wait_for_conditions() + + samples = TimeoutSampler(timeout=30, sleep=1, func=self.status) + try: + for sample in samples: + if sample == self.Conditions.Reason.SUCCESS: + LOGGER.info("NNCP configured Successfully") + return sample + + if sample == self.Conditions.Reason.FAILED: + raise NNCPConfigurationFailed( + f"Reason: {self.Conditions.Reason.FAILED}" + ) + + except (TimeoutExpiredError, NNCPConfigurationFailed): + LOGGER.error("Unable to configure NNCP for node") + raise + + def wait_for_conditions(self): + samples = TimeoutSampler( + timeout=30, sleep=1, func=lambda: self.instance.status.conditions + ) + for sample in samples: + if sample: + return diff --git a/resources/node_network_state.py b/resources/node_network_state.py new file mode 100644 index 0000000000..aac3ff2b3a --- /dev/null +++ b/resources/node_network_state.py @@ -0,0 +1,109 @@ +import logging +import time + +from openshift.dynamic.exceptions import ConflictError +from resources.utils import TimeoutSampler + +from .resource import Resource + + +LOGGER = logging.getLogger(__name__) + +SLEEP = 1 +TIMEOUT = 120 + + +class NodeNetworkState(Resource): + + api_group = "nmstate.io" + + def __init__(self, name, teardown=True): + super().__init__(name=name, teardown=teardown) + status = self.instance.to_dict()["status"] + if "desiredState" in status: + self.desired_state = status["desiredState"] + else: + self.desired_state = {"interfaces": []} + + def set_interface(self, interface): + + # First drop the interface is's already in the list + interfaces = [ + i + for i in self.desired_state["interfaces"] + if not (i["name"] == interface["name"]) + ] + + # Add the interface + interfaces.append(interface) + self.desired_state["interfaces"] = interfaces + + def to_dict(self): + res = super()._base_body() + res.update( + { + "spec": { + "nodeName": self.name, + "managed": True, + "desiredState": self.desired_state, + } + } + ) + return res + + def apply(self): + resource = self.to_dict() + retries_on_conflict = 3 + while True: + try: + resource["metadata"] = self.instance.to_dict()["metadata"] + self.update(resource) + break + except ConflictError as e: + retries_on_conflict -= 1 + if retries_on_conflict == 0: + raise e + time.sleep(1) + + def wait_until_up(self, name): + def _find_up_interface(): + for interface in self.interfaces: + if interface["name"] == name and interface["state"] == "up": + return interface + return None + + LOGGER.info(f"Checking if interface {name} is up -- {self.name}") + samples = TimeoutSampler(timeout=TIMEOUT, sleep=SLEEP, func=_find_up_interface) + for sample in samples: + if sample: + return + + def wait_until_deleted(self, name): + def _find_deleted_interface(): + for interface in self.interfaces: + if interface["name"] == name: + return interface + return None + + LOGGER.info(f"Checking if interface {name} is deleted -- {self.name}") + samples = TimeoutSampler( + timeout=TIMEOUT, sleep=SLEEP, func=_find_deleted_interface + ) + for sample in samples: + if not sample: + return + + @property + def interfaces(self): + return self.instance.status.currentState.interfaces + + @property + def routes(self): + return self.instance.status.currentState.routes + + def ipv4(self, iface): + for interface in self.interfaces: + if interface["name"] == iface: + addresses = interface["ipv4"]["address"] + if addresses: + return interface["ipv4"]["address"][0]["ip"] diff --git a/resources/oauth.py b/resources/oauth.py new file mode 100644 index 0000000000..aaaa760ff0 --- /dev/null +++ b/resources/oauth.py @@ -0,0 +1,9 @@ +from .resource import Resource + + +class OAuth(Resource): + """ + OAuth object. + """ + + api_version = "v1" diff --git a/resources/operator_source.py b/resources/operator_source.py new file mode 100644 index 0000000000..cbb212655d --- /dev/null +++ b/resources/operator_source.py @@ -0,0 +1,38 @@ +from .resource import NamespacedResource + + +class OperatorSource(NamespacedResource): + api_group = "operators.coreos.com" + + def __init__( + self, + name, + namespace, + registry_namespace, + display_name, + publisher, + secret, + teardown=True, + ): + super().__init__(name=name, namespace=namespace, teardown=teardown) + self.registry_namespace = registry_namespace + self.display_name = display_name + self.publisher = publisher + self.secret = secret + + def to_dict(self): + res = super()._base_body() + res.update( + { + "spec": { + "type": "appregistry", + "endpoint": "https://quay.io/cnr", + "registryNamespace": self.registry_namespace, + "displayName": self.display_name, + "publisher": self.publisher, + "authorizationToken": {"secretName": self.secret}, + } + } + ) + + return res diff --git a/resources/persistent_volume.py b/resources/persistent_volume.py new file mode 100644 index 0000000000..c83a6d6d85 --- /dev/null +++ b/resources/persistent_volume.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- + +from .resource import Resource + + +class PersistentVolume(Resource): + """ + PersistentVolume object + """ + + api_version = "v1" + + @property + def max_available_pvs(self): + """ + Returns the maximum number (int) of PV's which are in 'Available' state + """ + return len( + [pv for pv in self.api().get()["items"] if pv.status.phase == "Available"] + ) diff --git a/resources/persistent_volume_claim.py b/resources/persistent_volume_claim.py new file mode 100644 index 0000000000..05ff9bd51b --- /dev/null +++ b/resources/persistent_volume_claim.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- + +import logging + +from .resource import NamespacedResource + + +LOGGER = logging.getLogger(__name__) + + +class PersistentVolumeClaim(NamespacedResource): + """ + PersistentVolumeClaim object + """ + + api_version = "v1" + + class Status(NamespacedResource.Status): + BOUND = "Bound" + PENDING = "Pending" + TERMINATING = "Terminating" + + class AccessMode: + """ + AccessMode object. + """ + + RWO = "ReadWriteOnce" + ROX = "ReadOnlyMany" + RWX = "ReadWriteMany" + + class VolumeMode: + """ + VolumeMode object + """ + + BLOCK = "Block" + FILE = "Filesystem" + + def __init__( + self, + name, + namespace, + storage_class=None, + accessmodes=None, + volume_mode=VolumeMode.FILE, + size=None, + hostpath_node=None, + teardown=True, + ): + super().__init__(name=name, namespace=namespace, teardown=teardown) + self.accessmodes = accessmodes + self.volume_mode = volume_mode + self.size = size + self.hostpath_node = hostpath_node + self.storage_class = storage_class + + def to_dict(self): + res = super()._base_body() + res.update( + { + "spec": { + "volumeMode": self.volume_mode, + "accessModes": [self.accessmodes], + "resources": {"requests": {"storage": self.size}}, + } + } + ) + """ + Hostpath-provisioner is "node aware", when using it, + a node attribute on the claim must be introduced as follows. + annotations: + kubevirt.io/provisionOnNode: + """ + if self.hostpath_node: + res["metadata"]["annotations"] = { + "kubevirt.io/provisionOnNode": self.hostpath_node + } + if self.storage_class: + res["spec"]["storageClassName"] = self.storage_class + + return res + + def bound(self): + """ + Check if PVC is bound + + Returns: + bool: True if bound else False + """ + LOGGER.info(f"Check if {self.kind} {self.name} is bound") + return self.status == "Bound" + + @property + def selected_node(self): + return self.instance.metadata.annotations.get( + "volume.kubernetes.io/selected-node" + ) diff --git a/resources/pod.py b/resources/pod.py new file mode 100644 index 0000000000..25d4f6f153 --- /dev/null +++ b/resources/pod.py @@ -0,0 +1,143 @@ +import json +import logging + +import kubernetes + +from . import utils +from .node import Node +from .resource import NamespacedResource + + +LOGGER = logging.getLogger(__name__) + + +class ExecOnPodError(Exception): + def __init__(self, command, rc, out, err): + self.cmd = command + self.rc = rc + self.out = out + self.err = err + + def __str__(self): + return ( + f"Command execution failure: " + f"{self.cmd}, " + f"RC: {self.rc}, " + f"OUT: {self.out}, " + f"ERR: {self.err}" + ) + + +class Pod(NamespacedResource): + """ + Pod object, inherited from Resource. + """ + + api_version = "v1" + + class Status(NamespacedResource.Status): + RUNNING = "Running" + CRASH_LOOPBACK_OFF = "CrashLoopBackOff" + + def __init__(self, name, namespace, client=None, teardown=True): + super().__init__( + name=name, namespace=namespace, client=client, teardown=teardown + ) + self._kube_api = kubernetes.client.CoreV1Api(api_client=self.client.client) + + @property + def containers(self): + """ + Get Pod containers + + Returns: + list: List of Pod containers + """ + return self.instance.spec.containers + + def execute(self, command, timeout=60, container=None): + """ + Run command on Pod + + Args: + command (list): Command to run. + timeout (int): Time to wait for the command. + container (str): Container name where to exec the command. + + Returns: + str: Command output. + + Raises: + ExecOnPodError: If the command failed. + """ + LOGGER.info(f"Execute {command} on {self.name} ({self.node.name})") + resp = kubernetes.stream.stream( + func=self._kube_api.connect_get_namespaced_pod_exec, + name=self.name, + namespace=self.namespace, + command=command, + container=container or self.containers[0].name, + stderr=True, + stdin=False, + stdout=True, + tty=False, + _preload_content=False, + ) + + timeout_watch = utils.TimeoutWatch(timeout=timeout) + while resp.is_open(): + resp.run_forever(timeout=2) + try: + error_channel = json.loads( + resp.read_channel(kubernetes.stream.ws_client.ERROR_CHANNEL) + ) + break + except json.decoder.JSONDecodeError: + # Check remaining time, in order to throw exception + # if reamining time reached zero + _ = timeout_watch.remaining_time() + + rcstring = error_channel.get("status") + if rcstring is None: + raise ExecOnPodError( + command=command, rc=-1, out="", err="stream resp is closed" + ) + + stdout = resp.read_stdout(timeout=5) + stderr = resp.read_stderr(timeout=5) + + if rcstring == "Success": + return stdout + + returncode = [ + int(cause["message"]) + for cause in error_channel["details"]["causes"] + if cause["reason"] == "ExitCode" + ][0] + + raise ExecOnPodError(command=command, rc=returncode, out=stdout, err=stderr) + + def log(self, **kwargs): + """ + Get Pod logs + + Returns: + str: Pod logs. + """ + return self._kube_api.read_namespaced_pod_log( + self.name, self.namespace, **kwargs + ) + + @property + def node(self): + """ + Get the node name where the Pod is running + + Returns: + Node: Node + """ + return Node(name=self.instance.spec.nodeName) + + @property + def ip(self): + return self.instance.status.podIP diff --git a/resources/project.py b/resources/project.py new file mode 100644 index 0000000000..828585a6d7 --- /dev/null +++ b/resources/project.py @@ -0,0 +1,40 @@ +from resources.utils import NudgeTimers, nudge_delete + +from .resource import Resource + + +API_GROUP = "project.openshift.io" + + +class Project(Resource): + """ + Project object. + This is openshift's object which represents Namespace + """ + + api_group = API_GROUP + + class Status(Resource.Status): + ACTIVE = "Active" + + def nudge_delete(self): + timers = NudgeTimers() + nudge_delete(name=self.name, timers=timers) + + +class ProjectRequest(Resource): + """ + RequestProject object. + Resource which adds Project and grand + full access to user who originated this request + """ + + api_group = API_GROUP + + def __exit__(self, exception_type, exception_value, traceback): + if not self.teardown: + return + self.clean_up() + + def clean_up(self): + Project(name=self.name).delete(wait=True) diff --git a/resources/replicaset.py b/resources/replicaset.py new file mode 100644 index 0000000000..5f2f9da4f4 --- /dev/null +++ b/resources/replicaset.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- + +from .resource import NamespacedResource + + +class ReplicaSet(NamespacedResource): + """ + OpenShift Service object. + """ + + api_version = "v1" diff --git a/resources/resource.py b/resources/resource.py new file mode 100644 index 0000000000..1db5426f3a --- /dev/null +++ b/resources/resource.py @@ -0,0 +1,889 @@ +import datetime +import logging +import os +import re +import shutil +from distutils.version import Version + +import kubernetes +import urllib3 +from openshift.dynamic import DynamicClient +from openshift.dynamic.exceptions import NotFoundError +from resources.utils import ( + NudgeTimers, + TimeoutExpiredError, + TimeoutSampler, + nudge_delete, +) +from urllib3.exceptions import ProtocolError + + +LOGGER = logging.getLogger(__name__) +TIMEOUT = 240 +MAX_SUPPORTED_API_VERSION = "v1" + + +def _prepare_collect_data_directory(resource_object): + dump_dir = "tests-collected-info" + if not os.path.isdir(dump_dir): + # pytest fixture create the directory, if it is not exists we probably not called from pytest. + return + + directory = os.path.join( + dump_dir, + f"{'NamespaceResources/Namespaces' if resource_object.namespace else 'NotNamespaceResources'}", + f"{resource_object.namespace if resource_object.namespace else ''}", + resource_object.kind, + f"{datetime.datetime.now().strftime('%H:%M:%S')}-{resource_object.name}", + ) + if os.path.exists(directory): + shutil.rmtree(directory, ignore_errors=True) + + os.makedirs(directory) + return directory + + +def _collect_instance_data(directory, resource_object): + with open(os.path.join(directory, f"{resource_object.name}.yaml"), "w") as fd: + fd.write(resource_object.instance.to_str()) + + +def _collect_pod_logs(dyn_client, resource_item, **kwargs): + kube_v1_api = kubernetes.client.CoreV1Api(api_client=dyn_client.client) + return kube_v1_api.read_namespaced_pod_log( + name=resource_item.metadata.name, + namespace=resource_item.metadata.namespace, + **kwargs, + ) + + +def _collect_virt_launcher_data(dyn_client, directory, resource_object): + if resource_object.kind == "VirtualMachineInstance": + for pod in dyn_client.resources.get(kind="Pod").get().items: + pod_name = pod.metadata.name + pod_instance = dyn_client.resources.get( + api_version=pod.apiVersion, kind=pod.kind + ).get(name=pod_name, namespace=pod.metadata.namespace) + if pod_name.startswith("virt-launcher"): + with open(os.path.join(directory, f"{pod_name}.log"), "w") as fd: + fd.write( + _collect_pod_logs( + dyn_client=dyn_client, + resource_item=pod, + container="compute", + ) + ) + + with open(os.path.join(directory, f"{pod_name}.yaml"), "w") as fd: + fd.write(pod_instance.to_str()) + + +def _collect_data_volume_data(dyn_client, directory, resource_object): + if resource_object.kind == "DataVolume": + for pod in dyn_client.resources.get(kind="DataVolume").get().items: + pod_name = pod.metadata.name + pod_instance = dyn_client.resources.get( + api_version=pod.apiVersion, kind=pod.kind + ).get(name=pod_name, namespace=pod.metadata.namespace) + if pod_name.startswith("cdi-importer"): + with open(os.path.join(directory, f"{pod_name}.log"), "w") as fd: + fd.write( + _collect_pod_logs(dyn_client=dyn_client, resource_item=pod) + ) + + with open(os.path.join(directory, f"{pod_name}.yaml"), "w") as fd: + fd.write(pod_instance.to_str()) + + +def _collect_data(resource_object, dyn_client=None): + dyn_client = ( + dyn_client + if dyn_client + else DynamicClient(kubernetes.config.new_client_from_config()) + ) + directory = _prepare_collect_data_directory(resource_object=resource_object) + _collect_instance_data(directory=directory, resource_object=resource_object) + _collect_virt_launcher_data( + dyn_client=dyn_client, directory=directory, resource_object=resource_object + ) + _collect_data_volume_data( + dyn_client=dyn_client, directory=directory, resource_object=resource_object + ) + + +def _find_supported_resource(dyn_client, api_group, kind): + results = dyn_client.resources.search(group=api_group, kind=kind) + sorted_results = sorted( + results, key=lambda result: KubeAPIVersion(result.api_version), reverse=True + ) + for result in sorted_results: + if KubeAPIVersion(result.api_version) <= KubeAPIVersion( + MAX_SUPPORTED_API_VERSION + ): + return result + + +def _get_api_version(dyn_client, api_group, kind): + res = _find_supported_resource( + dyn_client=dyn_client, api_group=api_group, kind=kind + ) + if not res: + log = f"Couldn't find {kind} in {api_group} api group" + LOGGER.warning(log) + raise NotImplementedError(log) + return res.group_version + + +def sub_resource_level(current_class, owner_class, parent_class): + # return the name of the last class in MRO list that is not one of base + # classes; otherwise return None + for class_iterator in reversed( + list( + class_iterator + for class_iterator in current_class.mro() + if class_iterator not in owner_class.mro() + and issubclass(class_iterator, parent_class) + ) + ): + return class_iterator.__name__ + + +class KubeAPIVersion(Version): + """ + Implement the Kubernetes API versioning scheme from + https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-versioning + """ + + component_re = re.compile(r"(\d+ | [a-z]+)", re.VERBOSE) + + def __init__(self, vstring=None): + self.vstring = vstring + self.version = None + super().__init__(vstring=vstring) + + def parse(self, vstring): + components = [x for x in self.component_re.split(vstring) if x] + for i, obj in enumerate(components): + try: + components[i] = int(obj) + except ValueError: + pass + + errmsg = f"version '{vstring}' does not conform to kubernetes api versioning guidelines" + + if ( + len(components) not in (2, 4) + or components[0] != "v" + or not isinstance(components[1], int) + ): + raise ValueError(errmsg) + if len(components) == 4 and ( + components[2] not in ("alpha", "beta") or not isinstance(components[3], int) + ): + raise ValueError(errmsg) + + self.version = components + + def __str__(self): + return self.vstring + + def __repr__(self): + return "KubeAPIVersion ('{0}')".format(str(self)) + + def _cmp(self, other): + if isinstance(other, str): + other = KubeAPIVersion(vstring=other) + + myver = self.version + otherver = other.version + + for ver in myver, otherver: + if len(ver) == 2: + ver.extend(["zeta", 9999]) + + if myver == otherver: + return 0 + if myver < otherver: + return -1 + if myver > otherver: + return 1 + + +class classproperty(object): # noqa: N801 + def __init__(self, func): + self.func = func + + def __get__(self, obj, owner): + return self.func(owner) + + +class ValueMismatch(Exception): + """ + Raises when value doesn't match the class value + """ + + pass + + +class Resource(object): + """ + Base class for API resources + """ + + api_group = None + api_version = None + singular_name = None + + class Status: + SUCCEEDED = "Succeeded" + FAILED = "Failed" + DELETING = "Deleting" + DEPLOYED = "Deployed" + + class Condition: + UPGRADEABLE = "Upgradeable" + AVAILABLE = "Available" + DEGRADED = "Degraded" + PROGRESSING = "Progressing" + RECONCILE_COMPLETE = "ReconcileComplete" + + class Status: + TRUE = "True" + FALSE = "False" + + def __init__(self, name, client=None, teardown=True): + """ + Create a API resource + + Args: + name (str): Resource name + """ + if not self.api_group and not self.api_version: + raise NotImplementedError( + "Subclasses of Resource require self.api_group or self.api_version to be defined" + ) + self.namespace = None + self.name = name + self.client = client + if not self.client: + try: + self.client = DynamicClient( + client=kubernetes.config.new_client_from_config() + ) + except ( + kubernetes.config.ConfigException, + urllib3.exceptions.MaxRetryError, + ): + LOGGER.error( + "You need to be logged into a cluster or have $KUBECONFIG env configured" + ) + raise + if not self.api_version: + self._get_api_version() + + self.teardown = teardown + + def _get_api_version(self): + res = _find_supported_resource( + dyn_client=self.client, api_group=self.api_group, kind=self.kind + ) + if not res: + LOGGER.error(f"Couldn't find {self.kind} in {self.api_group} api group") + raise NotImplementedError( + f"Couldn't find {self.kind} in {self.api_group} api group" + ) + self.api_version = _get_api_version( + dyn_client=self.client, api_group=self.api_group, kind=self.kind + ) + + @classproperty + def kind(cls): # noqa: N805 + return sub_resource_level(cls, NamespacedResource, Resource) + + def _base_body(self): + return { + "apiVersion": self.api_version, + "kind": self.kind, + "metadata": {"name": self.name}, + } + + def to_dict(self): + """ + Generate intended dict representation of the resource. + """ + return self._base_body() + + def __enter__(self): + data = self.to_dict() + LOGGER.info(f"Posting {data}") + self.create_from_dict( + dyn_client=self.client, data=data, namespace=self.namespace + ) + return self + + def __exit__(self, exception_type, exception_value, traceback): + if not self.teardown: + return + self.clean_up() + + def clean_up(self): + if os.environ.get("CNV_TEST_COLLECT_LOGS", "0") == "1": + try: + _collect_data(resource_object=self) + except Exception as exception_: + LOGGER.warning(exception_) + + data = self.to_dict() + LOGGER.info(f"Deleting {data}") + self.delete(wait=True) + + def api(self, **kwargs): + """ + Get resource API + + Keyword Args: + pretty + _continue + include_uninitialized + field_selector + label_selector + limit + resource_version + timeout_seconds + watch + async_req + + Returns: + Resource: Resource object. + """ + if self.singular_name: + kwargs["singular_name"] = self.singular_name + return self.client.resources.get( + api_version=self.api_version, kind=self.kind, **kwargs + ) + + def wait(self, timeout=TIMEOUT): + """ + Wait for resource + + Args: + timeout (int): Time to wait for the resource. + + Raises: + TimeoutExpiredError: If resource not exists. + """ + LOGGER.info(f"Wait until {self.kind} {self.name} is created") + samples = TimeoutSampler( + timeout=timeout, + sleep=1, + exceptions=(ProtocolError, NotFoundError), + func=lambda: self.exists, + ) + for sample in samples: + if sample: + return + + def wait_deleted(self, timeout=TIMEOUT): + """ + Wait until resource is deleted + + Args: + timeout (int): Time to wait for the resource. + + Raises: + TimeoutExpiredError: If resource still exists. + """ + LOGGER.info(f"Wait until {self.kind} {self.name} is deleted") + return self._client_wait_deleted(timeout) + + def nudge_delete(self): + """ + Resource specific "nudge delete" action that may help the resource to + complete its cleanup. Needed by some resources. + """ + + @property + def exists(self): + """ + Whether self exists on the server + """ + try: + return self.instance + except NotFoundError: + return None + + def _client_wait_deleted(self, timeout): + """ + client-side Wait until resource is deleted + + Args: + timeout (int): Time to wait for the resource. + + Raises: + TimeoutExpiredError: If resource still exists. + """ + samples = TimeoutSampler(timeout=timeout, sleep=1, func=lambda: self.exists) + for sample in samples: + self.nudge_delete() + if not sample: + return + + def wait_for_status(self, status, timeout=TIMEOUT, stop_status=None): + """ + Wait for resource to be in status + + Args: + status (str): Expected status. + timeout (int): Time to wait for the resource. + stop_status (str): Status which should stop the wait and failed. + + Raises: + TimeoutExpiredError: If resource in not in desire status. + """ + stop_status = stop_status if stop_status else self.Status.FAILED + LOGGER.info(f"Wait for {self.kind} {self.name} status to be {status}") + samples = TimeoutSampler( + timeout=timeout, + sleep=1, + exceptions=ProtocolError, + func=self.api().get, + field_selector=f"metadata.name=={self.name}", + namespace=self.namespace, + ) + current_status = None + try: + for sample in samples: + if sample.items: + sample_status = sample.items[0].status + if sample_status: + current_status = sample_status.phase + if current_status == status: + return + + if current_status == stop_status: + raise TimeoutExpiredError( + f"Status of {self.kind} {self.name} is {current_status}" + ) + + except TimeoutExpiredError: + if current_status: + LOGGER.error(f"Status of {self.kind} {self.name} is {current_status}") + raise + + @classmethod + def create_from_dict(cls, dyn_client, data, namespace=None): + """ + Create resource from given yaml file. + + Args: + dyn_client (DynamicClient): Open connection to remote cluster. + data (dict): Dict representing the resource. + namespace (str): Namespace of the resource unless specified in the supplied yaml. + """ + client = dyn_client.resources.get( + api_version=data["apiVersion"], kind=data["kind"] + ) + LOGGER.info(f"Create {data['kind']} {data['metadata']['name']}") + return client.create( + body=data, namespace=data["metadata"].get("namespace", namespace) + ) + + def create(self, body=None, wait=False): + """ + Create resource. + + Args: + body (dict): Resource data to create. + wait (bool) : True to wait for resource status. + + Returns: + bool: True if create succeeded, False otherwise. + + Raises: + ValueMismatch: When body value doesn't match class value + """ + data = self.to_dict() + if body: + kind = body["kind"] + name = body.get("name") + api_version = body["apiVersion"] + if kind != self.kind: + ValueMismatch(f"{kind} != {self.kind}") + if name and name != self.name: + ValueMismatch(f"{name} != {self.name}") + if api_version != self.api_version: + ValueMismatch(f"{api_version} != {self.api_version}") + + data.update(body) + res = self.api().create(body=data, namespace=self.namespace) + + LOGGER.info(f"Create {self.kind} {self.name}") + if wait and res: + return self.wait() + return res + + @classmethod + def delete_from_dict(cls, dyn_client, data, namespace=None, wait=False): + """ + Delete resource represented by the passed data + + Args: + dyn_client (DynamicClient): Open connection to remote cluster. + data (dict): Dict representation of resource payload. + namespace (str): Namespace of the resource unless specified in the supplied yaml. + wait (bool) : True to wait for resource till deleted. + + Returns: + True if delete succeeded, False otherwise. + """ + + def _exists(name, namespace): + try: + return client.get(name=name, namespace=namespace) + except NotFoundError: + return + + def _sampler(name, namespace, force=False): + samples = TimeoutSampler( + timeout=TIMEOUT, sleep=1, func=_exists, name=name, namespace=namespace + ) + timers = NudgeTimers() + for sample in samples: + if force: + nudge_delete(name=name, timers=timers) + if not sample: + return + + kind = data["kind"] + name = data["metadata"]["name"] + namespace = data["metadata"].get("namespace", namespace) + client = dyn_client.resources.get(api_version=data["apiVersion"], kind=kind) + LOGGER.info(f"Delete {data['kind']} {name}") + res = client.delete(name=name, namespace=namespace) + if wait and res: + return _sampler(name, namespace, force=kind == "Namespace") + return res + + def delete(self, wait=False): + resource_list = self.api() + try: + res = resource_list.delete(name=self.name, namespace=self.namespace) + except NotFoundError: + return False + + LOGGER.info(f"Delete {self.kind} {self.name}") + if wait and res: + return self.wait_deleted() + return res + + @property + def status(self): + """ + Get resource status + + Status: Running, Scheduling, Pending, Unknown, CrashLoopBackOff + + Returns: + str: Status + """ + LOGGER.info(f"Get {self.kind} {self.name} status") + return self.instance.status.phase + + def update(self, resource_dict): + """ + Update resource with resource dict + + Args: + resource_dict: Resource dictionary + """ + LOGGER.info(f"Update {self.kind} {self.name}: {resource_dict}") + self.api().patch( + body=resource_dict, + namespace=self.namespace, + content_type="application/merge-patch+json", + ) + + @classmethod + def get(cls, dyn_client, singular_name=None, *args, **kwargs): + """ + Get resources + + Args: + dyn_client (DynamicClient): Open connection to remote cluster + singular_name (str): Resource kind (in lowercase), in use where we have multiple matches for resource + + Returns: + generator: Generator of Resources of cls.kind + """ + if not cls.api_version: + cls.api_version = _get_api_version( + dyn_client=dyn_client, api_group=cls.api_group, kind=cls.kind + ) + + get_kwargs = {"singular_name": singular_name} if singular_name else {} + for resource_field in ( + dyn_client.resources.get( + kind=cls.kind, api_version=cls.api_version, **get_kwargs + ) + .get(*args, **kwargs) + .items + ): + yield cls(name=resource_field.metadata.name) + + @property + def instance(self): + """ + Get resource instance + + Returns: + openshift.dynamic.client.ResourceInstance + """ + return self.api().get(name=self.name) + + @property + def labels(self): + """ + Method to get dict of labels for this resource + + Returns: + labels(dict): dict labels + """ + return self.instance["metadata"]["labels"] + + def wait_for_condition(self, condition, status, timeout=300): + """ + Wait for Pod condition to be in desire status. + + Args: + condition (str): Condition to query. + status (str): Expected condition status. + timeout (int): Time to wait for the resource. + + Raises: + TimeoutExpiredError: If Pod condition in not in desire status. + """ + LOGGER.info( + f"Wait for {self.kind}/{self.name}'s '{condition}' condition to be '{status}'" + ) + samples = TimeoutSampler( + timeout=timeout, + sleep=1, + exceptions=ProtocolError, + func=self.api().get, + field_selector=f"metadata.name=={self.name}", + namespace=self.namespace, + ) + for sample in samples: + if ( + sample.items + and sample.items[0].get("status") + and sample.items[0].status.get("conditions") + ): + sample_conditions = sample.items[0].status.conditions + if sample_conditions: + for cond in sample_conditions: + if cond.type == condition and cond.status == status: + return + + +class NamespacedResource(Resource): + """ + Namespaced object, inherited from Resource. + """ + + def __init__(self, name, namespace, client=None, teardown=True): + super().__init__(name=name, client=client, teardown=teardown) + self.namespace = namespace + + @classmethod + def get(cls, dyn_client, singular_name=None, *args, **kwargs): + """ + Get resources + + Args: + dyn_client (DynamicClient): Open connection to remote cluster + singular_name (str): Resource kind (in lowercase), in use where we have multiple matches for resource + + + Returns: + generator: Generator of Resources of cls.kind + """ + if not cls.api_version: + cls.api_version = _get_api_version( + dyn_client=dyn_client, api_group=cls.api_group, kind=cls.kind + ) + + get_kwargs = {"singular_name": singular_name} if singular_name else {} + for resource_field in ( + dyn_client.resources.get( + kind=cls.kind, api_version=cls.api_version, **get_kwargs + ) + .get(*args, **kwargs) + .items + ): + yield cls( + name=resource_field.metadata.name, + namespace=resource_field.metadata.namespace, + ) + + @property + def instance(self): + """ + Get resource instance + + Returns: + openshift.dynamic.client.ResourceInstance + """ + return self.api().get(name=self.name, namespace=self.namespace) + + +class ResourceEditor(object): + def __init__(self, patches): + """ + Args: + patches (dict): {: } + e.g. {: + {'metadata': {'labels': {'label1': 'true'}}} + + Allows for temporary edits to cluster resources for tests. During + __enter__ user-specified patches (see args) are applied and old values + are backed up, and during __exit__ these backups are used to reverse + all changes made. + + Flow: + 1) apply patches + 2) automation runs + 3) edits made to resources are reversed + + May also be used without being treated as a context manager by + calling the methods update() and restore() after instantiation. + + *** the DynamicClient object used to get the resources must not be + using an unprivileged_user; use default_client or similar instead.*** + """ + + self._patches = patches + self._backups = {} + + @property + def backups(self): + """Returns a dict {: } + The backup dict kept for each resource edited """ + return self._backups + + @property + def patches(self): + """Returns the patches dict provided in the constructor""" + return self._patches + + def update(self): + """Prepares backup dicts (where necessary) and applies patches""" + # prepare update dicts and backups + LOGGER.info("ResourceEdit: Backing up old data") + + resource_to_patch = [] + + for resource, update in self._patches.items(): + # prepare backup + backup = self._create_backup( + original=resource.instance.to_dict(), patch=update + ) + + # no need to back up if no changes have been made + if backup: + resource_to_patch.append(resource) + self._backups[resource] = backup + else: + LOGGER.info( + f"ResourceEdit: no diff found in patch for " + f"{resource.name} -- skipping" + ) + + patches_to_apply = { + resource: self._patches[resource] for resource in resource_to_patch + } + + # apply changes + self._apply_patches(patches=patches_to_apply, action_text="Updating") + + def restore(self): + self._apply_patches(patches=self._backups, action_text="Restoring") + + def __enter__(self): + self.update() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # restore backups + self.restore() + + @staticmethod + def _create_backup(original, patch): + """ + Args: + original (dict*): source of values to back up if necessary + patch (dict*): 'new' values; keys needn't necessarily all be + contained in original + + Returns a dict containing the fields in original that are different + from update. Performs the + + Places None for fields in update that don't appear in + original (because that's how the API knows to remove those fields from + the yaml). + + * the first call will be with both of these arguments as dicts but + this will not necessarily be the case during recursion""" + + # when both are dicts, get the diff (recursively if need be) + if isinstance(original, dict) and isinstance(patch, dict): + diff_dict = {} + for key, value in patch.items(): + if key not in original: + diff_dict[key] = None + continue + + # recursive call + key_diff = ResourceEditor._create_backup( + original=original[key], patch=value + ) + + if key_diff: + diff_dict[key] = key_diff + + return diff_dict + + # for one or more non-dict values, just compare them + if patch != original: + return original + else: + # this return value will be received by key_diff above + return None + + @staticmethod + def _apply_patches(patches, action_text): + """ + Updates provided Resource objects with provided yaml patches + + Args: + patches (dict): {: } + action_text (str): + "ResourceEdit for resource " + will be printed for each resource; see below + """ + for resource, patch in patches.items(): + LOGGER.info( + f"ResourceEdits: {action_text} data for " + f"resource {resource.kind} {resource.name}" + ) + + # add name to patch + if "metadata" not in patch: + patch["metadata"] = {} + + # the api requires this field to be present in a yaml patch for + # some resource kinds even if it is not changed + if "name" not in patch["metadata"]: + patch["metadata"]["name"] = resource.name + + resource.update(patch) # update the resource diff --git a/resources/role.py b/resources/role.py new file mode 100644 index 0000000000..4203d385f1 --- /dev/null +++ b/resources/role.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- + +from .resource import NamespacedResource + + +class Role(NamespacedResource): + """ + Role object. + """ + + api_group = "rbac.authorization.k8s.io" diff --git a/resources/role_binding.py b/resources/role_binding.py new file mode 100644 index 0000000000..000a6fab84 --- /dev/null +++ b/resources/role_binding.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- + +from .resource import NamespacedResource + + +API_GROUP = "rbac.authorization.k8s.io" + + +class RoleBinding(NamespacedResource): + """ + RoleBinding object + """ + + api_group = API_GROUP + + def __init__( + self, + name, + namespace, + subjects_kind=None, + subjects_name=None, + subjects_namespace=None, + subjects_api_group=None, + role_ref_kind=None, + role_ref_name=None, + teardown=True, + ): + + super().__init__(name=name, namespace=namespace, teardown=teardown) + self.subjects_kind = subjects_kind + self.subjects_name = subjects_name + self.subjects_namespace = subjects_namespace + self.subjects_api_group = subjects_api_group + self.role_ref_kind = role_ref_kind + self.role_ref_name = role_ref_name + + def to_dict(self): + res = super().to_dict() + + subjects = {} + if self.subjects_kind: + subjects["kind"] = self.subjects_kind + if self.subjects_name: + subjects["name"] = self.subjects_name + if self.subjects_namespace: + subjects["namespace"] = self.subjects_namespace + if self.subjects_api_group: + subjects["apiGroup"] = self.subjects_api_group + if subjects: + res["subjects"] = [subjects] + + roleref = {} + if self.role_ref_kind: + roleref["kind"] = self.role_ref_kind + if self.role_ref_name: + roleref["name"] = self.role_ref_name + if roleref: + roleref["apiGroup"] = self.api_group + res["roleRef"] = roleref + return res diff --git a/resources/route.py b/resources/route.py new file mode 100644 index 0000000000..2afdd59390 --- /dev/null +++ b/resources/route.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +import logging + +from .resource import NamespacedResource + + +LOGGER = logging.getLogger(__name__) + + +class Route(NamespacedResource): + """ + OpenShift Route object. + """ + + api_group = "route.openshift.io" + + def __init__( + self, name, namespace, service=None, destination_ca_cert=None, teardown=True + ): + super().__init__(name=name, namespace=namespace, teardown=teardown) + self.service = service + self.destination_ca_cert = destination_ca_cert + + def to_dict(self): + body = super()._base_body() + if self.service: + body.update({"spec": {"to": {"kind": "Service", "name": self.service}}}) + if self.destination_ca_cert: + body["spec"]["tls"] = { + "destinationCACertificate": self.destination_ca_cert, + "termination": "reencrypt", + } + return body + + @property + def exposed_service(self): + """ + returns the service the route is exposing + """ + return self.instance.spec.to.name + + @property + def host(self): + """ + returns hostname that is exposing the service + """ + return self.instance.spec.host + + @property + def ca_cert(self): + """ + returns destinationCACertificate + """ + return self.instance.spec.tls.destinationCACertificate + + @property + def termination(self): + """ + returns a secured route using re-encrypt termination + """ + return self.instance.spec.tls.termination diff --git a/resources/secret.py b/resources/secret.py new file mode 100644 index 0000000000..a1081a7d0d --- /dev/null +++ b/resources/secret.py @@ -0,0 +1,54 @@ +from .resource import NamespacedResource + + +class Secret(NamespacedResource): + """ + Secret object. + """ + + api_version = "v1" + + def __init__( + self, + name, + namespace, + accesskeyid=None, + secretkey=None, + htpasswd=None, + teardown=True, + data_dict=None, + string_data=None, + ): + super().__init__(name=name, namespace=namespace, teardown=teardown) + self.accesskeyid = accesskeyid + self.secretkey = secretkey + self.htpasswd = htpasswd + self.data_dict = data_dict + self.string_data = string_data + + def to_dict(self): + res = super()._base_body() + if self.accesskeyid: + res.update( + {"data": {"accessKeyId": self.accesskeyid, "secretKey": self.secretkey}} + ) + if self.htpasswd: + res.update({"data": {"htpasswd": self.htpasswd}}) + if self.data_dict: + res.update({"data": self.data_dict}) + if self.string_data: + res.update({"stringData": self.string_data}) + + return res + + @property + def certificate_not_after(self): + return self.instance.metadata.annotations[ + "auth.openshift.io/certificate-not-after" + ] + + @property + def certificate_not_before(self): + return self.instance.metadata.annotations[ + "auth.openshift.io/certificate-not-before" + ] diff --git a/resources/security_context_constraints.py b/resources/security_context_constraints.py new file mode 100644 index 0000000000..7b8cc5a23c --- /dev/null +++ b/resources/security_context_constraints.py @@ -0,0 +1,9 @@ +from .resource import Resource + + +class SecurityContextConstraints(Resource): + """ + Security Context Constraints object. + """ + + api_group = "security.openshift.io" diff --git a/resources/service.py b/resources/service.py new file mode 100644 index 0000000000..c561f0cf8f --- /dev/null +++ b/resources/service.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- + +from .resource import NamespacedResource + + +class Service(NamespacedResource): + """ + OpenShift Service object. + """ + + api_version = "v1" diff --git a/resources/service_account.py b/resources/service_account.py new file mode 100644 index 0000000000..8af5df94c0 --- /dev/null +++ b/resources/service_account.py @@ -0,0 +1,9 @@ +from .resource import NamespacedResource + + +class ServiceAccount(NamespacedResource): + """ + Service Account object + """ + + api_version = "v1" diff --git a/resources/sriov_network.py b/resources/sriov_network.py new file mode 100644 index 0000000000..87044f6227 --- /dev/null +++ b/resources/sriov_network.py @@ -0,0 +1,37 @@ +from .resource import NamespacedResource + + +class SriovNetwork(NamespacedResource): + """ + SriovNetwork object. + """ + + api_group = "sriovnetwork.openshift.io" + + def __init__( + self, + name, + policy_namespace, + network_namespace, + resource_name=None, + vlan=None, + ipam=None, + teardown=True, + ): + self.policy_namespace = policy_namespace + super().__init__(name=name, namespace=policy_namespace, teardown=teardown) + self.network_namespace = network_namespace + self.resource_name = resource_name + self.vlan = vlan + self.ipam = ipam + + def to_dict(self): + res = super().to_dict() + res["spec"] = { + "ipam": self.ipam or "{}\n", + "networkNamespace": self.network_namespace, + "resourceName": self.resource_name, + } + if self.vlan: + res["spec"]["vlan"] = self.vlan + return res diff --git a/resources/sriov_network_node_policy.py b/resources/sriov_network_node_policy.py new file mode 100644 index 0000000000..d5f000c199 --- /dev/null +++ b/resources/sriov_network_node_policy.py @@ -0,0 +1,55 @@ +from .resource import NamespacedResource + + +class SriovNetworkNodePolicy(NamespacedResource): + """ + SriovNetworkNodePolicy object. + """ + + api_group = "sriovnetwork.openshift.io" + + def __init__( + self, + name, + policy_namespace, + pf_names, + root_devices, + num_vfs, + resource_name, + priority=None, + mtu=None, + node_selector=None, + teardown=True, + ): + self.policy_namespace = policy_namespace + super().__init__(name=name, namespace=policy_namespace, teardown=teardown) + self.pf_names = pf_names + self.root_devices = root_devices + self.num_vfs = num_vfs + self.priority = priority + self.resource_name = resource_name + self.mtu = mtu + self.node_selector = node_selector + + def to_dict(self): + res = super().to_dict() + res["spec"] = { + "deviceType": "vfio-pci", + "nicSelector": { + "pfNames": [self.pf_names], + "rootDevices": [self.root_devices], + }, + "numVfs": self.num_vfs, + "resourceName": self.resource_name, + } + if self.mtu: + res["spec"]["mtu"] = self.mtu + if self.priority: + res["spec"]["priority"] = self.priority + if self.node_selector: + res["spec"]["nodeSelector"] = self.node_selector + else: + res["spec"]["nodeSelector"] = { + "feature.node.kubernetes.io/network-sriov.capable": "true" + } + return res diff --git a/resources/sriov_network_node_state.py b/resources/sriov_network_node_state.py new file mode 100644 index 0000000000..2a7702d3b0 --- /dev/null +++ b/resources/sriov_network_node_state.py @@ -0,0 +1,28 @@ +from .resource import NamespacedResource + + +class SriovNetworkNodeState(NamespacedResource): + """ + SriovNetworkNodeState object. + """ + + api_group = "sriovnetwork.openshift.io" + + def __init__(self, name, policy_namespace): + super().__init__(name=name, namespace=policy_namespace) + + @property + def interfaces(self): + return self.instance.status.interfaces + + @staticmethod + def iface_name(iface): + return iface.name + + @staticmethod + def pciaddress(iface): + return iface.pciAddress + + @staticmethod + def totalvfs(iface): + return iface.totalvfs diff --git a/resources/storage_class.py b/resources/storage_class.py new file mode 100644 index 0000000000..e439521063 --- /dev/null +++ b/resources/storage_class.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- + +from .resource import Resource + + +class StorageClass(Resource): + """ + StorageClass object. + """ + + api_group = "storage.k8s.io" + + class Types: + """ + These are names of StorageClass instances when you run `oc get sc` + """ + + LOCAL_BLOCK = "local-block" + HOSTPATH = "hostpath-provisioner" + CEPH_RBD = "ocs-storagecluster-ceph-rbd" + + class Provisioner: + HOSTPATH = "kubevirt.io/hostpath-provisioner" + LOCAL_BLOCK = "kubernetes.io/no-provisioner" + CEPH_RBD = "openshift-storage.rbd.csi.ceph.com" + + class VolumeBindingMode: + """ + VolumeBindingMode indicates how PersistentVolumeClaims should be provisioned and bound. + When unset, Immediate is used. + When "Immediate", if you want to use the "node aware" hostpath-provisioner, + ProvisionOnNode annotations should be introduced to PVC. + Or in order to be able to use the hpp without specifying the node on the PVC, + since CNV-2.2, hpp supports for "WaitForFirstConsumer". + """ + + Immediate = "Immediate" + WaitForFirstConsumer = "WaitForFirstConsumer" diff --git a/resources/subscription.py b/resources/subscription.py new file mode 100644 index 0000000000..645f8ec79e --- /dev/null +++ b/resources/subscription.py @@ -0,0 +1,5 @@ +from .resource import NamespacedResource + + +class Subscription(NamespacedResource): + api_group = "operators.coreos.com" diff --git a/resources/template.py b/resources/template.py new file mode 100644 index 0000000000..e67472e1a4 --- /dev/null +++ b/resources/template.py @@ -0,0 +1,61 @@ +import json + +from .resource import NamespacedResource + + +class Template(NamespacedResource): + api_group = "template.openshift.io" + singular_name = "template" + + class Labels: + FLAVOR = "flavor.template.kubevirt.io" + OS = "os.template.kubevirt.io" + WORKLOAD = "workload.template.kubevirt.io" + + class Workload: + DESKTOP = "desktop" + HIGH_PERFORMANCE = "highperformance" + SERVER = "server" + + class Flavor: + LARGE = "large" + MEDIUM = "medium" + SMALL = "small" + TINY = "tiny" + + def process(self, **kwargs): + instance_dict = self.instance.to_dict() + params = instance_dict["parameters"] + # filling the template parameters with given kwargs + for param in params: + try: + param["value"] = kwargs[param["name"]] + except KeyError: + continue + instance_dict["parameters"] = params + # TODO: remove after fix - https://issues.redhat.com/browse/KNIP-1055 (bug 1753554) + # For template validator to be used - template namespace needs to be updated + instance_dict["objects"][0]["metadata"]["labels"][ + "vm.kubevirt.io/template.namespace" + ] = instance_dict["metadata"]["namespace"] + # TODO: remove after fix - https://bugzilla.redhat.com/show_bug.cgi?id=1816518 - template + # name needs to be updated + instance_dict["objects"][0]["metadata"]["labels"][ + "vm.kubevirt.io/template" + ] = instance_dict["metadata"]["name"] + r = json.dumps(instance_dict) + body = json.loads(r) + response = self.client.request( + method="Post", + path="/apis/template.openshift.io/v1/namespaces/openshift/processedtemplates", + body=body, + ) + return response.to_dict()["objects"] + + @staticmethod + def generate_template_labels(os, workload, flavor): + return [ + f"{Template.Labels.OS}/{os}", + f"{Template.Labels.WORKLOAD}/{getattr(Template.Workload, workload.upper())}", + f"{Template.Labels.FLAVOR}/{getattr(Template.Flavor, flavor.upper())}", + ] diff --git a/resources/upload_token_request.py b/resources/upload_token_request.py new file mode 100644 index 0000000000..2a46a1ea72 --- /dev/null +++ b/resources/upload_token_request.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- + +import logging + +from .resource import NamespacedResource + + +LOGGER = logging.getLogger(__name__) + + +class UploadTokenRequest(NamespacedResource): + """ + OpenShift UploadTokenRequest object. + """ + + api_group = "upload.cdi.kubevirt.io" + + def __init__(self, name, namespace, pvc_name=None, teardown=True): + super().__init__(name=name, namespace=namespace, teardown=teardown) + self.pvc_name = pvc_name + + def to_dict(self): + res = super()._base_body() + res.update({"spec": {"pvcName": self.pvc_name}}) + return res diff --git a/resources/utils.py b/resources/utils.py new file mode 100644 index 0000000000..4675eb9a31 --- /dev/null +++ b/resources/utils.py @@ -0,0 +1,119 @@ +import logging +import subprocess +import time + + +_DELETE_NUDGE_DELAY = 30 +_DELETE_NUDGE_INTERVAL = 5 +LOGGER = logging.getLogger(__name__) + + +class TimeoutExpiredError(Exception): + def __init__(self, value): + super().__init__() + self.value = value + + def __str__(self): + return f"Timed Out: {self.value}" + + +class TimeoutSampler(object): + """ + Samples the function output. + + This is a generator object that at first yields the output of function + `func`. After the yield, it either raises instance of `TimeoutExpiredError` or + sleeps `sleep` seconds. + + Yielding the output allows you to handle every value as you wish. + + Feel free to set the instance variables. + """ + + def __init__( + self, timeout, sleep, func, exceptions=None, *func_args, **func_kwargs + ): + self.timeout = timeout + self.sleep = sleep + self.func = func + self.func_args = func_args + self.func_kwargs = func_kwargs + self.start_time = None + self.last_sample_time = None + self.exception = exceptions if exceptions else Exception + + def __iter__(self): + caught_exception = None + if self.start_time is None: + self.start_time = time.time() + while True: + self.last_sample_time = time.time() + try: + yield self.func(*self.func_args, **self.func_kwargs) + except self.exception as e: + caught_exception = e + pass + + if self.timeout < (time.time() - self.start_time): + raise TimeoutExpiredError( + f"{self.timeout} {caught_exception}" + if caught_exception + else self.timeout + ) + time.sleep(self.sleep) + + +class TimeoutWatch: + """ + A time counter allowing to determine the time remaining since the start + of a given interval + """ + + def __init__(self, timeout): + self.timeout = timeout + self.start_time = time.time() + + def remaining_time(self): + """ + Return the remaining part of timeout since the object was created. + """ + new_timeout = self.start_time + self.timeout - time.time() + if new_timeout > 0: + return new_timeout + raise TimeoutExpiredError(self.timeout) + + +class NudgeTimers: + """ + A holder for two values needed to time deletion of an object properly. + """ + + def __init__(self): + self.nudge_start_time = time.time() + self.last_nudge = 0 + + +# TODO: remove the nudge when the underlying issue with namespaces stuck in +# Terminating state is fixed. +# Upstream bug: https://github.com/kubernetes/kubernetes/issues/60807 +def nudge_delete(name, timers): + # remember the time of the first delete attempt + if not timers.nudge_start_time: + timers.nudge_start_time = time.time() + # delay active nudging in hope regular delete procedure will succeed + current_time = time.time() + if current_time - _DELETE_NUDGE_DELAY < timers.nudge_start_time: + return + # don't nudge more often than once in 5 seconds + if timers.last_nudge + _DELETE_NUDGE_INTERVAL > current_time: + return + LOGGER.info(f"Nudging namespace {name} while waiting for it to delete") + try: + # kube client is deficient so we have to use curl to kill stuck + # finalizers + subprocess.check_output(["./scripts/clean-namespace.sh", name]) + timers.last_nudge = time.time() + except subprocess.CalledProcessError as e: + # deliberately ignore all errors since an intermittent nudge + # failure is not the end of the world + LOGGER.info(f"Error happened while nudging namespace {name}: {e}") diff --git a/resources/validating_webhook_config.py b/resources/validating_webhook_config.py new file mode 100644 index 0000000000..4b9a4fd49f --- /dev/null +++ b/resources/validating_webhook_config.py @@ -0,0 +1,9 @@ +from .resource import Resource + + +class ValidatingWebhookConfiguration(Resource): + """ + ValidatingWebhookConfiguration object. + """ + + api_group = "admissionregistration.k8s.io" diff --git a/resources/virtual_machine.py b/resources/virtual_machine.py new file mode 100644 index 0000000000..4d4efd623b --- /dev/null +++ b/resources/virtual_machine.py @@ -0,0 +1,424 @@ +# -*- coding: utf-8 -*- + +import json +import logging + +import xmltodict +from openshift.dynamic.exceptions import ResourceNotFoundError +from resources.utils import TimeoutExpiredError, TimeoutSampler +from urllib3.exceptions import ProtocolError + +from .node import Node +from .pod import Pod +from .resource import TIMEOUT, NamespacedResource + + +LOGGER = logging.getLogger(__name__) +API_GROUP = "kubevirt.io" + + +class AnsibleLoginAnnotationsMixin(object): + """A mixin class that enhances the object.metadata.annotations + with login credentials stored in Ansible variables. + + This allows seamless console connection in tests as both + Console and Ansible inventory/connection plugins know how + to extract this information. + """ + + def _store_login_information(self, username, password): + self._username = username + self._password = password + + def _add_login_annotation(self, vmi): + """Enhance VMI object with the proper metadata. Call this method + from to_dict with vmi set to the dict that represents VMI. + The provided vmi is modified in place! + + This method does nothing when no credentials were provided. + """ + + login_annotation = {} + + if self._username: + login_annotation["ansible_user"] = self._username + + if self._password: + login_annotation["ansible_ssh_pass"] = self._password + + if login_annotation: + # cloud images defaults + login_annotation["ansible_become"] = True + login_annotation["ansible_become_method"] = "sudo" + + vmi.setdefault("metadata", {}) + vmi["metadata"].setdefault("annotations", {}) + vmi["metadata"]["annotations"]["ansible"] = json.dumps(login_annotation) + + +class VirtualMachine(NamespacedResource, AnsibleLoginAnnotationsMixin): + """ + Virtual Machine object, inherited from Resource. + Implements actions start / stop / status / wait for VM status / is running + """ + + api_group = API_GROUP + + def __init__( + self, name, namespace, client=None, username=None, password=None, teardown=True + ): + super().__init__( + name=name, namespace=namespace, client=client, teardown=teardown + ) + self._store_login_information(username=username, password=password) + + @property + def _subresource_api_url(self): + return ( + f"{self.client.configuration.host}/" + f"apis/subresources.kubevirt.io/{self.api().api_version}/" + f"namespaces/{self.namespace}/virtualmachines/{self.name}" + ) + + def to_dict(self): + res = super().to_dict() + res["spec"] = {"template": {"spec": {}}} + self._add_login_annotation(vmi=res["spec"]["template"]) + return res + + def start(self, timeout=TIMEOUT, wait=False): + self.client.client.request( + "PUT", + f"{self._subresource_api_url}/start", + headers=self.client.configuration.api_key, + ) + if wait: + return self.wait_for_status(timeout=timeout, status=True) + + def restart(self, timeout=TIMEOUT, wait=False): + self.client.client.request( + "PUT", + f"{self._subresource_api_url}/restart", + headers=self.client.configuration.api_key, + ) + if wait: + self.vmi.wait_for_status(status="Failed", stop_status="dummy") + # stop_status="dummy" used to ignore FAILED status of vmi during restart + return self.vmi.wait_until_running(timeout=timeout, stop_status="dummy") + + def stop(self, timeout=TIMEOUT, wait=False): + self.client.client.request( + "PUT", + f"{self._subresource_api_url}/stop", + headers=self.client.configuration.api_key, + ) + if wait: + self.wait_for_status(timeout=timeout, status=False) + return self.vmi.wait_deleted() + + def wait_for_status(self, status, timeout=TIMEOUT): + """ + Wait for resource to be in status + + Args: + status (bool): Expected status. + timeout (int): Time to wait for the resource. + + Raises: + TimeoutExpiredError: If timeout reached. + """ + LOGGER.info(f"Wait for {self.kind} {self.name} status to be {status}") + samples = TimeoutSampler( + timeout=timeout, + sleep=1, + exceptions=ProtocolError, + func=self.api().get, + field_selector=f"metadata.name=={self.name}", + namespace=self.namespace, + ) + for sample in samples: + if sample.items: + if sample.items[0].spec.running == status: + return + + def get_interfaces(self): + return self.instance.spec.template.spec.domain.devices.interfaces + + @property + def vmi(self): + """ + Get VMI + + Returns: + VirtualMachineInstance: VMI + """ + return VirtualMachineInstance( + name=self.name, + namespace=self.namespace, + username=self._username, + password=self._password, + ) + + def ready(self): + """ + Get VM status + + Returns: + bool: True if Running else False + """ + LOGGER.info(f"Check if {self.kind} {self.name} is ready") + return self.instance.status["ready"] + + +class VirtualMachineInstance(NamespacedResource, AnsibleLoginAnnotationsMixin): + """ + Virtual Machine Instance object, inherited from Resource. + """ + + api_group = API_GROUP + + class Status(NamespacedResource.Status): + RUNNING = "Running" + SCHEDULING = "Scheduling" + + def __init__(self, name, namespace, client=None, username=None, password=None): + super().__init__(name=name, namespace=namespace, client=client) + self._store_login_information(username=username, password=password) + + @property + def _subresource_api_url(self): + return ( + f"{self.client.configuration.host}/" + f"apis/subresources.kubevirt.io/{self.api().api_version}/" + f"namespaces/{self.namespace}/virtualmachineinstances/{self.name}" + ) + + def to_dict(self): + res = super().to_dict() + self._add_login_annotation(vmi=res) + return res + + def pause(self, timeout=TIMEOUT, wait=False): + self.client.client.request( + "PUT", + f"{self._subresource_api_url}/pause", + headers=self.client.configuration.api_key, + ) + if wait: + return self.wait_for_pause_status(pause=True, timeout=timeout) + + def unpause(self, timeout=TIMEOUT, wait=False): + self.client.client.request( + "PUT", + f"{self._subresource_api_url}/unpause", + headers=self.client.configuration.api_key, + ) + if wait: + return self.wait_for_pause_status(pause=False, timeout=timeout) + + @property + def interfaces(self): + return self.instance.status.interfaces + + @property + def virt_launcher_pod(self): + pods = list( + Pod.get( + dyn_client=self.client, + namespace=self.namespace, + label_selector=f"kubevirt.io=virt-launcher,kubevirt.io/created-by={self.instance.metadata.uid}", + ) + ) + migration_state = self.instance.status.migrationState + if migration_state: + # After VM migration there are two pods, one in Completed status and one in Running status. + # We need to return the Pod that is not in Completed status. + for pod in pods: + if migration_state.targetPod == pod.name: + return pod + else: + return pods[0] + + raise ResourceNotFoundError + + def wait_until_running(self, timeout=TIMEOUT, logs=True, stop_status=None): + """ + Wait until VMI is running + + Args: + timeout (int): Time to wait for VMI. + logs (bool): True to extract logs from the VMI pod and from the VMI. + stop_status (str): Status which should stop the wait and failed. + + Raises: + TimeoutExpiredError: If VMI failed to run. + """ + try: + self.wait_for_status( + status=self.Status.RUNNING, timeout=timeout, stop_status=stop_status + ) + except TimeoutExpiredError: + if not logs: + raise + + virt_pod = self.virt_launcher_pod + if virt_pod: + LOGGER.debug(f"{virt_pod.name} *****LOGS*****") + LOGGER.debug(virt_pod.log(container="compute")) + + raise + + def wait_for_pause_status(self, pause, timeout=TIMEOUT): + """ + Wait for Virtual Machine Instance to be paused / unpaused. + Paused status is checked in libvirt and in the VMI conditions. + + Args: + pause (bool): True for paused, False for unpause + timeout (int): Time to wait for the resource. + + Raises: + TimeoutExpiredError: If resource not exists. + """ + LOGGER.info( + f"Wait until {self.kind} {self.name} is " + f"{'Paused' if pause else 'Unpuased'}" + ) + self.wait_for_domstate_pause_status(pause=pause, timeout=timeout) + self.wait_for_vmi_condition_pause_status(pause=pause, timeout=timeout) + + def wait_for_domstate_pause_status(self, pause, timeout=TIMEOUT): + pause_status = "paused" if pause else "running" + samples = TimeoutSampler( + timeout=timeout, + sleep=1, + exceptions=(ProtocolError), + func=self.get_domstate, + ) + for sample in samples: + if pause_status in sample: + return + + def wait_for_vmi_condition_pause_status(self, pause, timeout=TIMEOUT): + samples = TimeoutSampler( + timeout=timeout, + sleep=1, + exceptions=(ProtocolError), + func=self.get_vmi_active_condition, + ) + for sample in samples: + # Paused VM + if pause and sample["reason"] == "PausedByUser": + return + # Unpaused VM + if not (pause and sample.get("reason")): + return + + @property + def node(self): + """ + Get the node name where the VM is running + + Returns: + Node: Node + """ + return Node(name=self.instance.status.nodeName) + + def get_xml(self): + """ + Get virtual machine instance XML + + Returns: + xml_output(string): VMI XML in the multi-line string + """ + return self.virt_launcher_pod.execute( + command=["virsh", "dumpxml", f"{self.namespace}_{self.name}"], + container="compute", + ) + + def get_domstate(self): + """ + Get virtual machine instance Status. + + Current workaround, as VM/VMI shows no status/phase == Paused yet. + Bug: https://bugzilla.redhat.com/show_bug.cgi?id=1805178 + + Returns: + String: VMI Status as string + """ + return self.virt_launcher_pod.execute( + command=["virsh", "domstate", f"{self.namespace}_{self.name}"], + container="compute", + ) + + def get_vmi_active_condition(self): + """ A VMI may have multiple conditions; the active one it the one with + 'lastTransitionTime' """ + return { + k: v + for condition in self.instance.status.conditions + for k, v in condition.items() + if condition["lastTransitionTime"] + } + + @property + def xml_dict(self): + """ Get virtual machine instance XML as dict """ + + return xmltodict.parse(self.get_xml(), process_namespaces=True) + + @property + def guest_os_info(self): + return self.instance.status.guestOSInfo + + @property + def os_version(self): + vmi_os_version = self.guest_os_info.get("version", {}) + if not vmi_os_version: + LOGGER.warning( + "Guest agent is not installed on the VM; OS version is not available." + ) + return vmi_os_version + + def interface_ip(self, interface): + iface_ip = [ + iface["ipAddress"] + for iface in self.interfaces + if iface["interfaceName"] == interface + ] + return iface_ip[0] if iface_ip else None + + +class VirtualMachineInstanceMigration(NamespacedResource): + api_group = API_GROUP + + def __init__(self, name, namespace, vmi=None, client=None, teardown=True): + super().__init__( + name=name, namespace=namespace, client=client, teardown=teardown + ) + self._vmi = vmi + + def to_dict(self): + # When creating VirtualMachineInstanceMigration vmi is mandatory but when calling get() + # we cannot pass vmi. + assert self._vmi, "vmi is mandatory for create" + + res = super().to_dict() + res["spec"] = {"vmiName": self._vmi.name} + return res + + +class VirtualMachineInstancePreset(NamespacedResource): + """ + VirtualMachineInstancePreset object. + """ + + api_group = API_GROUP + + +class VirtualMachineInstanceReplicaSet(NamespacedResource): + """ + VirtualMachineInstancePreset object. + """ + + api_group = API_GROUP diff --git a/resources/virtual_machine_import.py b/resources/virtual_machine_import.py new file mode 100644 index 0000000000..a6ef509ce6 --- /dev/null +++ b/resources/virtual_machine_import.py @@ -0,0 +1,247 @@ +# -*- coding: utf-8 -*- + + +import logging + +from urllib3.exceptions import ProtocolError + +from .resource import NamespacedResource +from .utils import TimeoutExpiredError, TimeoutSampler +from .virtual_machine import VirtualMachine + + +LOGGER = logging.getLogger(__name__) + + +def _map_mappings(mappings): + mappings_list = [] + for mapping in mappings: + mapping_dict = {"target": {"name": mapping.target_name}} + if mapping.target_namespace: + mapping_dict["target"]["namespace"] = mapping.target_namespace + if mapping.target_type: + mapping_dict["type"] = mapping.target_type + if mapping.source_id: + mapping_dict.setdefault("source", {})["id"] = mapping.source_id + if mapping.source_name: + mapping_dict.setdefault("source", {})["name"] = mapping.source_name + mappings_list.append(mapping_dict) + return mappings_list + + +class VirtualMachineImport(NamespacedResource): + """ + Virtual Machine Import object, inherited from NamespacedResource. + """ + + api_group = "v2v.kubevirt.io" + + class Condition(NamespacedResource.Condition): + SUCCEEDED = "Succeeded" + VALID = "Valid" + MAPPING_RULES_VERIFIED = "MappingRulesVerified" + PROCESSING = "Processing" + + class ValidConditionReason: + """ + Valid condition reason object + """ + + VALIDATION_COMPLETED = "ValidationCompleted" + SECRET_NOT_FOUND = "SecretNotFound" + RESOURCE_MAPPING_NOT_FOUND = "ResourceMappingNotFound" + UNINITIALIZED_PROVIDER = "UninitializedProvider" + SOURCE_VM_NOT_FOUND = "SourceVMNotFound" + INCOMPLETE_MAPPING_RULES = "IncompleteMappingRules" + + class MappingRulesConditionReason: + """ + Mapping rules verified condition reason object + """ + + MAPPING_COMPLETED = "MappingRulesVerificationCompleted" + MAPPING_FAILED = "MappingRulesVerificationFailed" + MAPPING_REPORTED_WARNINGS = "MappingRulesVerificationReportedWarnings" + + class ProcessingConditionReason: + """ + Processing condition reason object + """ + + CREATING_TARGET_VM = "CreatingTargetVM" + COPYING_DISKS = "CopyingDisks" + COMPLETED = "ProcessingCompleted" + FAILED = "ProcessingFailed" + + class SucceededConditionReason: + """ + Succeeced cond reason object + """ + + VALIDATION_FAILED = "ValidationFailed" + VM_CREATION_FAILED = "VMCreationFailed" + DATAVOLUME_CREATION_FAILED = "DataVolumeCreationFailed" + VIRTUAL_MACHINE_READY = "VirtualMachineReady" + VIRTUAL_MACHINE_RUNNING = "VirtualMachineRunning" + VMTEMPLATE_MATCHING_FAILED = "VMTemplateMatchingFailed" + + def __init__( + self, + name, + namespace, + provider_credentials_secret_name, + provider_credentials_secret_namespace=None, + client=None, + teardown=True, + vm_id=None, + vm_name=None, + cluster_id=None, + cluster_name=None, + target_vm_name=None, + start_vm=False, + ovirt_mappings=None, + resource_mapping_name=None, + resource_mapping_namespace=None, + ): + super().__init__( + name=name, namespace=namespace, client=client, teardown=teardown + ) + self.vm_id = vm_id + self.vm_name = vm_name + self.cluster_id = cluster_id + self.cluster_name = cluster_name + self.target_vm_name = target_vm_name + self.start_vm = start_vm + self.provider_credentials_secret_name = provider_credentials_secret_name + self.provider_credentials_secret_namespace = ( + provider_credentials_secret_namespace + ) + self.ovirt_mappings = ovirt_mappings + self.resource_mapping_name = resource_mapping_name + self.resource_mapping_namespace = resource_mapping_namespace + + @property + def vm(self): + return VirtualMachine(name=self.target_vm_name, namespace=self.namespace) + + def to_dict(self): + res = super().to_dict() + spec = res.setdefault("spec", {}) + + secret = spec.setdefault("providerCredentialsSecret", {}) + secret["name"] = self.provider_credentials_secret_name + + if self.provider_credentials_secret_namespace: + secret["namespace"] = self.provider_credentials_secret_namespace + + if self.resource_mapping_name: + spec.setdefault("resourceMapping", {})["name"] = self.resource_mapping_name + if self.resource_mapping_namespace: + spec.setdefault("resourceMapping", {})[ + "namespace" + ] = self.resource_mapping_namespace + + if self.target_vm_name: + spec["targetVmName"] = self.target_vm_name + + if self.start_vm is not None: + spec["startVm"] = self.start_vm + + ovirt = spec.setdefault("source", {}).setdefault("ovirt", {}) + + vm = ovirt.setdefault("vm", {}) + if self.vm_id: + vm["id"] = self.vm_id + if self.vm_name: + vm["name"] = self.vm_name + + if self.cluster_id: + vm.setdefault("cluster", {})["id"] = self.cluster_id + if self.cluster_name: + vm.setdefault("cluster", {})["name"] = self.cluster_name + + if self.ovirt_mappings: + if self.ovirt_mappings.disk_mappings: + mappings = _map_mappings(mappings=self.ovirt_mappings.disk_mappings) + ovirt.setdefault("mappings", {}).setdefault("diskMappings", mappings) + + if self.ovirt_mappings.network_mappings: + mappings = _map_mappings(mappings=self.ovirt_mappings.network_mappings) + ovirt.setdefault("mappings", {}).setdefault("networkMappings", mappings) + + if self.ovirt_mappings.storage_mappings: + mappings = _map_mappings(mappings=self.ovirt_mappings.storage_mappings) + ovirt.setdefault("mappings", {}).setdefault("storageMappings", mappings) + + return res + + def wait( + self, + timeout=600, + cond_reason=SucceededConditionReason.VIRTUAL_MACHINE_READY, + cond_status=Condition.Status.TRUE, + cond_type=Condition.SUCCEEDED, + ): + LOGGER.info( + f"Wait for {self.kind} {self.name} {cond_reason} condition to be {cond_status}" + ) + samples = TimeoutSampler( + timeout=timeout, + sleep=1, + exceptions=ProtocolError, + func=self.api().get, + field_selector=f"metadata.name=={self.name}", + namespace=self.namespace, + ) + last_condition = None + try: + for sample in samples: + if sample.items: + sample_status = sample.items[0].status + if sample_status: + current_conditions = sample_status.conditions + for cond in current_conditions: + last_condition = cond + if ( + cond.type == cond_type + and cond.status == cond_status + and cond.reason == cond_reason + ): + msg = ( + f"Status of {self.kind} {self.name} {cond.type} is " + f"{cond.status} ({cond.reason}: {cond.message})" + ) + LOGGER.info(msg) + return + except TimeoutExpiredError: + msg = ( + f"Last condition of {self.kind} {self.name} {last_condition.type} was " + f"{last_condition.status} ({last_condition.reason}: {last_condition.message})" + ) + LOGGER.error(msg) + raise + + +class OvirtMappings: + def __init__( + self, disk_mappings=None, network_mappings=None, storage_mappings=None + ): + self.disk_mappings = disk_mappings + self.network_mappings = network_mappings + self.storage_mappings = storage_mappings + + +class ResourceMappingItem: + def __init__( + self, + target_name, + target_namespace=None, + target_type=None, + source_name=None, + source_id=None, + ): + self.target_name = target_name + self.target_namespace = target_namespace + self.source_name = source_name + self.source_id = source_id + self.target_type = target_type diff --git a/setup.py b/setup.py new file mode 100644 index 0000000000..4a91a3368b --- /dev/null +++ b/setup.py @@ -0,0 +1,22 @@ +#! /usr/bin/python +# -*- coding: utf-8 -*- + +from setuptools import find_packages, setup + + +setup( + name="ocp-python-wrapper", + version="1.0", + packages=find_packages(), + include_package_data=True, + install_requires=[ + "kubernetes", + "openshift", + "xmltodict", + "urllib3", + "netaddr", + "paramiko", + "pbr", + ], + python_requires=">=3.6", +) diff --git a/stdci.yaml b/stdci.yaml new file mode 100644 index 0000000000..3fed7e7303 --- /dev/null +++ b/stdci.yaml @@ -0,0 +1,7 @@ +--- +distro: el7 +sub-stages: + - check + +reporting: + style: stdci diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000000..1c6a150761 --- /dev/null +++ b/tox.ini @@ -0,0 +1,12 @@ +[tox] +envlist=code-check +skipsdist=True + +[flake8] +[testenv:code-check] +basepython = python3 +setenv = PYTHONPATH = {toxinidir} +deps= + pre-commit +commands = + pre-commit run --all-files