Skip to content

Commit

Permalink
Merge pull request #429 from Limmen/start_services
Browse files Browse the repository at this point in the history
add start/stop services tests
  • Loading branch information
Limmen authored Aug 27, 2024
2 parents d1675c5 + a21c4ce commit 9b03cb1
Showing 1 changed file with 366 additions and 0 deletions.
366 changes: 366 additions & 0 deletions emulation-system/tests/test_start_stop_host_manager_services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,366 @@
import pytest
import docker
import logging
import grpc
from unittest.mock import MagicMock
from docker.types import IPAMConfig, IPAMPool
import time
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.controllers.host_controller import HostController
import csle_common.constants.constants as constants
import csle_collector.host_manager.host_manager_pb2_grpc
import csle_collector.host_manager.host_manager_pb2
import csle_collector.host_manager.query_host_manager
from csle_common.metastore.metastore_facade import MetastoreFacade
from typing import List
from typing import Generator


@pytest.fixture(scope="module")
def docker_client() -> None:
"""
Initialize and Provide a Docker client instance for the test
:return: None
"""
return docker.from_env()


@pytest.fixture(scope="module")
def network(docker_client) -> Generator:
"""
Create a custom network with a specific subnet
:param docker_client: docker_client
:yield: network
:return: None
"""
subnet = "15.15.15.0/24"
ipam_pool = IPAMPool(subnet=subnet)
ipam_config = IPAMConfig(pool_configs=[ipam_pool])
logging.info(f"Creating virtual network with subnet: {subnet}")
network = docker_client.networks.create("test_network", driver="bridge", ipam=ipam_config)
yield network
network.remove()


def get_derived_containers(docker_client, excluded_tag="blank") -> List:
"""
Get all the containers except the blank ones
:param docker_client: docker_client
:return: List of Images
"""
# Get all images except those with the excluded tag
config = MetastoreFacade.get_config(id=1)
match_tag = config.version
all_images = docker_client.images.list()
derived_images = [image for image in all_images
if (any(match_tag in tag for tag in image.tags)
and all(constants.CONTAINER_IMAGES.BASE not in tag for tag in image.tags)
and all(excluded_tag not in tag for tag in image.tags))]
return derived_images


@pytest.fixture(scope="module", params=get_derived_containers(docker.from_env()))
def container_setup(request, docker_client, network) -> Generator:
"""
Starts a Docker container before running tests and ensures its stopped and removed after tests complete.
:param request: request
:param docker_client: docker_client
:yield: container
:return: None
"""
# Create and start each derived container
image = request.param
container = docker_client.containers.create(image.tags[0], command="sh -c 'while true; do sleep 3600; done'",
detach=True)
network.connect(container)
logging.info(f"Starting container: {container.id} with image: {container.image.tags}")
container.start()
yield container
logging.info(f"Stopping and removing container: {container.id} with image: {container.image.tags}")
container.stop()
container.remove()


def test_start_host_manager(container_setup) -> None:
"""
Start host_manager in a container
:param container_setup: container_setup
:return: None
"""
failed_containers = []
containers_info = []
container_setup.reload()
assert container_setup.status == "running"
# Mock emulation_env_config
emulation_env_config = MagicMock(spec=EmulationEnvConfig)
emulation_env_config.get_connection.return_value = MagicMock()
emulation_env_config.host_manager_config = MagicMock()
emulation_env_config.host_manager_config.host_manager_port = 8080
emulation_env_config.host_manager_config.host_manager_log_dir = "/var/log/host_manager"
emulation_env_config.host_manager_config.host_manager_log_file = "host_manager.log"
emulation_env_config.host_manager_config.host_manager_max_workers = 4

ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO]
port = emulation_env_config.host_manager_config.host_manager_port
try:
# Start host_manager command
cmd = (
f"/root/miniconda3/bin/python3 /host_manager.py "
f"--port {emulation_env_config.host_manager_config.host_manager_port} "
f"--logdir {emulation_env_config.host_manager_config.host_manager_log_dir} "
f"--logfile {emulation_env_config.host_manager_config.host_manager_log_file} "
f"--maxworkers {emulation_env_config.host_manager_config.host_manager_max_workers}"
)

