Skip to content

Commit

Permalink
Merge pull request #399 from Limmen/test_container_ftp
Browse files Browse the repository at this point in the history
Integration test container start/stop/ping
  • Loading branch information
Limmen authored Aug 8, 2024
2 parents 6c13fcf + 02775ba commit 3518898
Show file tree
Hide file tree
Showing 4 changed files with 321 additions and 0 deletions.
54 changes: 54 additions & 0 deletions emulation-system/tests/test_all_containers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pytest
import docker

@pytest.fixture(scope="module")
def docker_client() -> None:
"""
Provide a Docker client instance
:return: None
"""
return docker.from_env()

@pytest.fixture(scope="module")
def containers(docker_client) -> None:
"""
Starts Docker containers before running tests and ensures its stopped and removed after tests complete.
:param docker_client: docker_client
:return: None
"""
match_tag = "0.6.0"
all_images = docker_client.images.list()
images = [
image for image in all_images
if any(match_tag in tag for tag in image.tags) and
all("base" not in tag for tag in image.tags) and
all("kimham/csle_blank_ubuntu_22:0.6.0" not in tag for tag in image.tags)
] #56
started_containers = []
try:
for image in images:
container = docker_client.containers.run(image.id,detach=True)
started_containers.append(container)
yield started_containers
finally:
for container in started_containers:
container.stop()
container.remove()

def test_container_running(docker_client, containers) -> None:
"""
Checks if the container is running and check if its in the running containers list
:param docker_client: docker_client
:return: None
"""
running_containers = docker_client.containers.list()
failed_containers = []
for container in containers:
if container not in running_containers:
failed_containers.append(f"Container with ID {container.id} and image {container.image.tags} is not running.")
assert not failed_containers, f"Some containers failed to run: {failed_containers}"
38 changes: 38 additions & 0 deletions emulation-system/tests/test_container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import pytest
import docker
import csle_common.constants.constants as constants

@pytest.fixture(scope="module")
def docker_client() -> None:
"""
Provide a Docker client instance
:return: None
"""
return docker.from_env()

@pytest.fixture(scope="module")
def container(docker_client) -> None:
"""
Starts a Docker container before running tests and ensures its stopped and removed after tests complete.
:param docker_client: docker_client
:return: None
"""
image_with_tag = f"kimham/{constants.CONTAINER_IMAGES.FTP_1}:0.6.0"
container = docker_client.containers.run(image_with_tag, detach=True)
yield container
container.stop()
container.remove()

def test_container_running(docker_client, container) -> None:
"""
Checks if the container is running and check if its in the running containers list
:param docker_client: docker_client
:return: None
"""
running_containers = docker_client.containers.list()
assert container in running_containers
128 changes: 128 additions & 0 deletions emulation-system/tests/test_ping_all_containers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import pytest
import docker
from docker.types import IPAMConfig, IPAMPool


@pytest.fixture(scope="module")
def docker_client() -> None:
"""
Initialize and Provide a Docker client instance for the test
:return: None
"""
return docker.from_env()


@pytest.fixture(scope="module")
def network(docker_client) -> None:
"""
Create a custom network with a specific subnet
:param docker_client: docker_client
:yield: network
:return: None
"""
# Create a custom network
ipam_pool = IPAMPool(subnet="15.15.15.0/24")
ipam_config = IPAMConfig(pool_configs=[ipam_pool])
network = docker_client.networks.create("test_network", driver="bridge", ipam=ipam_config)
yield network
network.remove()


@pytest.fixture(scope="module")
def blank1(docker_client, network) -> None:
"""
Create and start the first container with a specific IP
:param docker_client: docker_client
:param network: network
:yield: container
:return: None
"""
# Create and start the first container with a specific IP
container = docker_client.containers.create(
"kimham/csle_blank_1:0.6.0",
command="sh -c 'apt-get update && apt-get install -y iputils-ping && while true; do sleep 3600; done'",
detach=True,
)
network.connect(container, ipv4_address="15.15.15.10")
container.start()
yield container
container.stop()
container.remove()


