Skip to content

Commit

Permalink
优雅地停机
Browse files Browse the repository at this point in the history
  • Loading branch information
xfgryujk committed Sep 6, 2023
1 parent fe141bc commit 72841ab
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 49 deletions.
2 changes: 1 addition & 1 deletion blivedm
77 changes: 68 additions & 9 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# -*- coding: utf-8 -*-
import argparse
import asyncio
import logging
import logging.handlers
import os
import signal
import webbrowser
from typing import *

import tornado.ioloop
import tornado.web
Expand All @@ -20,7 +23,7 @@

logger = logging.getLogger(__name__)

routes = [
ROUTES = [
(r'/api/server_info', api.main.ServerInfoHandler),
(r'/api/emoticon', api.main.UploadEmoticonHandler),

Expand All @@ -29,14 +32,31 @@
(r'/api/avatar_url', api.chat.AvatarHandler),

(rf'{api.main.EMOTICON_BASE_URL}/(.*)', tornado.web.StaticFileHandler, {'path': api.main.EMOTICON_UPLOAD_PATH}),
(r'/(.*)', api.main.MainHandler, {'path': config.WEB_ROOT})
(r'/(.*)', api.main.MainHandler, {'path': config.WEB_ROOT}),
]

server: Optional[tornado.httpserver.HTTPServer] = None

shut_down_event: Optional[asyncio.Event] = None


async def main():
if not init():
return 1
try:
await run()
finally:
await shut_down()
return 0


def init():
init_signal_handlers()

def main():
args = parse_args()

init_logging(args.debug)
logger.info('App started, initializing')
config.init()

utils.request.init()
Expand All @@ -48,7 +68,27 @@ def main():

update.check_update()

run_server(args.host, args.port, args.debug)
init_server(args.host, args.port, args.debug)
return server is not None


def init_signal_handlers():
global shut_down_event
shut_down_event = asyncio.Event()

signums = (signal.SIGINT, signal.SIGTERM)
try:
loop = asyncio.get_running_loop()
for signum in signums:
loop.add_signal_handler(signum, on_shut_down_signal)
except NotImplementedError:
# 不太安全,但Windows只能用这个
for signum in signums:
signal.signal(signum, on_shut_down_signal)


def on_shut_down_signal(*_args):
shut_down_event.set()


def parse_args():
Expand Down Expand Up @@ -76,21 +116,22 @@ def init_logging(debug):
logging.getLogger('tornado.access').setLevel(logging.WARNING)


def run_server(host, port, debug):
def init_server(host, port, debug):
cfg = config.get_config()
if host is None:
host = cfg.host
if port is None:
port = cfg.port

app = tornado.web.Application(
routes,
ROUTES,
websocket_ping_interval=10,
debug=debug,
autoreload=False
)
try:
app.listen(
global server
server = app.listen(
port,
host,
xheaders=cfg.tornado_xheaders,
Expand All @@ -105,8 +146,26 @@ def run_server(host, port, debug):
url = 'http://localhost/' if port == 80 else f'http://localhost:{port}/'
webbrowser.open(url)
logger.info('Server started: %s:%d', host, port)
tornado.ioloop.IOLoop.current().start()


async def run():
logger.info('Running event loop')
await shut_down_event.wait()
logger.info('Received shutdown signal')


async def shut_down():
logger.info('Closing server')
server.stop()
await server.close_all_connections()

logger.info('Closing websocket connections')
await services.chat.shut_down()

await utils.request.shut_down()

logger.info('App shut down')


if __name__ == '__main__':
main()
exit(asyncio.run(main()))
2 changes: 1 addition & 1 deletion services/avatar.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def init():
global _avatar_url_cache, _task_queue
_avatar_url_cache = cachetools.TTLCache(cfg.avatar_cache_size, 10 * 60)
_task_queue = asyncio.Queue(cfg.fetch_avatar_max_queue_size)
asyncio.get_event_loop().create_task(_do_init())
asyncio.get_running_loop().create_task(_do_init())


async def _do_init():
Expand Down
62 changes: 45 additions & 17 deletions services/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,33 @@ def init():
_live_msg_handler = LiveMsgHandler()


async def shut_down():
if client_room_manager is not None:
client_room_manager.shut_down()
if _live_client_manager is not None:
await _live_client_manager.shut_down()


class LiveClientManager:
"""管理到B站的连接"""
def __init__(self):
self._live_clients: Dict[int, LiveClient] = {}
self._live_clients: Dict[int, WebLiveClient] = {}
self._close_client_futures: Set[asyncio.Future] = set()

async def shut_down(self):
while len(self._live_clients) != 0:
room_id = next(iter(self._live_clients))
self.del_live_client(room_id)

await asyncio.gather(*self._close_client_futures, return_exceptions=True)

def add_live_client(self, room_id):
if room_id in self._live_clients:
return
logger.info('room=%d creating live client', room_id)
self._live_clients[room_id] = live_client = LiveClient(room_id)
self._live_clients[room_id] = live_client = WebLiveClient(room_id)
live_client.set_handler(_live_msg_handler)
# 直接启动吧,这里不用管init_room失败的情况,万一失败了会在on_stopped_by_exception里删除掉这个客户端
# 直接启动吧,这里不用管init_room失败的情况,万一失败了会在on_client_stopped里删除掉这个客户端
live_client.start()
logger.info('room=%d live client created, %d live clients', room_id, len(self._live_clients))

Expand All @@ -54,13 +69,17 @@ def del_live_client(self, room_id):
return
logger.info('room=%d removing live client', room_id)
live_client.set_handler(None)
asyncio.create_task(live_client.stop_and_close())

future = asyncio.create_task(live_client.stop_and_close())
self._close_client_futures.add(future)
future.add_done_callback(lambda _future: self._close_client_futures.discard(future))

logger.info('room=%d live client removed, %d live clients', room_id, len(self._live_clients))

client_room_manager.del_room(room_id)


class LiveClient(blivedm.BLiveClient):
class WebLiveClient(blivedm.BLiveClient):
HEARTBEAT_INTERVAL = 10

def __init__(self, room_id):
Expand All @@ -81,6 +100,15 @@ def __init__(self):
# room_id -> timer_handle
self._delay_del_timer_handles: Dict[int, asyncio.TimerHandle] = {}

def shut_down(self):
while len(self._rooms) != 0:
room_id = next(iter(self._rooms))
self.del_room(room_id)

for timer_handle in self._delay_del_timer_handles.values():
timer_handle.cancel()
self._delay_del_timer_handles.clear()

def add_client(self, room_id, client: 'api.chat.ChatHandler'):
room = self._get_or_add_room(room_id)
room.add_client(client)
Expand Down Expand Up @@ -194,7 +222,7 @@ def send_cmd_data_if(self, filterer: Callable[['api.chat.ChatHandler'], bool], c

class LiveMsgHandler(blivedm.BaseHandler):
# 重新定义XXX_callback是为了减少对字段名的依赖,防止B站改字段名
def __danmu_msg_callback(self, client: LiveClient, command: dict):
def __danmu_msg_callback(self, client: WebLiveClient, command: dict):
info = command['info']
dm_v2 = command.get('dm_v2', '')

Expand Down Expand Up @@ -241,7 +269,7 @@ def __danmu_msg_callback(self, client: LiveClient, command: dict):
)
return self._on_danmaku(client, message)

def __send_gift_callback(self, client: LiveClient, command: dict):
def __send_gift_callback(self, client: WebLiveClient, command: dict):
data = command['data']
message = dm_web_models.GiftMessage(
gift_name=data['giftName'],
Expand All @@ -255,7 +283,7 @@ def __send_gift_callback(self, client: LiveClient, command: dict):
)
return self._on_gift(client, message)

def __guard_buy_callback(self, client: LiveClient, command: dict):
def __guard_buy_callback(self, client: WebLiveClient, command: dict):
data = command['data']
message = dm_web_models.GuardBuyMessage(
uid=data['uid'],
Expand All @@ -265,7 +293,7 @@ def __guard_buy_callback(self, client: LiveClient, command: dict):
)
return self._on_buy_guard(client, message)

def __super_chat_message_callback(self, client: LiveClient, command: dict):
def __super_chat_message_callback(self, client: WebLiveClient, command: dict):
data = command['data']
message = dm_web_models.SuperChatMessage(
price=data['price'],
Expand All @@ -286,13 +314,13 @@ def __super_chat_message_callback(self, client: LiveClient, command: dict):
'SUPER_CHAT_MESSAGE': __super_chat_message_callback
}

def on_stopped_by_exception(self, client: LiveClient, exception: Exception):
def on_client_stopped(self, client: WebLiveClient, exception: Optional[Exception]):
_live_client_manager.del_live_client(client.tmp_room_id)

def _on_danmaku(self, client: LiveClient, message: dm_web_models.DanmakuMessage):
def _on_danmaku(self, client: WebLiveClient, message: dm_web_models.DanmakuMessage):
asyncio.create_task(self.__on_danmaku(client, message))

async def __on_danmaku(self, client: LiveClient, message: dm_web_models.DanmakuMessage):
async def __on_danmaku(self, client: WebLiveClient, message: dm_web_models.DanmakuMessage):
avatar_url = message.face
if avatar_url != '':
services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url)
Expand Down Expand Up @@ -375,7 +403,7 @@ def _parse_text_emoticons(message: dm_web_models.DanmakuMessage):
except (json.JSONDecodeError, TypeError, KeyError):
return []

def _on_gift(self, client: LiveClient, message: dm_web_models.GiftMessage):
def _on_gift(self, client: WebLiveClient, message: dm_web_models.GiftMessage):
avatar_url = services.avatar.process_avatar_url(message.face)
services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url)

Expand All @@ -397,11 +425,11 @@ def _on_gift(self, client: LiveClient, message: dm_web_models.GiftMessage):
'num': message.num
})

