diff --git a/simulation-system/libs/csle-cli/src/csle_cli/cli.py b/simulation-system/libs/csle-cli/src/csle_cli/cli.py index 766c62b66..cd3116603 100755 --- a/simulation-system/libs/csle-cli/src/csle_cli/cli.py +++ b/simulation-system/libs/csle-cli/src/csle_cli/cli.py @@ -11,6 +11,7 @@ from csle_common.util.general_util import GeneralUtil from csle_cluster.cluster_manager.cluster_controller import ClusterController from typing import TYPE_CHECKING + if TYPE_CHECKING: from csle_common.dao.emulation_config.emulation_env_state import EmulationEnvState from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig diff --git a/simulation-system/libs/csle-rest-api/src/csle_rest_api/resources/pgadmin/routes.py b/simulation-system/libs/csle-rest-api/src/csle_rest_api/resources/pgadmin/routes.py index 57017c23d..45578ed2f 100644 --- a/simulation-system/libs/csle-rest-api/src/csle_rest_api/resources/pgadmin/routes.py +++ b/simulation-system/libs/csle-rest-api/src/csle_rest_api/resources/pgadmin/routes.py @@ -3,10 +3,10 @@ """ from typing import Tuple import json +from flask import Blueprint, Response, jsonify, request import csle_common.constants.constants as constants from csle_cluster.cluster_manager.cluster_controller import ClusterController from csle_common.metastore.metastore_facade import MetastoreFacade -from flask import Blueprint, jsonify, request, Response import csle_rest_api.constants.constants as api_constants import csle_rest_api.util.rest_api_util as rest_api_util diff --git a/simulation-system/libs/csle-rest-api/src/csle_rest_api/resources/prometheus/routes.py b/simulation-system/libs/csle-rest-api/src/csle_rest_api/resources/prometheus/routes.py index cad51b750..d82189bb5 100644 --- a/simulation-system/libs/csle-rest-api/src/csle_rest_api/resources/prometheus/routes.py +++ b/simulation-system/libs/csle-rest-api/src/csle_rest_api/resources/prometheus/routes.py @@ -3,10 +3,10 @@ """ from typing import Tuple import json +from flask import Blueprint, Response, jsonify, request import csle_common.constants.constants as constants from csle_cluster.cluster_manager.cluster_controller import ClusterController from csle_common.metastore.metastore_facade import MetastoreFacade -from flask import Blueprint, jsonify, request, Response import csle_rest_api.constants.constants as api_constants import csle_rest_api.util.rest_api_util as rest_api_util diff --git a/simulation-system/libs/csle-rest-api/src/csle_rest_api/util/rest_api_util.py b/simulation-system/libs/csle-rest-api/src/csle_rest_api/util/rest_api_util.py index a8c29c343..793a5895d 100644 --- a/simulation-system/libs/csle-rest-api/src/csle_rest_api/util/rest_api_util.py +++ b/simulation-system/libs/csle-rest-api/src/csle_rest_api/util/rest_api_util.py @@ -48,7 +48,9 @@ def check_if_user_edit_is_authorized(request, user: ManagementUser) -> Union[Non # Extract token and check if user is authorized token = request.args.get(api_constants.MGMT_WEBAPP.TOKEN_QUERY_PARAM) token_obj = MetastoreFacade.get_session_token_metadata(token=token) - request_user = MetastoreFacade.get_management_user_by_username(username=token_obj.username) + request_user = None + if token_obj is not None: + request_user = MetastoreFacade.get_management_user_by_username(username=token_obj.username) if token_obj is None or token_obj.expired(valid_length_hours=api_constants.SESSION_TOKENS.EXPIRE_TIME_HOURS) \ or request_user is None or (not request_user.admin and request_user.username != user.username): if token_obj is not None: diff --git a/simulation-system/libs/csle-rest-api/tests/test_resources_pgadmin.py b/simulation-system/libs/csle-rest-api/tests/test_resources_pgadmin.py index e528152b8..76e204190 100644 --- a/simulation-system/libs/csle-rest-api/tests/test_resources_pgadmin.py +++ b/simulation-system/libs/csle-rest-api/tests/test_resources_pgadmin.py @@ -3,12 +3,12 @@ import pytest_mock import csle_common.constants.constants as constants from csle_cluster.cluster_manager.cluster_manager_pb2 import NodeStatusDTO +from csle_common.dao.emulation_config.config import Config import csle_rest_api.constants.constants as api_constants from csle_rest_api.rest_api import create_app -from csle_common.dao.emulation_config.config import Config -class TestResourcespgAdminSuite(): +class TestResourcespgAdminSuite: """ Test suite for /pgadmin resource """ diff --git a/simulation-system/libs/csle-rest-api/tests/test_resources_prometheus.py b/simulation-system/libs/csle-rest-api/tests/test_resources_prometheus.py index a4c3fdc4e..75ed1dc08 100644 --- a/simulation-system/libs/csle-rest-api/tests/test_resources_prometheus.py +++ b/simulation-system/libs/csle-rest-api/tests/test_resources_prometheus.py @@ -3,9 +3,9 @@ import pytest_mock import csle_common.constants.constants as constants from csle_cluster.cluster_manager.cluster_manager_pb2 import NodeStatusDTO +from csle_common.dao.emulation_config.config import Config import csle_rest_api.constants.constants as api_constants from csle_rest_api.rest_api import create_app -from csle_common.dao.emulation_config.config import Config class TestResourcesPrometheusSuite: diff --git a/simulation-system/libs/csle-rest-api/tests/test_rest_api_util.py b/simulation-system/libs/csle-rest-api/tests/test_rest_api_util.py new file mode 100644 index 000000000..505c03c40 --- /dev/null +++ b/simulation-system/libs/csle-rest-api/tests/test_rest_api_util.py @@ -0,0 +1,243 @@ +import json +import numpy as np +import pytest +import pytest_mock +from flask import Flask +import csle_common.constants.constants as constants +from csle_common.dao.management.management_user import ManagementUser +from csle_common.dao.management.session_token import SessionToken +import csle_rest_api.constants.constants as api_constants +import csle_rest_api.util.rest_api_util as rest_api_util +from csle_rest_api.rest_api import create_app + + +class TestRestAPIUtilSuite: + """ + Test suite for /experiments url + """ + + class SyntReq: + """ + Mock class for synt reg + """ + def __init__(self, args) -> None: + """ + Initializes the object + + :param args: the arguments for syntreg + """ + self.args = args + + class Args: + """ + Mock class for syntehtic arguments to a request + """ + def __init__(self) -> None: + """ + Initializes the object + """ + pass + + def get(self, token: str) -> None: + """ + Mocks the get method of the arguments + + :param token: the token for authentication + :return: None + """ + return None + + @pytest.fixture + def flask_app(self): + """ + Gets the Flask app + + :return: the flask app fixture representing the webserver + """ + return create_app(static_folder="../../../../../management-system/csle-mgmt-webapp/build") + + @pytest.fixture + def session_token(self, mocker: pytest_mock.MockFixture): + """ + Pytest fixture for mocking the get_session_token_metadata method + + :param mocker: the pytest mocker object + :return: the mocked function + """ + def get_session_token_metadata(token: str) -> SessionToken: + return SessionToken(token="null", timestamp=1.5, username="JDoe") + get_session_token_metadata_mocker = mocker.MagicMock(side_effect=get_session_token_metadata) + return get_session_token_metadata_mocker + + @pytest.fixture + def session_token_exp(self, mocker: pytest_mock.MockFixture): + """ + Pytest fixture for mocking the get_session_token_metadata method + + :param mocker: the pytest mocker object + :return: the mocked function + """ + def get_session_token_metadata(token: str) -> SessionToken: + ses_tok = SessionToken(token="null", timestamp=1.5, username="JDoe") + api_constants.SESSION_TOKENS.EXPIRE_TIME_HOURS = np.iinfo(np.int32).max + return ses_tok + get_session_token_metadata_mocker = mocker.MagicMock(side_effect=get_session_token_metadata) + return get_session_token_metadata_mocker + + @pytest.fixture + def management_user(self, mocker: pytest_mock.MockFixture): + """ + Pytest fixture for mocking the get_management_user_by_username method + + :param mocker: the pytest mocker object + :return: the mocked function + """ + def get_management_user_by_username(username: str) -> ManagementUser: + mng_user = TestRestAPIUtilSuite.get_synthetic_mng_user() + return mng_user + + get_management_user_by_username_mocker = mocker.MagicMock(side_effect=get_management_user_by_username) + return get_management_user_by_username_mocker + + @pytest.fixture + def management_user_none(self, mocker: pytest_mock.MockFixture): + """ + Pytest fixture for mocking the get_management_user_by_username method + + :param mocker: the pytest mocker object + :return: the mocked function + """ + def get_management_user_by_username(username: str) -> None: + return None + + get_management_user_by_username_mocker = mocker.MagicMock(side_effect=get_management_user_by_username) + return get_management_user_by_username_mocker + + @pytest.fixture + def remove(self, mocker: pytest_mock.MockFixture): + """ + Pytest fixture for mocking the reomve_session_token method + + :param mocker: the pytest mocker object + :return: the mocked function + """ + def remove_session_token(session_token: SessionToken) -> None: + return None + remove_session_token_mocker = mocker.MagicMock(side_effect=remove_session_token) + return remove_session_token_mocker + + @staticmethod + def get_args() -> Args: + """ + Returns a mock request argument + + :return: the mocked request argument + """ + return TestRestAPIUtilSuite.Args() + + @staticmethod + def get_synthetic_request(args) -> SyntReq: + """ + Static help method for returning a synthetic/mocked request, customized to work for testing without the use + of blueprint or flask app + + :param args: the arguments for the mock request + :return: the syntethic request + """ + return TestRestAPIUtilSuite.SyntReq(args) + + @staticmethod + def get_synthetic_mng_user() -> ManagementUser: + """ + Static help method for returning a synthetic/mocked management user + + :return: the mocked management user + """ + mng_user = ManagementUser(username="JDoe", password="JDoe", email="jdoe@csle.com", + first_name="John", last_name="Doe", organization="CSLE", + admin=False, salt="null") + return mng_user + + def test_util(self, flask_app, mocker: pytest_mock.MockFixture, + session_token, session_token_exp, + management_user, management_user_none, remove): + """ + Test method for the rest-api util + + :param flask_app: the flask_app fixture + :param mocker: the pytest mocker object + :param session_token: the session_token fixture + :param management_user: the management_user fixture + :param remove: the remove fixture + """ + mocker.patch("csle_common.metastore.metastore_facade.MetastoreFacade.get_session_token_metadata", + side_effect=session_token) + mocker.patch("csle_common.metastore.metastore_facade.MetastoreFacade.get_management_user_by_username", + side_effect=management_user) + mocker.patch("csle_common.metastore.metastore_facade.MetastoreFacade.remove_session_token", side_effect=remove) + app = Flask(__name__) + mng_user = TestRestAPIUtilSuite.get_synthetic_mng_user() + args = TestRestAPIUtilSuite.get_args() + req = TestRestAPIUtilSuite.get_synthetic_request(args) + with app.app_context(): + response = rest_api_util.check_if_user_is_authorized(request=req) + response1 = rest_api_util.check_if_user_edit_is_authorized(request=req, user=mng_user) + assert response is not None + response_data = response[0].data.decode("utf-8") + response_data_dict = json.loads(response_data) + response_status_code = response[1] + assert response_data_dict == {} + assert response_status_code == constants.HTTPS.UNAUTHORIZED_STATUS_CODE + assert response1 is not None + response_data1 = response1[0].data.decode("utf-8") + response_data_dict1 = json.loads(response_data1) + response_status_code1 = response1[1] + assert response_data_dict1 == {} + assert response_status_code1 == constants.HTTPS.UNAUTHORIZED_STATUS_CODE + mocker.patch("csle_common.metastore.metastore_facade.MetastoreFacade.get_management_user_by_username", + side_effect=management_user_none) + with app.app_context(): + response1 = rest_api_util.check_if_user_edit_is_authorized(request=req, user=mng_user) + assert response1 is not None + response_data1 = response1[0].data.decode("utf-8") + response_data_dict1 = json.loads(response_data1) + response_status_code1 = response1[1] + assert response_data_dict1 == {} + assert response_status_code1 == constants.HTTPS.UNAUTHORIZED_STATUS_CODE + mocker.patch("csle_common.metastore.metastore_facade.MetastoreFacade.get_management_user_by_username", + side_effect=management_user) + with app.app_context(): + response = rest_api_util.check_if_user_is_authorized(request=req) + assert response is not None + response_data = response[0].data.decode("utf-8") + response_data_dict = json.loads(response_data) + response_status_code = response[1] + assert response_data_dict == {} + assert response_status_code == constants.HTTPS.UNAUTHORIZED_STATUS_CODE + mocker.patch("csle_common.metastore.metastore_facade.MetastoreFacade.get_session_token_metadata", + side_effect=session_token_exp) + with app.app_context(): + response = rest_api_util.check_if_user_is_authorized(request=req) + response1 = rest_api_util.check_if_user_edit_is_authorized(request=req, user=mng_user) + assert response is None + assert response1 is not None + assert isinstance(response1, ManagementUser) + ex_mng_user = TestRestAPIUtilSuite.get_synthetic_mng_user() + assert response1.username == ex_mng_user.username + assert response1.password == ex_mng_user.password + assert response1.admin == ex_mng_user.admin + assert response1.id == ex_mng_user.id + assert response1.salt == ex_mng_user.salt + assert response1.email == ex_mng_user.email + assert response1.first_name == ex_mng_user.first_name + assert response1.last_name == ex_mng_user.last_name + assert response1.organization == ex_mng_user.organization + assert response_status_code1 == constants.HTTPS.UNAUTHORIZED_STATUS_CODE + with app.app_context(): + response = rest_api_util.check_if_user_is_authorized(request=req, requires_admin=True) + assert response is not None + response_data = response[0].data.decode("utf-8") + response_data_dict = json.loads(response_data) + response_status_code = response[1] + assert response_data_dict == {} + assert response_status_code == constants.HTTPS.UNAUTHORIZED_STATUS_CODE