From 3f9ce03823fa9aae4dfd03b864b6690ab9f7ee2a Mon Sep 17 00:00:00 2001 From: sysang Date: Wed, 31 Aug 2022 20:51:56 +0700 Subject: [PATCH] Fix timeout issue that terminate sending messages to chatwoot --- .cw.env.template | 6 +- botfrontend/botfrontend/settings.py | 9 + .../chatroom/templates/chatroom/index.html | 9 +- botfrontend/chatroom/views.py | 7 +- botserver-action/booking_service.py | 21 ++- botserver-action/codelab/asyncio_wait.py | 35 ++++ botserver-app/addons/channels/chatwoot.py | 177 +++++++++++------- .../addons/channels/cwwebsite_output.py | 7 +- .../chatwoot_web_client_loop_test.py | 20 ++ deployment_note.md | 1 - docker-compose.yml | 6 +- .../local.chatwoot.nginx.template | 35 ++++ 12 files changed, 246 insertions(+), 87 deletions(-) create mode 100644 botserver-action/codelab/asyncio_wait.py create mode 100644 botserver-app/chatwoot_web_client_loop_test.py create mode 100755 httpserver-nginx/nginx-config-files/local.chatwoot.nginx.template diff --git a/.cw.env.template b/.cw.env.template index 0049535..91b9f0d 100644 --- a/.cw.env.template +++ b/.cw.env.template @@ -2,7 +2,7 @@ SECRET_KEY_BASE='4ZzW8MpYUkw2hWf4ZzW8MpYUkw2hWf4ZzW8MpYUkw2hWf' # Replace with the URL you are planning to use for your app -FRONTEND_URL=http://0.0.0.0:3000 +FRONTEND_URL='https://cs.dsysang.site' # If the variable is set, all non-authenticated pages would fallback to the default locale. # Whenever a new account is created, the default language will be DEFAULT_LOCALE instead of en @@ -41,10 +41,10 @@ REDIS_SENTINEL_MASTER_NAME= # You can leave POSTGRES_DATABASE blank. The default name of # the database in the production environment is chatwoot_production # POSTGRES_DATABASE= -POSTGRES_HOST=postgres +POSTGRES_HOST=cwdb POSTGRES_USERNAME=postgres POSTGRES_PASSWORD='qwer1234' -RAILS_ENV=development +RAILS_ENV=production RAILS_MAX_THREADS=5 # The email from which all outgoing emails are sent diff --git a/botfrontend/botfrontend/settings.py b/botfrontend/botfrontend/settings.py index e983e8e..c2af7ea 100644 --- a/botfrontend/botfrontend/settings.py +++ b/botfrontend/botfrontend/settings.py @@ -169,6 +169,15 @@ }, } +CHAT_SERVER = { + 'socketUrl': 'ws://cs.rasachatbot.sysang/cable', + 'protocol': 'sockcw', + 'protocolOptions': { + 'inboxIdentifier': "t1hbqYVrTWQHwge9tvtFnByz", + 'chatwootAPIUrl': "https://cs.dsysang.site/public/api/v1/", + } +} + try: from .local import * except ImportError: diff --git a/botfrontend/chatroom/templates/chatroom/index.html b/botfrontend/chatroom/templates/chatroom/index.html index fe3eba6..162e5d9 100644 --- a/botfrontend/chatroom/templates/chatroom/index.html +++ b/botfrontend/chatroom/templates/chatroom/index.html @@ -21,13 +21,14 @@ (e.async = !0), (e.onload = () => { window.WebChat.default({ + //socketUrl:"http://rasachatbot.sysang", customData:{language:"en"}, - socketUrl:"wss://cs.dsysang.site/cable", - protocol: 'sockcw', + socketUrl:"{{ socketUrl }}", + protocol: "{{ protocol }}", withRules: true, protocolOptions:{ - inboxIdentifier: "t1hbqYVrTWQHwge9tvtFnByz", - chatwootAPIUrl: "https://cs.dsysang.site/public/api/v1/", + inboxIdentifier: "{{ propsInboxIdentifier }}", + chatwootAPIUrl: "{{ propsChatwootAPIUrl }}", }, }, null); }), diff --git a/botfrontend/chatroom/views.py b/botfrontend/chatroom/views.py index 83e7f22..730bd41 100644 --- a/botfrontend/chatroom/views.py +++ b/botfrontend/chatroom/views.py @@ -8,7 +8,12 @@ def index(request): - response = TemplateResponse(request, 'chatroom/index.html', {'socketUrl': settings.BASE_DOMAIN_URL}) + response = TemplateResponse(request, 'chatroom/index.html', { + 'socketUrl': settings.CHAT_SERVER['socketUrl'], + 'protocol': settings.CHAT_SERVER['protocol'], + 'propsInboxIdentifier': settings.CHAT_SERVER['protocolOptions']['inboxIdentifier'], + 'propsChatwootAPIUrl': settings.CHAT_SERVER['protocolOptions']['chatwootAPIUrl'], + }) return response diff --git a/botserver-action/booking_service.py b/botserver-action/booking_service.py index cee4320..9f8545d 100644 --- a/botserver-action/booking_service.py +++ b/botserver-action/booking_service.py @@ -16,6 +16,7 @@ from cachecontrol.caches.redis_cache import RedisCache from cachecontrol.heuristics import ExpiresAfter from functools import reduce +from asyncio import CancelledError from .utils import parse_date_range from .utils import SortbyDictionary @@ -36,7 +37,8 @@ LOCALE = 'en-gb' UNITS = 'metric' PRICE_MARGIN = 0.07 -REQUESTS_CACHE_MINS = 60 +# REQUESTS_CACHE_MINS = 60 +REQUESTS_CACHE_MINS = 1 # comply to https://booking-com.p.rapidapi.com/v1/hotels/locations api's dest_type field DEST_TYPE_HOTEL = 'hotel' @@ -316,7 +318,7 @@ def extract_bed_type(room): def make_asyncio_schedule_to_get_room_list(hotel_id_list): total_num = len(hotel_id_list) - reqnum_limit = 3 + reqnum_limit = 4 total_page = math.ceil(total_num / reqnum_limit) schedule = [] for idx in range(total_page): @@ -337,9 +339,15 @@ async def get_room_list(hotel_id_list, checkin_date, checkout_date, currency=CUR futures = [] for hotel_id in tasks: futures.append(loop.run_in_executor( None, request_room_list_by_hotel, hotel_id, checkin_date, checkout_date, currency)) - for response in await asyncio.gather(*futures): - for item in response: - result.append(item) + try: + responses = await asyncio.gather(*futures) + for response in responses: + for item in response: + result.append(item) + + except CancelledError: + logger.error(f"[ERROR] Requesting rooms timed out for hotel ids: %s", tasks) + return result @@ -374,7 +382,7 @@ def request_room_list_by_hotel(hotel_id, checkin_date, checkout_date, currency=C logger.info('[INFO] request_room_list_by_hotel, API url: %s', url) response = make_request_to_bookingapi(url, headers=headers, params=querystring) - if not response.ok: + if not response: logger.info('[INFO] request_room_list_by_hotel, API url: %s, FAILED.', url) return [] @@ -431,6 +439,7 @@ def request_to_search_hotel(dest_id, dest_type, checkin_date, checkout_date, ord if response: return response.json() + return {} diff --git a/botserver-action/codelab/asyncio_wait.py b/botserver-action/codelab/asyncio_wait.py new file mode 100644 index 0000000..34f5781 --- /dev/null +++ b/botserver-action/codelab/asyncio_wait.py @@ -0,0 +1,35 @@ +import asyncio +import time + +fast_result = None +long_result = None + +async def fast(): + global fast_result + await asyncio.sleep(1) + fast_result = 'fast -> done' + return 1 + +async def long(): + global long_result + await asyncio.sleep(5) + long_result = 'long -> done' + return 5 + +async def main(): + corofast = asyncio.create_task(fast()) + corolong = asyncio.create_task(long()) + print('fast_result: ', fast_result) + print('long_result: ', long_result) + + done, pending = await asyncio.wait({corofast, corolong}, return_when=asyncio.FIRST_COMPLETED) + + time.sleep(6) + + if corofast in done: + print(corofast) + + print(fast_result) + print(long_result) + +asyncio.run(main()) diff --git a/botserver-app/addons/channels/chatwoot.py b/botserver-app/addons/channels/chatwoot.py index 62e8c4c..ba3bafd 100644 --- a/botserver-app/addons/channels/chatwoot.py +++ b/botserver-app/addons/channels/chatwoot.py @@ -1,6 +1,10 @@ -import asyncio import inspect import logging +import requests +import json +import asyncio +import aiohttp +import time from sanic import Sanic, Blueprint, response from sanic.request import Request @@ -22,6 +26,63 @@ logger = logging.getLogger(__name__) +def create_handler(message, chatwoot_url, cfg, on_new_message) -> Callable: + + def select_output_channel(channel, chatwoot_url, cfg, **kwargs): + out_channel = None + if channel == 'cwwebsite': + out_channel = CwwebsiteOutput( + chatwoot_url=chatwoot_url, + bot_token=cfg.get("bot_token"), + botagent_account_id=cfg.get("botagent_account_id"), + conversation_id=kwargs.get("conversation_id"), + ) + + return out_channel + + async def process_message() -> None: + logger.info('[DEBUG] (process_message) message: %s', message) + + sender = message.get("sender", {}) + sender_id = sender.get('id', None) + content = message.get("content", None) + conversation = message.get('conversation', {}) + conversation_id = conversation.get('id', None) + + input_channel = cfg.get("sub_channel") + collector = CollectingOutputChannel() + output_channel = select_output_channel( + channel=input_channel, + chatwoot_url=chatwoot_url, + cfg=cfg, + conversation_id=conversation_id, + ) + + if content and conversation_id and output_channel: + logger.info('[DEBUG] on_new_message, content: %s', content) + user_message = UserMessage( + text=content, + input_channel=input_channel, + output_channel=output_channel, + sender_id=sender_id, + ) + await on_new_message(user_message) + + logger.info('[DEBUG] on_new_message -> done') + + return process_message + + +def check_should_proceed_message(message): + message_type = message.get("message_type", None) + conversation = message.get('conversation', {}) + conversation_status = conversation.get('status', None) + sender = message.get("sender", {}) + sender_id = sender.get('id', None) + + return message_type == "incoming" and conversation_status == 'pending' and sender_id + + class ChatwootInput(InputChannel): def name(self) -> Text: @@ -42,28 +103,6 @@ def __init__(self, chatwoot_url, website: Dict[Text, Any]) -> None: self.chatwoot_url = chatwoot_url self.website = website - def _check_should_proceed_message(self, message): - message_type = message.get("message_type", None) - conversation = message.get('conversation', {}) - conversation_status = conversation.get('status', None) - sender = message.get("sender", {}) - sender_id = sender.get('id', None) - - return message_type == "incoming" and conversation_status == 'pending' and sender_id - - def select_output_channel(self, channel, cfg, **kwargs): - out_channel = None - if channel == 'cwwebsite': - out_channel = CwwebsiteOutput( - chatwoot_url=self.chatwoot_url, - bot_token=cfg.get("bot_token"), - botagent_account_id=cfg.get("botagent_account_id"), - conversation_id=kwargs.get("conversation_id"), - ) - - return out_channel - - def blueprint( self, on_new_message: Callable[[UserMessage], Awaitable[None]] ) -> Blueprint: @@ -73,59 +112,61 @@ def blueprint( inspect.getmodule(self).__name__, ) - async def handler(request: Request) -> HTTPResponse: - cfg = getattr(self, request.route.ctx.configuration_name) - message = request.json - # logger.info('[INFO] message: %s', message) - metadata = self.get_metadata(request) - - if self._check_should_proceed_message(message): - sender = message.get("sender", {}) - sender_id = sender.get('id', None) - content = message.get("content", None) - conversation = message.get('conversation', {}) - conversation_id = conversation.get('id', None) - - input_channel = cfg.get("sub_channel") - collector = CollectingOutputChannel() - output_channel = self.select_output_channel( - channel=input_channel, - cfg=cfg, - conversation_id=conversation_id, - ) - - if content and conversation_id and output_channel: - try: - await on_new_message( - UserMessage( - text=content, - input_channel=input_channel, - output_channel=output_channel, - sender_id=sender_id, - metadata=metadata, - ) - ) - - # except CancelledError: - # logger.error(f"Message handling timed out for message: %s", message) - except Exception: - logger.exception(f"An exception occured while handling message: %s", message) - else: - logger.debug("Invalid message") - - logger.debug('[DEBUG] return response, 204') - return response.text("", status=204) - @custom_webhook.route("/", methods=["GET"]) async def health(request: Request) -> HTTPResponse: # TODO: check configuration for website channel here return response.json({"status": "ok"}) + async def handler(_message, _chatwoot_url, _cfg, _on_new_message) -> None: + process_message = create_handler( + message=_message, + chatwoot_url=_chatwoot_url, + cfg=_cfg, + on_new_message=_on_new_message, + ) + + try: + await process_message() + + except CancelledError: + logger.error(f"[ERROR] Message handling timed out for message: %s", _message) + raise Exception('CancelledError') + + except Exception: + logger.error(f"[ERROR] An exception occured while handling message: %s", _message) + + + async def async_trigger(): + await asyncio.sleep(0.1) + return True + + async def cwwebsite(request: Request) -> HTTPResponse: + logger.info('[DEBUG] (webhooks/chatwoot/cwwebsite) message: %s', request.json) + + if check_should_proceed_message(request.json): + message = request.json + chatwoot_url = request.route.ctx.chatwoot_url + cfg = json.loads(request.route.ctx.cfg) + + await asyncio.shield(handler( + _message=message, + _chatwoot_url=chatwoot_url, + _cfg=cfg, + _on_new_message=on_new_message, + )) + + return response.text("", status=204) + + logger.info("[DEBUG] Invalid message, just response immediately") + + return response.text("", status=204) + custom_webhook.add_route( - handler=handler, + handler=cwwebsite, uri="/cwwebsite", methods=["POST"], - ctx_configuration_name='website', + ctx_chatwoot_url=self.chatwoot_url, + ctx_cfg=json.dumps(self.website), ) return custom_webhook diff --git a/botserver-app/addons/channels/cwwebsite_output.py b/botserver-app/addons/channels/cwwebsite_output.py index f8f3b90..dfcc150 100644 --- a/botserver-app/addons/channels/cwwebsite_output.py +++ b/botserver-app/addons/channels/cwwebsite_output.py @@ -1,6 +1,8 @@ import requests import json import logging +import asyncio +import aiohttp from typing import Text, Dict, Any, Optional, Callable, Awaitable, NoReturn, List, Iterable @@ -65,7 +67,10 @@ async def _send_message(self, message: Text) -> None: } # TDOD: check if error - r = requests.post(url, json=data, headers=headers) + timeout = aiohttp.ClientTimeout(total=60) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post(url, json=data, headers=headers) as resp: + pass async def send_text_message( self, text: Text, **kwargs: Any) -> None: """Send a message through this channel.""" diff --git a/botserver-app/chatwoot_web_client_loop_test.py b/botserver-app/chatwoot_web_client_loop_test.py new file mode 100644 index 0000000..7be66f4 --- /dev/null +++ b/botserver-app/chatwoot_web_client_loop_test.py @@ -0,0 +1,20 @@ +import http.client +import time + +def test(counter=0): + conn = http.client.HTTPConnection("rasachatbot.sysang") + + payload = "{\n\t\"message_type\": \"incoming\",\n\t\"conversation\": { \"id\": 1, \"status\": \"pending\" },\n\t\"sender\": { \"id\": 1 },\n\t\"content\": \"hi\"\n}" + + headers = { 'Content-Type': "application/json" } + + conn.request("POST", "/webhooks/chatwoot/cwwebsite", payload, headers) + + res = conn.getresponse() + data = res.read() + + print('Counter: ', counter) + +for i in range(3600): + test(i) + time.sleep(1) diff --git a/deployment_note.md b/deployment_note.md index a532f07..de7b271 100644 --- a/deployment_note.md +++ b/deployment_note.md @@ -80,7 +80,6 @@ for tracker_state, label in zip(tracker_state_features, label_ids): > `bot = AgentBot.create!(name: "Rasa Chatbot", outgoing_url: "http://rasa-production:5005/webhooks/chatwoot/cwwebsite")` > `bot.access_token.token` > `AgentBotInbox.create!(inbox: Inbox.find(1), agent_bot: bot)` -> `AgentBotInbox.create!(inbox: Inbox.find(1), agent_bot: bot)` - Super admin Console > https://www.chatwoot.com/docs/self-hosted/monitoring/super-admin-sidekiq/ > `docker exec -it chatwoot_rails_1 bundle exec rails c` diff --git a/docker-compose.yml b/docker-compose.yml index 8cd380f..9149961 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -264,8 +264,8 @@ services: - redis # ports: # - '3000:3000' - # expose: - # - "3000" + expose: + - "3000" environment: - NODE_ENV=production - RAILS_ENV=production @@ -289,7 +289,7 @@ services: restart: on-failure:5 image: "bitnami/postgresql:11.9.0" expose: - - "5432:54322" + - "5432" environment: POSTGRESQL_USERNAME: "${POSTGRES_CWUSER}" POSTGRESQL_PASSWORD: "${POSTGRES_CWPASSWORD}" diff --git a/httpserver-nginx/nginx-config-files/local.chatwoot.nginx.template b/httpserver-nginx/nginx-config-files/local.chatwoot.nginx.template new file mode 100755 index 0000000..3929757 --- /dev/null +++ b/httpserver-nginx/nginx-config-files/local.chatwoot.nginx.template @@ -0,0 +1,35 @@ +upstream docker-chatwoot { + server rails:3000 max_fails=0; +} + +server { + listen 8080; + server_name cs.rasachatbot.sysang; + + # Nginx strips out underscore in headers by default + # Chatwoot relies on underscore in headers for API + # Make sure that the config is set to on. + underscores_in_headers on; + + location / { + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header Host $host; + + proxy_pass_header Authorization; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_set_header X-Forwarded-Ssl on; # Optional + + proxy_http_version 1.1; + proxy_set_header Connection “”; + proxy_buffering off; + + client_max_body_size 0; + proxy_read_timeout 36000s; + proxy_redirect off; + + proxy_pass http://docker-chatwoot; + } +}