# Run cmd in the container
logging.info(f"Starting host manager in container: {container_setup.id} "
f"with image: {container_setup.image.tags}")
container_setup.exec_run(cmd, detach=True)

# Check if host_manager starts
cmd = (
f"sh -c '{constants.COMMANDS.PS_AUX} | {constants.COMMANDS.GREP} "
f"{constants.COMMANDS.SPACE_DELIM}{constants.TRAFFIC_COMMANDS.HOST_MANAGER_FILE_NAME}'"
)
logging.info(f"Verifying that host manager is running in container: {container_setup.id} "
f"with image: {container_setup.image.tags}")
result = container_setup.exec_run(cmd)
output = result.output.decode("utf-8")
assert constants.COMMANDS.SEARCH_HOST_MANAGER in output, "Host manager is not running in the container"
time.sleep(5)
# Call grpc
with grpc.insecure_channel(f"{ip}:{port}", options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
status = csle_collector.host_manager.query_host_manager.get_host_status(stub=stub)
assert status
except Exception as e:
logging.info(f"Error occurred in container {container_setup.name}: {e}")
failed_containers.append(container_setup.name)
containers_info.append(
{
"container_status": container_setup.status,
"container_image": container_setup.image.tags,
"name": container_setup.name,
"error": str(e)
}
)
if failed_containers:
logging.info("Containers that failed to start the host manager:")
logging.info(containers_info)
assert not failed_containers, f"T{failed_containers} failed"


def test_start_filebeat(container_setup) -> None:
"""
Start filebeat in a container
:param container_setup: container_setup
:return: None
"""
emulation_env_config = MagicMock(spec=EmulationEnvConfig)
emulation_env_config.get_connection.return_value = MagicMock()
emulation_env_config.host_manager_config = MagicMock()
emulation_env_config.host_manager_config.host_manager_port = 8080
ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO]
port = emulation_env_config.host_manager_config.host_manager_port
logger = logging.getLogger("test_logger")
try:
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.start_filebeat(stub=stub)
host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port)
assert host_monitor_dto.filebeat_running, f"Failed to start Filebeat on {ip}."
logger.info(f"Filebeat has been successfully started on {ip}.")
except grpc.RpcError as e:
logger.error(f"gRPC Error: {e}")
assert False, f"gRPC call failed with error: {e}"


def test_stop_filebeat(container_setup) -> None:
"""
Stop filebeat in a container
:param container_setup: container_setup
:return: None
"""
emulation_env_config = MagicMock(spec=EmulationEnvConfig)
emulation_env_config.get_connection.return_value = MagicMock()
emulation_env_config.host_manager_config = MagicMock()
emulation_env_config.host_manager_config.host_manager_port = 8080
ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO]
port = emulation_env_config.host_manager_config.host_manager_port
logger = logging.getLogger("test_logger")
try:
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.stop_filebeat(stub=stub)
host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port)
assert not host_monitor_dto.filebeat_running, f"Failed to stop Filebeat on {ip}."
logger.info(f"Filebeat has been successfully stopped on {ip}.")
except grpc.RpcError as e:
logger.error(f"gRPC Error: {e}")
assert False, f"gRPC call failed with error: {e}"


def test_start_packetbeat(container_setup) -> None:
"""
Start packetbeat in a container
:param container_setup: container_setup
:return: None
"""
emulation_env_config = MagicMock(spec=EmulationEnvConfig)
emulation_env_config.get_connection.return_value = MagicMock()
emulation_env_config.host_manager_config = MagicMock()
emulation_env_config.host_manager_config.host_manager_port = 8080
ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO]
port = emulation_env_config.host_manager_config.host_manager_port
logger = logging.getLogger("test_logger")
try:
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.start_packetbeat(stub=stub)
host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port)
assert host_monitor_dto.packetbeat_running, f"Failed to start packetbeat on {ip}."
logger.info(f"packetbeat has been successfully started on {ip}.")
except grpc.RpcError as e:
logger.error(f"gRPC Error: {e}")
assert False, f"gRPC call failed with error: {e}"


