Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Обработка превышения времени работы #3

Merged
merged 3 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 47 additions & 70 deletions aliceio/client/session/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

import abc
import secrets
import datetime
from enum import Enum
from http import HTTPStatus
from types import TracebackType
from typing import TYPE_CHECKING, Any, Dict, Final, Optional, Type, cast
Expand All @@ -10,6 +11,7 @@

from aliceio.exceptions import AliceAPIError, ClientDecodeError

from ...dispatcher.event.bases import REJECTED, UNHANDLED
from ...json import JSONModule, json
from ...methods import AliceMethod, AliceType, Response
from ...types import ErrorResult, InputFile
Expand Down Expand Up @@ -42,7 +44,7 @@ def __init__(
:param timeout: Тайм-аут запроса сессии.
"""
self.api = api
self.json_module = json_module
self.json = json_module
self.timeout = timeout

self.middleware = RequestMiddlewareManager()
Expand All @@ -56,7 +58,7 @@ def check_response(
) -> Response[AliceType]:
"""Проверка статуса ответа."""
try:
json_data = self.json_module.loads(content)
json_data = self.json.loads(content)
except Exception as e:
# Обрабатываемая ошибка не может быть поймана конкретным типом,
# поскольку декодер можно кастомизировать и вызвать любое исключение.
Expand Down Expand Up @@ -105,8 +107,7 @@ async def make_request(
"""
pass

# TODO: Сделать под Алису
def prepare_value(
def prepare_value( # noqa: C901
self,
value: Any,
skill: Skill,
Expand All @@ -118,77 +119,53 @@ def prepare_value(
return None
if isinstance(value, str):
return value
# if value is UNSET_PARSE_MODE:
# return self.prepare_value(
# skill.parse_mode,
# skill=skill,
# files=files,
# _dumps_json=_dumps_json,
# )
# if value is UNSET_DISABLE_WEB_PAGE_PREVIEW:
# return self.prepare_value(
# skill.disable_web_page_preview,
# skill=skill,
# files=files,
# _dumps_json=_dumps_json,
# )
# if value is UNSET_PROTECT_CONTENT:
# return self.prepare_value(
# skill.protect_content,
# skill=skill,
# files=files,
# _dumps_json=_dumps_json,
# )
if value in (UNHANDLED, REJECTED):
return None
if isinstance(value, InputFile):
key = "file"
files[key] = value
return f"attach://{key}"

if isinstance(value, InputFile):
key = secrets.token_urlsafe(10)
files[key] = value
return f"attach://{key}"
# if isinstance(value, dict):
# value = {
# key: prepared_item
# for key, item in value.items()
# if (
# prepared_item := self.prepare_value(
# item,
# skill=skill,
# files=files,
# _dumps_json=False,
# )
# )
# is not None
# }
# if _dumps_json:
# return self.json_dumps(value)
# return value
# if isinstance(value, list):
# value = [
# prepared_item
# for item in value
# if (
# prepared_item := self.prepare_value(
# item, skill=skill, files=files, _dumps_json=False
# )
# )
# is not None
# ]
# if _dumps_json:
# return self.json_dumps(value)
# return value
# if isinstance(value, datetime.timedelta):
# now = datetime.datetime.now()
# return str(round((now + value).timestamp()))
# if isinstance(value, datetime.datetime):
# return str(round(value.timestamp()))
# if isinstance(value, Enum):
# return self.prepare_value(value.value, skill=skill, files=files)
if isinstance(value, dict):
value = {
key: prepared_item
for key, item in value.items()
if (
prepared_item := self.prepare_value(
item,
skill=skill,
files=files,
_dumps_json=False,
)
)
is not None
}
if _dumps_json:
return self.json.dumps(value)
return value
if isinstance(value, list):
value = [
prepared_item
for item in value
if (
prepared_item := self.prepare_value(
item, skill=skill, files=files, _dumps_json=False
)
)
is not None
]
if _dumps_json:
return self.json.dumps(value)
return value
if isinstance(value, datetime.timedelta):
now = datetime.datetime.now()
return str(round((now + value).timestamp()))
if isinstance(value, datetime.datetime):
return str(round(value.timestamp()))
if isinstance(value, Enum):
return self.prepare_value(value.value, skill=skill, files=files)

if _dumps_json:
return self.json_module.dumps(value)
return self.json.dumps(value)
return value

async def __call__(
Expand Down
113 changes: 59 additions & 54 deletions aliceio/dispatcher/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@
from .. import loggers
from ..client.skill import Skill
from ..enums import EventType
from ..exceptions import AliceAPIError
from ..fsm.middleware import FSMContextMiddleware
from ..fsm.storage.base import BaseStorage
from ..fsm.storage.memory import MemoryStorage
from ..fsm.strategy import FSMStrategy
from ..methods import AliceMethod
from ..types import Update, UpdateTypeLookupError
from ..types import AliceResponse, Response, TimeoutEvent, Update, UpdateTypeLookupError
from ..types.base import AliceObject
from .event.alice import AliceEventObserver
from .event.bases import UNHANDLED, SkipHandler
Expand All @@ -34,6 +32,7 @@ def __init__(
fsm_strategy: FSMStrategy = FSMStrategy.USER,
disable_fsm: bool = False,
name: Optional[str] = None,
response_timeout: Union[int, float] = 4.0,
**kwargs: Any,
) -> None:
"""
Expand All @@ -42,8 +41,11 @@ def __init__(
:param storage: Хранилище для FSM.
:param fsm_strategy: Стратегия FSM.
:param disable_fsm: Отключить ли FSM.
:param name: Имя как роутера, полезно при дебаге.
:param response_timeout: Время для обработки события,
после которого будет вызван TimeoutEvent.
:param kwargs: Остальные аргументы,
будут переданы в обработчики как именованные аргументы
будут переданы в обработчики как именованные аргументы
"""
super(Dispatcher, self).__init__(name=name)

Expand All @@ -53,14 +55,20 @@ def __init__(
)
self.update.register(self._listen_update)

# На timeout-observer тоже регистрируются все те же мидлвари, что и на update,
# потому что при возникновении TimeoutEvent'а контекстные данные из мидлварей
# оригинального Update не получить. Засчитаю за костыль

# Обработчики ошибок должны работать вне всех других функций
# и должны быть зарегистрированы раньше всех остальных мидлварей.
self.update.outer_middleware(ErrorsMiddleware(self))
self.timeout.outer_middleware(ErrorsMiddleware(self))

# UserContextMiddleware выполняет небольшую оптимизацию
# для всех других встроенных мидлварей путем кэширования
# экземпляров пользователя и сессиив контексте событий.
self.update.outer_middleware(UserContextMiddleware())
self.timeout.outer_middleware(UserContextMiddleware())

# FSMContextMiddleware всегда следует регистрировать после UserContextMiddleware
# поскольку здесь используется контекст из предыдущего шага.
Expand All @@ -70,8 +78,10 @@ def __init__(
)
if not disable_fsm:
self.update.outer_middleware(self.fsm)
self.timeout.outer_middleware(self.fsm)
self.shutdown.register(self.fsm.close)

self.response_timeout = response_timeout
self.workflow_data: Dict[str, Any] = kwargs
self._running_lock = Lock()
self._stop_signal: Optional[Event] = None
Expand Down Expand Up @@ -128,7 +138,8 @@ async def feed_update(self, skill: Skill, update: Update, **kwargs: Any) -> Any:
# Предпочтительным способом является передача события с уже привязанным
# экземпляром навыка перед вызовом метода feed_update
update = Update.model_validate(
update.model_dump(), context={"skill": skill}
update.model_dump(),
context={"skill": skill},
)

try:
Expand Down Expand Up @@ -171,24 +182,6 @@ async def feed_raw_update(
parsed_update = Update.model_validate(update, context={"skill": skill})
return await self.feed_update(skill=skill, update=parsed_update, **kwargs)

@classmethod
async def silent_call_request(
cls,
skill: Skill,
result: AliceMethod[Any],
) -> None:
"""Имитация ответа в вебхук."""
try:
await skill(result)
except AliceAPIError as e:
# Поскольку механизм WebHook не позволяет получить ответ на запросы,
# вызванные в ответ на запрос WebHook,
# необходимо пропускать неудачные ответы.
# Для отладки сюда добавлено логирование.
loggers.event.error(
"Failed to make answer: %s: %s", e.__class__.__name__, e
)

async def _listen_update(self, update: Update, **kwargs: Any) -> Any:
"""
Основной отслеживатель событий.
Expand Down Expand Up @@ -225,10 +218,7 @@ async def _feed_webhook_update(
update: Update,
**kwargs: Any,
) -> Any:
"""
Тот же самый `Dispatcher.process_update()`,
но возвращает реальный ответ вместо bool.
"""
"""Возвращает реальный ответ вместо bool."""
try:
return await self.feed_update(skill, update, **kwargs)
except Exception as e:
Expand All @@ -242,13 +232,10 @@ async def _feed_webhook_update(
)
raise

# TODO: Сделать возврат из вебхука без ретурна в обработчиках?
# TODO: Сделать кастомный ответ, если ответа нет через _timeout секунд
async def feed_webhook_update(
self,
skill: Skill,
update: Union[Update, Dict[str, Any]],
_timeout: float = 4.5,
**kwargs: Any,
) -> Optional[AliceObject]:
if not isinstance(update, Update): # Allow to use raw updates
Expand All @@ -262,28 +249,13 @@ def release_waiter(*_: Any) -> None:
if not waiter.done():
waiter.set_result(None)

timeout_handle = loop.call_later(_timeout, release_waiter)
timeout_handle = loop.call_later(self.response_timeout, release_waiter)

process_updates: Future[Any] = asyncio.ensure_future(
self._feed_webhook_update(skill=skill, update=update, **kwargs)
)
process_updates.add_done_callback(release_waiter, context=ctx)

def process_response(task: Future[Any]) -> None:
warnings.warn(
"Detected slow response into webhook.\n"
"Alice is waiting for response only 4.5 seconds and cancel update.",
RuntimeWarning,
)
try:
result = task.result()
except Exception as e:
raise e
if isinstance(result, AliceMethod):
asyncio.ensure_future(
self.silent_call_request(skill=skill, result=result)
)

try:
try:
await waiter
Expand All @@ -294,16 +266,49 @@ def process_response(task: Future[Any]) -> None:

if process_updates.done():
# TODO: handle exceptions
# TODO: Определить типы, сделать преобразование в AliceResponse
response: Any = process_updates.result()
if isinstance(response, AliceObject):
return response
return await self._convert_response(process_updates.result())

else:
process_updates.remove_done_callback(release_waiter)
process_updates.add_done_callback(process_response, context=ctx)
process_updates.remove_done_callback(release_waiter)
response: Any = await self._process_timeouted_update(
skill,
update,
**kwargs,
)
return await self._convert_response(response)

finally:
timeout_handle.cancel()

return None
async def _process_timeouted_update(
self,
skill: Skill,
update: Update,
**kwargs: Any,
) -> Any:
warnings.warn(
"Detected slow response into webhook.\n"
"Alice only waits less than 4.5 seconds for a response and cancel "
"skill conversation, so be careful and register extra fast handlers "
"in `@<router>.timeout` to respond to timeouted updates.",
RuntimeWarning,
)
return await self.propagate_event(
event_type=EventType.TIMEOUT,
event=TimeoutEvent(
update=update,
session=update.session,
context={"skill": skill},
),
skill=skill,
**self.workflow_data,
**kwargs,
)

# TODO: Сделать преобразование разных типов в AliceResponse
@staticmethod
async def _convert_response(value: Any) -> Optional[Union[AliceObject, Any]]:
if isinstance(value, AliceResponse):
return value
if isinstance(value, Response):
return AliceResponse(response=value)
return value
Loading