def _on_buy_guard(self, client: LiveClient, message: dm_web_models.GuardBuyMessage):
def _on_buy_guard(self, client: WebLiveClient, message: dm_web_models.GuardBuyMessage):
asyncio.create_task(self.__on_buy_guard(client, message))

@staticmethod
async def __on_buy_guard(client: LiveClient, message: dm_web_models.GuardBuyMessage):
async def __on_buy_guard(client: WebLiveClient, message: dm_web_models.GuardBuyMessage):
# 先异步调用再获取房间,因为返回时房间可能已经不存在了
avatar_url = await services.avatar.get_avatar_url(message.uid)

Expand All @@ -417,7 +445,7 @@ async def __on_buy_guard(client: LiveClient, message: dm_web_models.GuardBuyMess
'privilegeType': message.guard_level
})

def _on_super_chat(self, client: LiveClient, message: dm_web_models.SuperChatMessage):
def _on_super_chat(self, client: WebLiveClient, message: dm_web_models.SuperChatMessage):
avatar_url = services.avatar.process_avatar_url(message.face)
services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url)

Expand Down Expand Up @@ -452,7 +480,7 @@ def _on_super_chat(self, client: LiveClient, message: dm_web_models.SuperChatMes
message.message, room.room_id, msg_id, services.translate.Priority.HIGH
))

