Skip to content

Commit

Permalink
Merge branch 'issue/FAForever#603-tmm-multiqueuing' into team-matchma…
Browse files Browse the repository at this point in the history
…king-and-party
  • Loading branch information
Askaholic committed Oct 7, 2020
2 parents 80feb6c + 099601a commit d3277a2
Show file tree
Hide file tree
Showing 11 changed files with 411 additions and 121 deletions.
17 changes: 15 additions & 2 deletions integration_tests/fafclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ async def send_command(self, command, **kwargs):
"""Convenience for sending commands"""
await self.send_message({"command": command, **kwargs})

async def read_until(self, predicate, timeout=60):
async def read_until(self, predicate, timeout=5):
return await asyncio.wait_for(
read_until(self.proto, predicate),
timeout=timeout
)

async def read_until_command(self, command, timeout=60):
async def read_until_command(self, command, timeout=5):
return await read_until_command(self.proto, command, timeout=timeout)

async def read_until_game_launch(self, uid):
Expand Down Expand Up @@ -130,6 +130,19 @@ async def join_game(self, game_id, **kwargs):
# HACK: Yield long enough for the server to process our message
await asyncio.sleep(0.5)

async def join_queue(self, queue_name, faction="uef"):
await self.send_message({
"command": "game_matchmaking",
"state": "start",
"queue_name": queue_name,
"faction": faction
})
await self.read_until(lambda msg: msg == {
"command": "search_info",
"state": "start",
"queue_name": queue_name
})

async def open_fa(self):
"""Simulate FA opening"""
await self.send_message({
Expand Down
28 changes: 27 additions & 1 deletion integration_tests/test_matchmaking.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def test_ladder_1v1_match(client_factory):
"faction": "seraphim"
})

await client1.read_until_command("match_found")
await client1.read_until_command("match_found", timeout=60)
await client2.read_until_command("match_found")

msg1 = await client1.read_until_command("game_info")
Expand Down Expand Up @@ -137,3 +137,29 @@ async def handle_game_launch(client):

assert ratings[winner][0] < new_ratings[winner][0]
assert ratings[loser][0] > new_ratings[loser][0]


async def test_multiqueue(client_factory):
client1, _ = await client_factory.login("test")
client2, _ = await client_factory.login("test2")

await client1.join_queue("tmm2v2")

for client in (client1, client2):
await client.join_queue("ladder1v1")

await client1.read_until_command("match_found", timeout=60)
msg1 = await client1.read_until_command("search_info")
msg2 = await client1.read_until_command("search_info")

assert {
"command": "search_info",
"queue_name": "tmm2v2",
"state": "stop"
} in (msg1, msg2)

assert {
"command": "search_info",
"queue_name": "ladder1v1",
"state": "stop"
} in (msg1, msg2)
9 changes: 6 additions & 3 deletions server/asyncio_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,14 @@ def _synchronize(
lock: Optional[asyncio.Lock] = None
) -> AsyncFunc:
"""Wrap an async function with an async lock."""
if lock is None:
lock = asyncio.Lock()

@wraps(function)
async def wrapped(*args, **kwargs):
nonlocal lock

# During testing, functions are called from multiple loops
if lock is None or lock._loop != asyncio.get_event_loop():
lock = asyncio.Lock()

async with lock:
return await function(*args, **kwargs)