@pytest.fixture(scope="module")
def other_containers(docker_client, network):
"""
Create and start the second container with a specific IP
:param docker_client: docker_client
:param network: network
:yield: container
:return: None
"""
# Create and start the second container with a specific IP
match_tag = "0.6.0"
all_images = docker_client.images.list()
images = [
image
for image in all_images
if any(match_tag in tag for tag in image.tags)
and all("base" not in tag for tag in image.tags)
and all("kimham/csle_blank_ubuntu_22:0.6.0" not in tag for tag in image.tags)
]
containers = []
start_ip = 11
for image in images:
container = docker_client.containers.create(
image.tags[0],
command="sh -c 'apt-get update && apt-get install -y iputils-ping && while true; do sleep 3600; done'",
detach=True,
)
network.connect(container, ipv4_address=f"15.15.15.{start_ip}")
container.start()
containers.append((container, f"15.15.15.{start_ip}"))
start_ip += 1
yield containers
for container, _ in containers:
container.stop()
container.remove()


def test_ping_containers(blank1, other_containers) -> None:
"""
Ping container1 from container2
:return: None
"""
failed_tests = []
blank1.start()
blank1.reload()
# Check if containers are running
assert blank1.status == "running", "Container1 is not running"
# Ping container2 from blank1
for container, ip in other_containers:
container.start()
container.reload()
assert container.status == "running", "Container2 is not running"
try:
exec_result = blank1.exec_run(f"ping -c 3 {ip}")
output = exec_result.output.decode("utf-8")
# Check if the ping was successful
assert "3 packets transmitted, 3 received" in output, f"Ping failed. Logs: {output}"
if "3 packets transmitted, 3 received" not in output:
failed_tests.append(f"Ping to {container.image.tags} from blank1 failed. Logs: {output}")
except Exception as e:
failed_tests.append(
f"Ping to {container.image.tags} from blank1 failed. Container: {container.image.tags}, \
IP: {ip}, Error: {str(e)}"
)
if failed_tests:
for fail in failed_tests:
print(fail)
assert False, "Some ping tests failed, see the output above for details."
101 changes: 101 additions & 0 deletions emulation-system/tests/test_ping_container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import pytest
import docker
from docker.types import IPAMConfig, IPAMPool


@pytest.fixture(scope="module")
def docker_client() -> None:
"""
Initialize and Provide a Docker client instance for the test
:return: None
"""
return docker.from_env()


@pytest.fixture(scope="module")
def network(docker_client) -> None:
"""
Create a custom network with a specific subnet
:param docker_client: docker_client
:yield: network
:return: None
"""
# Create a custom network
ipam_pool = IPAMPool(subnet="15.15.15.0/24")
ipam_config = IPAMConfig(pool_configs=[ipam_pool])
network = docker_client.networks.create("test_network", driver="bridge", ipam=ipam_config)
yield network
network.remove()


@pytest.fixture(scope="module")
def container1(docker_client, network) -> None:
"""
Create and start the first container with a specific IP
:param docker_client: docker_client
:param network: network
:yield: container
:return: None
"""
# Create and start the first container with a specific IP
container = docker_client.containers.create(
"kimham/csle_spark_1:0.6.0",
command="sh -c 'while true; do sleep 3600; done'",
name="test_container1",
detach=True
)
network.connect(container, ipv4_address="15.15.15.15")
container.start()
yield container
container.stop()
container.remove()


@pytest.fixture(scope="module")
def container2(docker_client, network):
"""
Create and start the second container with a specific IP
:param docker_client: docker_client
:param network: network
:yield: container
:return: None
"""
# Create and start the second container with a specific IP
container = docker_client.containers.create(
"kimham/csle_elk_1:0.6.0",
command="sh -c 'while true; do sleep 3600; done'",
name="test_container2",
detach=True
)
network.connect(container, ipv4_address="15.15.15.14")
container.start()
yield container
container.stop()
container.remove()


def test_ping_containers(container1, container2) -> None:
"""
Ping container1 from container2
:return: None
"""
container1.start()
container2.start()
container1.reload()
container2.reload()
# Check if containers are running
assert container1.status == 'running', "Container1 is not running"
assert container2.status == 'running', "Container2 is not running"
# Ping container2 from container1
exec_result = container2.exec_run("ping -c 3 15.15.15.15")
output = exec_result.output.decode('utf-8')
# Check if the ping was successful
assert "3 packets transmitted, 3 received" in output, f"Ping failed. Logs: {output}"

0 comments on commit 3518898

Please sign in to comment.