diff --git a/integration_tests/fafclient.py b/integration_tests/fafclient.py index e122dcbcc..50fc573f2 100644 --- a/integration_tests/fafclient.py +++ b/integration_tests/fafclient.py @@ -44,13 +44,13 @@ async def send_command(self, command, **kwargs): """Convenience for sending commands""" await self.send_message({"command": command, **kwargs}) - async def read_until(self, predicate, timeout=60): + async def read_until(self, predicate, timeout=5): return await asyncio.wait_for( read_until(self.proto, predicate), timeout=timeout ) - async def read_until_command(self, command, timeout=60): + async def read_until_command(self, command, timeout=5): return await read_until_command(self.proto, command, timeout=timeout) async def read_until_game_launch(self, uid): @@ -130,6 +130,19 @@ async def join_game(self, game_id, **kwargs): # HACK: Yield long enough for the server to process our message await asyncio.sleep(0.5) + async def join_queue(self, queue_name, faction="uef"): + await self.send_message({ + "command": "game_matchmaking", + "state": "start", + "queue_name": queue_name, + "faction": faction + }) + await self.read_until(lambda msg: msg == { + "command": "search_info", + "state": "start", + "queue_name": queue_name + }) + async def open_fa(self): """Simulate FA opening""" await self.send_message({ diff --git a/integration_tests/test_matchmaking.py b/integration_tests/test_matchmaking.py index f114069af..310bbc4fb 100644 --- a/integration_tests/test_matchmaking.py +++ b/integration_tests/test_matchmaking.py @@ -29,7 +29,7 @@ async def test_ladder_1v1_match(client_factory): "faction": "seraphim" }) - await client1.read_until_command("match_found") + await client1.read_until_command("match_found", timeout=60) await client2.read_until_command("match_found") msg1 = await client1.read_until_command("game_info") @@ -137,3 +137,29 @@ async def handle_game_launch(client): assert ratings[winner][0] < new_ratings[winner][0] assert ratings[loser][0] > new_ratings[loser][0] + + +async def test_multiqueue(client_factory): + client1, _ = await client_factory.login("test") + client2, _ = await client_factory.login("test2") + + await client1.join_queue("tmm2v2") + + for client in (client1, client2): + await client.join_queue("ladder1v1") + + await client1.read_until_command("match_found", timeout=60) + msg1 = await client1.read_until_command("search_info") + msg2 = await client1.read_until_command("search_info") + + assert { + "command": "search_info", + "queue_name": "tmm2v2", + "state": "stop" + } in (msg1, msg2) + + assert { + "command": "search_info", + "queue_name": "ladder1v1", + "state": "stop" + } in (msg1, msg2) diff --git a/server/asyncio_extensions.py b/server/asyncio_extensions.py index 4a44d214e..b989a1a04 100644 --- a/server/asyncio_extensions.py +++ b/server/asyncio_extensions.py @@ -89,11 +89,14 @@ def _synchronize( lock: Optional[asyncio.Lock] = None ) -> AsyncFunc: """Wrap an async function with an async lock.""" - if lock is None: - lock = asyncio.Lock() - @wraps(function) async def wrapped(*args, **kwargs): + nonlocal lock + + # During testing, functions are called from multiple loops + if lock is None or lock._loop != asyncio.get_event_loop(): + lock = asyncio.Lock() + async with lock: return await function(*args, **kwargs) diff --git a/server/ladder_service.py b/server/ladder_service.py index cc52bad88..13ca3c3e1 100644 --- a/server/ladder_service.py +++ b/server/ladder_service.py @@ -67,16 +67,16 @@ async def update_data(self) -> None: for name, info in db_queues.items(): if name not in self.queues: queue = MatchmakerQueue( + self.game_service, + self.on_match_found, name=name, queue_id=info["id"], featured_mod=info["mod"], rating_type=info["rating_type"], team_size=info["team_size"], - game_service=self.game_service ) self.queues[name] = queue queue.initialize() - asyncio.ensure_future(self.handle_queue_matches(queue)) else: queue = self.queues[name] queue.featured_mod = info["mod"] @@ -185,9 +185,8 @@ def start_search( for player in players: player.state = PlayerState.SEARCHING_LADDER - # FIXME: For now, inform_player is only designed for ladder1v1 - if queue_name == "ladder1v1": - self.inform_player(player) + + self.write_rating_progress(player, queue.rating_type) player.write_message({ "command": "search_info", @@ -239,10 +238,10 @@ def _cancel_search(self, initiator: Player, queue_name: str) -> None: "%s stopped searching for %s", cancelled_search, queue_name ) - def inform_player(self, player: Player): + def write_rating_progress(self, player: Player, rating_type: str) -> None: if player not in self._informed_players: self._informed_players.add(player) - mean, deviation = player.ratings[RatingType.LADDER_1V1] + _, deviation = player.ratings[rating_type] if deviation > 490: player.write_message({ @@ -269,30 +268,41 @@ def inform_player(self, player: Player): ) }) - async def handle_queue_matches(self, queue: MatchmakerQueue): - async for s1, s2 in queue.iter_matches(): - try: - msg = {"command": "match_found", "queue": queue.name} - # TODO: Handle disconnection with a client supported message - await asyncio.gather(*[ - player.send_message(msg) - for player in s1.players + s2.players - ]) - asyncio.create_task( - self.start_game(s1.players, s2.players, queue) - ) - except Exception as e: - self._logger.exception( - "Error processing match between searches %s, and %s: %s", - s1, s2, e - ) + def on_match_found( + self, + s1: Search, + s2: Search, + queue: MatchmakerQueue + ) -> None: + """ + Callback for when a match is generated by a matchmaker queue. + + NOTE: This function is called while the matchmaker search lock is held, + so it should only perform fast operations. + """ + try: + msg = {"command": "match_found", "queue": queue.name} + + for player in s1.players + s2.players: + player.write_message(msg) + + self.cancel_search(player) + + asyncio.create_task( + self.start_game(s1.players, s2.players, queue) + ) + except Exception: + self._logger.exception( + "Error processing match between searches %s, and %s", + s1, s2 + ) async def start_game( self, team1: List[Player], team2: List[Player], queue: MatchmakerQueue - ): + ) -> None: self._logger.debug( "Starting %s game between %s and %s", queue.name, team1, team2 ) diff --git a/server/matchmaker/matchmaker_queue.py b/server/matchmaker/matchmaker_queue.py index 95288b03d..272dea3b4 100644 --- a/server/matchmaker/matchmaker_queue.py +++ b/server/matchmaker/matchmaker_queue.py @@ -1,17 +1,21 @@ import asyncio import time -from collections import OrderedDict, deque +from collections import OrderedDict from concurrent.futures import CancelledError from datetime import datetime, timezone -from typing import Deque, Dict, Iterable, List, Optional, Tuple +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple import server.metrics as metrics +from ..asyncio_extensions import synchronized from ..decorators import with_logger +from ..players import PlayerState from .algorithm import make_matches, make_teams, make_teams_from_single from .map_pool import MapPool from .pop_timer import PopTimer -from .search import Match, Search +from .search import Search + +MatchFoundCallback = Callable[[Search, Search, "MatchmakerQueue"], Any] class MatchmakerSearchTimer: @@ -39,12 +43,13 @@ class MatchmakerQueue: def __init__( self, game_service: "GameService", + on_match_found: MatchFoundCallback, name: str, queue_id: int, featured_mod: str, rating_type: str, team_size: int = 1, - map_pools: Iterable[Tuple[MapPool, Optional[int], Optional[int]]] = () + map_pools: Iterable[Tuple[MapPool, Optional[int], Optional[int]]] = (), ): self.game_service = game_service self.name = name @@ -55,7 +60,7 @@ def __init__( self.map_pools = {info[0].id: info for info in map_pools} self._queue: Dict[Search, None] = OrderedDict() - self._matches: Deque[Match] = deque() + self.on_match_found = on_match_found self._is_running = True self.timer = PopTimer(self.name) @@ -83,18 +88,6 @@ def initialize(self): def num_players(self) -> int: return sum(len(search.players) for search in self._queue.keys()) - async def iter_matches(self): - """ Asynchronously yields matches as they become available """ - - while self._is_running: - if not self._matches: - # There are no matches so there is nothing to do - await asyncio.sleep(1) - continue - - # Yield the next available match to the caller - yield self._matches.popleft() - async def queue_pop_timer(self) -> None: """ Periodically tries to match all Searches in the queue. The amount of time until next queue 'pop' is determined by the number of players @@ -106,16 +99,6 @@ async def queue_pop_timer(self) -> None: await self.find_matches() - number_of_matches = len(self._matches) - metrics.matches.labels(self.name).set(number_of_matches) - - # TODO: Move this into algorithm, then don't need to recalculate quality_with? - # Probably not a major bottleneck though. - for search1, search2 in self._matches: - metrics.match_quality.labels(self.name).observe( - search1.quality_with(search2) - ) - number_of_unmatched_searches = len(self._queue) metrics.unmatched_searches.labels(self.name).set(number_of_unmatched_searches) @@ -148,7 +131,15 @@ async def search(self, search: Search) -> None: if search in self._queue: del self._queue[search] + @synchronized async def find_matches(self) -> None: + """ + Perform the matchmaking algorithm. + + Note that this funciton is synchronized such that only one instance of + MatchmakerQueue can call this function at any given time. This is + needed in order to safely enable multiqueuing. + """ self._logger.info("Searching for matches: %s", self.name) if self.num_players < 2 * self.team_size: @@ -158,11 +149,24 @@ async def find_matches(self) -> None: # Call self.match on all matches and filter out the ones that were cancelled loop = asyncio.get_running_loop() - new_matches = filter( + matches = list(filter( lambda m: self.match(m[0], m[1]), await loop.run_in_executor(None, make_matches, searches) - ) - self._matches.extend(new_matches) + )) + + number_of_matches = len(matches) + metrics.matches.labels(self.name).set(number_of_matches) + + for search1, search2 in matches: + # TODO: Move this into algorithm, then don't need to recalculate + # quality_with? Probably not a major bottleneck though. + metrics.match_quality.labels(self.name).observe( + search1.quality_with(search2) + ) + try: + self.on_match_found(search1, search2, self) + except Exception: + self._logger.exception("Match callback raised an exception!") def find_teams(self) -> List[Search]: searches = [] @@ -193,10 +197,26 @@ def match(self, s1: Search, s2: Search) -> bool: Mark the given two searches as matched :param s1: :param s2: - :return: + :return: True if matching succeeded or False if matching failed """ - if (s1.is_matched or s2.is_matched) or (s1.is_cancelled or s2.is_cancelled): + if s1.is_matched or s2.is_matched: return False + if s1.is_cancelled or s2.is_cancelled: + return False + # Additional failsafe. Ideally this check will never fail. + if any( + player.state != PlayerState.SEARCHING_LADDER + for player in s1.players + s2.players + ): + self._logger.warning( + "Tried to match searches %s and %s while some players had " + "invalid states: team1: %s team2: %s", + s1, s2, + list(p.state for p in s1.players), + list(p.state for p in s2.players) + ) + return False + s1.match(s2) s2.match(s1) if s1 in self._queue: diff --git a/server/players.py b/server/players.py index fc1ff2617..3e2691f19 100644 --- a/server/players.py +++ b/server/players.py @@ -104,21 +104,22 @@ def is_admin(self) -> bool: def is_moderator(self) -> bool: return "faf_moderators_global" in self.user_groups - async def send_message(self, message) -> None: + async def send_message(self, message: dict) -> None: """ Try to send a message to this player. - :raises: DisconnectedError if the player has disconnected + :raises: DisconnectedError if the player has disconnected. """ if self.lobby_connection is None: raise DisconnectedError("Player has disconnected!") await self.lobby_connection.send(message) - def write_message(self, message) -> None: + def write_message(self, message: dict) -> None: """ - Try to queue a message to be sent this player. Only call this from - broadcasting functions. Does nothing if the player has disconnected. + Try to queue a message to be sent to this player. + + Does nothing if the player has disconnected. """ if self.lobby_connection is None: return diff --git a/tests/conftest.py b/tests/conftest.py index ffaca40a3..b143d3724 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,6 +20,7 @@ from server.config import TRACE, config from server.db import FAFDatabase from server.game_service import GameService +from server.games import FeaturedModType from server.geoip_service import GeoIpService from server.lobbyconnection import LobbyConnection from server.matchmaker import MatchmakerQueue @@ -273,6 +274,7 @@ def make( queue_id += 1 return MatchmakerQueue( game_service=mock.Mock(), + on_match_found=mock.Mock(), name=name, queue_id=queue_id, featured_mod=mod, @@ -284,7 +286,14 @@ def make( @pytest.fixture def matchmaker_queue(game_service) -> MatchmakerQueue: - queue = MatchmakerQueue(game_service, "ladder1v1test", 1, "ladder1v1", 2) + queue = MatchmakerQueue( + game_service, + mock.Mock(), + "ladder1v1test", + FeaturedModType.LADDER_1V1, + RatingType.LADDER_1V1, + 1 + ) return queue diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index 05dd0653c..db8782766 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -187,17 +187,31 @@ async def perform_login( }) -async def read_until( - proto: Protocol, pred: Callable[[Dict[str, Any]], bool] +async def _read_until( + proto: Protocol, + pred: Callable[[Dict[str, Any]], bool] ) -> Dict[str, Any]: while True: msg = await proto.read_message() try: if pred(msg): return msg - except (KeyError, ValueError): - logging.getLogger().info("read_until predicate raised during message: {}".format(msg)) + except KeyError: pass + except Exception: + logging.getLogger().warning( + "read_until predicate raised during message: %s", + msg, + exc_info=True + ) + + +async def read_until( + proto: Protocol, + pred: Callable[[Dict[str, Any]], bool], + timeout: float = 60 +) -> Dict[str, Any]: + return await asyncio.wait_for(_read_until(proto, pred), timeout=timeout) async def read_until_command( @@ -206,7 +220,7 @@ async def read_until_command( timeout: float = 60 ) -> Dict[str, Any]: return await asyncio.wait_for( - read_until(proto, lambda msg: msg.get("command") == command), + _read_until(proto, lambda msg: msg.get("command") == command), timeout=timeout ) diff --git a/tests/integration_tests/test_teammatchmaker.py b/tests/integration_tests/test_teammatchmaker.py index 330291086..fa9f0cc8c 100644 --- a/tests/integration_tests/test_teammatchmaker.py +++ b/tests/integration_tests/test_teammatchmaker.py @@ -21,15 +21,11 @@ async def connect_players(lobby_server): protos = [proto for _, _, proto in res] ids = [id_ for id_, _, _ in res] - await asyncio.gather(*[ - read_until_command(proto, "game_info") for proto in protos - ]) - - return ids, protos + return protos, ids async def queue_players_for_matchmaking(lobby_server): - ids, protos = await connect_players(lobby_server) + protos, ids = await connect_players(lobby_server) await asyncio.gather(*[ proto.send_message({ @@ -113,7 +109,7 @@ async def test_game_matchmaking(lobby_server): @fast_forward(10) async def test_game_matchmaking_with_parties(lobby_server): - ids, protos = await connect_players(lobby_server) + protos, ids = await connect_players(lobby_server) id1, id2, id3, id4 = ids proto1, proto2, proto3, proto4 = protos @@ -174,7 +170,6 @@ async def test_game_matchmaking_with_parties(lobby_server): "queue_name": "tmm2v2", "state": "start", }) - msgs = await asyncio.gather(*[client_response(proto) for proto in protos]) uid = set(msg["uid"] for msg in msgs) @@ -189,6 +184,119 @@ async def test_game_matchmaking_with_parties(lobby_server): assert msg["faction"] == i + 1 +@fast_forward(15) +async def test_game_matchmaking_multiqueue(lobby_server): + protos, _ = await connect_players(lobby_server) + + await asyncio.gather(*[ + read_until_command(proto, "game_info") for proto in protos + ]) + + await protos[0].send_message({ + "command": "game_matchmaking", + "state": "start", + "faction": "uef", + "queue_name": "ladder1v1" + }) + await read_until_command(protos[0], "search_info") + await asyncio.gather(*[ + proto.send_message({ + "command": "game_matchmaking", + "state": "start", + "faction": "aeon", + "queue_name": "tmm2v2" + }) + for proto in protos + ]) + msg = await read_until( + protos[0], + lambda msg: ( + msg["command"] == "search_info" and msg["queue_name"] == "ladder1v1" + ) + ) + assert msg == { + "command": "search_info", + "queue_name": "ladder1v1", + "state": "stop" + } + msgs = await asyncio.gather(*[client_response(proto) for proto in protos]) + + uid = set(msg["uid"] for msg in msgs) + assert len(uid) == 1 + for msg in msgs: + assert msg["init_mode"] == 1 + assert "None" not in msg["name"] + assert msg["mod"] == "faf" + assert msg["expected_players"] == 4 + assert msg["team"] in (2, 3) + assert msg["map_position"] in (1, 2, 3, 4) + assert msg["faction"] == 2 + + +@fast_forward(60) +async def test_game_matchmaking_multiqueue_multimatch(lobby_server): + """ + Scenario where both queues could possibly generate a match. + Queues: + ladder1v1 - 2 players join + tmm2v2 - 4 players join + Result: + Either one of the queues generates a match, but not both. + """ + protos, _ = await connect_players(lobby_server) + + await asyncio.gather(*[ + read_until_command(proto, "game_info") for proto in protos + ]) + + ladder1v1_tasks = [ + proto.send_message({ + "command": "game_matchmaking", + "state": "start", + "faction": "uef", + "queue_name": "ladder1v1" + }) + for proto in protos[:2] + ] + await asyncio.gather(*[ + proto.send_message({ + "command": "game_matchmaking", + "state": "start", + "faction": "aeon", + "queue_name": "tmm2v2" + }) + for proto in protos + ] + ladder1v1_tasks) + msg1 = await read_until_command(protos[0], "match_found") + msg2 = await read_until_command(protos[1], "match_found") + + matched_queue = msg1["queue"] + if matched_queue == "ladder1v1": + with pytest.raises(asyncio.TimeoutError): + await read_until_command(protos[2], "match_found", timeout=3) + with pytest.raises(asyncio.TimeoutError): + await read_until_command(protos[3], "match_found", timeout=3) + with pytest.raises(asyncio.TimeoutError): + await read_until_command(protos[2], "search_info", timeout=3) + with pytest.raises(asyncio.TimeoutError): + await read_until_command(protos[3], "search_info", timeout=3) + else: + await read_until_command(protos[2], "match_found", timeout=3) + await read_until_command(protos[3], "match_found", timeout=3) + + assert msg1 == msg2 + + def other_cancelled(msg): + return ( + msg["command"] == "search_info" + and msg["queue_name"] != matched_queue + ) + msg1 = await read_until(protos[0], other_cancelled, timeout=3) + msg2 = await read_until(protos[1], other_cancelled, timeout=3) + assert msg1 == msg2 + assert msg1["state"] == "stop" + + @fast_forward(60) async def test_game_matchmaking_timeout(lobby_server): protos, _ = await queue_players_for_matchmaking(lobby_server) diff --git a/tests/unit_tests/test_ladder_service.py b/tests/unit_tests/test_ladder_service.py index fb667c8ec..0feaaf8da 100644 --- a/tests/unit_tests/test_ladder_service.py +++ b/tests/unit_tests/test_ladder_service.py @@ -10,6 +10,7 @@ from server.ladder_service import game_name from server.matchmaker import MatchmakerQueue from server.players import PlayerState +from server.rating import RatingType from server.types import Map from tests.utils import fast_forward @@ -155,7 +156,7 @@ async def test_start_game_with_teams( assert game.max_players == 4 -async def test_inform_player(ladder_service: LadderService, player_factory): +async def test_write_rating_progress(ladder_service: LadderService, player_factory): p1 = player_factory( "Dostya", player_id=1, @@ -163,16 +164,16 @@ async def test_inform_player(ladder_service: LadderService, player_factory): with_lobby_connection=True ) - ladder_service.inform_player(p1) + ladder_service.write_rating_progress(p1, RatingType.LADDER_1V1) # Message is sent after the first call p1.lobby_connection.write.assert_called_once() - ladder_service.inform_player(p1) + ladder_service.write_rating_progress(p1, RatingType.LADDER_1V1) p1.lobby_connection.write.reset_mock() # But not after the second p1.lobby_connection.write.assert_not_called() await ladder_service.on_connection_lost(p1) - ladder_service.inform_player(p1) + ladder_service.write_rating_progress(p1, RatingType.LADDER_1V1) # But it is called if the player relogs p1.lobby_connection.write.assert_called_once() @@ -331,6 +332,50 @@ async def test_start_search_multiqueue_multiple_players( assert "tmm2v2" not in ladder_service._searches[p2] +async def test_game_start_cancels_search( + ladder_service: LadderService, + player_factory, + queue_factory, + event_loop +): + ladder_service.queues["tmm2v2"] = queue_factory("tmm2v2") + + p1 = player_factory( + "Dostya", + player_id=1, + ladder_rating=(1000, 10), + with_lobby_connection=True + ) + + p2 = player_factory( + "Brackman", + player_id=2, + ladder_rating=(1000, 10), + with_lobby_connection=True + ) + ladder_service.start_search([p1], "ladder1v1") + ladder_service.start_search([p2], "ladder1v1") + ladder_service.start_search([p1], "tmm2v2") + ladder_service.start_search([p2], "tmm2v2") + await exhaust_callbacks(event_loop) + + assert "ladder1v1" in ladder_service._searches[p1] + assert "tmm2v2" in ladder_service._searches[p1] + assert "ladder1v1" in ladder_service._searches[p2] + assert "tmm2v2" in ladder_service._searches[p2] + + ladder_service.on_match_found( + ladder_service._searches[p1]["ladder1v1"], + ladder_service._searches[p2]["ladder1v1"], + ladder_service.queues["ladder1v1"] + ) + + assert "ladder1v1" not in ladder_service._searches[p1] + assert "tmm2v2" not in ladder_service._searches[p1] + assert "ladder1v1" not in ladder_service._searches[p2] + assert "tmm2v2" not in ladder_service._searches[p2] + + async def test_start_and_cancel_search( ladder_service: LadderService, player_factory, @@ -438,14 +483,14 @@ async def test_start_game_called_on_match(ladder_service: LadderService, player_ ) ladder_service.start_game = CoroutineMock() - ladder_service.inform_player = CoroutineMock() + ladder_service.write_rating_progress = CoroutineMock() ladder_service.start_search([p1], "ladder1v1") ladder_service.start_search([p2], "ladder1v1") await asyncio.sleep(2) - ladder_service.inform_player.assert_called() + ladder_service.write_rating_progress.assert_called() ladder_service.start_game.assert_called_once() @@ -615,14 +660,14 @@ async def test_game_name_many_teams(player_factory): assert game_name([p1], [p2], [p3], [p4]) == "Dostya Vs QAI Vs Rhiza Vs Kale" -async def test_inform_player_message( +async def test_write_rating_progress_message( ladder_service: LadderService, player_factory ): player = player_factory(ladder_rating=(1500, 500)) player.write_message = CoroutineMock() - ladder_service.inform_player(player) + ladder_service.write_rating_progress(player, RatingType.LADDER_1V1) player.write_message.assert_called_once_with({ "command": "notice", @@ -639,14 +684,38 @@ async def test_inform_player_message( }) -async def test_inform_player_message_2( +async def test_write_rating_progress_message_2( ladder_service: LadderService, player_factory ): player = player_factory(ladder_rating=(1500, 400.1235)) player.write_message = CoroutineMock() - ladder_service.inform_player(player) + ladder_service.write_rating_progress(player, RatingType.LADDER_1V1) + + player.write_message.assert_called_once_with({ + "command": "notice", + "style": "info", + "text": ( + "The system is still learning you.