Expand Down
60 changes: 35 additions & 25 deletions server/ladder_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ async def update_data(self) -> None:
for name, info in db_queues.items():
if name not in self.queues:
queue = MatchmakerQueue(
self.game_service,
self.on_match_found,
name=name,
queue_id=info["id"],
featured_mod=info["mod"],
rating_type=info["rating_type"],
team_size=info["team_size"],
game_service=self.game_service
)
self.queues[name] = queue
queue.initialize()
asyncio.ensure_future(self.handle_queue_matches(queue))
else:
queue = self.queues[name]
queue.featured_mod = info["mod"]
Expand Down Expand Up @@ -185,9 +185,8 @@ def start_search(

for player in players:
player.state = PlayerState.SEARCHING_LADDER
# FIXME: For now, inform_player is only designed for ladder1v1
if queue_name == "ladder1v1":
self.inform_player(player)

self.write_rating_progress(player, queue.rating_type)

player.write_message({
"command": "search_info",
Expand Down Expand Up @@ -239,10 +238,10 @@ def _cancel_search(self, initiator: Player, queue_name: str) -> None:
"%s stopped searching for %s", cancelled_search, queue_name
)

def inform_player(self, player: Player):
def write_rating_progress(self, player: Player, rating_type: str) -> None:
if player not in self._informed_players:
self._informed_players.add(player)
mean, deviation = player.ratings[RatingType.LADDER_1V1]
_, deviation = player.ratings[rating_type]

if deviation > 490:
player.write_message({
Expand All @@ -269,30 +268,41 @@ def inform_player(self, player: Player):
)
})

async def handle_queue_matches(self, queue: MatchmakerQueue):
async for s1, s2 in queue.iter_matches():
try:
msg = {"command": "match_found", "queue": queue.name}
# TODO: Handle disconnection with a client supported message
await asyncio.gather(*[
player.send_message(msg)
for player in s1.players + s2.players
])
asyncio.create_task(
self.start_game(s1.players, s2.players, queue)
)
except Exception as e:
self._logger.exception(
"Error processing match between searches %s, and %s: %s",
s1, s2, e
)
def on_match_found(
self,
s1: Search,
s2: Search,
queue: MatchmakerQueue
) -> None:
"""
Callback for when a match is generated by a matchmaker queue.
NOTE: This function is called while the matchmaker search lock is held,
so it should only perform fast operations.
"""
try:
msg = {"command": "match_found", "queue": queue.name}

for player in s1.players + s2.players:
player.write_message(msg)

self.cancel_search(player)

asyncio.create_task(
self.start_game(s1.players, s2.players, queue)
)
except Exception:
self._logger.exception(
"Error processing match between searches %s, and %s",
s1, s2
)

async def start_game(
self,
team1: List[Player],
team2: List[Player],
queue: MatchmakerQueue
):
) -> None:
self._logger.debug(
"Starting %s game between %s and %s", queue.name, team1, team2
)
Expand Down
84 changes: 52 additions & 32 deletions server/matchmaker/matchmaker_queue.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import asyncio
import time
from collections import OrderedDict, deque
from collections import OrderedDict
from concurrent.futures import CancelledError
from datetime import datetime, timezone
from typing import Deque, Dict, Iterable, List, Optional, Tuple
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple

import server.metrics as metrics

from ..asyncio_extensions import synchronized
from ..decorators import with_logger
from ..players import PlayerState
from .algorithm import make_matches, make_teams, make_teams_from_single
from .map_pool import MapPool
from .pop_timer import PopTimer
from .search import Match, Search
from .search import Search

MatchFoundCallback = Callable[[Search, Search, "MatchmakerQueue"], Any]


