-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
678 additions
and
314 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
import datetime | ||
from typing import TYPE_CHECKING | ||
|
||
from core.plugin import Plugin, job | ||
from plugins.tools.cloud_game import CloudGameHelper | ||
from plugins.tools.sign import SignJobType | ||
from utils.log import logger | ||
|
||
if TYPE_CHECKING: | ||
from telegram.ext import ContextTypes | ||
|
||
|
||
class SignJob(Plugin): | ||
def __init__(self, sign_system: CloudGameHelper): | ||
self.sign_system = sign_system | ||
|
||
# @job.run_daily(time=datetime.time(hour=0, minute=1, second=0), name="CloudGameSignJob") | ||
async def sign(self, context: "ContextTypes.DEFAULT_TYPE"): | ||
logger.info("正在执行云游戏自动签到") | ||
await self.sign_system.do_sign_job(context, job_type=SignJobType.START) | ||
logger.success("执行云游戏自动签到完成") | ||
await self.re_sign(context) | ||
|
||
async def re_sign(self, context: "ContextTypes.DEFAULT_TYPE"): | ||
logger.info("正在执行云游戏自动重签") | ||
await self.sign_system.do_sign_job(context, job_type=SignJobType.REDO) | ||
logger.success("执行云游戏自动重签完成") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
import time | ||
from typing import TYPE_CHECKING | ||
|
||
from simnet import Region | ||
from telegram.constants import ChatAction | ||
from telegram.ext import filters | ||
|
||
from core.config import config | ||
from core.plugin import Plugin, handler | ||
from core.services.task.models import Task as SignUser, TaskStatusEnum | ||
from core.services.users.services import UserAdminService | ||
from gram_core.services.task.services import TaskCardServices | ||
from plugins.tools.cloud_game import CloudGameHelper | ||
from plugins.tools.genshin import PlayerNotFoundError, CookiesNotFoundError | ||
from utils.log import logger | ||
|
||
if TYPE_CHECKING: | ||
from telegram import Update | ||
from telegram.ext import ContextTypes | ||
|
||
|
||
class CloudGameSign(Plugin): | ||
"""云游戏每日签到""" | ||
|
||
CHECK_SERVER, COMMAND_RESULT = range(10400, 10402) | ||
|
||
def __init__( | ||
self, | ||
cloud_game_helper: CloudGameHelper, | ||
sign_service: TaskCardServices, | ||
user_admin_service: UserAdminService, | ||
): | ||
self.user_admin_service = user_admin_service | ||
self.sign_service = sign_service | ||
self.cloud_game_helper = cloud_game_helper | ||
|
||
async def _process_auto_sign(self, user_id: int, player_id: int, offset: int, chat_id: int, method: str) -> str: | ||
try: | ||
async with self.cloud_game_helper.client(user_id, player_id=player_id, offset=offset) as c: | ||
if c.region is Region.OVERSEAS: | ||
return "云游戏相关功能仅支持国服" | ||
await c.get_cloud_game_notifications() | ||
player_id = c.player_id | ||
except (PlayerNotFoundError, CookiesNotFoundError): | ||
return config.notice.user_not_found | ||
user: SignUser = await self.sign_service.get_by_user_id(user_id, player_id) | ||
if user: | ||
if method == "关闭": | ||
await self.sign_service.remove(user) | ||
return f"UID {player_id} 关闭云游戏自动签到成功" | ||
if method == "开启": | ||
if user.chat_id == chat_id: | ||
return f"UID {player_id} 云游戏自动签到已经开启过了" | ||
user.chat_id = chat_id | ||
user.status = TaskStatusEnum.STATUS_SUCCESS | ||
await self.sign_service.update(user) | ||
return f"UID {player_id} 修改云游戏自动签到通知对话成功" | ||
elif method == "关闭": | ||
return f"UID {player_id} 您还没有开启云游戏自动签到" | ||
elif method == "开启": | ||
user = self.sign_service.create(user_id, player_id, chat_id, TaskStatusEnum.STATUS_SUCCESS) | ||
await self.sign_service.add(user) | ||
return f"UID {player_id} 开启云游戏自动签到成功" | ||
|
||
@handler.command(command="cloud_game_sign", cookie=True, block=False) | ||
@handler.message(filters=filters.Regex("^云游戏每日签到(.*)"), cookie=True, block=False) | ||
@handler.command(command="start", filters=filters.Regex("cloud_game_sign$"), block=False) | ||
async def command_start(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: | ||
user_id = await self.get_real_user_id(update) | ||
uid, offset = self.get_real_uid_or_offset(update) | ||
message = update.effective_message | ||
# args = self.get_args(context) | ||
# if len(args) >= 1: | ||
# msg = None | ||
# if args[0] == "开启自动签到": | ||
# if await self.user_admin_service.is_admin(user_id): | ||
# msg = await self._process_auto_sign(user_id, uid, offset, message.chat_id, "开启") | ||
# else: | ||
# msg = await self._process_auto_sign(user_id, uid, offset, user_id, "开启") | ||
# elif args[0] == "关闭自动签到": | ||
# msg = await self._process_auto_sign(user_id, uid, offset, message.chat_id, "关闭") | ||
# if msg: | ||
# self.log_user(update, logger.info, "云游戏自动签到命令请求 || 参数 %s", args[0]) | ||
# reply_message = await message.reply_text(msg) | ||
# if filters.ChatType.GROUPS.filter(message): | ||
# self.add_delete_message_job(reply_message, delay=30) | ||
# self.add_delete_message_job(message, delay=30) | ||
# return | ||
self.log_user(update, logger.info, "云游戏每日签到命令请求") | ||
if filters.ChatType.GROUPS.filter(message): | ||
self.add_delete_message_job(message) | ||
async with self.cloud_game_helper.client(user_id, player_id=uid, offset=offset) as client: | ||
if client.region is Region.OVERSEAS: | ||
reply_message = await message.reply_text("云游戏相关功能仅支持国服") | ||
if filters.ChatType.GROUPS.filter(reply_message): | ||
self.add_delete_message_job(message) | ||
self.add_delete_message_job(reply_message) | ||
return | ||
await message.reply_chat_action(ChatAction.TYPING) | ||
sign_text = await self.cloud_game_helper.start_sign(client) | ||
reply_message = await message.reply_text(sign_text) | ||
if filters.ChatType.GROUPS.filter(reply_message): | ||
self.add_delete_message_job(reply_message) | ||
|
||
@handler.command(command="cloud_game_wallet", cookie=True, block=False) | ||
async def command_start_wallet(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: | ||
user_id = await self.get_real_user_id(update) | ||
uid, offset = self.get_real_uid_or_offset(update) | ||
message = update.effective_message | ||
self.log_user(update, logger.info, "云游戏查询钱包命令请求") | ||
async with self.cloud_game_helper.client(user_id, player_id=uid, offset=offset) as client: | ||
if client.region is Region.OVERSEAS: | ||
reply_message = await message.reply_text("云游戏相关功能仅支持国服") | ||
if filters.ChatType.GROUPS.filter(reply_message): | ||
self.add_delete_message_job(message) | ||
self.add_delete_message_job(reply_message) | ||
return | ||
await message.reply_chat_action(ChatAction.TYPING) | ||
wallet = await self.cloud_game_helper.get_wallet(client) | ||
today = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) | ||
free_time = wallet.free_time.free_time | ||
pay_time = int(wallet.coin.coin_num / wallet.coin.exchange) | ||
total_time = wallet.total_time | ||
card = wallet.play_card.msg | ||
text = ( | ||
"#### 云游戏钱包 ####\n" | ||
f"时间:{today} (UTC+8)\n" | ||
f"免费时长:{free_time} 分钟\n" | ||
f"付费时长:{pay_time} 分钟\n" | ||
f"畅玩卡:{card}\n" | ||
f"总时长:{total_time} 分钟" | ||
) | ||
await message.reply_text(text) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
import time | ||
from contextlib import asynccontextmanager | ||
from typing import Optional, List, TYPE_CHECKING | ||
|
||
from simnet.errors import BadRequest as SimnetBadRequest, InvalidCookies, AlreadyClaimed, TimedOut as SimnetTimedOut | ||
from simnet.models.cloud_game.base import CloudGameWallet | ||
from sqlalchemy.orm.exc import StaleDataError | ||
from telegram.constants import ParseMode | ||
from telegram.error import BadRequest, Forbidden | ||
|
||
from gram_core.basemodel import RegionEnum | ||
from gram_core.plugin import Plugin | ||
from gram_core.services.task.models import TaskStatusEnum | ||
from gram_core.services.task.services import TaskCardServices | ||
from plugins.tools.genshin import GenshinHelper, CookiesUpdateRequestError, PlayerNotFoundError, CookiesNotFoundError | ||
from plugins.tools.sign import SignJobType | ||
from utils.log import logger | ||
|
||
if TYPE_CHECKING: | ||
from simnet import GenshinClient | ||
from telegram.ext import ContextTypes | ||
|
||
|
||
class CloudGameHelper(Plugin): | ||
def __init__(self, genshin_helper: GenshinHelper, sign_service: TaskCardServices): | ||
self.genshin_helper = genshin_helper | ||
self.sign_service = sign_service | ||
|
||
@asynccontextmanager | ||
async def client( # skipcq: PY-R1000 # | ||
self, user_id: int, region: Optional[RegionEnum] = None, player_id: int = None, offset: int = 0 | ||
) -> "GenshinClient": | ||
async with self.genshin_helper.genshin(user_id, region, player_id, offset) as client: | ||
client: "GenshinClient" | ||
try: | ||
yield client | ||
except InvalidCookies as exc: | ||
stoken = client.cookies.get("stoken") | ||
combo_token = None | ||
if stoken is not None: | ||
try: | ||
game_token = await client.get_game_token_by_stoken() | ||
logger.success("用户 %s 获取 game_token 成功", user_id) | ||
login_cookie = await client.login_game_by_game_token(game_token) | ||
logger.success("用户 %s 刷新 combo_token 成功", user_id) | ||
combo_token = login_cookie.combo_token | ||
except Exception as _exc: | ||
logger.error("用户 %s 刷新 combo_token 失败", user_id, exc_info=_exc) | ||
if combo_token: | ||
raise CookiesUpdateRequestError({client.cloud_game_combo_token_key: combo_token}) | ||
raise exc | ||
|
||
@staticmethod | ||
async def clear_notification(client: "GenshinClient"): | ||
no = await client.get_cloud_game_notifications() | ||
data_list = no.get("list") | ||
if not data_list: | ||
return | ||
for data in data_list: | ||
n_id = data.get("id") | ||
if not n_id: | ||
continue | ||
await client.ask_cloud_game_notifications(n_id) | ||
|
||
@staticmethod | ||
async def get_wallet(client: "GenshinClient") -> CloudGameWallet: | ||
await client.check_cloud_game_token() | ||
return await client.get_cloud_game_wallet() | ||
|
||
async def start_sign( | ||
self, | ||
client: "GenshinClient", | ||
is_raise: bool = False, | ||
title: Optional[str] = "云游戏签到结果", | ||
) -> str: | ||
try: | ||
wallet = await self.get_wallet(client) | ||
except SimnetBadRequest as error: | ||
logger.warning("UID[%s] 获取云游戏钱包信息失败,API返回信息为 %s", client.player_id, str(error)) | ||
if is_raise: | ||
raise error | ||
return f"获取云游戏钱包信息失败,API返回信息为 {str(error)}" | ||
try: | ||
await self.clear_notification(client) | ||
except Exception: | ||
logger.warning("UID[%s] 清空云游戏通知失败", client.player_id) | ||
send_free_time = wallet.free_time.send_freetime | ||
have_free_time = wallet.free_time.free_time | ||
if send_free_time: | ||
logger.success("UID[%s] 云游戏签到成功", client.player_id) | ||
result = f"获得 {send_free_time} 分钟免费时长" | ||
else: | ||
logger.info("UID[%s] 云游戏已经签到", client.player_id) | ||
result = "今天旅行者云游戏已经签到过了~" | ||
if is_raise: | ||
raise AlreadyClaimed | ||
today = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) | ||
message = ( | ||
f"#### {title} ####\n" | ||
f"时间:{today} (UTC+8)\n" | ||
f"Account: {client.account_id}\n" | ||
f"免费时长:{have_free_time} 分钟\n" | ||
f"签到结果: {result}" | ||
) | ||
return message | ||
|
||
async def do_sign_job(self, context: "ContextTypes.DEFAULT_TYPE", job_type: SignJobType): # skipcq: PY-R1000 # | ||
include_status: List[TaskStatusEnum] = [ | ||
TaskStatusEnum.STATUS_SUCCESS, | ||
TaskStatusEnum.ALREADY_CLAIMED, | ||
TaskStatusEnum.TIMEOUT_ERROR, | ||
TaskStatusEnum.NEED_CHALLENGE, | ||
TaskStatusEnum.GENSHIN_EXCEPTION, | ||
] | ||
if job_type == SignJobType.START: | ||
title = "自动签到" | ||
elif job_type == SignJobType.REDO: | ||
title = "自动重新签到" | ||
include_status.remove(TaskStatusEnum.STATUS_SUCCESS) | ||
else: | ||
raise ValueError | ||
sign_list = await self.sign_service.get_all() | ||
for sign_db in sign_list: | ||
if sign_db.status not in include_status: | ||
continue | ||
user_id = sign_db.user_id | ||
try: | ||
async with self.client(user_id) as client: | ||
text = await self.start_sign(client, is_raise=True, title=title) | ||
except InvalidCookies: | ||
text = "云游戏自动签到执行失败,Cookie无效" | ||
sign_db.status = TaskStatusEnum.INVALID_COOKIES | ||
except AlreadyClaimed: | ||
text = "今天旅行者云游戏已经签到过了~" | ||
sign_db.status = TaskStatusEnum.ALREADY_CLAIMED | ||
except SimnetBadRequest as exc: | ||
text = f"云游戏自动签到执行失败,API返回信息为 {str(exc)}" | ||
sign_db.status = TaskStatusEnum.GENSHIN_EXCEPTION | ||
except SimnetTimedOut: | ||
text = "云游戏签到失败了呜呜呜 ~ 服务器连接超时 服务器熟啦 ~ " | ||
sign_db.status = TaskStatusEnum.TIMEOUT_ERROR | ||
except PlayerNotFoundError: | ||
logger.info("用户 user_id[%s] 玩家不存在 关闭并移除云游戏自动签到", user_id) | ||
await self.sign_service.remove(sign_db) | ||
continue | ||
except CookiesNotFoundError: | ||
logger.info("用户 user_id[%s] cookie 不存在 关闭并移除云游戏自动签到", user_id) | ||
await self.sign_service.remove(sign_db) | ||
continue | ||
except Exception as exc: | ||
logger.error("执行云游戏自动签到时发生错误 user_id[%s]", user_id, exc_info=exc) | ||
text = "签到失败了呜呜呜 ~ 执行云游戏自动签到时发生错误" | ||
else: | ||
sign_db.status = TaskStatusEnum.STATUS_SUCCESS | ||
if sign_db.chat_id < 0: | ||
text = f'<a href="tg://user?id={sign_db.user_id}">NOTICE {sign_db.user_id}</a>\n\n{text}' | ||
try: | ||
await context.bot.send_message(sign_db.chat_id, text, parse_mode=ParseMode.HTML) | ||
except BadRequest as exc: | ||
logger.error("执行云游戏自动签到时发生错误 user_id[%s] Message[%s]", user_id, exc.message) | ||
sign_db.status = TaskStatusEnum.BAD_REQUEST | ||
except Forbidden as exc: | ||
logger.error("执行云游戏自动签到时发生错误 user_id[%s] message[%s]", user_id, exc.message) | ||
sign_db.status = TaskStatusEnum.FORBIDDEN | ||
except Exception as exc: | ||
logger.error("执行云游戏自动签到时发生错误 user_id[%s]", user_id, exc_info=exc) | ||
continue | ||
else: | ||
if sign_db.status not in include_status: | ||
sign_db.status = TaskStatusEnum.STATUS_SUCCESS | ||
try: | ||
await self.sign_service.update(sign_db) | ||
except StaleDataError: | ||
logger.warning("用户 user_id[%s] 云游戏自动签到数据过期,跳过更新数据", user_id) |
Oops, something went wrong.