Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Limmen/csle
Browse files Browse the repository at this point in the history
  • Loading branch information
Limmen committed Jul 19, 2024
2 parents 06826cf + a433e4f commit 7cdec90
Show file tree
Hide file tree
Showing 7 changed files with 330 additions and 0 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ features. We currently support each release for a window of 6 months.
<img src="https://upload.wikimedia.org/wikipedia/commons/3/35/Tux.svg" width="7%" height="7%" style="margin-left:70px;"/>
</p>

## Datasets

A dataset of 6400 intrusion traces can be found [here](https://zenodo.org/records/10234379).

## Maintainer

<table>
Expand Down
38 changes: 38 additions & 0 deletions simulation-system/libs/csle-common/tests/test_grpc_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import grpc
from unittest.mock import patch, MagicMock
from csle_common.util.grpc_util import GrpcUtil


class TestGrpcUtilSuite:
"""
Test suite for grpc_util
"""

@patch("grpc.channel_ready_future")
def test_grpc_server_on(self, mock_channel_ready_future) -> None:
"""
Test utility function to test if a given gRPC channel is working or not
:param mock_channel_ready_future: mock_channel_ready_future
:return: None
"""
mock_future = MagicMock()
mock_channel_ready_future.return_value = mock_future
result = GrpcUtil.grpc_server_on(mock_channel_ready_future)
mock_future.result.assert_called()
assert result

@patch("grpc.channel_ready_future")
def test_grpc_server_on_timeout(self, mock_channel_ready_future) -> None:
"""
Test utility function to test if a given gRPC channel is not working
:param mock_channel_ready_future: mock_channel_ready_future
:return: None
"""
mock_future = MagicMock()
mock_future.result.side_effect = grpc.FutureTimeoutError()
mock_channel_ready_future.return_value = mock_future
result = GrpcUtil.grpc_server_on(mock_channel_ready_future)
mock_future.result.assert_called()
assert not result
64 changes: 64 additions & 0 deletions simulation-system/libs/csle-common/tests/test_import_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from unittest.mock import patch, MagicMock
from csle_common.util.import_util import ImportUtil


class TestImportUtilSuite:
"""
Test suite for import_util
"""

@patch("os.path.exists")
@patch("csle_common.dao.system_identification.emulation_statistics.EmulationStatistics.from_json_file")
@patch("csle_common.metastore.metastore_facade.MetastoreFacade.save_emulation_statistic")
def test_import_emulation_statistics_from_disk_json(
self, mock_save_emulation_statistic, mock_from_json_file, mock_path_exists
) -> None:
"""
Test the method that imports emulation statistics from disk to the metastore
:param mock_save_emulation_statistic: mock_save_emulation_statistic
:param mock_from_json_file: mock_from_json_file
:param mock_path_exists: mock_path_exists
:return: None
"""
mock_path_exists.return_value = True
mock_statistics = MagicMock()
mock_from_json_file.return_value = mock_statistics
input_file = "file.json"
emulation_name = "test_emulation"
ImportUtil.import_emulation_statistics_from_disk_json(input_file=input_file, emulation_name=emulation_name)

mock_path_exists.assert_called()
mock_from_json_file.assert_called_once_with(input_file)
assert mock_statistics.emulation_name == emulation_name
mock_save_emulation_statistic.assert_called()

@patch("os.path.exists")
@patch("csle_common.dao.emulation_config.emulation_trace.EmulationTrace.load_traces_from_disk")
@patch("csle_common.metastore.metastore_facade.MetastoreFacade.save_emulation_trace")
def test_import_emulation_traces_from_disk_json(
self, mock_save_emulation_trace, mock_load_traces_from_disk, mock_path_exists
) -> None:
"""
Test the method that imports emulation traces from disk to the metastore
:param mock_save_emulation_trace: mock_save_emulation_trac
:param mock_load_traces_from_disk: mock_load_traces_from_disk
:param mock_path_exists: mock_path_exists
:return: None
"""
mock_path_exists.return_value = True
mock_trace_1 = MagicMock()
mock_trace_2 = MagicMock()
mock_load_traces_from_disk.return_value = [mock_trace_1, mock_trace_2]
input_file = "file.json"
emulation_name = "test_emulation"
ImportUtil.import_emulation_traces_from_disk_json(input_file=input_file, emulation_name=emulation_name)

mock_path_exists.assert_called()
mock_load_traces_from_disk.assert_called()
assert mock_trace_1.emulation_name == emulation_name
assert mock_trace_2.emulation_name == emulation_name
assert mock_save_emulation_trace.call_count == 2
80 changes: 80 additions & 0 deletions simulation-system/libs/csle-common/tests/test_management_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import csle_common.constants.constants as constants
from csle_common.util.management_util import ManagementUtil
from unittest.mock import patch

class TestManagementUtilSuite:
"""
Test suite for management util
"""

@patch("csle_common.metastore.metastore_facade.MetastoreFacade.list_management_users")
@patch("bcrypt.gensalt")
@patch("bcrypt.hashpw")
@patch("csle_common.metastore.metastore_facade.MetastoreFacade.save_management_user")
def test_create_default_management_admin_account(
self, mock_save_management_user, mock_hashpw, mock_gensalt, mock_list_management_users
) -> None:
"""
Test the method that creates the default management admin account
:param mock_save_management_user: mock_save_management_user
:param mock_hashpw: mock_hashpw
:param mock_gensalt: mock_gensalt
:param mock_list_management_users: mock_list_management_users
:return: None
"""
mock_list_management_users.return_value = []
mock_salt = b"salt"
mock_gensalt.return_value = mock_salt
mock_hash = b"hashed_password"
mock_hashpw.return_value = mock_hash

constants.CSLE_ADMIN.MANAGEMENT_USER = "admin"
constants.CSLE_ADMIN.MANAGEMENT_PW = "password"
constants.CSLE_ADMIN.MANAGEMENT_FIRST_NAME = "first"
constants.CSLE_ADMIN.MANAGEMENT_LAST_NAME = "last"
constants.CSLE_ADMIN.MANAGEMENT_ORGANIZATION = "organization"
constants.CSLE_ADMIN.MANAGEMENT_EMAIL = "admin@email.com"

ManagementUtil.create_default_management_admin_account()
mock_list_management_users.assert_called_once()
mock_gensalt.assert_called_once()
mock_hashpw.assert_called_once_with(constants.CSLE_ADMIN.MANAGEMENT_PW.encode("utf-8"), mock_salt)
mock_save_management_user.assert_called_once()

@patch("csle_common.metastore.metastore_facade.MetastoreFacade.list_management_users")
@patch("bcrypt.gensalt")
@patch("bcrypt.hashpw")
@patch("csle_common.metastore.metastore_facade.MetastoreFacade.save_management_user")
def test_create_default_management_guest_account(
self, mock_save_management_user, mock_hashpw, mock_gensalt, mock_list_management_users
) -> None:
"""
Test the method that creates the default management guest account
:param mock_save_management_user: mock_save_management_user
:param mock_hashpw: mock_hashpw
:param mock_gensalt: mock_gensalt
:param mock_list_management_users: mock_list_management_users
:return: None
"""
mock_list_management_users.return_value = []
mock_salt = b"salt"
mock_gensalt.return_value = mock_salt
mock_hash = b"hashed_password"
mock_hashpw.return_value = mock_hash

constants.CSLE_GUEST.MANAGEMENT_USER = "user"
constants.CSLE_GUEST.MANAGEMENT_PW = "password"
constants.CSLE_GUEST.MANAGEMENT_FIRST_NAME = "guest_first"
constants.CSLE_GUEST.MANAGEMENT_LAST_NAME = "guest_last"
constants.CSLE_GUEST.MANAGEMENT_ORGANIZATION = "guest_organization"
constants.CSLE_GUEST.MANAGEMENT_EMAIL = "guest@email.com"

ManagementUtil.create_default_management_guest_account()
mock_list_management_users.assert_called_once()
mock_gensalt.assert_called_once()
mock_hashpw.assert_called_once_with(constants.CSLE_GUEST.MANAGEMENT_PW.encode("utf-8"), mock_salt)
mock_save_management_user.assert_called_once()
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from unittest.mock import patch
from csle_common.util.multiprocessing_util import NoDaemonProcess
from csle_common.util.multiprocessing_util import NoDaemonContext
from csle_common.util.multiprocessing_util import NestablePool


class TestMultiprocessingUtilSuite:
"""
Test suite for multiprocessing util
"""

def test_daemon(self) -> None:
"""
Test the process with daemon property set to false
:return: None
"""
result = NoDaemonProcess(target=lambda: None).daemon
assert not result

def test_no_daemon_context(self) -> None:
"""
Test the NoDaemonContext method
:return: None
"""
context = NoDaemonContext()
process = context.Process(target=lambda: None)
assert isinstance(process, NoDaemonProcess)
assert not process.daemon

@patch("multiprocessing.get_context")
def test_nestable_pool_initialization(self, mock_get_context) -> None:
"""
Test the method that initializes the pool
:param mock_get_context: mock_get_context
:return: None
"""
mock_get_context.return_value = NoDaemonContext()
pool = NestablePool()
assert pool
48 changes: 48 additions & 0 deletions simulation-system/libs/csle-common/tests/test_plotting_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from csle_common.util.plotting_util import PlottingUtil
from scipy import stats
import numpy as np


class TestPlottingUtilSuite:
"""
Test suite for plotting util
"""

def test_running_average(self) -> None:
"""
Test the function used to compute the running average of the last N elements of a vector x
:return: None
"""
x = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
N = 3
expected = np.array([1, 2, 3, 3, 4, 5, 6, 7, 8, 9])
result = PlottingUtil.running_average(x, N)
assert result.any() == expected.any()

def test_mean_confidence_interval(self) -> None:
"""
Test function that computes confidence intervals
:return: None
"""
data = np.array([1, 2, 3, 4, 5])
mean, h = PlottingUtil.mean_confidence_interval(data=data, confidence=0.95)
expected_mean = np.mean(data)
expected_se = stats.sem(data)
expected_h = expected_se * stats.t.ppf((1 + 0.95) / 2.0, len(data) - 1)
assert expected_mean == mean
assert expected_h == h

def test_min_max_norm(self) -> None:
"""
Test function that computes min-max normalization of a vector
:return: None
"""
vec = np.array([1, 2, 3, 4, 5])
min_val = 1
max_val = 5
expected = np.array([0.0, 0.25, 0.5, 0.75, 1.0])
result = PlottingUtil.min_max_norm(vec, max_val, min_val)
assert result.any() == expected.any()
53 changes: 53 additions & 0 deletions simulation-system/libs/csle-common/tests/test_ssh_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from csle_common.util.ssh_util import SSHUtil
from unittest.mock import patch, MagicMock
import pytest


class TestSSHUtilSuite:
"""
Test suite for ssh_util
"""

@pytest.fixture(autouse=True)
def mock_sleep(self):
"""
Mock time.sleep to avoid delays
"""
with patch("time.sleep", return_value=None):
yield

@patch("csle_common.util.ssh_util.SSHUtil.execute_ssh_cmd")
def test_execute_ssh_cmds(self, mock_execute_ssh_cmd) -> None:
"""
Test the method that executes a list of commands over an ssh connection to the emulation
:param mock_execute_ssh_cmd: mock_execute_ssh_cmd
:return: None
"""
mock_execute_ssh_cmd.return_value = (b"output", b"error", 1.0)
cmds = ["ls", "pwd", "whoami"]
conn = MagicMock()
results = SSHUtil.execute_ssh_cmds(cmds, conn)
mock_execute_ssh_cmd.assert_called()
assert results == [(b"output", b"error", 1.0)] * len(cmds)

def test_execute_ssh_cmd(self) -> None:
"""
Test the method that executes an action on the emulation over a ssh connection
:return: None
"""
conn = MagicMock()
mock_transport = MagicMock()
mock_session = MagicMock()
mock_session.exit_status_ready.return_value = True
mock_session.recv_ready.return_value = True
mock_session.recv_stderr_ready.return_value = True
mock_session.recv.side_effect = [b"output", b""]
mock_session.recv_stderr.side_effect = [b"error", b""]
conn.get_transport.return_value = mock_transport
mock_transport.open_session.return_value = mock_session

with pytest.raises(ConnectionError, match="Connection failed"):
SSHUtil.execute_ssh_cmd("ls", conn)

0 comments on commit 7cdec90

Please sign in to comment.