diff --git a/api/chat.py b/api/chat.py index c8abef59..e1b42cfb 100644 --- a/api/chat.py +++ b/api/chat.py @@ -129,21 +129,21 @@ def __init__(self, *args, **kwargs): def open(self): logger.info('client=%s connected', self.request.remote_ip) - self._heartbeat_timer_handle = asyncio.get_event_loop().call_later( + self._heartbeat_timer_handle = asyncio.get_running_loop().call_later( self.HEARTBEAT_INTERVAL, self._on_send_heartbeat ) self._refresh_receive_timeout_timer() def _on_send_heartbeat(self): self.send_cmd_data(Command.HEARTBEAT, {}) - self._heartbeat_timer_handle = asyncio.get_event_loop().call_later( + self._heartbeat_timer_handle = asyncio.get_running_loop().call_later( self.HEARTBEAT_INTERVAL, self._on_send_heartbeat ) def _refresh_receive_timeout_timer(self): if self._receive_timeout_timer_handle is not None: self._receive_timeout_timer_handle.cancel() - self._receive_timeout_timer_handle = asyncio.get_event_loop().call_later( + self._receive_timeout_timer_handle = asyncio.get_running_loop().call_later( self.RECEIVE_TIMEOUT, self._on_receive_timeout ) @@ -189,7 +189,7 @@ def on_message(self, message): pass services.chat.client_room_manager.add_client(self.room_id, self) - asyncio.ensure_future(self._on_joined_room()) + asyncio.create_task(self._on_joined_room()) else: logger.warning('client=%s unknown cmd=%d, body=%s', self.request.remote_ip, cmd, body) diff --git a/api/main.py b/api/main.py index f0b00f99..577ba6fb 100644 --- a/api/main.py +++ b/api/main.py @@ -56,7 +56,7 @@ async def post(self): if not file.content_type.lower().startswith('image/'): raise tornado.web.HTTPError(415) - url = await asyncio.get_event_loop().run_in_executor( + url = await asyncio.get_running_loop().run_in_executor( None, self._save_file, file.body, self.request.remote_ip ) self.write({ diff --git a/main.py b/main.py index cb5ec958..e27e4b14 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- import argparse -import asyncio import logging import logging.handlers import os @@ -40,7 +39,7 @@ def main(): init_logging(args.debug) config.init() - asyncio.get_event_loop().run_until_complete(utils.request.init()) + utils.request.init() models.database.init(args.debug) services.avatar.init() diff --git a/services/avatar.py b/services/avatar.py index c5b8a879..64eeaf59 100644 --- a/services/avatar.py +++ b/services/avatar.py @@ -20,7 +20,6 @@ DEFAULT_AVATAR_URL = '//static.hdslb.com/images/member/noface.gif' -_main_event_loop = asyncio.get_event_loop() # user_id -> avatar_url _avatar_url_cache: Dict[int, str] = {} # 正在获取头像的Future,user_id -> Future @@ -43,7 +42,7 @@ def init(): cfg = config.get_config() global _uid_queue_to_fetch _uid_queue_to_fetch = asyncio.Queue(cfg.fetch_avatar_max_queue_size) - asyncio.ensure_future(_get_avatar_url_from_web_consumer()) + asyncio.get_event_loop().create_task(_get_avatar_url_from_web_consumer()) async def get_avatar_url(user_id): @@ -71,12 +70,11 @@ def get_avatar_url_from_memory(user_id): def get_avatar_url_from_database(user_id) -> Awaitable[Optional[str]]: - return asyncio.get_event_loop().run_in_executor( - None, _do_get_avatar_url_from_database, user_id - ) + loop = asyncio.get_running_loop() + return loop.run_in_executor(None, _do_get_avatar_url_from_database, user_id, loop) -def _do_get_avatar_url_from_database(user_id): +def _do_get_avatar_url_from_database(user_id, loop: asyncio.AbstractEventLoop): try: with models.database.get_session() as session: user = session.scalars( @@ -94,7 +92,7 @@ def refresh_cache(): _avatar_url_cache.pop(user_id, None) get_avatar_url_from_web(user_id) - _main_event_loop.call_soon(refresh_cache) + loop.call_soon_threadsafe(refresh_cache) else: # 否则只更新内存缓存 _update_avatar_cache_in_memory(user_id, avatar_url) @@ -113,7 +111,7 @@ def get_avatar_url_from_web(user_id) -> Awaitable[Optional[str]]: if future is not None: return future # 否则创建一个获取任务 - _uid_fetch_future_map[user_id] = future = _main_event_loop.create_future() + _uid_fetch_future_map[user_id] = future = asyncio.get_running_loop().create_future() future.add_done_callback(lambda _future: _uid_fetch_future_map.pop(user_id, None)) try: _uid_queue_to_fetch.put_nowait(user_id) @@ -141,7 +139,7 @@ async def _get_avatar_url_from_web_consumer(): else: _last_fetch_banned_time = None - asyncio.ensure_future(_get_avatar_url_from_web_coroutine(user_id, future)) + asyncio.create_task(_get_avatar_url_from_web_coroutine(user_id, future)) # 限制频率,防止被B站ban cfg = config.get_config() @@ -271,7 +269,7 @@ def process_avatar_url(avatar_url): def update_avatar_cache(user_id, avatar_url): _update_avatar_cache_in_memory(user_id, avatar_url) - asyncio.get_event_loop().run_in_executor( + asyncio.get_running_loop().run_in_executor( None, _update_avatar_cache_in_database, user_id, avatar_url ) diff --git a/services/chat.py b/services/chat.py index 487cc33d..4915c8c7 100644 --- a/services/chat.py +++ b/services/chat.py @@ -39,7 +39,7 @@ def add_live_client(self, room_id): logger.info('room=%d creating live client', room_id) self._live_clients[room_id] = live_client = LiveClient(room_id) live_client.add_handler(_live_msg_handler) - asyncio.ensure_future(self._init_live_client(live_client)) + asyncio.create_task(self._init_live_client(live_client)) logger.info('room=%d live client created, %d live clients', room_id, len(self._live_clients)) async def _init_live_client(self, live_client: 'LiveClient'): @@ -56,7 +56,7 @@ def del_live_client(self, room_id): return logger.info('room=%d removing live client', room_id) live_client.remove_handler(_live_msg_handler) - asyncio.ensure_future(live_client.stop_and_close()) + asyncio.create_task(live_client.stop_and_close()) logger.info('room=%d live client removed, %d live clients', room_id, len(self._live_clients)) client_room_manager.del_room(room_id) @@ -130,7 +130,7 @@ def del_room(self, room_id): def delay_del_room(self, room_id, timeout): self._clear_delay_del_timer(room_id) - self._delay_del_timer_handles[room_id] = asyncio.get_event_loop().call_later( + self._delay_del_timer_handles[room_id] = asyncio.get_running_loop().call_later( timeout, self._on_delay_del_room, room_id ) @@ -279,7 +279,7 @@ def __super_chat_message_callback(self, client: LiveClient, command: dict): } async def _on_danmaku(self, client: LiveClient, message: blivedm.DanmakuMessage): - asyncio.ensure_future(self.__on_danmaku(client, message)) + asyncio.create_task(self.__on_danmaku(client, message)) async def __on_danmaku(self, client: LiveClient, message: blivedm.DanmakuMessage): # 先异步调用再获取房间,因为返回时房间可能已经不存在了 @@ -364,7 +364,7 @@ async def _on_gift(self, client: LiveClient, message: blivedm.GiftMessage): }) async def _on_buy_guard(self, client: LiveClient, message: blivedm.GuardBuyMessage): - asyncio.ensure_future(self.__on_buy_guard(client, message)) + asyncio.create_task(self.__on_buy_guard(client, message)) @staticmethod async def __on_buy_guard(client: LiveClient, message: blivedm.GuardBuyMessage): @@ -415,7 +415,7 @@ async def _on_super_chat(self, client: LiveClient, message: blivedm.SuperChatMes }) if need_translate: - asyncio.ensure_future(self._translate_and_response(message.message, room.room_id, msg_id)) + asyncio.create_task(self._translate_and_response(message.message, room.room_id, msg_id)) async def _on_super_chat_delete(self, client: LiveClient, message: blivedm.SuperChatDeleteMessage): room = client_room_manager.get_room(client.tmp_room_id) diff --git a/services/translate.py b/services/translate.py index bc7fd9b7..53666784 100644 --- a/services/translate.py +++ b/services/translate.py @@ -33,7 +33,7 @@ def init(): - asyncio.ensure_future(_do_init()) + asyncio.get_event_loop().create_task(_do_init()) async def _do_init(): @@ -100,7 +100,7 @@ def translate(text) -> Awaitable[Optional[str]]: if future is not None: return future # 否则创建一个翻译任务 - future = asyncio.get_event_loop().create_future() + future = asyncio.get_running_loop().create_future() # 查缓存 res = _translate_cache.get(key, None) @@ -168,7 +168,7 @@ def __init__(self, query_interval, max_queue_size): self._text_queue = asyncio.Queue(max_queue_size) async def init(self): - asyncio.ensure_future(self._translate_consumer()) + asyncio.create_task(self._translate_consumer()) return True @property @@ -189,7 +189,7 @@ async def _translate_consumer(self): while True: try: text, future = await self._text_queue.get() - asyncio.ensure_future(self._translate_coroutine(text, future)) + asyncio.create_task(self._translate_coroutine(text, future)) # 频率限制 await asyncio.sleep(self._query_interval) except Exception: # noqa @@ -226,7 +226,7 @@ async def init(self): return False if not await self._do_init(): return False - self._reinit_future = asyncio.ensure_future(self._reinit_coroutine()) + self._reinit_future = asyncio.create_task(self._reinit_coroutine()) return True async def _do_init(self): @@ -303,7 +303,7 @@ async def _reinit_coroutine(self): while True: await asyncio.sleep(30) logger.debug('TencentTranslateFree reinit') - asyncio.ensure_future(self._do_init()) + asyncio.create_task(self._do_init()) except asyncio.CancelledError: pass @@ -515,7 +515,7 @@ def _on_fail(self, code): # 需要手动处理,等5分钟 sleep_time = 5 * 60 if sleep_time != 0: - self._cool_down_timer_handle = asyncio.get_event_loop().call_later( + self._cool_down_timer_handle = asyncio.get_running_loop().call_later( sleep_time, self._on_cool_down_timeout ) @@ -577,7 +577,7 @@ def _on_fail(self, code): # 账户余额不足,需要手动处理,等5分钟 sleep_time = 5 * 60 if sleep_time != 0: - self._cool_down_timer_handle = asyncio.get_event_loop().call_later( + self._cool_down_timer_handle = asyncio.get_running_loop().call_later( sleep_time, self._on_cool_down_timeout ) diff --git a/update.py b/update.py index 0f9c60c1..7f69cc2d 100644 --- a/update.py +++ b/update.py @@ -9,7 +9,7 @@ def check_update(): - asyncio.ensure_future(_do_check_update()) + asyncio.get_event_loop().create_task(_do_check_update()) async def _do_check_update(): diff --git a/utils/request.py b/utils/request.py index 186abee6..46d76ddd 100644 --- a/utils/request.py +++ b/utils/request.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import asyncio from typing import * import aiohttp @@ -14,7 +15,10 @@ http_session: Optional[aiohttp.ClientSession] = None -# ClientSession要在异步函数中创建 -async def init(): - global http_session - http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) +def init(): + # ClientSession要在异步函数中创建 + async def do_init(): + global http_session + http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) + + asyncio.get_event_loop().run_until_complete(do_init())