Skip to content

超时重发的消息会被识别为未知消息类型(即使是text) #820

@W1ndys

Description

@W1ndys
  • 对 Bug 的描述

    • 当前行为:超时重发的消息会被识别为未知消息类型(即使是text)
    • 正确的行为:识别正确
  • 环境

    • 平台:Windows
    • WeRoBot 版本号:werobot==1.13.1
    • Python 版本:Python 3.12.3
  • 复现代码或 repo 链接

# 脱敏后的代码文件
import os
import time
import threading
from datetime import datetime
from werobot import WeRoBot
from core.log_config import setup_logger  # 导入配置函数
from utils.logger import logger  # 导入干净的 logger
from core.config import (
    # TOKEN, # 已脱敏
    # HOST, # 已脱敏
    # PORT, # 已脱敏
    # SERVER, # 已脱敏
    # APP_ID, # 已脱敏
    # APP_SECRET, # 已脱敏
    LOGS_DIR,
    MAINTENANCE_MODE,
)
from utils.feishu import send_feishu_msg
from handlers.menu import MenuHandler

# 模拟从配置文件中读取的脱敏后变量
TOKEN = "YOUR_WECHAT_TOKEN"
APP_ID = "YOUR_APP_ID"
APP_SECRET = "YOUR_APP_SECRET"
HOST = "0.0.0.0"
PORT = 80
SERVER = "waitress"

# 在所有其他代码之前,首先配置日志,日志等级为debug,轮转10MB
# setup_logger("bot", level="DEBUG")
setup_logger("bot", logs_dir=str(LOGS_DIR), level="DEBUG")

from models.database import html_tasks, users
from models.database.qa_database import QADatabase
from models.database.exam_subscription import ExamSubscriptionDatabase
from models.database.score_subscription import ScoreSubscriptionDatabase
from models.database.average_scores import AverageScoresDatabase
from models.database.course_recommendation import CourseRecommendationDatabase
from models.database.message_cache import (
    MessageCacheDatabase,
    STATUS_PENDING,
    STATUS_COMPLETED,
    STATUS_FAILED,
)

# 初始化数据库
html_tasks.init_db()
users.UsersDatabase().init_db()
QADatabase().init_db()
ExamSubscriptionDatabase().init_db()
ScoreSubscriptionDatabase().init_db()

# 初始化消息缓存数据库
message_cache_db = MessageCacheDatabase()
message_cache_db.init_db()

# 消息缓存配置
CACHE_POLL_INTERVAL = 0.5  # 轮询间隔(秒)
CACHE_MAX_WAIT_TIME = 4  # 最大等待时间(秒),微信5秒超时,预留1秒给返回
CACHE_CLEANUP_INTERVAL = 60  # 缓存清理间隔(秒)

# 缓存清理时间记录(用于限制清理频率)
_last_cleanup_time = 0
_cleanup_lock = threading.Lock()

# 初始化平均分数据库,平均分数据来源依赖手动上传,检测表是否存在,如果不存在,发送飞书上报警告
AverageScoresDatabase().init_db()

# 初始化选课推荐数据库,选课推荐数据来源依赖手动上传,检测表是否存在,如果不存在,发送飞书上报警告
CourseRecommendationDatabase().init_db()

# 初始化空教室基准列表
from handlers.jwxt.free_classroom import initialize_baseline_classrooms

initialize_baseline_classrooms()

# 初始化公告
from utils.announcement import ANNOUNCEMENT_SWITCH, ANNOUNCEMENT


# 创建 WeRoBot 核心实例
robot = WeRoBot(token=TOKEN, app_id=APP_ID, app_secret=APP_SECRET)


def _process_message_in_background(message, message_id):
    """
    在后台线程中处理消息并更新缓存

    Args:
        message: WeRoBot消息对象
        message_id: 消息唯一标识符
    """
    try:
        # 执行实际的消息处理
        result = MenuHandler(robot).handle(message)
        if result:
            if ANNOUNCEMENT_SWITCH:
                result = result + "\n" + "━" * 10 + "\n" + "公告:" + ANNOUNCEMENT
        # 更新缓存为已完成状态
        message_cache_db.update_cache_response(message_id, result, STATUS_COMPLETED)
        logger.info(f"消息处理完成: message_id={message_id}")
    except Exception as e:
        error_msg = f'系统暂时出现了一点问题,请稍后再试或发送"联系作者"联系作者获取帮助,提交时请附上错误信息:{str(e)}\n\n发送"曲奇教务"查看可用命令,或者联系作者QQ XXXXXXXXXX 获取帮助。'
        message_cache_db.update_cache_response(message_id, error_msg, STATUS_FAILED)
        logger.error(f"后台处理消息失败: message_id={message_id}, error={e}")
        send_feishu_msg(
            "WeRoBot 服务错误", f"时间:{datetime.now()} \n处理消息时发生未知错误: {e}"
        )


def _wait_for_cache_result(message_id, max_wait_time=CACHE_MAX_WAIT_TIME):
    """
    轮询等待缓存结果

    Args:
        message_id: 消息唯一标识符
        max_wait_time: 最大等待时间(秒)

    Returns:
        处理结果字符串,如果超时则返回None
    """
    start_time = time.time()
    while time.time() - start_time < max_wait_time:
        # 使用原子操作获取并删除已完成的缓存,避免竞态条件
        response = message_cache_db.get_and_delete_completed_cache(message_id)
        if response is not None:
            return response
        time.sleep(CACHE_POLL_INTERVAL)
    return None