def test_stop_packetbeat(container_setup) -> None:
"""
Stop packetbeat in a container
:param container_setup: container_setup
:return: None
"""
emulation_env_config = MagicMock(spec=EmulationEnvConfig)
emulation_env_config.get_connection.return_value = MagicMock()
emulation_env_config.host_manager_config = MagicMock()
emulation_env_config.host_manager_config.host_manager_port = 8080
ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO]
port = emulation_env_config.host_manager_config.host_manager_port
logger = logging.getLogger("test_logger")
try:
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.stop_packetbeat(stub=stub)
host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port)
assert not host_monitor_dto.packetbeat_running, f"Failed to stop packetbeat on {ip}."
logger.info(f"packetbeat has been successfully stopped on {ip}.")
except grpc.RpcError as e:
logger.error(f"gRPC Error: {e}")
assert False, f"gRPC call failed with error: {e}"


def test_start_metricbeat(container_setup) -> None:
"""
Start metricbeat in a container
:param container_setup: container_setup
:return: None
"""
emulation_env_config = MagicMock(spec=EmulationEnvConfig)
emulation_env_config.get_connection.return_value = MagicMock()
emulation_env_config.host_manager_config = MagicMock()
emulation_env_config.host_manager_config.host_manager_port = 8080
ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO]
port = emulation_env_config.host_manager_config.host_manager_port
logger = logging.getLogger("test_logger")
try:
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.start_metricbeat(stub=stub)
host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port)
assert host_monitor_dto.metricbeat_running, f"Failed to start metricbeat on {ip}."
logger.info(f"metricbeat has been successfully started on {ip}.")
except grpc.RpcError as e:
logger.error(f"gRPC Error: {e}")
assert False, f"gRPC call failed with error: {e}"


def test_stop_metricbeat(container_setup) -> None:
"""
Stop metricbeat in a container
:param container_setup: container_setup
:return: None
"""
emulation_env_config = MagicMock(spec=EmulationEnvConfig)
emulation_env_config.get_connection.return_value = MagicMock()
emulation_env_config.host_manager_config = MagicMock()
emulation_env_config.host_manager_config.host_manager_port = 8080
ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO]
port = emulation_env_config.host_manager_config.host_manager_port
logger = logging.getLogger("test_logger")
try:
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.stop_metricbeat(stub=stub)
host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port)
assert not host_monitor_dto.metricbeat_running, f"Failed to stop metricbeat on {ip}."
logger.info(f"metricbeat has been successfully stopped on {ip}.")
except grpc.RpcError as e:
logger.error(f"gRPC Error: {e}")
assert False, f"gRPC call failed with error: {e}"


def test_start_heartbeat(container_setup) -> None:
"""
Start heartbeat in a container
:param container_setup: container_setup
:return: None
"""
emulation_env_config = MagicMock(spec=EmulationEnvConfig)
emulation_env_config.get_connection.return_value = MagicMock()
emulation_env_config.host_manager_config = MagicMock()
emulation_env_config.host_manager_config.host_manager_port = 8080
ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO]
port = emulation_env_config.host_manager_config.host_manager_port
logger = logging.getLogger("test_logger")
try:
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.start_heartbeat(stub=stub)
host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port)
assert host_monitor_dto.heartbeat_running, f"Failed to start heartbeat on {ip}."
logger.info(f"heartbeat has been successfully started on {ip}.")
except grpc.RpcError as e:
logger.error(f"gRPC Error: {e}")
assert False, f"gRPC call failed with error: {e}"


def test_stop_heartbeat(container_setup) -> None:
"""
Stop heartbeat in a container
:param container_setup: container_setup
:return: None
"""
emulation_env_config = MagicMock(spec=EmulationEnvConfig)
emulation_env_config.get_connection.return_value = MagicMock()
emulation_env_config.host_manager_config = MagicMock()
emulation_env_config.host_manager_config.host_manager_port = 8080
ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO]
port = emulation_env_config.host_manager_config.host_manager_port
logger = logging.getLogger("test_logger")
try:
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.stop_heartbeat(stub=stub)
host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port)
assert not host_monitor_dto.heartbeat_running, f"Failed to stop heartbeat on {ip}."
logger.info(f"heartbeat has been successfully stopped on {ip}.")
except grpc.RpcError as e:
logger.error(f"gRPC Error: {e}")
assert False, f"gRPC call failed with error: {e}"

0 comments on commit 9b03cb1

Please sign in to comment.