Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Запретить параллельное использование пользователей #36

Merged
merged 11 commits into from
Jan 20, 2024
10 changes: 10 additions & 0 deletions overhave/admin/views/emulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,15 @@ def edit_view(self) -> werkzeug.Response | None:
flask.flash("Please, save emulation template before execution.")
return rendered

test_user_id = data["test_user"]
if not self._ensure_no_active_emulation_runs_for_user(int(test_user_id)):
flask.flash(f"Unable to run new emulation in parallel for user {test_user_id}")
return rendered

logger.debug("Seen emulation request")
return self._run_emulation(emulation_id)

@staticmethod
def _ensure_no_active_emulation_runs_for_user(test_user_id: int) -> bool:
factory = get_admin_factory()
return factory.emulation_storage.has_running_emulation_with_user(test_user_id)
28 changes: 22 additions & 6 deletions overhave/storage/emulation_storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc
import logging
import socket
from typing import Any, List, cast
from typing import Any, List, Tuple, cast

import orjson
import sqlalchemy as sa
Expand Down Expand Up @@ -48,6 +48,10 @@ def set_error_emulation_run(self, emulation_run_id: int, traceback: str) -> None
def get_emulation_runs_by_test_user_id(test_user_id: int) -> list[EmulationRunModel]:
pass

@abc.abstractmethod
def has_running_emulation_with_user(self, test_user_id: int) -> bool:
pass


class EmulationStorage(IEmulationStorage):
"""Class for emulation runs storage."""
Expand Down Expand Up @@ -87,11 +91,15 @@ def _get_next_port(self) -> int:
raise AllPortsAreBusyError("All ports are busy - could not find free port!")

def get_allocated_ports(self) -> List[int]:
return cast(List[int], orjson.loads(cast(bytes, self._redis.get(self._settings.redis_ports_key))))
port_user_pairs = self.get_allocated_port_user_pairs()
return [port for port, _ in port_user_pairs]

def allocate_port(self, port: int) -> None:
new_allocated_ports = self.get_allocated_ports()
new_allocated_ports.append(port)
def get_allocated_port_user_pairs(self) -> List[Tuple[int, int]]:
return cast(List[Tuple[int, int]], orjson.loads(cast(bytes, self._redis.get(self._settings.redis_ports_key))))

def allocate_port_for_user(self, port: int, test_user_id: int) -> None:
new_allocated_ports = self.get_allocated_port_user_pairs()
new_allocated_ports.append((port, test_user_id))
self._redis.set(self._settings.redis_ports_key, orjson.dumps(sorted(new_allocated_ports)))

def _is_port_in_use(self, port: int) -> bool:
Expand All @@ -103,7 +111,7 @@ def get_requested_emulation_run(self, emulation_run_id: int) -> EmulationRunMode
emulation_run = session.query(db.EmulationRun).filter(db.EmulationRun.id == emulation_run_id).one()
emulation_run.status = db.EmulationStatus.REQUESTED
emulation_run.port = self._get_next_port()
self.allocate_port(emulation_run.port)
self.allocate_port_for_user(emulation_run.port, emulation_run.emulation.test_user_id)
emulation_run.changed_at = get_current_time()
return EmulationRunModel.model_validate(emulation_run)

Expand Down Expand Up @@ -136,3 +144,11 @@ def get_emulation_runs_by_test_user_id(test_user_id: int) -> list[EmulationRunMo
session.query(db.EmulationRun).where(db.EmulationRun.emulation_id.in_(emulation_ids_query)).all()
)
return [EmulationRunModel.model_validate(x) for x in emulation_runs]

def has_running_emulation_with_user(self, test_user_id: int) -> bool:
Catmoonlight marked this conversation as resolved.
Show resolved Hide resolved
port_user_pairs = self.get_allocated_port_user_pairs()

for port, user in port_user_pairs:
if user == test_user_id and self._is_port_in_use(port):
return True
return False
64 changes: 64 additions & 0 deletions tests/unit/storage/test_emulation_storage_unit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from typing import Callable, List, Tuple
from unittest import mock

import pytest

from overhave.storage import EmulationStorage


class TestEmulationStorage:
"""
Unit tests for :class:`EmulationStorage`.

functions with logic over redis or database.
"""

TARGET_USER_ID = 1
OTHER_USER_ID = 2

@pytest.mark.parametrize(
("allocated_port_user_pairs", "used_ports", "expected_result"),
[
([], [], False),
([(8080, TARGET_USER_ID), (8081, OTHER_USER_ID)], [8080], True),
([(8080, TARGET_USER_ID), (8081, OTHER_USER_ID)], [8081], False),
([(8080, TARGET_USER_ID), (8081, TARGET_USER_ID), (8082, TARGET_USER_ID)], [], False),
([(8080, TARGET_USER_ID), (8081, TARGET_USER_ID), (8082, TARGET_USER_ID)], [8082], True),
],
ids=[
"empty_suite",
"used_by_target_user",
"used_by_other_user",
"nothing_is_used",
"one_from_many_is_used",
],
)
def test_has_running_emulation_with_user(
self,
allocated_port_user_pairs: List[Tuple[int, int]],
used_ports: List[int],
expected_result: bool,
) -> None:
# No database or redis is used, as necessary database layer functions in
# storage should are mocked
emulation_storage = EmulationStorage(mock.MagicMock(), mock.MagicMock())
emulation_storage.get_allocated_port_user_pairs = lambda **_: allocated_port_user_pairs # type: ignore
emulation_storage._is_port_in_use = get_dummy_used_ports_method(used_ports) # type: ignore

result = emulation_storage.has_running_emulation_with_user(self.TARGET_USER_ID)

assert result == expected_result


# +---------+
# | Helpers |
# +---------+


def get_dummy_used_ports_method(used_ports: List[int]) -> Callable[[int], bool]:
ports = used_ports.copy()

def dummy_method(port: int) -> bool:
return port in ports

return dummy_method
Loading