# 注册消息处理器,将所有文本消息都交由分发器处理
@robot.text
def handle_text_message(message):
    """
    处理所有文本消息

    使用消息缓存机制将容错时间扩大到15秒:
    1. 首次收到消息时,创建PENDING缓存并启动后台处理
    2. 重试请求到来时,检测处理是否完成
    3. 完成则返回结果并删除记录,未完成则返回空字符串触发微信重试

    注意:返回空字符串会触发微信服务器重试机制
    """
    try:
        message_id = str(message.message_id)
        user_id = message.source

        logger.info(
            f"收到消息: message_id={message_id}, user_id={user_id}, content='{message.content}'"
        )

        # 维护模式直接返回,无需缓存
        if MAINTENANCE_MODE:
            return "系统正在维护中,暂时无法处理您的请求,请稍后重试,建议添加官方交流QQ群 XXXXXXXXXX 获取更多信息。"

        # 定期清理过期缓存(限制清理频率,避免性能影响)
        global _last_cleanup_time
        current_time = time.time()
        with _cleanup_lock:
            if current_time - _last_cleanup_time >= CACHE_CLEANUP_INTERVAL:
                _last_cleanup_time = current_time
                try:
                    message_cache_db.cleanup_expired_cache()
                except Exception as cleanup_error:
                    logger.warning(
                        f"清理过期缓存失败(不影响正常处理): {cleanup_error}"
                    )

        # 检查是否有现有缓存
        cache = message_cache_db.get_message_cache(message_id)

        if cache:
            # 缓存存在
            if cache["status"] in (STATUS_COMPLETED, STATUS_FAILED):
                # 处理已完成,使用原子操作获取结果并删除缓存
                response = message_cache_db.get_and_delete_completed_cache(message_id)
                if response is not None:
                    logger.info(f"从缓存返回结果: message_id={message_id}")
                    return response
                # 如果原子操作返回None,可能是被其他请求处理了,重新轮询
                result = _wait_for_cache_result(message_id)
                if result:
                    return result
                # 超时,返回空字符串触发微信重试
                logger.warning(f"获取缓存结果超时: message_id={message_id}")
                return ""
            else:
                # 处理中,轮询等待结果
                logger.info(f"消息处理中,开始轮询等待: message_id={message_id}")
                result = _wait_for_cache_result(message_id)
                if result:
                    return result
                # 超时,返回空字符串触发微信重试
                logger.warning(f"等待消息处理超时: message_id={message_id}")
                return ""
        else:
            # 首次收到该消息,创建缓存并启动后台处理
            if message_cache_db.create_pending_cache(message_id, user_id):
                # 成功创建缓存,启动后台处理线程
                thread = threading.Thread(
                    target=_process_message_in_background,
                    args=(message, message_id),
                    daemon=True,
                )
                thread.start()
                logger.info(f"启动后台处理线程: message_id={message_id}")

                # 等待处理结果
                result = _wait_for_cache_result(message_id)
                if result:
                    return result
                # 超时,返回空字符串触发微信重试
                logger.warning(f"首次处理消息超时: message_id={message_id}")
                return ""
            else:
                # 创建缓存失败(可能是并发创建),尝试等待结果
                logger.info(f"缓存创建失败,尝试等待结果: message_id={message_id}")
                result = _wait_for_cache_result(message_id)
                if result:
                    return result
                # 超时,返回空字符串触发微信重试
                return ""

    except Exception as e:
        logger.error(f"处理消息时发生未知错误: {e}")
        send_feishu_msg(
            "WeRoBot 服务错误", f"时间:{datetime.now()} \n处理消息时发生未知错误: {e}"
        )
        return f'系统暂时出现了一点问题,请稍后再试或发送"联系作者"联系作者获取帮助,提交时请附上错误信息:{str(e)}'


@robot.image
def handle_image_message(message):
    """处理所有图片消息"""
    logger.info(f"收到图片消息: user_id={message.source}, img='{message.img}'")
    if MAINTENANCE_MODE:
        return "系统正在维护中,暂时无法处理您的请求,请稍后重试,建议添加官方交流QQ群 XXXXXXXXXX 获取更多信息。"
    return "图片消息已收到,但我还不能处理图片内容。"


@robot.subscribe
def subscribe_handler(message):
    return "欢迎关注曲奇教务,发送“曲奇教务”查看可用曲奇教务命令,或者联系作者QQ XXXXXXXXXX 获取帮助。"


@robot.handler
def default_handler(message):
    """处理所有未被其他处理器处理的消息"""
    logger.info(
        f"收到未知类型消息: user_id={message.source}, type='{message.type}', 原始内容='{message.raw}'"
    )
    if MAINTENANCE_MODE:
        return "系统正在维护中,暂时无法处理您的请求,请稍后重试,建议添加官方交流QQ群 XXXXXXXXXX 获取更多信息。"
    return (
        "无效的命令,请发送“曲奇教务”查看可用命令,或者联系作者QQ XXXXXXXXXX 获取帮助。"
    )


if __name__ == "__main__":
    send_feishu_msg(
        "WeRoBot 服务已启动", f"启动时间:{datetime.now()}\n运行路径:{os.getcwd()}"
    )

    try:
        if os.name == "nt":
            # 在 Windows 上,通常使用 waitress 作为生产服务器
            robot.run(host=HOST, port=PORT, server="waitress")
        else:
            # 在 Linux/macOS 上,可以使用默认的 werkzeug 或其他如 gunicorn
            robot.run(host=HOST, port=PORT, server=SERVER)
    except KeyboardInterrupt:
        logger.info("手动停止WeRoBot 服务")
  • 复现步骤

  • 其他信息

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions