Skip to content

Commit

Permalink
Fix timeout issue that terminate sending messages to chatwoot
Browse files Browse the repository at this point in the history
  • Loading branch information
sysang committed Aug 31, 2022
1 parent 2eb60d1 commit 3f9ce03
Show file tree
Hide file tree
Showing 12 changed files with 246 additions and 87 deletions.
6 changes: 3 additions & 3 deletions .cw.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
SECRET_KEY_BASE='4ZzW8MpYUkw2hWf4ZzW8MpYUkw2hWf4ZzW8MpYUkw2hWf'

# Replace with the URL you are planning to use for your app
FRONTEND_URL=http://0.0.0.0:3000
FRONTEND_URL='https://cs.dsysang.site'

# If the variable is set, all non-authenticated pages would fallback to the default locale.
# Whenever a new account is created, the default language will be DEFAULT_LOCALE instead of en
Expand Down Expand Up @@ -41,10 +41,10 @@ REDIS_SENTINEL_MASTER_NAME=
# You can leave POSTGRES_DATABASE blank. The default name of
# the database in the production environment is chatwoot_production
# POSTGRES_DATABASE=
POSTGRES_HOST=postgres
POSTGRES_HOST=cwdb
POSTGRES_USERNAME=postgres
POSTGRES_PASSWORD='qwer1234'
RAILS_ENV=development
RAILS_ENV=production
RAILS_MAX_THREADS=5

# The email from which all outgoing emails are sent
Expand Down
9 changes: 9 additions & 0 deletions botfrontend/botfrontend/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,15 @@
},
}

CHAT_SERVER = {
'socketUrl': 'ws://cs.rasachatbot.sysang/cable',
'protocol': 'sockcw',
'protocolOptions': {
'inboxIdentifier': "t1hbqYVrTWQHwge9tvtFnByz",
'chatwootAPIUrl': "https://cs.dsysang.site/public/api/v1/",
}
}

try:
from .local import *
except ImportError:
Expand Down
9 changes: 5 additions & 4 deletions botfrontend/chatroom/templates/chatroom/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
(e.async = !0),
(e.onload = () => {
window.WebChat.default({
//socketUrl:"http://rasachatbot.sysang",
customData:{language:"en"},
socketUrl:"wss://cs.dsysang.site/cable",
protocol: 'sockcw',
socketUrl:"{{ socketUrl }}",
protocol: "{{ protocol }}",
withRules: true,
protocolOptions:{
inboxIdentifier: "t1hbqYVrTWQHwge9tvtFnByz",
chatwootAPIUrl: "https://cs.dsysang.site/public/api/v1/",
inboxIdentifier: "{{ propsInboxIdentifier }}",
chatwootAPIUrl: "{{ propsChatwootAPIUrl }}",
},
}, null);
}),
Expand Down
7 changes: 6 additions & 1 deletion botfrontend/chatroom/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@


def index(request):
response = TemplateResponse(request, 'chatroom/index.html', {'socketUrl': settings.BASE_DOMAIN_URL})
response = TemplateResponse(request, 'chatroom/index.html', {
'socketUrl': settings.CHAT_SERVER['socketUrl'],
'protocol': settings.CHAT_SERVER['protocol'],
'propsInboxIdentifier': settings.CHAT_SERVER['protocolOptions']['inboxIdentifier'],
'propsChatwootAPIUrl': settings.CHAT_SERVER['protocolOptions']['chatwootAPIUrl'],
})

return response

Expand Down
21 changes: 15 additions & 6 deletions botserver-action/booking_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from cachecontrol.caches.redis_cache import RedisCache
from cachecontrol.heuristics import ExpiresAfter
from functools import reduce
from asyncio import CancelledError

from .utils import parse_date_range
from .utils import SortbyDictionary
Expand All @@ -36,7 +37,8 @@
LOCALE = 'en-gb'
UNITS = 'metric'
PRICE_MARGIN = 0.07
REQUESTS_CACHE_MINS = 60
# REQUESTS_CACHE_MINS = 60
REQUESTS_CACHE_MINS = 1

