From cbd50800e3a62bd794f9f73f639e7bf9e1e56059 Mon Sep 17 00:00:00 2001 From: Askaholic Date: Mon, 24 Aug 2020 20:50:52 -0800 Subject: [PATCH 1/9] Generalize inform_player to work for any rating type --- server/ladder_service.py | 9 +++---- tests/unit_tests/test_ladder_service.py | 35 +++++++++++++++++++++---- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/server/ladder_service.py b/server/ladder_service.py index 60c414fa4..2418b05d0 100644 --- a/server/ladder_service.py +++ b/server/ladder_service.py @@ -176,9 +176,8 @@ def start_search(self, players: List[Player], queue_name: str): for player in players: player.state = PlayerState.SEARCHING_LADDER - # FIXME: For now, inform_player is only designed for ladder1v1 - if queue_name == "ladder1v1": - self.inform_player(player) + + self.inform_player(player, queue.rating_type) player.write_message({ "command": "search_info", @@ -230,10 +229,10 @@ def _cancel_search(self, initiator: Player, queue_name: str) -> None: "%s stopped searching for %s", cancelled_search, queue_name ) - def inform_player(self, player: Player): + def inform_player(self, player: Player, rating_type: str) -> None: if player not in self._informed_players: self._informed_players.add(player) - mean, deviation = player.ratings[RatingType.LADDER_1V1] + _, deviation = player.ratings[rating_type] if deviation > 490: player.write_message({ diff --git a/tests/unit_tests/test_ladder_service.py b/tests/unit_tests/test_ladder_service.py index f6c26b4cb..9af72cbd8 100644 --- a/tests/unit_tests/test_ladder_service.py +++ b/tests/unit_tests/test_ladder_service.py @@ -10,6 +10,7 @@ from server.ladder_service import game_name from server.matchmaker import MatchmakerQueue from server.players import PlayerState +from server.rating import RatingType from server.types import Map from tests.utils import fast_forward @@ -162,16 +163,16 @@ async def test_inform_player(ladder_service: LadderService, player_factory): with_lobby_connection=True ) - ladder_service.inform_player(p1) + ladder_service.inform_player(p1, RatingType.LADDER_1V1) # Message is sent after the first call p1.lobby_connection.write.assert_called_once() - ladder_service.inform_player(p1) + ladder_service.inform_player(p1, RatingType.LADDER_1V1) p1.lobby_connection.write.reset_mock() # But not after the second p1.lobby_connection.write.assert_not_called() await ladder_service.on_connection_lost(p1) - ladder_service.inform_player(p1) + ladder_service.inform_player(p1, RatingType.LADDER_1V1) # But it is called if the player relogs p1.lobby_connection.write.assert_called_once() @@ -621,7 +622,7 @@ async def test_inform_player_message( player = player_factory(ladder_rating=(1500, 500)) player.write_message = CoroutineMock() - ladder_service.inform_player(player) + ladder_service.inform_player(player, RatingType.LADDER_1V1) player.write_message.assert_called_once_with({ "command": "notice", @@ -645,7 +646,31 @@ async def test_inform_player_message_2( player = player_factory(ladder_rating=(1500, 400.1235)) player.write_message = CoroutineMock() - ladder_service.inform_player(player) + ladder_service.inform_player(player, RatingType.LADDER_1V1) + + player.write_message.assert_called_once_with({ + "command": "notice", + "style": "info", + "text": ( + "The system is still learning you.

