From 03ab273620cbb12db159d4b397958d303e3aaa01 Mon Sep 17 00:00:00 2001 From: Yang Chiu Date: Fri, 22 Sep 2023 14:12:14 +0800 Subject: [PATCH] test: add realtime logs to track test execution progress Signed-off-by: Yang Chiu --- e2e/libs/keywords/common_keywords.py | 5 +- e2e/libs/keywords/engine_keywords.py | 6 +- e2e/libs/keywords/node_keywords.py | 2 - e2e/libs/keywords/pod_keywords.py | 6 +- e2e/libs/keywords/recurring_job_keywords.py | 7 +- e2e/libs/keywords/replica_keywords.py | 19 ++-- e2e/libs/keywords/volume_keywords.py | 21 ++-- e2e/libs/keywords/workload_keywords.py | 8 +- e2e/libs/node/node.py | 30 +++--- e2e/libs/node_exec/node_exec.py | 5 +- e2e/libs/recurring_job/crd.py | 14 ++- e2e/libs/recurring_job/recurring_job.py | 2 - e2e/libs/recurring_job/rest.py | 26 +++-- e2e/libs/replica/crd.py | 11 +-- e2e/libs/replica/replica.py | 2 - e2e/libs/utility/utility.py | 27 +++--- e2e/libs/volume/crd.py | 33 ++++--- e2e/libs/volume/rest.py | 17 ++-- e2e/libs/workload/workload.py | 102 +++++++++----------- 19 files changed, 156 insertions(+), 187 deletions(-) diff --git a/e2e/libs/keywords/common_keywords.py b/e2e/libs/keywords/common_keywords.py index 58550829cd..3c9d55fa71 100644 --- a/e2e/libs/keywords/common_keywords.py +++ b/e2e/libs/keywords/common_keywords.py @@ -1,20 +1,17 @@ from utility.utility import init_k8s_api_client from node_exec import NodeExec -import logging class common_keywords: def __init__(self): - logging.warn("initialize common_keywords class") + pass def init_k8s_api_client(self): init_k8s_api_client() def init_node_exec(self, test_name): namespace = test_name.lower().replace(' ', '-')[:63] - logging.warn(f"namespace = {namespace}") NodeExec.get_instance().set_namespace(namespace) def cleanup_node_exec(self): - logging.info('cleaning up resources') NodeExec.get_instance().cleanup() diff --git a/e2e/libs/keywords/engine_keywords.py b/e2e/libs/keywords/engine_keywords.py index a16d1e0fbc..f704dc4ffa 100644 --- a/e2e/libs/keywords/engine_keywords.py +++ b/e2e/libs/keywords/engine_keywords.py @@ -1,5 +1,4 @@ -import logging - +from utility.utility import logging from common_keywords import common_keywords class engine_keywords: @@ -8,8 +7,7 @@ def __init__(self): self.engine = common_keywords.engine_instance def get_engine_state(self, volume_name, node_name): - logging.info( - f"getting the volume {volume_name} engine on the node {node_name} state") + logging(f"Getting the volume {volume_name} engine on the node {node_name} state") resp = self.engine.get_engine(volume_name, node_name) if resp == "" or resp is None: diff --git a/e2e/libs/keywords/node_keywords.py b/e2e/libs/keywords/node_keywords.py index bf0ed11be7..4fe06a41a1 100644 --- a/e2e/libs/keywords/node_keywords.py +++ b/e2e/libs/keywords/node_keywords.py @@ -3,12 +3,10 @@ from utility.utility import wait_for_all_instance_manager_running from robot.libraries.BuiltIn import BuiltIn from node import Node -import logging class node_keywords: def __init__(self): - logging.warn("initialize node_keywords class") self.node = Node() def reboot_volume_node(self, volume_name): diff --git a/e2e/libs/keywords/pod_keywords.py b/e2e/libs/keywords/pod_keywords.py index 6173170de0..e9b14efba9 100644 --- a/e2e/libs/keywords/pod_keywords.py +++ b/e2e/libs/keywords/pod_keywords.py @@ -1,6 +1,4 @@ -import logging import time - from node import Nodes retry_count = 200 @@ -8,6 +6,10 @@ class pod_keywords: + #TODO + # keywords layer can only call lower implementation layer to complete its work + # and should not have any business logic here + def wait_all_pods_evicted(self, node_index): node_name = Nodes.get_name_by_index(int(node_index)) diff --git a/e2e/libs/keywords/recurring_job_keywords.py b/e2e/libs/keywords/recurring_job_keywords.py index 1f0ea69f9f..f6e417aece 100644 --- a/e2e/libs/keywords/recurring_job_keywords.py +++ b/e2e/libs/keywords/recurring_job_keywords.py @@ -1,14 +1,13 @@ from utility.utility import get_test_case_namespace, generate_volume_name from utility.utility import get_node, list_nodes from utility.utility import get_test_pod_running_node, get_test_pod_not_running_node +from utility.utility import logging from robot.libraries.BuiltIn import BuiltIn from recurring_job import RecurringJob -import logging class recurring_job_keywords: def __init__(self): - logging.warn("initialize recurring_job_keywords class") self.recurring_job = RecurringJob() @@ -17,7 +16,7 @@ def create_snapshot_recurring_job_for_volume(self, volume_name): self.recurring_job.create(job_name, task="snapshot") self.recurring_job.add_to_volume(job_name, volume_name) self.recurring_job.get(job_name) - logging.warn(f'==> create recurring job {job_name} for volume {volume_name}') + logging(f'Created recurring job {job_name} for volume {volume_name}') def create_backup_recurring_job_for_volume(self, volume_name): @@ -25,7 +24,7 @@ def create_backup_recurring_job_for_volume(self, volume_name): self.recurring_job.create(job_name, task="backup") self.recurring_job.add_to_volume(job_name, volume_name) self.recurring_job.get(job_name) - logging.warn(f'==> create recurring job {job_name} for volume {volume_name}') + logging(f'Created recurring job {job_name} for volume {volume_name}') def check_recurring_jobs_work(self, volume_name): diff --git a/e2e/libs/keywords/replica_keywords.py b/e2e/libs/keywords/replica_keywords.py index 0a4b7fb51b..be115b56a1 100644 --- a/e2e/libs/keywords/replica_keywords.py +++ b/e2e/libs/keywords/replica_keywords.py @@ -1,5 +1,4 @@ -import logging - +from utility.utility import logging from node import Nodes from common_keywords import common_keywords @@ -11,27 +10,27 @@ def __init__(self): def delete_replica(self, volume_name, node_index): node_name = Nodes.get_name_by_index(int(node_index)) - logging.info( - f"deleting volume {volume_name}'s replica on the node {node_name}") + logging(f"Deleting volume {volume_name}'s replica on the node {node_name}") self.replica.delete_replica(volume_name, node_name) def wait_for_replica_rebuilding_start(self, volume_name, node_index): node_name = Nodes.get_name_by_index(int(node_index)) - logging.info( - f"waiting the {volume_name} replica on node {node_name} rebuilding start") + logging(f"Waiting volume {volume_name}'s replica on node {node_name} rebuilding start") self.replica.wait_for_replica_rebuilding_start(volume_name, node_name) def wait_for_replica_rebuilding_complete(self, volume_name, node_index): node_name = Nodes.get_name_by_index(int(node_index)) - logging.info( - f"waiting the {volume_name} replica on node {node_name} rebuilding complete") + logging(f"Waiting volume {volume_name}'s replica on node {node_name} rebuilding complete") self.replica.wait_for_replica_rebuilding_complete( volume_name, node_name) + #TODO + # keywords layer can only call lower implementation layer to complete its work + # and should not have any business logic here + def get_replica_state(self, volume_name, node_index): node_name = Nodes.get_name_by_index(int(node_index)) - logging.info( - f"getting the volume {volume_name} replica on the node {node_name} state") + logging(f"Getting volume {volume_name}'s replica on the node {node_name} state") resp = self.replica.get_replica(volume_name, node_name) assert resp != "", f"failed to get the volume {volume_name} replicas" diff --git a/e2e/libs/keywords/volume_keywords.py b/e2e/libs/keywords/volume_keywords.py index c7ce9c0149..a7b73281fc 100644 --- a/e2e/libs/keywords/volume_keywords.py +++ b/e2e/libs/keywords/volume_keywords.py @@ -1,33 +1,31 @@ from utility.utility import get_test_case_namespace, generate_volume_name from utility.utility import get_node, list_nodes from utility.utility import get_test_pod_running_node, get_test_pod_not_running_node +from utility.utility import logging from robot.libraries.BuiltIn import BuiltIn from volume import Volume -import logging class volume_keywords: def __init__(self): - logging.warn("initialize volume_keywords class") self.volume = Volume() def create_volume(self, size, replica_count): volume_name = generate_volume_name() self.volume.create(volume_name, size, replica_count) - logging.info(f'==> create volume {volume_name}') + logging(f'Created volume {volume_name}') return volume_name def attach_volume(self, volume_name): attach_node = get_test_pod_not_running_node() - logging.info(f'==> attach volume {volume_name} to {attach_node}') + logging(f'Attached volume {volume_name} to {attach_node}') self.volume.attach(volume_name, attach_node) def get_volume_node(self, volume_name): volume = self.volume.get(volume_name) - print(volume) return volume['spec']['nodeID'] # return volume.controllers[0].hostId @@ -42,7 +40,6 @@ def get_replica_node(self, volume_name): def write_volume_random_data(self, volume_name, size_in_mb): - print('write_volume_random_data') return self.volume.write_random_data(volume_name, size_in_mb) @@ -51,23 +48,21 @@ def keep_writing_data(self, volume_name): def check_data(self, volume_name, checksum): - print(f"check volume {volume_name} data with checksum {checksum}") + logging(f"Checking volume {volume_name} data with checksum {checksum}") self.volume.check_data(volume_name, checksum) def delete_replica(self, volume_name, replica_node): if str(replica_node).isdigit(): replica_node = get_node(replica_node) - logging.info(f"==> delete volume {volume_name}'s replica\ - on node {replica_node}") + logging(f"Deleting volume {volume_name}'s replica on node {replica_node}") self.volume.delete_replica(volume_name, replica_node) def wait_for_replica_rebuilding_start(self, volume_name, replica_node): if str(replica_node).isdigit(): replica_node = get_node(replica_node) - logging.info(f"==> wait for volume {volume_name}'s replica\ - on node {replica_node} rebuilding started") + logging(f"Waiting for volume {volume_name}'s replica on node {replica_node} rebuilding started") self.volume.wait_for_replica_rebuilding_start( volume_name, replica_node @@ -77,8 +72,7 @@ def wait_for_replica_rebuilding_start(self, volume_name, replica_node): def wait_for_replica_rebuilding_complete(self, volume_name, replica_node): if str(replica_node).isdigit(): replica_node = get_node(replica_node) - logging.info(f"==> wait for volume {volume_name}'s replica\ - on node {replica_node} rebuilding completed") + logging(f"Waiting for volume {volume_name}'s replica on node {replica_node} rebuilding completed") self.volume.wait_for_replica_rebuilding_complete( volume_name, replica_node @@ -91,5 +85,4 @@ def wait_for_volume_healthy(self, volume_name): self.volume.wait_for_volume_healthy(volume_name) def cleanup_volumes(self, volume_names): - logging.warn(f"cleanup volumes {volume_names}") self.volume.cleanup(volume_names) \ No newline at end of file diff --git a/e2e/libs/keywords/workload_keywords.py b/e2e/libs/keywords/workload_keywords.py index 80aa670d18..cee3fb2c94 100644 --- a/e2e/libs/keywords/workload_keywords.py +++ b/e2e/libs/keywords/workload_keywords.py @@ -1,18 +1,17 @@ from workload.workload import * -import logging class workload_keywords: def __init__(self): - logging.warn("initialize workload_keywords class") + pass def init_storageclasses(self): create_storageclass('longhorn-test') - create_storageclass('strict-local') + create_storageclass('longhorn-test-strict-local') def cleanup_storageclasses(self): delete_storageclass('longhorn-test') - delete_storageclass('strict-local') + delete_storageclass('longhorn-test-strict-local') def create_deployment(self, volume_type="rwo", option=""): pvc_name = create_pvc(volume_type, option) @@ -36,7 +35,6 @@ def write_pod_random_data(self, pod, size_in_mb): return write_pod_random_data(pod, size_in_mb) def check_pod_data(self, pod_name, checksum): - print(f"check pod {pod_name} data with checksum {checksum}") check_pod_data(pod_name, checksum) def cleanup_deployments(self, deployment_names): diff --git a/e2e/libs/node/node.py b/e2e/libs/node/node.py index 208ad3034b..84ddc7fa70 100644 --- a/e2e/libs/node/node.py +++ b/e2e/libs/node/node.py @@ -1,7 +1,7 @@ from kubernetes import client import yaml import time -import logging +from utility.utility import logging from utility.utility import apply_cr_from_yaml, get_cr from utility.utility import wait_for_cluster_ready from utility.utility import list_nodes @@ -16,59 +16,55 @@ def __init__(self): with open('/tmp/instance_mapping', 'r') as f: self.mapping = yaml.safe_load(f) self.aws_client = boto3.client('ec2') - #logging.warn(f"describe_instances = {self.aws_client.describe_instances()}") def reboot_all_nodes(self, shut_down_time_in_sec=60): instance_ids = [value for value in self.mapping.values()] - print(instance_ids) resp = self.aws_client.stop_instances(InstanceIds=instance_ids) - print(resp) + logging(f"Stopping instances {instance_ids} response: {resp}") waiter = self.aws_client.get_waiter('instance_stopped') waiter.wait(InstanceIds=instance_ids) - print(f"all instances stopped") + logging(f"Stopped instances") time.sleep(shut_down_time_in_sec) resp = self.aws_client.start_instances(InstanceIds=instance_ids) - print(resp) + logging(f"Starting instances {instance_ids} response: {resp}") waiter = self.aws_client.get_waiter('instance_running') waiter.wait(InstanceIds=instance_ids) wait_for_cluster_ready() - print(f"all instances running") + logging(f"Started instances") def reboot_node(self, reboot_node_name, shut_down_time_in_sec=60): instance_ids = [self.mapping[reboot_node_name]] - print(instance_ids) resp = self.aws_client.stop_instances(InstanceIds=instance_ids) - print(resp) + logging(f"Stopping instances {instance_ids} response: {resp}") waiter = self.aws_client.get_waiter('instance_stopped') waiter.wait(InstanceIds=instance_ids) - print(f"instances {instance_ids} stopped") + logging(f"Stopped instances") time.sleep(shut_down_time_in_sec) resp = self.aws_client.start_instances(InstanceIds=instance_ids) - print(resp) + logging(f"Starting instances {instance_ids} response: {resp}") waiter = self.aws_client.get_waiter('instance_running') waiter.wait(InstanceIds=instance_ids) - print(f"instances {instance_ids} running") + logging(f"Started instances") def reboot_all_worker_nodes(self, shut_down_time_in_sec=60): instance_ids = [self.mapping[value] for value in list_nodes()] - print(instance_ids) resp = self.aws_client.stop_instances(InstanceIds=instance_ids) - print(resp) + logging(f"Stopping instances {instance_ids} response: {resp}") waiter = self.aws_client.get_waiter('instance_stopped') waiter.wait(InstanceIds=instance_ids) - print(f"instances {instance_ids} stopped") + logging(f"Stopped instances") time.sleep(shut_down_time_in_sec) resp = self.aws_client.start_instances(InstanceIds=instance_ids) - print(resp) + logging(f"Starting instances {instance_ids} response: {resp}") waiter = self.aws_client.get_waiter('instance_running') waiter.wait(InstanceIds=instance_ids) - print(f"instances {instance_ids} running") + logging(f"Started instances") diff --git a/e2e/libs/node_exec/node_exec.py b/e2e/libs/node_exec/node_exec.py index d88661f6ad..2f27d82906 100644 --- a/e2e/libs/node_exec/node_exec.py +++ b/e2e/libs/node_exec/node_exec.py @@ -1,9 +1,9 @@ from kubernetes import client from kubernetes.stream import stream import time -import logging from utility.utility import wait_delete_pod from utility.utility import wait_delete_ns +from utility.utility import logging DEFAULT_POD_TIMEOUT = 180 DEFAULT_POD_INTERVAL = 1 @@ -39,10 +39,11 @@ def set_namespace(self, namespace): self.core_api.create_namespace( body=namespace_manifest ) + logging(f"Created namespace {namespace}") def cleanup(self): for pod in self.node_exec_pod.values(): - logging.warn(f"==> cleanup pod {pod.metadata.name} {pod.metadata.uid}") + logging(f"Cleaning up pod {pod.metadata.name} {pod.metadata.uid}") res = self.core_api.delete_namespaced_pod( name=pod.metadata.name, namespace=self.namespace, diff --git a/e2e/libs/recurring_job/crd.py b/e2e/libs/recurring_job/crd.py index 3a0aa5b895..b026bdf9c0 100644 --- a/e2e/libs/recurring_job/crd.py +++ b/e2e/libs/recurring_job/crd.py @@ -1,5 +1,4 @@ -import logging - +from utility.utility import logging from recurring_job.base import Base from recurring_job.rest import Rest from kubernetes import client @@ -10,25 +9,24 @@ def __init__(self): self.rest = Rest() def create(self, name, task, groups, cron, retain, concurrency, label): - logging.warn("NotImplemented") + logging("Delegating the create call to API because there is no CRD implementation") return self.rest.create(name, task, groups, cron, retain, concurrency, label) def delete(self, job_name, volume_name): - logging.warn("NotImplemented") + logging("Delegating the delete call to API because there is no CRD implementation") return self.rest.delete(job_name, volume_name) def get(self, name): - logging.warn("NotImplemented") return self.rest.get(name) def add_to_volume(self, job_name, volume_name): - logging.warn("NotImplemented") + logging("Delegating the add_to_volume call to API because there is no CRD implementation") return self.rest.add_to_volume(job_name, volume_name) def check_jobs_work(self, volume_name): - logging.warn("NotImplemented") + logging("Delegating the check_jobs_work call to API because there is no CRD implementation") return self.rest.check_jobs_work(volume_name) def cleanup(self, volume_names): - logging.warn("NotImplemented") + logging("Delegating the cleanup call to API because there is no CRD implementation") return self.rest.cleanup(volume_names) \ No newline at end of file diff --git a/e2e/libs/recurring_job/recurring_job.py b/e2e/libs/recurring_job/recurring_job.py index db8aaa7fb6..96c799dbfe 100644 --- a/e2e/libs/recurring_job/recurring_job.py +++ b/e2e/libs/recurring_job/recurring_job.py @@ -1,5 +1,3 @@ -import logging - from recurring_job.base import Base from recurring_job.crd import CRD from recurring_job.rest import Rest diff --git a/e2e/libs/recurring_job/rest.py b/e2e/libs/recurring_job/rest.py index fc1ebe2541..05d257cda6 100644 --- a/e2e/libs/recurring_job/rest.py +++ b/e2e/libs/recurring_job/rest.py @@ -1,9 +1,9 @@ import time -import logging from kubernetes import client from recurring_job.base import Base from utility.utility import get_longhorn_client from utility.utility import filter_cr +from utility.utility import logging from datetime import datetime RETRY_COUNTS = 180 @@ -103,7 +103,7 @@ def check_jobs_work(self, volume_name): jobs, _ = self.get_volume_recurring_jobs_and_groups(volume_name) for job_name in jobs: job = self.get(job_name) - logging.warn(f"get recurring job: {job}") + logging(f"Checking recurring job {job}") if job['task'] == 'snapshot' and job['cron'] == '* * * * *': period_in_sec = 60 self._check_snapshot_created_in_time(volume_name, job_name, period_in_sec) @@ -114,9 +114,8 @@ def check_jobs_work(self, volume_name): def _check_snapshot_created_in_time(self, volume_name, job_name, period_in_sec): # check snapshot can be created by the recurring job current_time = datetime.utcnow() - logging.warn(f"current_time = {current_time}") current_timestamp = current_time.timestamp() - logging.warn(f"current_timestamp = {current_timestamp}") + logging(f"Recorded current time = {current_time}, timestamp = {current_timestamp}") label_selector=f"longhornvolume={volume_name}" snapshot_timestamp = 0 for _ in range(period_in_sec * 2): @@ -129,18 +128,16 @@ def _check_snapshot_created_in_time(self, volume_name, job_name, period_in_sec): # and crd doesn't support field selector # so need to filter by ourselves if item['spec']['labels']['RecurringJob'] == job_name: - logging.warn(f"item = {item}") + logging(f"Got snapshot {item}") snapshot_time = snapshot_list['items'][0]['metadata']['creationTimestamp'] - logging.warn(f"snapshot_time = {snapshot_time}") snapshot_time = datetime.strptime(snapshot_time, '%Y-%m-%dT%H:%M:%SZ') - logging.warn(f"snapshot_time = {snapshot_time}") snapshot_timestamp = snapshot_time.timestamp() - logging.warn(f"snapshot_timestamp = {snapshot_timestamp}") + logging(f"Got snapshot time = {snapshot_time}, timestamp = {snapshot_timestamp}") break if snapshot_timestamp > current_timestamp: return except Exception as e: - logging.warn(f"iterate snapshot list error: {e}") + logging(f"Iterating snapshot list error: {e}") time.sleep(1) assert False, f"since {current_time},\ there's no new snapshot created by recurring job \ @@ -149,9 +146,8 @@ def _check_snapshot_created_in_time(self, volume_name, job_name, period_in_sec): def _check_backup_created_in_time(self, volume_name, period_in_sec): # check backup can be created by the recurring job current_time = datetime.utcnow() - logging.warn(f"current_time = {current_time}") current_timestamp = current_time.timestamp() - logging.warn(f"current_timestamp = {current_timestamp}") + logging(f"Recorded current time = {current_time}, timestamp = {current_timestamp}") label_selector=f"backup-volume={volume_name}" backup_timestamp = 0 for _ in range(period_in_sec * 2): @@ -159,15 +155,13 @@ def _check_backup_created_in_time(self, volume_name, period_in_sec): try: if len(backup_list['items']) > 0: backup_time = backup_list['items'][0]['metadata']['creationTimestamp'] - logging.warn(f"backup_time = {backup_time}") backup_time = datetime.strptime(backup_time, '%Y-%m-%dT%H:%M:%SZ') - logging.warn(f"backup_time = {backup_time}") backup_timestamp = backup_time.timestamp() - logging.warn(f"backup_timestamp = {backup_timestamp}") + logging(f"Got backup time = {backup_time}, timestamp = {backup_timestamp}") if backup_timestamp > current_timestamp: return except Exception as e: - logging.warn(f"iterate backup list error: {e}") + logging(f"Iterating backup list error: {e}") time.sleep(1) assert False, f"since {current_time},\ there's no new backup created by recurring job \ @@ -175,6 +169,8 @@ def _check_backup_created_in_time(self, volume_name, period_in_sec): def cleanup(self, volume_names): for volume_name in volume_names: + logging(f"Cleaning up recurring jobs for volume {volume_name}") jobs, _ = self.get_volume_recurring_jobs_and_groups(volume_name) for job in jobs: + logging(f"Deleting recurring job {job}") self.delete(job, volume_name) \ No newline at end of file diff --git a/e2e/libs/replica/crd.py b/e2e/libs/replica/crd.py index ae30777455..c2024d2b8c 100644 --- a/e2e/libs/replica/crd.py +++ b/e2e/libs/replica/crd.py @@ -1,5 +1,4 @@ -import logging - +from utility.utility import logging from replica.base import Base from replica.rest import Rest from utils.common_utils import k8s_cr_api @@ -27,10 +26,10 @@ def get_replica(self, volume_name, node_name): def delete_replica(self, volume_name, node_name): if volume_name == "" or node_name == "": - logging.info(f"deleting all replicas") + logging(f"Deleting all replicas") else: - logging.info( - f"deleting the volume {volume_name} on node {node_name} replicas") + logging( + f"Deleting volume {volume_name} on node {node_name} replicas") resp = self.get_replica(volume_name, node_name) assert resp != "", f"failed to get replicas" @@ -48,7 +47,7 @@ def delete_replica(self, volume_name, node_name): plural="replicas", name=replica_name ) - logging.info(f"finished replicas deletion") + logging(f"Finished replicas deletion") def wait_for_replica_rebuilding_start(self, volume_name, node_name): Rest(self.node_exec).wait_for_replica_rebuilding_start(volume_name, node_name) diff --git a/e2e/libs/replica/replica.py b/e2e/libs/replica/replica.py index 8e5508492a..5927d83e39 100644 --- a/e2e/libs/replica/replica.py +++ b/e2e/libs/replica/replica.py @@ -1,5 +1,3 @@ -import logging - from replica.base import Base from replica.crd import CRD from strategy import LonghornOperationStrategy diff --git a/e2e/libs/utility/utility.py b/e2e/libs/utility/utility.py index 8da7d38aea..fb926ed8b5 100644 --- a/e2e/libs/utility/utility.py +++ b/e2e/libs/utility/utility.py @@ -8,11 +8,14 @@ import socket import time import yaml -import logging +from robot.api import logger RETRY_COUNTS = 150 RETRY_INTERVAL = 1 +def logging(msg): + logger.info(msg, also_console=True) + def generate_volume_name(): return "vol-" + \ ''.join(random.choice(string.ascii_lowercase + string.digits) @@ -22,11 +25,11 @@ def init_k8s_api_client(): if os.getenv('LONGHORN_CLIENT_URL'): # for develop or debug, run test in local environment config.load_kube_config() - logging.info("initialize out-of-cluster k8s api client") + logging("Initialized out-of-cluster k8s api client") else: # for ci, run test in in-cluster environment config.load_incluster_config() - logging.info("initialize in-cluster k8s api client") + logging("Initialized in-cluster k8s api client") def list_nodes(): core_api = client.CoreV1Api() @@ -41,6 +44,7 @@ def list_nodes(): def wait_for_cluster_ready(): core_api = client.CoreV1Api() for i in range(RETRY_COUNTS): + logging(f"Waiting for cluster ready ({i}) ...") try: resp = core_api.list_node() ready = True @@ -52,7 +56,7 @@ def wait_for_cluster_ready(): if ready: break except Exception as e: - logging.warn(f"list node error: {e}") + logging(f"Listing nodes error: {e}") time.sleep(RETRY_INTERVAL) assert ready, f"expect cluster's ready but it isn't {resp}" @@ -62,6 +66,7 @@ def wait_for_all_instance_manager_running(): nodes = list_nodes() for _ in range(RETRY_COUNTS): + logging(f"Waiting for all instance manager running ({_}) ...") instance_managers = longhorn_client.list_instance_manager() instance_manager_map = {} try: @@ -72,7 +77,7 @@ def wait_for_all_instance_manager_running(): break time.sleep(RETRY_INTERVAL) except Exception as e: - print(f"exception when get instance manager state: {e}") + logging(f"Getting instance manager state error: {e}") assert len(instance_manager_map) == len(nodes), f"expect all instance managers running: {instance_managers}" def get_node(index): @@ -107,7 +112,7 @@ def get_cr(group, version, namespace, plural, name): resp = api.get_namespaced_custom_object(group, version, namespace, plural, name) return resp except ApiException as e: - print("Exception when calling CustomObjectsApi->get_namespaced_custom_object: %s\n" % e) + logging(f"Getting namespaced custom object error: {e}") def filter_cr(group, version, namespace, plural, field_selector="", label_selector=""): api = client.CustomObjectsApi() @@ -115,7 +120,7 @@ def filter_cr(group, version, namespace, plural, field_selector="", label_select resp = api.list_namespaced_custom_object(group, version, namespace, plural, field_selector=field_selector, label_selector=label_selector) return resp except ApiException as e: - print("Exception when calling CustomObjectsApi->list_namespaced_custom_object: %s\n" % e) + logging(f"Listing namespaced custom object: {e}") def wait_delete_pod(pod_uid, namespace='default'): api = client.CoreV1Api() @@ -156,7 +161,7 @@ def get_mgr_ips(): def get_longhorn_client(): if os.getenv('LONGHORN_CLIENT_URL'): - logging.info(f"initialize longhorn api client from LONGHORN_CLIENT_URL") + logging(f"Initializing longhorn api client from LONGHORN_CLIENT_URL {os.getenv('LONGHORN_CLIENT_URL')}") # for develop or debug # manually expose longhorn client # to access longhorn manager in local environment @@ -166,10 +171,10 @@ def get_longhorn_client(): longhorn_client = from_env(url=f"{longhorn_client_url}/v1/schemas") return longhorn_client except Exception as e: - logging.info(f"get longhorn client error: {e}") + logging(f"Getting longhorn client error: {e}") time.sleep(RETRY_INTERVAL) else: - logging.info(f"initialize longhorn api client from longhorn manager") + logging(f"Initializing longhorn api client from longhorn manager") # for ci, run test in in-cluster environment # directly use longhorn manager cluster ip for i in range(RETRY_COUNTS): @@ -184,7 +189,7 @@ def get_longhorn_client(): longhorn_client = from_env(url=f"http://{ip}:9500/v1/schemas") return longhorn_client except Exception as e: - logging.info(f"get longhorn client error: {e}") + logging(f"Getting longhorn client error: {e}") time.sleep(RETRY_INTERVAL) def get_test_pod_running_node(): diff --git a/e2e/libs/volume/crd.py b/e2e/libs/volume/crd.py index 517bc487c2..2617cdac93 100644 --- a/e2e/libs/volume/crd.py +++ b/e2e/libs/volume/crd.py @@ -1,8 +1,7 @@ import os import time import warnings -import logging - +from utility.utility import logging from volume.base import Base from volume.rest import Rest from kubernetes import client @@ -106,7 +105,7 @@ def delete(self, volume_name): ) self.wait_for_volume_delete(volume_name) except Exception as e: - logging.warn(f"Exception when deleting volume: {e}") + logging(f"Deleting volume error: {e}") def wait_for_volume_delete(self, volume_name): for i in range(retry_count): @@ -120,45 +119,48 @@ def wait_for_volume_delete(self, volume_name): ) except Exception as e: if e.reason == 'Not Found': - logging.warn(f"volume {volume_name} delete") + logging(f"Deleted volume {volume_name}") return else: - logging.warn(f"wait for volume delete error: {e}") + logging(f"Waiting for volume deleting error: {e}") time.sleep(retry_interval) assert False, f"expect volume {volume_name} deleted but it still exists" def wait_for_volume_state(self, volume_name, desired_state): for i in range(retry_count): + logging(f"Waiting for {volume_name} {desired_state} ({i}) ...") try: if self.get(volume_name)["status"]["state"] == desired_state: break except Exception as e: - print(f"get volume {self.get(volume_name)} status error: {e}") + logging(f"Getting volume {self.get(volume_name)} status error: {e}") time.sleep(retry_interval) assert self.get(volume_name)["status"]["state"] == desired_state def wait_for_volume_robustness(self, volume_name, desired_state): for i in range(retry_count): + logging(f"Waiting for {volume_name} {desired_state} ({i}) ...") try: if self.get(volume_name)["status"]["robustness"] == desired_state: break except Exception as e: - print(f"get volume robustness error. volume = {self.get(volume_name)}") + logging(f"Getting volume {self.get(volume_name)} robustness error: {e}") time.sleep(retry_interval) assert self.get(volume_name)["status"]["robustness"] == desired_state def wait_for_volume_robustness_not(self, volume_name, not_desired_state): for i in range(retry_count): + logging(f"Waiting for {volume_name} not {not_desired_state} ({i}) ...") try: if self.get(volume_name)["status"]["robustness"] != not_desired_state: break except Exception as e: - print(f"get volume robustness error. volume = {self.get(volume_name)}") + logging(f"Getting volume {self.get(volume_name)} robustness error: {e}") time.sleep(retry_interval) assert self.get(volume_name)["status"]["robustness"] != not_desired_state def get_endpoint(self, volume_name): - warnings.warn("no endpoint in volume cr, get it from rest api") + logging("Delegating the get_endpoint call to API because there is no CRD implementation") return Rest(self.node_exec).get_endpoint(volume_name) def write_random_data(self, volume_name, size): @@ -173,11 +175,11 @@ def write_random_data(self, volume_name, size): def keep_writing_data(self, volume_name, size): node_name = self.get(volume_name)["spec"]["nodeID"] endpoint = self.get_endpoint(volume_name) - logging.warn(f"==> keep writing data to volume {volume_name}") + logging(f"Keeping writing data to volume {volume_name}") res = self.node_exec.issue_cmd( node_name, f"while true; do dd if=/dev/urandom of={endpoint} bs=1M count={size} status=none; done > /dev/null 2> /dev/null &") - logging.warn(f"==> before write operation completed, function can return") + logging(f"Created process to keep writing data to volume {volume_name}") def delete_replica(self, volume_name, node_name): replica_list = self.obj_api.list_namespaced_custom_object( @@ -188,7 +190,7 @@ def delete_replica(self, volume_name, node_name): label_selector=f"longhornvolume={volume_name}\ ,longhornnode={node_name}" ) - print(f"delete replica {replica_list['items'][0]['metadata']['name']}") + logging(f"Deleting replica {replica_list['items'][0]['metadata']['name']}") self.obj_api.delete_namespaced_custom_object( group="longhorn.io", version="v1beta2", @@ -198,11 +200,11 @@ def delete_replica(self, volume_name, node_name): ) def wait_for_replica_rebuilding_start(self, volume_name, node_name): - warnings.warn("no rebuild status in volume cr, get it from rest api") + logging("Delegating the wait_for_replica_rebuilding_start call to API because there is no CRD implementation") Rest(self.node_exec).wait_for_replica_rebuilding_start(volume_name, node_name) def wait_for_replica_rebuilding_complete(self, volume_name, node_name): - warnings.warn("no rebuild status in volume cr, get it from rest api") + logging("Delegating the wait_for_replica_rebuilding_complete call to API because there is no CRD implementation") Rest(self.node_exec).wait_for_replica_rebuilding_complete( volume_name, node_name @@ -214,10 +216,11 @@ def check_data(self, volume_name, checksum): _checksum = self.node_exec.issue_cmd( node_name, f"md5sum {endpoint} | awk \'{{print $1}}\'") - print(f"get {endpoint} checksum = {_checksum},\ + logging(f"Got {endpoint} checksum = {_checksum},\ expected checksum = {checksum}") assert _checksum == checksum def cleanup(self, volume_names): for volume_name in volume_names: + logging(f"Deleting volume {volume_name}") self.delete(volume_name) \ No newline at end of file diff --git a/e2e/libs/volume/rest.py b/e2e/libs/volume/rest.py index 6c7e80633e..d81b29a979 100644 --- a/e2e/libs/volume/rest.py +++ b/e2e/libs/volume/rest.py @@ -1,6 +1,6 @@ from volume.base import Base from utility.utility import get_longhorn_client - +from utility.utility import logging import time import os @@ -50,7 +50,7 @@ def get_endpoint(self, volume_name): break time.sleep(RETRY_INTERVAL) - print(f"get volume {volume_name} endpoint = {endpoint}") + logging(f"Got volume {volume_name} endpoint = {endpoint}") if v.frontend == VOLUME_FRONTEND_BLOCKDEV: assert endpoint == os.path.join(DEV_PATH, v.name) @@ -68,11 +68,10 @@ def delete_replica(self, volume_name, node_name): return NotImplemented def wait_for_replica_rebuilding_start(self, volume_name, node_name): - print(node_name) rebuilding_replica_name = None for i in range(RETRY_COUNTS): v = self.longhorn_client.by_id_volume(volume_name) - print(v.replicas) + logging(f"Got volume {volume_name} replicas = {v.replicas}") for replica in v.replicas: if replica.hostId == node_name: rebuilding_replica_name = replica.name @@ -81,18 +80,18 @@ def wait_for_replica_rebuilding_start(self, volume_name, node_name): break time.sleep(RETRY_INTERVAL) assert rebuilding_replica_name != None - print(rebuilding_replica_name) + logging(f"Got rebuilding replica = {rebuilding_replica_name}") started = False for i in range(RETRY_COUNTS): v = self.longhorn_client.by_id_volume(volume_name) - print(v.rebuildStatus) + logging(f"Got volume rebuild status = {v.rebuildStatus}") for status in v.rebuildStatus: for replica in v.replicas: if status.replica == replica.name and \ replica.hostId == node_name and \ status.state == "in_progress": - print(f"{node_name}'s replica is {replica.name}") + logging(f"Started {node_name}'s replica {replica.name} rebuilding") started = True break if started: @@ -104,7 +103,7 @@ def wait_for_replica_rebuilding_complete(self, volume_name, node_name): completed = False for i in range(RETRY_COUNTS): v = self.longhorn_client.by_id_volume(volume_name) - print(f"replicas = {v.replicas}") + logging(f"Got volume {volume_name} replicas = {v.replicas}") for replica in v.replicas: # use replica.mode is RW or RO to check if this replica # has been rebuilt or not @@ -115,7 +114,7 @@ def wait_for_replica_rebuilding_complete(self, volume_name, node_name): # so it's no way to distinguish "rebuilding not started yet" # or "rebuilding already completed" using rebuildStatus if replica.hostId == node_name and replica.mode == "RW": - print(f"{node_name}'s replica {replica.name} becomes RW") + logging(f"Completed {node_name}'s replica {replica.name} rebuilding") completed = True break if completed: diff --git a/e2e/libs/workload/workload.py b/e2e/libs/workload/workload.py index 9229b28076..63796475b1 100644 --- a/e2e/libs/workload/workload.py +++ b/e2e/libs/workload/workload.py @@ -3,7 +3,7 @@ from kubernetes.stream import stream import time import yaml -import logging +from utility.utility import logging RETRY_COUNTS = 150 RETRY_INTERVAL = 1 @@ -19,7 +19,7 @@ def get_name_suffix(*args): return suffix def create_storageclass(name): - if name == 'strict-local': + if name == 'longhorn-test-strict-local': filepath = "./templates/workload/strict_local_storageclass.yaml" else: filepath = "./templates/workload/storageclass.yaml" @@ -28,10 +28,7 @@ def create_storageclass(name): namespace = 'default' manifest_dict = yaml.safe_load(f) api = client.StorageV1Api() - try: - api.create_storage_class(body=manifest_dict) - except Exception as e: - print(f"Exception when create storageclass: {e}") + api.create_storage_class(body=manifest_dict) def delete_storageclass(name): api = client.StorageV1Api() @@ -54,29 +51,26 @@ def create_deployment(volume_type, option): # correct claim name manifest_dict['spec']['template']['spec']['volumes'][0]['persistentVolumeClaim']['claimName'] += suffix api = client.AppsV1Api() - try: - deployment = api.create_namespaced_deployment( - namespace=namespace, - body=manifest_dict) - print(deployment) - - deployment_name = deployment.metadata.name - replicas = deployment.spec.replicas - - for i in range(RETRY_COUNTS): - deployment = api.read_namespaced_deployment( - name=deployment_name, - namespace=namespace) - # deployment is none if deployment is not yet created - if deployment is not None and \ - deployment.status.ready_replicas == replicas: - break - time.sleep(RETRY_INTERVAL) - assert deployment.status.ready_replicas == replicas + deployment = api.create_namespaced_deployment( + namespace=namespace, + body=manifest_dict) + + deployment_name = deployment.metadata.name + replicas = deployment.spec.replicas + + for i in range(RETRY_COUNTS): + deployment = api.read_namespaced_deployment( + name=deployment_name, + namespace=namespace) + # deployment is none if deployment is not yet created + if deployment is not None and \ + deployment.status.ready_replicas == replicas: + break + time.sleep(RETRY_INTERVAL) + + assert deployment.status.ready_replicas == replicas - except Exception as e: - print(f"Exception when create deployment: {e}") return deployment_name def delete_deployment(name, namespace='default'): @@ -120,28 +114,26 @@ def create_statefulset(volume_type, option): if volume_type == 'rwx': manifest_dict['spec']['volumeClaimTemplates'][0]['spec']['accessModes'][0] = 'ReadWriteMany' api = client.AppsV1Api() - try: - statefulset = api.create_namespaced_stateful_set( - body=manifest_dict, - namespace=namespace) - statefulset_name = statefulset.metadata.name - replicas = statefulset.spec.replicas + statefulset = api.create_namespaced_stateful_set( + body=manifest_dict, + namespace=namespace) - for i in range(RETRY_COUNTS): - statefulset = api.read_namespaced_stateful_set( - name=statefulset_name, - namespace=namespace) - # statefulset is none if statefulset is not yet created - if statefulset is not None and \ - statefulset.status.ready_replicas == replicas: - break - time.sleep(RETRY_INTERVAL) + statefulset_name = statefulset.metadata.name + replicas = statefulset.spec.replicas + + for i in range(RETRY_COUNTS): + statefulset = api.read_namespaced_stateful_set( + name=statefulset_name, + namespace=namespace) + # statefulset is none if statefulset is not yet created + if statefulset is not None and \ + statefulset.status.ready_replicas == replicas: + break + time.sleep(RETRY_INTERVAL) - assert statefulset.status.ready_replicas == replicas + assert statefulset.status.ready_replicas == replicas - except Exception as e: - print(f"Exception when create statefulset: {e}") return statefulset_name def delete_statefulset(name, namespace='default'): @@ -182,12 +174,11 @@ def create_pvc(volume_type, option): if volume_type == 'rwx': manifest_dict['spec']['accessModes'][0] = 'ReadWriteMany' api = client.CoreV1Api() - try: - pvc = api.create_namespaced_persistent_volume_claim( - body=manifest_dict, - namespace=namespace) - except Exception as e: - print(f"Exception when create pvc: {e}") + + pvc = api.create_namespaced_persistent_volume_claim( + body=manifest_dict, + namespace=namespace) + return pvc.metadata.name def delete_pvc(name, namespace='default'): @@ -241,7 +232,7 @@ def get_workload_volume_name(workload_name): def get_workload_pvc_name(workload_name): api = client.CoreV1Api() pod = get_workload_pods(workload_name)[0] - print(f"pod = {pod}") + logging(f"Got pod {pod.metadata.name} for workload {workload_name}") for volume in pod.spec.volumes: if volume.name == 'pod-data': pvc_name = volume.persistent_volume_claim.claim_name @@ -269,12 +260,12 @@ def keep_writing_pod_data(pod_name, size_in_mb=256, path="/data/overwritten-data '-c', f"while true; do dd if=/dev/urandom of={path} bs=1M count={size_in_mb} status=none; done > /dev/null 2> /dev/null &" ] - logging.warn("before keep_writing_pod_data") + logging(f"Keep writing pod {pod_name}") res = stream( api.connect_get_namespaced_pod_exec, pod_name, 'default', command=write_cmd, stderr=True, stdin=False, stdout=True, tty=False) - logging.warn("keep_writing_pod_data return") + logging(f"Created process to keep writing pod {pod_name}") return res def check_pod_data(pod_name, checksum, path="/data/random-data"): @@ -288,7 +279,7 @@ def check_pod_data(pod_name, checksum, path="/data/random-data"): api.connect_get_namespaced_pod_exec, pod_name, 'default', command=cmd, stderr=True, stdin=False, stdout=True, tty=False) - print(f"get {path} checksum = {_checksum},\ + logging(f"Got {path} checksum = {_checksum},\ expected checksum = {checksum}") assert _checksum == checksum @@ -296,6 +287,7 @@ def wait_for_workload_pod_stable(workload_name): stable_pod = None wait_for_stable_retry = 0 for _ in range(POD_WAIT_TIMEOUT): + logging(f"Waiting for {workload_name} pod stable ({_}) ...") pods = get_workload_pods(workload_name) for pod in pods: if pod.status.phase == "Running":