Skip to content

Commit

Permalink
Refactor message sending interface to be a coroutine (#497)
Browse files Browse the repository at this point in the history
* Refactor message sending interface to be a coroutine

* Use gather to wait on some messages simultaneously

* Clean up imports

* Call gather correctly

* Check results for exceptions

* Make abort() a coroutine

* Make report_dirties a coroutine so that messages are guaranteed to be sent in order

* Add test to verify that ping message is sent

* Refactor report dirty interval using aiocron style timer

* Refactor a few other unnecessary uses of asyncio.ensure_future

* Clean up timer

* Revert geoip service change

* All commands must be coroutines, added tests for lobbyconnection error conditions

* Adjust tests so they actually work

* Add test for verifying that your foes don't see your games

* Add test for verifying that only your friends can see games with 'friends' visibility state

* Make command_pong a coroutine
  • Loading branch information
Askaholic authored Dec 23, 2019
1 parent aee9965 commit 79721a9
Show file tree
Hide file tree
Showing 27 changed files with 610 additions and 383 deletions.
57 changes: 37 additions & 20 deletions server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Any, Dict, Optional

import aiomeasures
import asyncio

from server.db import FAFDatabase
from . import config as config
Expand All @@ -26,9 +27,11 @@
from .game_service import GameService
from .ladder_service import LadderService
from .control import init as run_control_server
from .timing import at_interval


__version__ = '0.9.17'
__author__ = 'Chris Kitching, Dragonfire, Gael Honorez, Jeroen De Dauw, Crotalus, Michael Søndergaard, Michel Jung'
__author__ = 'Askaholic, Chris Kitching, Dragonfire, Gael Honorez, Jeroen De Dauw, Crotalus, Michael Søndergaard, Michel Jung'
__contact__ = 'admin@faforever.com'
__license__ = 'GPLv3'
__copyright__ = 'Copyright (c) 2011-2015 ' + __author__
Expand Down Expand Up @@ -94,7 +97,8 @@ def run_lobby_server(
Run the lobby server
"""

def report_dirties():
@at_interval(DIRTY_REPORT_INTERVAL)
async def do_report_dirties():
try:
dirty_games = games.dirty_games
dirty_queues = games.dirty_queues
Expand All @@ -103,39 +107,54 @@ def report_dirties():
player_service.clear_dirty()

if len(dirty_queues) > 0:
ctx.broadcast_raw(encode_queues(dirty_queues), lambda lobby_conn: lobby_conn.authenticated)
await ctx.broadcast_raw(
encode_queues(dirty_queues),
lambda lobby_conn: lobby_conn.authenticated
)

if len(dirty_players) > 0:
ctx.broadcast_raw(encode_players(dirty_players), lambda lobby_conn: lobby_conn.authenticated)

# TODO: This spams squillions of messages: we should implement per-connection message
# aggregation at the next abstraction layer down :P
await ctx.broadcast_raw(
encode_players(dirty_players),
lambda lobby_conn: lobby_conn.authenticated
)

# TODO: This spams squillions of messages: we should implement per-
# connection message aggregation at the next abstraction layer down :P
tasks = []
for game in dirty_games:
if game.state == GameState.ENDED:
games.remove_game(game)

# So we're going to be broadcasting this to _somebody_...
message = encode_dict(game.to_dict())

# These games shouldn't be broadcast, but instead privately sent to those who are
# allowed to see them.
# These games shouldn't be broadcast, but instead privately sent
# to those who are allowed to see them.
if game.visibility == VisibilityState.FRIENDS:
# To see this game, you must have an authenticated connection and be a friend of the host, or the host.
validation_func = lambda lobby_conn: lobby_conn.player.id in game.host.friends or lobby_conn.player == game.host
# To see this game, you must have an authenticated
# connection and be a friend of the host, or the host.
def validation_func(lobby_conn):
return lobby_conn.player.id in game.host.friends or \
lobby_conn.player == game.host
else:
validation_func = lambda lobby_conn: lobby_conn.player.id not in game.host.foes
def validation_func(lobby_conn):
return lobby_conn.player.id not in game.host.foes

tasks.append(ctx.broadcast_raw(
message,
lambda lobby_conn: lobby_conn.authenticated and validation_func(lobby_conn)
))

await asyncio.gather(*tasks)

ctx.broadcast_raw(message, lambda lobby_conn: lobby_conn.authenticated and validation_func(lobby_conn))
except Exception as e:
logging.getLogger().exception(e)
finally:
loop.call_later(DIRTY_REPORT_INTERVAL, report_dirties)

ping_msg = encode_message('PING')

def ping_broadcast():
ctx.broadcast_raw(ping_msg)
loop.call_later(45, ping_broadcast)
@at_interval(45)
async def ping_broadcast():
await ctx.broadcast_raw(ping_msg)

def make_connection() -> LobbyConnection:
return LobbyConnection(
Expand All @@ -147,7 +166,5 @@ def make_connection() -> LobbyConnection:
ladder_service=ladder_service
)
ctx = ServerContext(make_connection, name="LobbyServer")
loop.call_later(DIRTY_REPORT_INTERVAL, report_dirties)
loop.call_soon(ping_broadcast)
loop.run_until_complete(ctx.listen(*address))
return ctx
5 changes: 3 additions & 2 deletions server/api/oauth_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from typing import Dict

import aiohttp
from oauthlib.oauth2.rfc6749.errors import (InsecureTransportError,
MissingTokenError)
from oauthlib.oauth2.rfc6749.errors import (
InsecureTransportError, MissingTokenError
)


class OAuth2Session(object):
Expand Down
9 changes: 6 additions & 3 deletions server/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
Tiny local-only http server for getting stats and performing various tasks
"""

import logging
import socket
from json import dumps

from aiohttp import web
import logging
from server import PlayerService, GameService, config
from json import dumps

from . import config
from .game_service import GameService
from .player_service import PlayerService

logger = logging.getLogger(__name__)

Expand Down
3 changes: 1 addition & 2 deletions server/game_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
from typing import Dict, List, Optional, Union, ValuesView

import aiocron
from server import GameState, VisibilityState
from server.db import FAFDatabase
from server.decorators import with_logger
from server.games import CoopGame, CustomGame, FeaturedMod, LadderGame
from server.games.game import Game
from server.games.game import Game, GameState, VisibilityState
from server.matchmaker import MatchmakerQueue
from server.players import Player

Expand Down
68 changes: 38 additions & 30 deletions server/gameconnection.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import asyncio

from server.db import FAFDatabase
from sqlalchemy import text, select
from sqlalchemy import select, text

from .abc.base_game import GameConnectionState
from .config import TRACE
from .db.models import login, moderation_report, reported_user
from .decorators import with_logger
from .game_service import GameService
from .games.game import Game, GameState, ValidityState, Victory
from .player_service import PlayerService
from .players import Player, PlayerState
from .protocol import GpgNetServerProtocol, QDataStreamProtocol

from .db.models import (reported_user, moderation_report, login)


@with_logger
class GameConnection(GpgNetServerProtocol):
Expand Down Expand Up @@ -69,11 +68,11 @@ def player(self) -> Player:
def player(self, val: Player):
self._player = val

def send_message(self, message):
async def send_message(self, message):
message['target'] = "game"

self._logger.log(TRACE, ">>: %s", message)
self.protocol.send_message(message)
await self.protocol.send_message(message)

async def _handle_idle_state(self):
"""
Expand All @@ -92,7 +91,7 @@ async def _handle_idle_state(self):
pass
else:
self._logger.exception("Unknown PlayerState: %s", state)
self.abort()
await self.abort()

async def _handle_lobby_state(self):
"""
Expand All @@ -104,7 +103,7 @@ async def _handle_lobby_state(self):
try:
player_state = self.player.state
if player_state == PlayerState.HOSTING:
self.send_HostGame(self.game.map_folder_name)
await self.send_HostGame(self.game.map_folder_name)
self.game.set_hosted()
# If the player is joining, we connect him to host
# followed by the rest of the players.
Expand All @@ -116,10 +115,12 @@ async def _handle_lobby_state(self):
self._state = GameConnectionState.CONNECTED_TO_HOST
self.game.add_game_connection(self)

tasks = []
for peer in self.game.connections:
if peer != self and peer.player != self.game.host:
self._logger.debug("%s connecting to %s", self.player, peer)
asyncio.ensure_future(self.connect_to_peer(peer))
tasks.append(self.connect_to_peer(peer))
await asyncio.gather(*tasks)
except Exception as e: # pragma: no cover
self._logger.exception(e)

Expand All @@ -129,24 +130,29 @@ async def connect_to_host(self, peer: "GameConnection"):
:return:
"""
assert peer.player.state == PlayerState.HOSTING
self.send_JoinGame(peer.player.login,
peer.player.id)
await self.send_JoinGame(peer.player.login, peer.player.id)

peer.send_ConnectToPeer(player_name=self.player.login,
player_uid=self.player.id,
offer=True)
await peer.send_ConnectToPeer(
player_name=self.player.login,
player_uid=self.player.id,
offer=True
)

async def connect_to_peer(self, peer: "GameConnection"):
"""
Connect two peers
:return: None
"""
self.send_ConnectToPeer(player_name=peer.player.login,
player_uid=peer.player.id,
offer=True)
peer.send_ConnectToPeer(player_name=self.player.login,
player_uid=self.player.id,
offer=False)
await self.send_ConnectToPeer(
player_name=peer.player.login,
player_uid=peer.player.id,
offer=True
)
await peer.send_ConnectToPeer(
player_name=self.player.login,
player_uid=self.player.id,
offer=False
)

async def handle_action(self, command, args):
"""
Expand All @@ -167,7 +173,7 @@ async def handle_action(self, command, args):
except Exception as e: # pragma: no cover
self._logger.exception(e)
self._logger.exception("Something awful happened in a game thread!")
self.abort()
await self.abort()

async def handle_desync(self, *_args): # pragma: no cover
self.game.desyncs += 1
Expand Down Expand Up @@ -303,7 +309,7 @@ async def handle_teamkill_report(self, gametime, reporter_id, reporter_name, tea
:param teamkiller_id: teamkiller id
:param teamkiller_name: teamkiller nickname - Used as a failsafe in case ID is wrong
"""

async with self._db.acquire() as conn:
"""
Sometime the game sends a wrong ID - but a correct player name
Expand Down Expand Up @@ -403,7 +409,7 @@ async def handle_ice_message(self, receiver_id, ice_msg):
)
return

game_connection.send_message({
await game_connection.send_message({
"command": "IceMsg",
"args": [int(self.player.id), ice_msg]
})
Expand Down Expand Up @@ -499,7 +505,7 @@ def _mark_dirty(self):
if self.game:
self.game_service.mark_dirty(self.game)

def abort(self, log_message: str=''):
async def abort(self, log_message: str=''):
"""
Abort the connection
Expand All @@ -513,25 +519,27 @@ def abort(self, log_message: str=''):
self._logger.debug("%s.abort(%s)", self, log_message)

if self.game.state == GameState.LOBBY:
self.disconnect_all_peers()
await self.disconnect_all_peers()

self._state = GameConnectionState.ENDED
asyncio.ensure_future(self.game.remove_game_connection(self))
await self.game.remove_game_connection(self)
self._mark_dirty()
self.player.state = PlayerState.IDLE
del self.player.game
del self.player.game_connection
except Exception as ex: # pragma: no cover
self._logger.debug("Exception in abort(): %s", ex)

def disconnect_all_peers(self):
async def disconnect_all_peers(self):
tasks = []
for peer in self.game.connections:
if peer == self:
continue

try:
peer.send_DisconnectFromPeer(self.player.id)
except Exception: # pragma no cover
tasks.append(peer.send_DisconnectFromPeer(self.player.id))

for result in await asyncio.gather(*tasks, return_exceptions=True):
if isinstance(result, Exception):
self._logger.exception(
"peer_sendDisconnectFromPeer failed for player %i",
self.player.id)
Expand All @@ -542,7 +550,7 @@ async def on_connection_lost(self):
except Exception as e: # pragma: no cover
self._logger.exception(e)
finally:
self.abort()
await self.abort()

def __str__(self):
return "GameConnection({}, {})".format(self.player, self.game)
Expand Down
8 changes: 4 additions & 4 deletions server/games/game.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
import logging
import re
import time
from collections import Counter, defaultdict
from collections import defaultdict
from enum import Enum, unique
from typing import Any, Dict, Optional, Tuple, Union
from typing import Any, Dict, Optional, Tuple

import trueskill
from server.games.game_results import GameOutcome, GameResult, GameResults
from server.rating import RatingType
from trueskill import Rating

from ..abc.base_game import GameConnectionState, InitMode
from ..players import Player, PlayerState
from server.rating import RatingType
from server.games.game_results import GameOutcome, GameResult, GameResults

FFA_TEAM = 1

Expand Down
3 changes: 2 additions & 1 deletion server/geoip_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def __init__(self):
self.db = None

# crontab: min hour day month day_of_week
# Run every Wednesday because GeoLite2 is updated every Tuesday
# Run every Wednesday because GeoLite2 is updated every first Tuesday
# of the month.
self._update_cron = aiocron.crontab(
'0 0 0 * * 3', func=self.check_update_geoip_db
)
Expand Down
Loading

0 comments on commit 79721a9

Please sign in to comment.