# comply to https://booking-com.p.rapidapi.com/v1/hotels/locations api's dest_type field
DEST_TYPE_HOTEL = 'hotel'
Expand Down Expand Up @@ -316,7 +318,7 @@ def extract_bed_type(room):

def make_asyncio_schedule_to_get_room_list(hotel_id_list):
total_num = len(hotel_id_list)
reqnum_limit = 3
reqnum_limit = 4
total_page = math.ceil(total_num / reqnum_limit)
schedule = []
for idx in range(total_page):
Expand All @@ -337,9 +339,15 @@ async def get_room_list(hotel_id_list, checkin_date, checkout_date, currency=CUR
futures = []
for hotel_id in tasks:
futures.append(loop.run_in_executor( None, request_room_list_by_hotel, hotel_id, checkin_date, checkout_date, currency))
for response in await asyncio.gather(*futures):
for item in response:
result.append(item)
try:
responses = await asyncio.gather(*futures)
for response in responses:
for item in response:
result.append(item)

except CancelledError:
logger.error(f"[ERROR] Requesting rooms timed out for hotel ids: %s", tasks)


return result

Expand Down Expand Up @@ -374,7 +382,7 @@ def request_room_list_by_hotel(hotel_id, checkin_date, checkout_date, currency=C
logger.info('[INFO] request_room_list_by_hotel, API url: %s', url)
response = make_request_to_bookingapi(url, headers=headers, params=querystring)

if not response.ok:
if not response:
logger.info('[INFO] request_room_list_by_hotel, API url: %s, FAILED.', url)
return []

Expand Down Expand Up @@ -431,6 +439,7 @@ def request_to_search_hotel(dest_id, dest_type, checkin_date, checkout_date, ord

if response:
return response.json()

return {}


Expand Down
35 changes: 35 additions & 0 deletions botserver-action/codelab/asyncio_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio
import time

fast_result = None
long_result = None

async def fast():
global fast_result
await asyncio.sleep(1)
fast_result = 'fast -> done'
return 1

async def long():
global long_result
await asyncio.sleep(5)
long_result = 'long -> done'
return 5

async def main():
corofast = asyncio.create_task(fast())
corolong = asyncio.create_task(long())
print('fast_result: ', fast_result)
print('long_result: ', long_result)

done, pending = await asyncio.wait({corofast, corolong}, return_when=asyncio.FIRST_COMPLETED)

time.sleep(6)

if corofast in done:
print(corofast)

print(fast_result)
print(long_result)

asyncio.run(main())
177 changes: 109 additions & 68 deletions botserver-app/addons/channels/chatwoot.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import asyncio
import inspect
import logging
import requests
import json
import asyncio
import aiohttp
import time

from sanic import Sanic, Blueprint, response
from sanic.request import Request
Expand All @@ -22,6 +26,63 @@

logger = logging.getLogger(__name__)

def create_handler(message, chatwoot_url, cfg, on_new_message) -> Callable:

def select_output_channel(channel, chatwoot_url, cfg, **kwargs):
out_channel = None
if channel == 'cwwebsite':
out_channel = CwwebsiteOutput(
chatwoot_url=chatwoot_url,
bot_token=cfg.get("bot_token"),
botagent_account_id=cfg.get("botagent_account_id"),
conversation_id=kwargs.get("conversation_id"),
)

return out_channel

async def process_message() -> None:
logger.info('[DEBUG] (process_message) message: %s', message)

sender = message.get("sender", {})
sender_id = sender.get('id', None)
content = message.get("content", None)
conversation = message.get('conversation', {})
conversation_id = conversation.get('id', None)

input_channel = cfg.get("sub_channel")
collector = CollectingOutputChannel()
output_channel = select_output_channel(
channel=input_channel,
chatwoot_url=chatwoot_url,
cfg=cfg,
conversation_id=conversation_id,
)

if content and conversation_id and output_channel:
logger.info('[DEBUG] on_new_message, content: %s', content)
user_message = UserMessage(
text=content,
input_channel=input_channel,
output_channel=output_channel,
sender_id=sender_id,
)
await on_new_message(user_message)

logger.info('[DEBUG] on_new_message -> done')

return process_message


def check_should_proceed_message(message):
message_type = message.get("message_type", None)
conversation = message.get('conversation', {})
conversation_status = conversation.get('status', None)
sender = message.get("sender", {})
sender_id = sender.get('id', None)

return message_type == "incoming" and conversation_status == 'pending' and sender_id


class ChatwootInput(InputChannel):

def name(self) -> Text:
Expand All @@ -42,28 +103,6 @@ def __init__(self, chatwoot_url, website: Dict[Text, Any]) -> None:
self.chatwoot_url = chatwoot_url
self.website = website

def _check_should_proceed_message(self, message):
message_type = message.get("message_type", None)
conversation = message.get('conversation', {})
conversation_status = conversation.get('status', None)
sender = message.get("sender", {})
sender_id = sender.get('id', None)

return message_type == "incoming" and conversation_status == 'pending' and sender_id

def select_output_channel(self, channel, cfg, **kwargs):
out_channel = None
if channel == 'cwwebsite':
out_channel = CwwebsiteOutput(
chatwoot_url=self.chatwoot_url,
bot_token=cfg.get("bot_token"),
botagent_account_id=cfg.get("botagent_account_id"),
conversation_id=kwargs.get("conversation_id"),
)

return out_channel


def blueprint(
self, on_new_message: Callable[[UserMessage], Awaitable[None]]
) -> Blueprint:
Expand All @@ -73,59 +112,61 @@ def blueprint(
inspect.getmodule(self).__name__,
)

async def handler(request: Request) -> HTTPResponse:
cfg = getattr(self, request.route.ctx.configuration_name)
message = request.json
# logger.info('[INFO] message: %s', message)
metadata = self.get_metadata(request)

if self._check_should_proceed_message(message):
sender = message.get("sender", {})
sender_id = sender.get('id', None)
content = message.get("content", None)
conversation = message.get('conversation', {})
conversation_id = conversation.get('id', None)

input_channel = cfg.get("sub_channel")
collector = CollectingOutputChannel()
output_channel = self.select_output_channel(
channel=input_channel,
cfg=cfg,
conversation_id=conversation_id,
)

if content and conversation_id and output_channel:
try:
await on_new_message(
UserMessage(
text=content,
input_channel=input_channel,
output_channel=output_channel,
sender_id=sender_id,
metadata=metadata,
)
)

# except CancelledError:
# logger.error(f"Message handling timed out for message: %s", message)
except Exception:
logger.exception(f"An exception occured while handling message: %s", message)
else:
logger.debug("Invalid message")

logger.debug('[DEBUG] return response, 204')
return response.text("", status=204)

@custom_webhook.route("/", methods=["GET"])
async def health(request: Request) -> HTTPResponse:
# TODO: check configuration for website channel here
return response.json({"status": "ok"})

async def handler(_message, _chatwoot_url, _cfg, _on_new_message) -> None:
process_message = create_handler(
message=_message,
chatwoot_url=_chatwoot_url,
cfg=_cfg,
on_new_message=_on_new_message,
)

try:
await process_message()

except CancelledError:
logger.error(f"[ERROR] Message handling timed out for message: %s", _message)
raise Exception('CancelledError')

except Exception:
logger.error(f"[ERROR] An exception occured while handling message: %s", _message)


async def async_trigger():
await asyncio.sleep(0.1)
return True

async def cwwebsite(request: Request) -> HTTPResponse:
logger.info('[DEBUG] (webhooks/chatwoot/cwwebsite) message: %s', request.json)

if check_should_proceed_message(request.json):
message = request.json
chatwoot_url = request.route.ctx.chatwoot_url
cfg = json.loads(request.route.ctx.cfg)

await asyncio.shield(handler(
_message=message,
_chatwoot_url=chatwoot_url,
_cfg=cfg,
_on_new_message=on_new_message,
))

return response.text("", status=204)

logger.info("[DEBUG] Invalid message, just response immediately")

return response.text("", status=204)

custom_webhook.add_route(
handler=handler,
handler=cwwebsite,
uri="/cwwebsite",
methods=["POST"],
ctx_configuration_name='website',
ctx_chatwoot_url=self.chatwoot_url,
ctx_cfg=json.dumps(self.website),
)

return custom_webhook
Expand Down
Loading

0 comments on commit 3f9ce03

Please sign in to comment.