class MatchmakerSearchTimer:
Expand Down Expand Up @@ -39,12 +43,13 @@ class MatchmakerQueue:
def __init__(
self,
game_service: "GameService",
on_match_found: MatchFoundCallback,
name: str,
queue_id: int,
featured_mod: str,
rating_type: str,
team_size: int = 1,
map_pools: Iterable[Tuple[MapPool, Optional[int], Optional[int]]] = ()
map_pools: Iterable[Tuple[MapPool, Optional[int], Optional[int]]] = (),
):
self.game_service = game_service
self.name = name
Expand All @@ -55,7 +60,7 @@ def __init__(
self.map_pools = {info[0].id: info for info in map_pools}

self._queue: Dict[Search, None] = OrderedDict()
self._matches: Deque[Match] = deque()
self.on_match_found = on_match_found
self._is_running = True

self.timer = PopTimer(self.name)
Expand Down Expand Up @@ -83,18 +88,6 @@ def initialize(self):
def num_players(self) -> int:
return sum(len(search.players) for search in self._queue.keys())

async def iter_matches(self):
""" Asynchronously yields matches as they become available """

while self._is_running:
if not self._matches:
# There are no matches so there is nothing to do
await asyncio.sleep(1)
continue

# Yield the next available match to the caller
yield self._matches.popleft()

async def queue_pop_timer(self) -> None:
""" Periodically tries to match all Searches in the queue. The amount
of time until next queue 'pop' is determined by the number of players
Expand All @@ -106,16 +99,6 @@ async def queue_pop_timer(self) -> None:

await self.find_matches()

number_of_matches = len(self._matches)
metrics.matches.labels(self.name).set(number_of_matches)

# TODO: Move this into algorithm, then don't need to recalculate quality_with?
# Probably not a major bottleneck though.
for search1, search2 in self._matches:
metrics.match_quality.labels(self.name).observe(
search1.quality_with(search2)
)

number_of_unmatched_searches = len(self._queue)
metrics.unmatched_searches.labels(self.name).set(number_of_unmatched_searches)

Expand Down Expand Up @@ -148,7 +131,15 @@ async def search(self, search: Search) -> None:
if search in self._queue:
del self._queue[search]

@synchronized
async def find_matches(self) -> None:
"""
Perform the matchmaking algorithm.
Note that this funciton is synchronized such that only one instance of
MatchmakerQueue can call this function at any given time. This is
needed in order to safely enable multiqueuing.
"""
self._logger.info("Searching for matches: %s", self.name)

if self.num_players < 2 * self.team_size:
Expand All @@ -158,11 +149,24 @@ async def find_matches(self) -> None:

# Call self.match on all matches and filter out the ones that were cancelled
loop = asyncio.get_running_loop()
new_matches = filter(
matches = list(filter(
lambda m: self.match(m[0], m[1]),
await loop.run_in_executor(None, make_matches, searches)
)
self._matches.extend(new_matches)
))

number_of_matches = len(matches)
metrics.matches.labels(self.name).set(number_of_matches)

for search1, search2 in matches:
# TODO: Move this into algorithm, then don't need to recalculate
# quality_with? Probably not a major bottleneck though.
metrics.match_quality.labels(self.name).observe(
search1.quality_with(search2)
)
try:
self.on_match_found(search1, search2, self)
except Exception:
self._logger.exception("Match callback raised an exception!")

def find_teams(self) -> List[Search]:
searches = []
Expand Down Expand Up @@ -193,10 +197,26 @@ def match(self, s1: Search, s2: Search) -> bool:
Mark the given two searches as matched
:param s1:
:param s2:
:return:
:return: True if matching succeeded or False if matching failed
"""
if (s1.is_matched or s2.is_matched) or (s1.is_cancelled or s2.is_cancelled):
if s1.is_matched or s2.is_matched:
return False
if s1.is_cancelled or s2.is_cancelled:
return False
# Additional failsafe. Ideally this check will never fail.
if any(
player.state != PlayerState.SEARCHING_LADDER
for player in s1.players + s2.players
):
self._logger.warning(
"Tried to match searches %s and %s while some players had "
"invalid states: team1: %s team2: %s",
s1, s2,
list(p.state for p in s1.players),
list(p.state for p in s2.players)
)
return False

s1.match(s2)
s2.match(s1)
if s1 in self._queue:
Expand Down
11 changes: 6 additions & 5 deletions server/players.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,22 @@ def is_admin(self) -> bool:
def is_moderator(self) -> bool:
return "faf_moderators_global" in self.user_groups

async def send_message(self, message) -> None:
async def send_message(self, message: dict) -> None:
"""
Try to send a message to this player.
:raises: DisconnectedError if the player has disconnected
:raises: DisconnectedError if the player has disconnected.
"""
if self.lobby_connection is None:
raise DisconnectedError("Player has disconnected!")

await self.lobby_connection.send(message)

def write_message(self, message) -> None:
def write_message(self, message: dict) -> None:
"""
Try to queue a message to be sent this player. Only call this from
broadcasting functions. Does nothing if the player has disconnected.
Try to queue a message to be sent to this player.
Does nothing if the player has disconnected.
"""
if self.lobby_connection is None:
return
Expand Down
Loading

0 comments on commit d3277a2

Please sign in to comment.