def _on_super_chat_delete(self, client: LiveClient, message: dm_web_models.SuperChatDeleteMessage):
def _on_super_chat_delete(self, client: WebLiveClient, message: dm_web_models.SuperChatDeleteMessage):
room = client_room_manager.get_room(client.tmp_room_id)
if room is None:
return
Expand Down
25 changes: 10 additions & 15 deletions services/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def init():
_translate_cache = cachetools.LRUCache(cfg.translation_cache_size)
# 总队列长度会超过translate_max_queue_size,不用这么严格
_task_queues = [asyncio.Queue(cfg.translate_max_queue_size) for _ in range(len(Priority))]
asyncio.get_event_loop().create_task(_do_init())
asyncio.get_running_loop().create_task(_do_init())


async def _do_init():
Expand Down Expand Up @@ -386,21 +386,16 @@ async def _do_init(self):
return True

async def _reinit_coroutine(self):
try:
while True:
logger.debug('TencentTranslateFree reinit')
start_time = datetime.datetime.now()
try:
await self._do_init()
except asyncio.CancelledError:
raise
except BaseException: # noqa
pass
cost_time = (datetime.datetime.now() - start_time).total_seconds()
while True:
logger.debug('TencentTranslateFree reinit')
start_time = datetime.datetime.now()
try:
await self._do_init()
except Exception: # noqa
pass
cost_time = (datetime.datetime.now() - start_time).total_seconds()

await asyncio.sleep(30 - cost_time)
except asyncio.CancelledError:
pass
await asyncio.sleep(30 - cost_time)

@property
def is_available(self):
Expand Down
2 changes: 1 addition & 1 deletion update.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


def check_update():
asyncio.get_event_loop().create_task(_do_check_update())
asyncio.get_running_loop().create_task(_do_check_update())


async def _do_check_update():
Expand Down
11 changes: 6 additions & 5 deletions utils/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@


def init():
# ClientSession要在异步函数中创建
async def do_init():
global http_session
http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
global http_session
http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))

asyncio.get_event_loop().run_until_complete(do_init())

async def shut_down():
if http_session is not None:
await http_session.close()

0 comments on commit 72841ab

Please sign in to comment.