-
Notifications
You must be signed in to change notification settings - Fork 997
Open
Description
-
对 Bug 的描述
- 当前行为:超时重发的消息会被识别为未知消息类型(即使是text)
- 正确的行为:识别正确
-
环境
- 平台:Windows
- WeRoBot 版本号:werobot==1.13.1
- Python 版本:Python 3.12.3
-
复现代码或 repo 链接
# 脱敏后的代码文件
import os
import time
import threading
from datetime import datetime
from werobot import WeRoBot
from core.log_config import setup_logger # 导入配置函数
from utils.logger import logger # 导入干净的 logger
from core.config import (
# TOKEN, # 已脱敏
# HOST, # 已脱敏
# PORT, # 已脱敏
# SERVER, # 已脱敏
# APP_ID, # 已脱敏
# APP_SECRET, # 已脱敏
LOGS_DIR,
MAINTENANCE_MODE,
)
from utils.feishu import send_feishu_msg
from handlers.menu import MenuHandler
# 模拟从配置文件中读取的脱敏后变量
TOKEN = "YOUR_WECHAT_TOKEN"
APP_ID = "YOUR_APP_ID"
APP_SECRET = "YOUR_APP_SECRET"
HOST = "0.0.0.0"
PORT = 80
SERVER = "waitress"
# 在所有其他代码之前,首先配置日志,日志等级为debug,轮转10MB
# setup_logger("bot", level="DEBUG")
setup_logger("bot", logs_dir=str(LOGS_DIR), level="DEBUG")
from models.database import html_tasks, users
from models.database.qa_database import QADatabase
from models.database.exam_subscription import ExamSubscriptionDatabase
from models.database.score_subscription import ScoreSubscriptionDatabase
from models.database.average_scores import AverageScoresDatabase
from models.database.course_recommendation import CourseRecommendationDatabase
from models.database.message_cache import (
MessageCacheDatabase,
STATUS_PENDING,
STATUS_COMPLETED,
STATUS_FAILED,
)
# 初始化数据库
html_tasks.init_db()
users.UsersDatabase().init_db()
QADatabase().init_db()
ExamSubscriptionDatabase().init_db()
ScoreSubscriptionDatabase().init_db()
# 初始化消息缓存数据库
message_cache_db = MessageCacheDatabase()
message_cache_db.init_db()
# 消息缓存配置
CACHE_POLL_INTERVAL = 0.5 # 轮询间隔(秒)
CACHE_MAX_WAIT_TIME = 4 # 最大等待时间(秒),微信5秒超时,预留1秒给返回
CACHE_CLEANUP_INTERVAL = 60 # 缓存清理间隔(秒)
# 缓存清理时间记录(用于限制清理频率)
_last_cleanup_time = 0
_cleanup_lock = threading.Lock()
# 初始化平均分数据库,平均分数据来源依赖手动上传,检测表是否存在,如果不存在,发送飞书上报警告
AverageScoresDatabase().init_db()
# 初始化选课推荐数据库,选课推荐数据来源依赖手动上传,检测表是否存在,如果不存在,发送飞书上报警告
CourseRecommendationDatabase().init_db()
# 初始化空教室基准列表
from handlers.jwxt.free_classroom import initialize_baseline_classrooms
initialize_baseline_classrooms()
# 初始化公告
from utils.announcement import ANNOUNCEMENT_SWITCH, ANNOUNCEMENT
# 创建 WeRoBot 核心实例
robot = WeRoBot(token=TOKEN, app_id=APP_ID, app_secret=APP_SECRET)
def _process_message_in_background(message, message_id):
"""
在后台线程中处理消息并更新缓存
Args:
message: WeRoBot消息对象
message_id: 消息唯一标识符
"""
try:
# 执行实际的消息处理
result = MenuHandler(robot).handle(message)
if result:
if ANNOUNCEMENT_SWITCH:
result = result + "\n" + "━" * 10 + "\n" + "公告:" + ANNOUNCEMENT
# 更新缓存为已完成状态
message_cache_db.update_cache_response(message_id, result, STATUS_COMPLETED)
logger.info(f"消息处理完成: message_id={message_id}")
except Exception as e:
error_msg = f'系统暂时出现了一点问题,请稍后再试或发送"联系作者"联系作者获取帮助,提交时请附上错误信息:{str(e)}\n\n发送"曲奇教务"查看可用命令,或者联系作者QQ XXXXXXXXXX 获取帮助。'
message_cache_db.update_cache_response(message_id, error_msg, STATUS_FAILED)
logger.error(f"后台处理消息失败: message_id={message_id}, error={e}")
send_feishu_msg(
"WeRoBot 服务错误", f"时间:{datetime.now()} \n处理消息时发生未知错误: {e}"
)
def _wait_for_cache_result(message_id, max_wait_time=CACHE_MAX_WAIT_TIME):
"""
轮询等待缓存结果
Args:
message_id: 消息唯一标识符
max_wait_time: 最大等待时间(秒)
Returns:
处理结果字符串,如果超时则返回None
"""
start_time = time.time()
while time.time() - start_time < max_wait_time:
# 使用原子操作获取并删除已完成的缓存,避免竞态条件
response = message_cache_db.get_and_delete_completed_cache(message_id)
if response is not None:
return response
time.sleep(CACHE_POLL_INTERVAL)
return None
# 注册消息处理器,将所有文本消息都交由分发器处理
@robot.text
def handle_text_message(message):
"""
处理所有文本消息
使用消息缓存机制将容错时间扩大到15秒:
1. 首次收到消息时,创建PENDING缓存并启动后台处理
2. 重试请求到来时,检测处理是否完成
3. 完成则返回结果并删除记录,未完成则返回空字符串触发微信重试
注意:返回空字符串会触发微信服务器重试机制
"""
try:
message_id = str(message.message_id)
user_id = message.source
logger.info(
f"收到消息: message_id={message_id}, user_id={user_id}, content='{message.content}'"
)
# 维护模式直接返回,无需缓存
if MAINTENANCE_MODE:
return "系统正在维护中,暂时无法处理您的请求,请稍后重试,建议添加官方交流QQ群 XXXXXXXXXX 获取更多信息。"
# 定期清理过期缓存(限制清理频率,避免性能影响)
global _last_cleanup_time
current_time = time.time()
with _cleanup_lock:
if current_time - _last_cleanup_time >= CACHE_CLEANUP_INTERVAL:
_last_cleanup_time = current_time
try:
message_cache_db.cleanup_expired_cache()
except Exception as cleanup_error:
logger.warning(
f"清理过期缓存失败(不影响正常处理): {cleanup_error}"
)
# 检查是否有现有缓存
cache = message_cache_db.get_message_cache(message_id)
if cache:
# 缓存存在
if cache["status"] in (STATUS_COMPLETED, STATUS_FAILED):
# 处理已完成,使用原子操作获取结果并删除缓存
response = message_cache_db.get_and_delete_completed_cache(message_id)
if response is not None:
logger.info(f"从缓存返回结果: message_id={message_id}")
return response
# 如果原子操作返回None,可能是被其他请求处理了,重新轮询
result = _wait_for_cache_result(message_id)
if result:
return result
# 超时,返回空字符串触发微信重试
logger.warning(f"获取缓存结果超时: message_id={message_id}")
return ""
else:
# 处理中,轮询等待结果
logger.info(f"消息处理中,开始轮询等待: message_id={message_id}")
result = _wait_for_cache_result(message_id)
if result:
return result
# 超时,返回空字符串触发微信重试
logger.warning(f"等待消息处理超时: message_id={message_id}")
return ""
else:
# 首次收到该消息,创建缓存并启动后台处理
if message_cache_db.create_pending_cache(message_id, user_id):
# 成功创建缓存,启动后台处理线程
thread = threading.Thread(
target=_process_message_in_background,
args=(message, message_id),
daemon=True,
)
thread.start()
logger.info(f"启动后台处理线程: message_id={message_id}")
# 等待处理结果
result = _wait_for_cache_result(message_id)
if result:
return result
# 超时,返回空字符串触发微信重试
logger.warning(f"首次处理消息超时: message_id={message_id}")
return ""
else:
# 创建缓存失败(可能是并发创建),尝试等待结果
logger.info(f"缓存创建失败,尝试等待结果: message_id={message_id}")
result = _wait_for_cache_result(message_id)
if result:
return result
# 超时,返回空字符串触发微信重试
return ""
except Exception as e:
logger.error(f"处理消息时发生未知错误: {e}")
send_feishu_msg(
"WeRoBot 服务错误", f"时间:{datetime.now()} \n处理消息时发生未知错误: {e}"
)
return f'系统暂时出现了一点问题,请稍后再试或发送"联系作者"联系作者获取帮助,提交时请附上错误信息:{str(e)}'
@robot.image
def handle_image_message(message):
"""处理所有图片消息"""
logger.info(f"收到图片消息: user_id={message.source}, img='{message.img}'")
if MAINTENANCE_MODE:
return "系统正在维护中,暂时无法处理您的请求,请稍后重试,建议添加官方交流QQ群 XXXXXXXXXX 获取更多信息。"
return "图片消息已收到,但我还不能处理图片内容。"
@robot.subscribe
def subscribe_handler(message):
return "欢迎关注曲奇教务,发送“曲奇教务”查看可用曲奇教务命令,或者联系作者QQ XXXXXXXXXX 获取帮助。"
@robot.handler
def default_handler(message):
"""处理所有未被其他处理器处理的消息"""
logger.info(
f"收到未知类型消息: user_id={message.source}, type='{message.type}', 原始内容='{message.raw}'"
)
if MAINTENANCE_MODE:
return "系统正在维护中,暂时无法处理您的请求,请稍后重试,建议添加官方交流QQ群 XXXXXXXXXX 获取更多信息。"
return (
"无效的命令,请发送“曲奇教务”查看可用命令,或者联系作者QQ XXXXXXXXXX 获取帮助。"
)
if __name__ == "__main__":
send_feishu_msg(
"WeRoBot 服务已启动", f"启动时间:{datetime.now()}\n运行路径:{os.getcwd()}"
)
try:
if os.name == "nt":
# 在 Windows 上,通常使用 waitress 作为生产服务器
robot.run(host=HOST, port=PORT, server="waitress")
else:
# 在 Linux/macOS 上,可以使用默认的 werkzeug 或其他如 gunicorn
robot.run(host=HOST, port=PORT, server=SERVER)
except KeyboardInterrupt:
logger.info("手动停止WeRoBot 服务")
-
复现步骤
-
其他信息
Metadata
Metadata
Assignees
Labels
No labels