Skip to content

Commit

Permalink
fix: timeout processing lmao requests
Browse files Browse the repository at this point in the history
  • Loading branch information
F33RNI committed Apr 20, 2024
1 parent 7f41e96 commit c945748
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 92 deletions.
10 changes: 6 additions & 4 deletions lmao_process_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ def _lmao_stop_stream_loop() -> None:
logging.warning(f"Exit from {name} loop requested")
break

request_response = None

try:
# Wait a bit to prevent overloading
# We need to wait at the beginning to enable delay even after exception
Expand Down Expand Up @@ -258,11 +260,9 @@ def _lmao_stop_stream_loop() -> None:
break

# Save conversation ID
logging.info(f"Saving user {request_response.user_id} conversation ID as: name_{conversation_id}")
users_handler_.set_key(request_response.user_id, name + "_conversation_id", conversation_id)

# Return container
lmao_response_queue.put(request_response)

# Non-blocking get of user_id to clear conversation for
delete_conversation_user_id = None
try:
Expand Down Expand Up @@ -297,10 +297,12 @@ def _lmao_stop_stream_loop() -> None:
logging.error(f"{name} error", exc_info=e)
lmao_exceptions_queue.put(e)

# Read module's status
# Read module's status and return the container
finally:
with lmao_module_status.get_lock():
lmao_module_status.value = module.status
if request_response:
lmao_response_queue.put(request_response)

# Wait for stop handler to finish
if stop_handler_thread and stop_handler_thread.is_alive():
Expand Down
155 changes: 80 additions & 75 deletions lmao_process_loop_web.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ def _lmao_stop_stream_loop() -> None:
logging.warning(f"Exit from {name} loop requested")
break

release_lock = False
request_response = None

try:
# Wait a bit to prevent overloading
# We need to wait at the beginning to enable delay even after exception
Expand Down Expand Up @@ -435,89 +438,83 @@ def _lmao_stop_stream_loop() -> None:
users_handler_.set_key(request_response.user_id, "suggestions", [])

# Ask and read stream
try:
for line in _request_wrapper(
api_url,
"ask",
{name_lmao: module_request},
cooldown_timer,
request_lock,
proxy,
token,
stream=True,
).iter_lines():
if not line:
continue
try:
response = json.loads(line.decode("utf-8"))
except Exception as e:
logging.warning(f"Unable to parse response line as JSON: {e}")
continue

finished = response.get("finished")
conversation_id = response.get("conversation_id")
request_response.response_text = response.get("response")

images = response.get("images")
if images is not None:
request_response.response_images = images[:]