" + "The learning phase is 40% complete" + ) + }) + + +async def test_write_rating_progress_other_rating( + ladder_service: LadderService, + player_factory +): + player = player_factory( + ladder_rating=(1500, 500), + global_rating=(1500, 400.1235) + ) + player.write_message = CoroutineMock() + + # There's no reason we would call it with global, but the logic is the same + # and global is an available rating that's not ladder + ladder_service.write_rating_progress(player, RatingType.GLOBAL) player.write_message.assert_called_once_with({ "command": "notice", diff --git a/tests/unit_tests/test_matchmaker_queue.py b/tests/unit_tests/test_matchmaker_queue.py index 0dd7a957c..4f1f7d282 100644 --- a/tests/unit_tests/test_matchmaker_queue.py +++ b/tests/unit_tests/test_matchmaker_queue.py @@ -1,22 +1,26 @@ import asyncio import functools -import random -from collections import deque +import time from concurrent.futures import CancelledError, TimeoutError +import mock import pytest from hypothesis import given from hypothesis import strategies as st import server.config as config from server.matchmaker import CombinedSearch, MapPool, PopTimer, Search +from server.players import PlayerState from server.rating import RatingType -from tests.utils import fast_forward @pytest.fixture(scope="session") def player_factory(player_factory): - return functools.partial(player_factory, ladder_games=(config.NEWBIE_MIN_GAMES + 1)) + return functools.partial( + player_factory, + ladder_games=(config.NEWBIE_MIN_GAMES + 1), + state=PlayerState.SEARCHING_LADDER + ) @pytest.fixture @@ -312,30 +316,6 @@ def test_queue_multiple_map_pools( assert queue.get_map_pool_for_rating(rating) is None -@fast_forward(3) -@pytest.mark.asyncio -async def test_queue_matches(matchmaker_queue): - matches = [random.randrange(0, 1 << 20) for _ in range(20)] - matchmaker_queue._matches = deque(matches) - - async def call_shutdown(): - await asyncio.sleep(1) - matchmaker_queue.shutdown() - - asyncio.create_task(call_shutdown()) - collected_matches = [match async for match in matchmaker_queue.iter_matches()] - - assert collected_matches == matches - - -@pytest.mark.asyncio -async def test_shutdown_matchmaker(matchmaker_queue): - matchmaker_queue.shutdown() - # Verify that no matches are yielded after shutdown is called - async for _ in matchmaker_queue.iter_matches(): - assert False - - @pytest.mark.asyncio async def test_queue_many(matchmaker_queue, player_factory): p1, p2, p3 = player_factory("Dostya", ladder_rating=(2200, 150)), \ @@ -354,6 +334,9 @@ async def test_queue_many(matchmaker_queue, player_factory): assert not s1.is_matched assert s2.is_matched assert s3.is_matched + matchmaker_queue.on_match_found.assert_called_once_with( + s2, s3, matchmaker_queue + ) @pytest.mark.asyncio @@ -393,12 +376,13 @@ async def test_queue_cancel(matchmaker_queue, matchmaker_players): assert not s1.is_matched assert not s2.is_matched + matchmaker_queue.on_match_found.assert_not_called() @pytest.mark.asyncio async def test_queue_mid_cancel(matchmaker_queue, matchmaker_players_all_match): # Turn list of players into map from ids to players. - p0, p1, p2, p3, _ = matchmaker_players_all_match + _, p1, p2, p3, _ = matchmaker_players_all_match (s1, s2, s3) = (Search([p1]), Search([p2]), Search([p3])) @@ -421,3 +405,36 @@ async def find_matches(): assert s2.is_matched assert s3.is_matched assert len(matchmaker_queue._queue) == 0 + matchmaker_queue.on_match_found.assert_called_once_with( + s2, s3, matchmaker_queue + ) + + +@pytest.mark.asyncio +async def test_find_matches_synchronized(queue_factory): + is_matching = False + + def make_matches(*args): + nonlocal is_matching + + assert not is_matching, "Function call not synchronized" + is_matching = True + + time.sleep(0.2) + + is_matching = False + return [] + + with mock.patch( + "server.matchmaker.matchmaker_queue.make_matches", + make_matches + ): + queues = [queue_factory(f"Queue{i}") for i in range(5)] + # Ensure that find_matches does not short circuit + for queue in queues: + queue._queue = {mock.Mock(): 1, mock.Mock(): 2} + queue.find_teams = mock.Mock() + + await asyncio.gather(*[ + queue.find_matches() for queue in queues + ])