" + "The learning phase is 40% complete" + ) + }) + + +async def test_inform_player_other_rating( + ladder_service: LadderService, + player_factory +): + player = player_factory( + ladder_rating=(1500, 500), + global_rating=(1500, 400.1235) + ) + player.write_message = CoroutineMock() + + # There's no reason we would call it with global, but the logic is the same + # and global is an available rating that's not ladder + ladder_service.inform_player(player, RatingType.GLOBAL) player.write_message.assert_called_once_with({ "command": "notice", From f5ca1fbc5e85a53cb60dea224660d38dd02d870e Mon Sep 17 00:00:00 2001 From: Askaholic Date: Mon, 24 Aug 2020 21:53:16 -0800 Subject: [PATCH 2/9] Refactor matchmaker queue to report new matches via a callback --- server/ladder_service.py | 43 +++++++++-------- server/matchmaker/matchmaker_queue.py | 56 ++++++++++------------- tests/conftest.py | 11 ++++- tests/unit_tests/test_matchmaker_queue.py | 36 ++++----------- 4 files changed, 66 insertions(+), 80 deletions(-) diff --git a/server/ladder_service.py b/server/ladder_service.py index 2418b05d0..42c2a56c8 100644 --- a/server/ladder_service.py +++ b/server/ladder_service.py @@ -67,16 +67,16 @@ async def update_data(self) -> None: for name, info in db_queues.items(): if name not in self.queues: queue = MatchmakerQueue( + self.game_service, + self.on_match_found, name=name, queue_id=info["id"], featured_mod=info["mod"], rating_type=info["rating_type"], team_size=info["team_size"], - game_service=self.game_service ) self.queues[name] = queue queue.initialize() - asyncio.ensure_future(self.handle_queue_matches(queue)) else: queue = self.queues[name] queue.featured_mod = info["mod"] @@ -259,30 +259,33 @@ def inform_player(self, player: Player, rating_type: str) -> None: ) }) - async def handle_queue_matches(self, queue: MatchmakerQueue): - async for s1, s2 in queue.iter_matches(): - try: - msg = {"command": "match_found", "queue": queue.name} - # TODO: Handle disconnection with a client supported message - await asyncio.gather(*[ - player.send_message(msg) - for player in s1.players + s2.players - ]) - asyncio.create_task( - self.start_game(s1.players, s2.players, queue) - ) - except Exception as e: - self._logger.exception( - "Error processing match between searches %s, and %s: %s", - s1, s2, e - ) + def on_match_found( + self, + s1: Search, + s2: Search, + queue: MatchmakerQueue + ) -> None: + try: + msg = {"command": "match_found", "queue": queue.name} + + for player in s1.players + s2.players: + player.write_message(msg) + + asyncio.create_task( + self.start_game(s1.players, s2.players, queue) + ) + except Exception: + self._logger.exception( + "Error processing match between searches %s, and %s", + s1, s2 + ) async def start_game( self, team1: List[Player], team2: List[Player], queue: MatchmakerQueue - ): + ) -> None: self._logger.debug( "Starting %s game between %s and %s", queue.name, team1, team2 ) diff --git a/server/matchmaker/matchmaker_queue.py b/server/matchmaker/matchmaker_queue.py index f1f3f5e32..311329de5 100644 --- a/server/matchmaker/matchmaker_queue.py +++ b/server/matchmaker/matchmaker_queue.py @@ -1,9 +1,9 @@ import asyncio import time -from collections import OrderedDict, deque +from collections import OrderedDict from concurrent.futures import CancelledError from datetime import datetime, timezone -from typing import Deque, Dict, Iterable, List, Optional, Tuple +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple import server.metrics as metrics @@ -11,7 +11,9 @@ from .algorithm import make_matches, make_teams, make_teams_from_single from .map_pool import MapPool from .pop_timer import PopTimer -from .search import Match, Search +from .search import Search + +MatchFoundCallback = Callable[[Search, Search, "MatchmakerQueue"], Any] class MatchmakerSearchTimer: @@ -39,12 +41,13 @@ class MatchmakerQueue: def __init__( self, game_service: "GameService", + on_match_found: MatchFoundCallback, name: str, queue_id: int, featured_mod: str, rating_type: str, team_size: int = 1, - map_pools: Iterable[Tuple[MapPool, Optional[int], Optional[int]]] = () + map_pools: Iterable[Tuple[MapPool, Optional[int], Optional[int]]] = (), ): self.game_service = game_service self.name = name @@ -55,7 +58,7 @@ def __init__( self.map_pools = {info[0].id: info for info in map_pools} self._queue: Dict[Search, None] = OrderedDict() - self._matches: Deque[Match] = deque() + self.on_match_found = on_match_found self._is_running = True self.timer = PopTimer(self.name) @@ -79,18 +82,6 @@ def get_map_pool_for_rating(self, rating: int) -> Optional[MapPool]: def initialize(self): asyncio.create_task(self.queue_pop_timer()) - async def iter_matches(self): - """ Asynchronously yields matches as they become available """ - - while self._is_running: - if not self._matches: - # There are no matches so there is nothing to do - await asyncio.sleep(1) - continue - - # Yield the next available match to the caller - yield self._matches.popleft() - async def queue_pop_timer(self) -> None: """ Periodically tries to match all Searches in the queue. The amount of time until next queue 'pop' is determined by the number of players @@ -102,16 +93,6 @@ async def queue_pop_timer(self) -> None: await self.find_matches() - number_of_matches = len(self._matches) - metrics.matches.labels(self.name).set(number_of_matches) - - # TODO: Move this into algorithm, then don't need to recalculate quality_with? - # Probably not a major bottleneck though. - for search1, search2 in self._matches: - metrics.match_quality.labels(self.name).observe( - search1.quality_with(search2) - ) - number_of_unmatched_searches = len(self._queue) metrics.unmatched_searches.labels(self.name).set(number_of_unmatched_searches) @@ -154,11 +135,24 @@ async def find_matches(self) -> None: # Call self.match on all matches and filter out the ones that were cancelled loop = asyncio.get_running_loop() - new_matches = filter( + matches = list(filter( lambda m: self.match(m[0], m[1]), await loop.run_in_executor(None, make_matches, searches) - ) - self._matches.extend(new_matches) + )) + + number_of_matches = len(matches) + metrics.matches.labels(self.name).set(number_of_matches) + + for search1, search2 in matches: + # TODO: Move this into algorithm, then don't need to recalculate + # quality_with? Probably not a major bottleneck though. + metrics.match_quality.labels(self.name).observe( + search1.quality_with(search2) + ) + try: + self.on_match_found(search1, search2, self) + except Exception: + self._logger.exception("Match callback raised an exception!") def find_teams(self) -> List[Search]: searches = [] @@ -189,7 +183,7 @@ def match(self, s1: Search, s2: Search) -> bool: Mark the given two searches as matched :param s1: :param s2: - :return: + :return: True if matching succeeded or False if matching failed """ if (s1.is_matched or s2.is_matched) or (s1.is_cancelled or s2.is_cancelled): return False diff --git a/tests/conftest.py b/tests/conftest.py index ffaca40a3..b143d3724 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,6 +20,7 @@ from server.config import TRACE, config from server.db import FAFDatabase from server.game_service import GameService +from server.games import FeaturedModType from server.geoip_service import GeoIpService from server.lobbyconnection import LobbyConnection from server.matchmaker import MatchmakerQueue @@ -273,6 +274,7 @@ def make( queue_id += 1 return MatchmakerQueue( game_service=mock.Mock(), + on_match_found=mock.Mock(), name=name, queue_id=queue_id, featured_mod=mod, @@ -284,7 +286,14 @@ def make( @pytest.fixture def matchmaker_queue(game_service) -> MatchmakerQueue: - queue = MatchmakerQueue(game_service, "ladder1v1test", 1, "ladder1v1", 2) + queue = MatchmakerQueue( + game_service, + mock.Mock(), + "ladder1v1test", + FeaturedModType.LADDER_1V1, + RatingType.LADDER_1V1, + 1 + ) return queue diff --git a/tests/unit_tests/test_matchmaker_queue.py b/tests/unit_tests/test_matchmaker_queue.py index 0dd7a957c..5915cc6c3 100644 --- a/tests/unit_tests/test_matchmaker_queue.py +++ b/tests/unit_tests/test_matchmaker_queue.py @@ -1,7 +1,5 @@ import asyncio import functools -import random -from collections import deque from concurrent.futures import CancelledError, TimeoutError import pytest @@ -11,7 +9,6 @@ import server.config as config from server.matchmaker import CombinedSearch, MapPool, PopTimer, Search from server.rating import RatingType -from tests.utils import fast_forward @pytest.fixture(scope="session") @@ -312,30 +309,6 @@ def test_queue_multiple_map_pools( assert queue.get_map_pool_for_rating(rating) is None -@fast_forward(3) -@pytest.mark.asyncio -async def test_queue_matches(matchmaker_queue): - matches = [random.randrange(0, 1 << 20) for _ in range(20)] - matchmaker_queue._matches = deque(matches) - - async def call_shutdown(): - await asyncio.sleep(1) - matchmaker_queue.shutdown() - - asyncio.create_task(call_shutdown()) - collected_matches = [match async for match in matchmaker_queue.iter_matches()] - - assert collected_matches == matches - - -@pytest.mark.asyncio -async def test_shutdown_matchmaker(matchmaker_queue): - matchmaker_queue.shutdown() - # Verify that no matches are yielded after shutdown is called - async for _ in matchmaker_queue.iter_matches(): - assert False - - @pytest.mark.asyncio async def test_queue_many(matchmaker_queue, player_factory): p1, p2, p3 = player_factory("Dostya", ladder_rating=(2200, 150)), \ @@ -354,6 +327,9 @@ async def test_queue_many(matchmaker_queue, player_factory): assert not s1.is_matched assert s2.is_matched assert s3.is_matched + matchmaker_queue.on_match_found.assert_called_once_with( + s2, s3, matchmaker_queue + ) @pytest.mark.asyncio @@ -393,12 +369,13 @@ async def test_queue_cancel(matchmaker_queue, matchmaker_players): assert not s1.is_matched assert not s2.is_matched + matchmaker_queue.on_match_found.assert_not_called() @pytest.mark.asyncio async def test_queue_mid_cancel(matchmaker_queue, matchmaker_players_all_match): # Turn list of players into map from ids to players. - p0, p1, p2, p3, _ = matchmaker_players_all_match + _, p1, p2, p3, _ = matchmaker_players_all_match (s1, s2, s3) = (Search([p1]), Search([p2]), Search([p3])) @@ -421,3 +398,6 @@ async def find_matches(): assert s2.is_matched assert s3.is_matched assert len(matchmaker_queue._queue) == 0 + matchmaker_queue.on_match_found.assert_called_once_with( + s2, s3, matchmaker_queue + ) From c4b102a33650d580373180d7c21dbcb98ad2a871 Mon Sep 17 00:00:00 2001 From: Askaholic Date: Mon, 24 Aug 2020 22:23:01 -0800 Subject: [PATCH 3/9] Ensure that players can only be matched once at a time --- server/ladder_service.py | 8 +++ server/matchmaker/matchmaker_queue.py | 28 ++++++++- .../integration_tests/test_teammatchmaker.py | 57 ++++++++++++++++++- tests/unit_tests/test_ladder_service.py | 44 ++++++++++++++ tests/unit_tests/test_matchmaker_queue.py | 7 ++- 5 files changed, 141 insertions(+), 3 deletions(-) diff --git a/server/ladder_service.py b/server/ladder_service.py index 42c2a56c8..5ca6ae9d4 100644 --- a/server/ladder_service.py +++ b/server/ladder_service.py @@ -265,12 +265,20 @@ def on_match_found( s2: Search, queue: MatchmakerQueue ) -> None: + """ + Callback for when a match is generated by a matchmaker queue. + + NOTE: This function is called while the matchmaker search lock is held, + so it should only perform fast operations. + """ try: msg = {"command": "match_found", "queue": queue.name} for player in s1.players + s2.players: player.write_message(msg) + self.cancel_search(player) + asyncio.create_task( self.start_game(s1.players, s2.players, queue) ) diff --git a/server/matchmaker/matchmaker_queue.py b/server/matchmaker/matchmaker_queue.py index 311329de5..1d823cb27 100644 --- a/server/matchmaker/matchmaker_queue.py +++ b/server/matchmaker/matchmaker_queue.py @@ -7,7 +7,9 @@ import server.metrics as metrics +from ..asyncio_extensions import synchronized from ..decorators import with_logger +from ..players import PlayerState from .algorithm import make_matches, make_teams, make_teams_from_single from .map_pool import MapPool from .pop_timer import PopTimer @@ -125,7 +127,15 @@ async def search(self, search: Search) -> None: if search in self._queue: del self._queue[search] + @synchronized async def find_matches(self) -> None: + """ + Perform the matchmaking algorithm. + + Note that this funciton is synchronized such that only one instance of + MatchmakerQueue can call this function at any given time. This is + needed in order to safely enable multiqueuing. + """ self._logger.info("Searching for matches: %s", self.name) if len(self._queue) < 2 * self.team_size: @@ -185,8 +195,24 @@ def match(self, s1: Search, s2: Search) -> bool: :param s2: :return: True if matching succeeded or False if matching failed """ - if (s1.is_matched or s2.is_matched) or (s1.is_cancelled or s2.is_cancelled): + if s1.is_matched or s2.is_matched: + return False + if s1.is_cancelled or s2.is_cancelled: return False + # Additional failsafe. Ideally this check will never fail. + if any( + player.state != PlayerState.SEARCHING_LADDER + for player in s1.players + s2.players + ): + self._logger.warning( + "Tried to match searches %s and %s while some players had " + "invalid states: team1: %s team2: %s", + s1, s2, + list(p.state for p in s1.players), + list(p.state for p in s2.players) + ) + return False + s1.match(s2) s2.match(s1) if s1 in self._queue: diff --git a/tests/integration_tests/test_teammatchmaker.py b/tests/integration_tests/test_teammatchmaker.py index 129451209..f3b451093 100644 --- a/tests/integration_tests/test_teammatchmaker.py +++ b/tests/integration_tests/test_teammatchmaker.py @@ -10,7 +10,7 @@ pytestmark = pytest.mark.asyncio -async def queue_players_for_matchmaking(lobby_server): +async def connect_players(lobby_server): res = await asyncio.gather(*[ connect_and_sign_in( (f"ladder{i}",) * 2, @@ -21,6 +21,12 @@ async def queue_players_for_matchmaking(lobby_server): protos = [proto for _, _, proto in res] ids = [id_ for id_, _, _ in res] + return protos, ids + + +async def queue_players_for_matchmaking(lobby_server): + protos, ids = await connect_players(lobby_server) + await asyncio.gather(*[ read_until_command(proto, "game_info") for proto in protos ]) @@ -105,6 +111,55 @@ async def test_game_matchmaking(lobby_server): assert msg["faction"] == 1 +@fast_forward(15) +async def test_game_matchmaking_multiqueue(lobby_server): + protos, _ = await connect_players(lobby_server) + + await asyncio.gather(*[ + read_until_command(proto, "game_info") for proto in protos + ]) + + await protos[0].send_message({ + "command": "game_matchmaking", + "state": "start", + "faction": "uef", + "queue_name": "ladder1v1" + }) + await read_until_command(protos[0], "search_info") + await asyncio.gather(*[ + proto.send_message({ + "command": "game_matchmaking", + "state": "start", + "faction": "aeon", + "queue_name": "tmm2v2" + }) + for proto in protos + ]) + msg = await read_until( + protos[0], + lambda msg: ( + msg["command"] == "search_info" and msg["queue_name"] == "ladder1v1" + ) + ) + assert msg == { + "command": "search_info", + "queue_name": "ladder1v1", + "state": "stop" + } + msgs = await asyncio.gather(*[client_response(proto) for proto in protos]) + + uid = set(msg["uid"] for msg in msgs) + assert len(uid) == 1 + for msg in msgs: + assert msg["init_mode"] == 1 + assert "None" not in msg["name"] + assert msg["mod"] == "faf" + assert msg["expected_players"] == 4 + assert msg["team"] in (2, 3) + assert msg["map_position"] in (1, 2, 3, 4) + assert msg["faction"] == 2 + + @fast_forward(60) async def test_game_matchmaking_timeout(lobby_server): protos, _ = await queue_players_for_matchmaking(lobby_server) diff --git a/tests/unit_tests/test_ladder_service.py b/tests/unit_tests/test_ladder_service.py index 9af72cbd8..89261b59b 100644 --- a/tests/unit_tests/test_ladder_service.py +++ b/tests/unit_tests/test_ladder_service.py @@ -331,6 +331,50 @@ async def test_start_search_multiqueue_multiple_players( assert "tmm2v2" not in ladder_service._searches[p2] +async def test_game_start_cancels_search( + ladder_service: LadderService, + player_factory, + queue_factory, + event_loop +): + ladder_service.queues["tmm2v2"] = queue_factory("tmm2v2") + + p1 = player_factory( + "Dostya", + player_id=1, + ladder_rating=(1000, 10), + with_lobby_connection=True + ) + + p2 = player_factory( + "Brackman", + player_id=2, + ladder_rating=(1000, 10), + with_lobby_connection=True + ) + ladder_service.start_search([p1], "ladder1v1") + ladder_service.start_search([p2], "ladder1v1") + ladder_service.start_search([p1], "tmm2v2") + ladder_service.start_search([p2], "tmm2v2") + await exhaust_callbacks(event_loop) + + assert "ladder1v1" in ladder_service._searches[p1] + assert "tmm2v2" in ladder_service._searches[p1] + assert "ladder1v1" in ladder_service._searches[p2] + assert "tmm2v2" in ladder_service._searches[p2] + + ladder_service.on_match_found( + ladder_service._searches[p1]["ladder1v1"], + ladder_service._searches[p2]["ladder1v1"], + ladder_service.queues["ladder1v1"] + ) + + assert "ladder1v1" not in ladder_service._searches[p1] + assert "tmm2v2" not in ladder_service._searches[p1] + assert "ladder1v1" not in ladder_service._searches[p2] + assert "tmm2v2" not in ladder_service._searches[p2] + + async def test_start_and_cancel_search( ladder_service: LadderService, player_factory, diff --git a/tests/unit_tests/test_matchmaker_queue.py b/tests/unit_tests/test_matchmaker_queue.py index 5915cc6c3..6c0d77f37 100644 --- a/tests/unit_tests/test_matchmaker_queue.py +++ b/tests/unit_tests/test_matchmaker_queue.py @@ -8,12 +8,17 @@ import server.config as config from server.matchmaker import CombinedSearch, MapPool, PopTimer, Search +from server.players import PlayerState from server.rating import RatingType @pytest.fixture(scope="session") def player_factory(player_factory): - return functools.partial(player_factory, ladder_games=(config.NEWBIE_MIN_GAMES + 1)) + return functools.partial( + player_factory, + ladder_games=(config.NEWBIE_MIN_GAMES + 1), + state=PlayerState.SEARCHING_LADDER + ) @pytest.fixture From 611d0824eb1f72919870c05d4e6b1bb90d4750eb Mon Sep 17 00:00:00 2001 From: Askaholic Date: Sat, 5 Sep 2020 11:43:37 -0800 Subject: [PATCH 4/9] Rename inform_player --- server/ladder_service.py | 4 ++-- tests/unit_tests/test_ladder_service.py | 24 ++++++++++++------------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/server/ladder_service.py b/server/ladder_service.py index 5ca6ae9d4..7eff089dd 100644 --- a/server/ladder_service.py +++ b/server/ladder_service.py @@ -177,7 +177,7 @@ def start_search(self, players: List[Player], queue_name: str): for player in players: player.state = PlayerState.SEARCHING_LADDER - self.inform_player(player, queue.rating_type) + self.write_rating_progress(player, queue.rating_type) player.write_message({ "command": "search_info", @@ -229,7 +229,7 @@ def _cancel_search(self, initiator: Player, queue_name: str) -> None: "%s stopped searching for %s", cancelled_search, queue_name ) - def inform_player(self, player: Player, rating_type: str) -> None: + def write_rating_progress(self, player: Player, rating_type: str) -> None: if player not in self._informed_players: self._informed_players.add(player) _, deviation = player.ratings[rating_type] diff --git a/tests/unit_tests/test_ladder_service.py b/tests/unit_tests/test_ladder_service.py index 89261b59b..0829e00ea 100644 --- a/tests/unit_tests/test_ladder_service.py +++ b/tests/unit_tests/test_ladder_service.py @@ -155,7 +155,7 @@ async def test_start_game_with_teams( assert game.max_players == 4 -async def test_inform_player(ladder_service: LadderService, player_factory): +async def test_write_rating_progress(ladder_service: LadderService, player_factory): p1 = player_factory( "Dostya", player_id=1, @@ -163,16 +163,16 @@ async def test_inform_player(ladder_service: LadderService, player_factory): with_lobby_connection=True ) - ladder_service.inform_player(p1, RatingType.LADDER_1V1) + ladder_service.write_rating_progress(p1, RatingType.LADDER_1V1) # Message is sent after the first call p1.lobby_connection.write.assert_called_once() - ladder_service.inform_player(p1, RatingType.LADDER_1V1) + ladder_service.write_rating_progress(p1, RatingType.LADDER_1V1) p1.lobby_connection.write.reset_mock() # But not after the second p1.lobby_connection.write.assert_not_called() await ladder_service.on_connection_lost(p1) - ladder_service.inform_player(p1, RatingType.LADDER_1V1) + ladder_service.write_rating_progress(p1, RatingType.LADDER_1V1) # But it is called if the player relogs p1.lobby_connection.write.assert_called_once() @@ -482,14 +482,14 @@ async def test_start_game_called_on_match(ladder_service: LadderService, player_ ) ladder_service.start_game = CoroutineMock() - ladder_service.inform_player = CoroutineMock() + ladder_service.write_rating_progress = CoroutineMock() ladder_service.start_search([p1], "ladder1v1") ladder_service.start_search([p2], "ladder1v1") await asyncio.sleep(2) - ladder_service.inform_player.assert_called() + ladder_service.write_rating_progress.assert_called() ladder_service.start_game.assert_called_once() @@ -659,14 +659,14 @@ async def test_game_name_many_teams(player_factory): assert game_name([p1], [p2], [p3], [p4]) == "Dostya Vs QAI Vs Rhiza Vs Kale" -async def test_inform_player_message( +async def test_write_rating_progress_message( ladder_service: LadderService, player_factory ): player = player_factory(ladder_rating=(1500, 500)) player.write_message = CoroutineMock() - ladder_service.inform_player(player, RatingType.LADDER_1V1) + ladder_service.write_rating_progress(player, RatingType.LADDER_1V1) player.write_message.assert_called_once_with({ "command": "notice", @@ -683,14 +683,14 @@ async def test_inform_player_message( }) -async def test_inform_player_message_2( +async def test_write_rating_progress_message_2( ladder_service: LadderService, player_factory ): player = player_factory(ladder_rating=(1500, 400.1235)) player.write_message = CoroutineMock() - ladder_service.inform_player(player, RatingType.LADDER_1V1) + ladder_service.write_rating_progress(player, RatingType.LADDER_1V1) player.write_message.assert_called_once_with({ "command": "notice", @@ -702,7 +702,7 @@ async def test_inform_player_message_2( }) -async def test_inform_player_other_rating( +async def test_write_rating_progress_other_rating( ladder_service: LadderService, player_factory ): @@ -714,7 +714,7 @@ async def test_inform_player_other_rating( # There's no reason we would call it with global, but the logic is the same # and global is an available rating that's not ladder - ladder_service.inform_player(player, RatingType.GLOBAL) + ladder_service.write_rating_progress(player, RatingType.GLOBAL) player.write_message.assert_called_once_with({ "command": "notice", From a36afc40aba3630d90001bb05b0160ec6433aea8 Mon Sep 17 00:00:00 2001 From: Askaholic Date: Sat, 5 Sep 2020 11:59:44 -0800 Subject: [PATCH 5/9] Cleanup typing for Player --- server/players.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/players.py b/server/players.py index 4649a90f2..d4ea0d5df 100644 --- a/server/players.py +++ b/server/players.py @@ -107,21 +107,22 @@ def is_admin(self) -> bool: def is_moderator(self) -> bool: return "faf_moderators_global" in self.user_groups - async def send_message(self, message) -> None: + async def send_message(self, message: dict) -> None: """ Try to send a message to this player. - :raises: DisconnectedError if the player has disconnected + :raises: DisconnectedError if the player has disconnected. """ if self.lobby_connection is None: raise DisconnectedError("Player has disconnected!") await self.lobby_connection.send(message) - def write_message(self, message) -> None: + def write_message(self, message: dict) -> None: """ - Try to queue a message to be sent this player. Only call this from - broadcasting functions. Does nothing if the player has disconnected. + Try to queue a message to be sent to this player. + + Does nothing if the player has disconnected. """ if self.lobby_connection is None: return From f850120c2485f52f04df200e369be5c636f392a8 Mon Sep 17 00:00:00 2001 From: Askaholic Date: Fri, 18 Sep 2020 16:27:45 -0800 Subject: [PATCH 6/9] Add integration test for multiqueueing --- integration_tests/fafclient.py | 13 +++++++++++++ integration_tests/test_matchmaking.py | 26 ++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/integration_tests/fafclient.py b/integration_tests/fafclient.py index e122dcbcc..eb6a6448b 100644 --- a/integration_tests/fafclient.py +++ b/integration_tests/fafclient.py @@ -130,6 +130,19 @@ async def join_game(self, game_id, **kwargs): # HACK: Yield long enough for the server to process our message await asyncio.sleep(0.5) + async def join_queue(self, queue_name, faction="uef"): + await self.send_message({ + "command": "game_matchmaking", + "state": "start", + "queue_name": queue_name, + "faction": faction + }) + await self.read_until(lambda msg: msg == { + "command": "search_info", + "state": "start", + "queue_name": queue_name + }) + async def open_fa(self): """Simulate FA opening""" await self.send_message({ diff --git a/integration_tests/test_matchmaking.py b/integration_tests/test_matchmaking.py index f114069af..8c0f9c61e 100644 --- a/integration_tests/test_matchmaking.py +++ b/integration_tests/test_matchmaking.py @@ -137,3 +137,29 @@ async def handle_game_launch(client): assert ratings[winner][0] < new_ratings[winner][0] assert ratings[loser][0] > new_ratings[loser][0] + + +async def test_multiqueue(client_factory): + client1, _ = await client_factory.login("test") + client2, _ = await client_factory.login("test2") + + await client1.join_queue("tmm2v2") + + for client in (client1, client2): + await client.join_queue("ladder1v1") + + await client1.read_until_command("match_found", timeout=60) + msg1 = await client1.read_until_command("search_info") + msg2 = await client1.read_until_command("search_info") + + assert { + "command": "search_info", + "queue_name": "tmm2v2", + "state": "stop" + } in (msg1, msg2) + + assert { + "command": "search_info", + "queue_name": "ladder1v1", + "state": "stop" + } in (msg1, msg2) From fc828d170e3a31f51d8b1415699fbec86e6552fe Mon Sep 17 00:00:00 2001 From: Askaholic Date: Fri, 18 Sep 2020 16:30:03 -0800 Subject: [PATCH 7/9] Adjust default timeout --- integration_tests/fafclient.py | 4 ++-- integration_tests/test_matchmaking.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/integration_tests/fafclient.py b/integration_tests/fafclient.py index eb6a6448b..50fc573f2 100644 --- a/integration_tests/fafclient.py +++ b/integration_tests/fafclient.py @@ -44,13 +44,13 @@ async def send_command(self, command, **kwargs): """Convenience for sending commands""" await self.send_message({"command": command, **kwargs}) - async def read_until(self, predicate, timeout=60): + async def read_until(self, predicate, timeout=5): return await asyncio.wait_for( read_until(self.proto, predicate), timeout=timeout ) - async def read_until_command(self, command, timeout=60): + async def read_until_command(self, command, timeout=5): return await read_until_command(self.proto, command, timeout=timeout) async def read_until_game_launch(self, uid): diff --git a/integration_tests/test_matchmaking.py b/integration_tests/test_matchmaking.py index 8c0f9c61e..310bbc4fb 100644 --- a/integration_tests/test_matchmaking.py +++ b/integration_tests/test_matchmaking.py @@ -29,7 +29,7 @@ async def test_ladder_1v1_match(client_factory): "faction": "seraphim" }) - await client1.read_until_command("match_found") + await client1.read_until_command("match_found", timeout=60) await client2.read_until_command("match_found") msg1 = await client1.read_until_command("game_info") From 61dbfc87839f420647e534fcdd069746fb64d980 Mon Sep 17 00:00:00 2001 From: Askaholic Date: Fri, 25 Sep 2020 12:04:59 -0800 Subject: [PATCH 8/9] Add another test for searches being cancelled --- tests/integration_tests/conftest.py | 24 +++++-- .../integration_tests/test_teammatchmaker.py | 64 +++++++++++++++++++ 2 files changed, 83 insertions(+), 5 deletions(-) diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index 1eab0165d..3379e737a 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -170,17 +170,31 @@ async def perform_login( }) -async def read_until( - proto: Protocol, pred: Callable[[Dict[str, Any]], bool] +async def _read_until( + proto: Protocol, + pred: Callable[[Dict[str, Any]], bool] ) -> Dict[str, Any]: while True: msg = await proto.read_message() try: if pred(msg): return msg - except (KeyError, ValueError): - logging.getLogger().info("read_until predicate raised during message: {}".format(msg)) + except KeyError: pass + except Exception: + logging.getLogger().warning( + "read_until predicate raised during message: %s", + msg, + exc_info=True + ) + + +async def read_until( + proto: Protocol, + pred: Callable[[Dict[str, Any]], bool], + timeout: float = 60 +) -> Dict[str, Any]: + return await asyncio.wait_for(_read_until(proto, pred), timeout=timeout) async def read_until_command( @@ -189,7 +203,7 @@ async def read_until_command( timeout: float = 60 ) -> Dict[str, Any]: return await asyncio.wait_for( - read_until(proto, lambda msg: msg.get("command") == command), + _read_until(proto, lambda msg: msg.get("command") == command), timeout=timeout ) diff --git a/tests/integration_tests/test_teammatchmaker.py b/tests/integration_tests/test_teammatchmaker.py index f3b451093..8b71c0001 100644 --- a/tests/integration_tests/test_teammatchmaker.py +++ b/tests/integration_tests/test_teammatchmaker.py @@ -160,6 +160,70 @@ async def test_game_matchmaking_multiqueue(lobby_server): assert msg["faction"] == 2 +@fast_forward(60) +async def test_game_matchmaking_multiqueue_multimatch(lobby_server): + """ + Scenario where both queues could possibly generate a match. + Queues: + ladder1v1 - 2 players join + tmm2v2 - 4 players join + Result: + Either one of the queues generates a match, but not both. + """ + protos, _ = await connect_players(lobby_server) + + await asyncio.gather(*[ + read_until_command(proto, "game_info") for proto in protos + ]) + + ladder1v1_tasks = [ + proto.send_message({ + "command": "game_matchmaking", + "state": "start", + "faction": "uef", + "queue_name": "ladder1v1" + }) + for proto in protos[:2] + ] + await asyncio.gather(*[ + proto.send_message({ + "command": "game_matchmaking", + "state": "start", + "faction": "aeon", + "queue_name": "tmm2v2" + }) + for proto in protos + ] + ladder1v1_tasks) + msg1 = await read_until_command(protos[0], "match_found") + msg2 = await read_until_command(protos[1], "match_found") + + matched_queue = msg1["queue"] + if matched_queue == "ladder1v1": + with pytest.raises(asyncio.TimeoutError): + await read_until_command(protos[2], "match_found", timeout=3) + with pytest.raises(asyncio.TimeoutError): + await read_until_command(protos[3], "match_found", timeout=3) + with pytest.raises(asyncio.TimeoutError): + await read_until_command(protos[2], "search_info", timeout=3) + with pytest.raises(asyncio.TimeoutError): + await read_until_command(protos[3], "search_info", timeout=3) + else: + await read_until_command(protos[2], "match_found", timeout=3) + await read_until_command(protos[3], "match_found", timeout=3) + + assert msg1 == msg2 + + def other_cancelled(msg): + return ( + msg["command"] == "search_info" + and msg["queue_name"] != matched_queue + ) + msg1 = await read_until(protos[0], other_cancelled, timeout=3) + msg2 = await read_until(protos[1], other_cancelled, timeout=3) + assert msg1 == msg2 + assert msg1["state"] == "stop" + + @fast_forward(60) async def test_game_matchmaking_timeout(lobby_server): protos, _ = await queue_players_for_matchmaking(lobby_server) From 099601aa06d8d1cfa4452e60facbba1a7f35f8f8 Mon Sep 17 00:00:00 2001 From: Askaholic Date: Sun, 4 Oct 2020 22:19:34 -0800 Subject: [PATCH 9/9] Initialize locks lazily It seems that during class creation time, the event loop that is automatically detected by asyncio.Lock is not the same event loop that ends up being used by asyncio.run. Therefore, when two coroutines try to access the lock at the same time and one of them needs to await it, a RuntimeError is raised because of mismatched event loops. --- server/asyncio_extensions.py | 9 ++++--- tests/unit_tests/test_matchmaker_queue.py | 32 +++++++++++++++++++++++ 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/server/asyncio_extensions.py b/server/asyncio_extensions.py index 4a44d214e..b989a1a04 100644 --- a/server/asyncio_extensions.py +++ b/server/asyncio_extensions.py @@ -89,11 +89,14 @@ def _synchronize( lock: Optional[asyncio.Lock] = None ) -> AsyncFunc: """Wrap an async function with an async lock.""" - if lock is None: - lock = asyncio.Lock() - @wraps(function) async def wrapped(*args, **kwargs): + nonlocal lock + + # During testing, functions are called from multiple loops + if lock is None or lock._loop != asyncio.get_event_loop(): + lock = asyncio.Lock() + async with lock: return await function(*args, **kwargs) diff --git a/tests/unit_tests/test_matchmaker_queue.py b/tests/unit_tests/test_matchmaker_queue.py index 6c0d77f37..4f1f7d282 100644 --- a/tests/unit_tests/test_matchmaker_queue.py +++ b/tests/unit_tests/test_matchmaker_queue.py @@ -1,7 +1,9 @@ import asyncio import functools +import time from concurrent.futures import CancelledError, TimeoutError +import mock import pytest from hypothesis import given from hypothesis import strategies as st @@ -406,3 +408,33 @@ async def find_matches(): matchmaker_queue.on_match_found.assert_called_once_with( s2, s3, matchmaker_queue ) + + +@pytest.mark.asyncio +async def test_find_matches_synchronized(queue_factory): + is_matching = False + + def make_matches(*args): + nonlocal is_matching + + assert not is_matching, "Function call not synchronized" + is_matching = True + + time.sleep(0.2) + + is_matching = False + return [] + + with mock.patch( + "server.matchmaker.matchmaker_queue.make_matches", + make_matches + ): + queues = [queue_factory(f"Queue{i}") for i in range(5)] + # Ensure that find_matches does not short circuit + for queue in queues: + queue._queue = {mock.Mock(): 1, mock.Mock(): 2} + queue.find_teams = mock.Mock() + + await asyncio.gather(*[ + queue.find_matches() for queue in queues + ])