# Format and add attributions
attributions = response.get("attributions")
if attributions is not None and len(attributions) != 0:
response_link_format = messages_.get_message(
"response_link_format", user_id=request_response.user_id
release_lock = True
for line in _request_wrapper(
api_url,
"ask",
{name_lmao: module_request},
cooldown_timer,
request_lock,
proxy,
token,
stream=True,
).iter_lines():
if not line:
continue
try:
response = json.loads(line.decode("utf-8"))
except Exception as e:
logging.warning(f"Unable to parse response line as JSON: {e}")
continue

finished = response.get("finished")
conversation_id = response.get("conversation_id")
request_response.response_text = response.get("response")

images = response.get("images")
if images is not None:
request_response.response_images = images[:]

# Format and add attributions
attributions = response.get("attributions")
if attributions is not None and len(attributions) != 0:
response_link_format = messages_.get_message(
"response_link_format", user_id=request_response.user_id
)
request_response.response_text += "\n"
for i, attribution in enumerate(attributions):
request_response.response_text += response_link_format.format(
source_name=str(i + 1), link=attribution.get("url", "")
)
request_response.response_text += "\n"
for i, attribution in enumerate(attributions):
request_response.response_text += response_link_format.format(
source_name=str(i + 1), link=attribution.get("url", "")
)

# Suggestions must be stored as tuples with unique ID for reply-markup
if finished:
suggestions = response.get("suggestions")
if suggestions is not None:
request_response.response_suggestions = []
for suggestion in suggestions:
if not suggestion or len(suggestion) < 1:
continue
id_ = "".join(
random.choices(
string.ascii_uppercase + string.ascii_lowercase + string.digits, k=8
)
# Suggestions must be stored as tuples with unique ID for reply-markup
if finished:
suggestions = response.get("suggestions")
if suggestions is not None:
request_response.response_suggestions = []
for suggestion in suggestions:
if not suggestion or len(suggestion) < 1:
continue
id_ = "".join(
random.choices(
string.ascii_uppercase + string.ascii_lowercase + string.digits, k=8
)
request_response.response_suggestions.append((id_, suggestion))
users_handler_.set_key(
request_response.user_id,
"suggestions",
request_response.response_suggestions,
)
request_response.response_suggestions.append((id_, suggestion))
users_handler_.set_key(
request_response.user_id,
"suggestions",
request_response.response_suggestions,
)

# Check if exit was requested
with lmao_process_running.get_lock():
lmao_process_running_value = lmao_process_running.value
if not lmao_process_running_value:
finished = True

# Send response to the user
async_helper(
send_message_async(config.get("telegram"), messages_, request_response, end=finished)
)

# Exit from stream reader
if not lmao_process_running_value:
break
# Check if exit was requested
with lmao_process_running.get_lock():
lmao_process_running_value = lmao_process_running.value
if not lmao_process_running_value:
finished = True

# Save conversation ID
users_handler_.set_key(request_response.user_id, name + "_conversation_id", conversation_id)
# Send response to the user
async_helper(
send_message_async(config.get("telegram"), messages_, request_response, end=finished)
)

# Return container
lmao_response_queue.put(request_response)
# Exit from stream reader
if not lmao_process_running_value:
break

# Release lock after stream stop
finally:
request_lock.release()
# Save conversation ID
logging.info(f"Saving user {request_response.user_id} conversation ID as: name_{conversation_id}")
users_handler_.set_key(request_response.user_id, name + "_conversation_id", conversation_id)

# Non-blocking get of user_id to clear conversation for
delete_conversation_user_id = None
Expand Down Expand Up @@ -580,6 +577,14 @@ def _lmao_stop_stream_loop() -> None:
logging.error(f"{name} error", exc_info=e)
lmao_exceptions_queue.put(e)

# Release lock if needed and return the container
finally:
if release_lock:
request_lock.release()
release_lock = False
if request_response:
lmao_response_queue.put(request_response)

# Read module status
try:
_check_status(status_dict, api_url, cooldown_timer, request_lock, proxy, token)
Expand Down
23 changes: 11 additions & 12 deletions module_wrapper_global.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ def process_request(self, request_response: request_response_container.RequestRe
self._lmao_request_queue.put(request_response)

# Wait until it's processed or failed
logging.info(f"Waiting for {self.name} request to be processed")
time.sleep(1)
logging.info(f"Waiting for {self.name} request to be processed (waiting for container)")
response_ = None
while True:
# Check process
with self._lmao_process_running.get_lock():
Expand All @@ -307,21 +307,18 @@ def process_request(self, request_response: request_response_container.RequestRe
if lmao_exception is not None:
raise lmao_exception

# Check status
with self._lmao_module_status.get_lock():
module_status = self._lmao_module_status.value
if module_status == STATUS_IDLE:
# Try to get container back
try:
response_ = self._lmao_response_queue.get(block=False)
except queue.Empty:
pass
if response_:
logging.info(f"Received container back from {self.name} process")
break

time.sleep(LMAO_LOOP_DELAY)

# Update container
# TODO: Optimize this
response_ = None
try:
response_ = self._lmao_response_queue.get(block=True, timeout=1)
except queue.Empty:
logging.warning(f"Cannot get container back from {self.name} process")
if response_:
request_response.response_text = response_.response_text
for response_image in response_.response_images:
Expand All @@ -335,6 +332,8 @@ def process_request(self, request_response: request_response_container.RequestRe
request_response.error = response_.error
request_response.response_next_chunk_start_index = response_.response_next_chunk_start_index
request_response.response_sent_len = response_.response_sent_len
else:
logging.warning(f"Unable to get container back from {self.name} process")

##########
# Gemini #
Expand Down
2 changes: 1 addition & 1 deletion request_response_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
PROCESSING_STATE_ABORT = 7

# State to string
PROCESSING_STATE_NAMES = ["Waiting", "Starting", "Active", "Done", "Timed out", "Canceling", "Canceling"]
PROCESSING_STATE_NAMES = ["Waiting", "Starting", "Active", "Done", "Timed out", "Canceling", "Canceling", "Abort"]


class RequestResponseContainer:
Expand Down

0 comments on commit c945748

Please sign in to comment.