给这个项目配一个网页让他能在手机上方便操作,甚至加入多人多任务同时使用或是自动排队使用的模式 #288
Closed
2061360308
started this conversation in
Ideas
Replies: 3 comments 6 replies
-
1,支持 不会 |
Beta Was this translation helpful? Give feedback.
6 replies
-
改成这个样子能接受吗@Cm1316 @Samueli924 对了这个写完后应该可以支持协程多进程并发的,到时候新的CIL界面想着还可以开一个多线程,然后在后台协程刷(就算是命令行体验感应该也不差)具体行不行还不知道 ,^_^
from __future__ import annotations
import asyncio
# 解决Python 的类型注解时两个类相互引用问题,(在 Python 3.7 及以上版本可用)来解决这个问题。
# 这个导入会让 Python解释器把所有的类型注解当作字符串处理,而不是立即解析它们。这样就可以在类定义之前引用这个类。
import json
import random
import re
import time
from hashlib import md5
from pprint import pprint
from typing import List, Optional
from urllib.parse import urlencode
import requests
from Crypto.Cipher import AES
from base64 import b64encode
from natsort import natsorted
from requests.utils import dict_from_cookiejar # 将 http.cookiejar.CookieJar 或 http.cookiejar.Cookie 对象转换为一个字典。
import secrets
class LoginStatusException(Exception):
def __init__(self, message=None):
if message is None:
message = "登录状态异常, 这可能是因为未进行登录而试图访问登录后才可用的信息引起的,请确保你已正确登录!"
super().__init__(message)
class ChaoxingBase:
"""学习通数据类型的基类
Attribute:
- parent: 这些数据所属的账户对象
"""
parent: Account
def __init__(self, parent):
self.__parent = parent
self.attributes = []
@property
def parent(self):
return self.parent
def __getattr__(self, name):
if name in self.attributes:
return object.__getattribute__(self, '_' + self.__class__.__name__ + '__' + name)
else:
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
class Attachment(ChaoxingBase):
"""学习通课程知识点的附件
Attribute:
- knowledge:
- fid:
- cpi:
- param_raw:
- name:
- type:
"""
knowledge: Knowledge
fid: str
cpi: str
param_raw: dict
name: str
type: str
object_id: str
def __init__(self, knowledge, fid, cpi, param_json):
super().__init__(knowledge.parent)
self.__knowledge = knowledge
self.attributes = ['knowledge', 'fid', 'cpi', 'name', 'type', 'object_id', 'otherInfo', 'param_raw']
self.__param_raw = param_json
self.__fid = fid
self.__cpi = cpi
self.__name = param_json['property']['name']
self.__type = param_json['property']['type']
self.__object_id = param_json['property']['objectid']
self.__other_info = param_json['otherInfo']
class Video(Attachment):
"""学习通课程知识点的视频附件
Attribute:
-
"""
is_passed: bool
job_id: bool
video_raw: dict
d_type: str
length: int
duration: int
d_token: str
def __init__(self, knowledge, fid, cpi, param_json):
super().__init__(knowledge, fid, cpi, param_json)
self.attributes.extend(['is_passed', 'job_id', 'video_raw', 'd_type', 'length', 'duration', 'd_token'])
# 获取视频的专属信息
self.__is_passed = param_json.get("isPassed")
self.__job_id = param_json["jobid"]
self.__video_raw: dict | None = None
self.__d_type = 'Video'
if 'audio' in param_json['property']['module']:
self.__d_type = 'Audio'
# 或许视频的详细信息
self.__length: int | None = None
self.__duration: int | None = None
self.__d_token: str | None = None
self.get_video()
def get_video(self):
self.__video_raw = {}
url = 'https://mooc1-api.chaoxing.com/ananas/status/{}'.format(self.object_id)
params = {
'k': self.fid,
'flag': 'normal',
'_dc': int(round(time.time() * 1000))
}
result = self.parent.session.get(url, params=params)
try:
result = result.json()
self.__video_raw = result
except:
print("出现JSONDecoder异常")
self.__video_raw = None
self.__length = result["length"]
self.__duration = result['duration']
self.__d_token = result['dtoken']
class Knowledge(ChaoxingBase):
"""学习通课程的知识点
Attribute:
- chapter
- id
- name
- description
- num
- video
- attachment
- param_raw
"""
chapter: Chapter
id: int
name: str
description: str
num: int
video: List[Optional[Video]] | None
attachment: List[Optional[Attachment]] | None
param_raw: dict
attachment_raw: dict
def __init__(self, chapter, knowledge_id, param_json):
super().__init__(chapter.parent)
self.__chapter = chapter # 所属章节
self.attributes = ['chapter', 'id', 'name', 'description', 'num', 'video',
'attachment', 'param_raw', 'attachment_raw']
self.__param_raw = param_json
self.__id = knowledge_id
self.__name = param_json["title"]
self.__description = param_json["description"]
self.__num = param_json["num"]
self.__video: List[Optional[Video]] | None = None
self.__attachment: List[Optional[Attachment]] | None = None
def __get_attachment(self):
self.__attachment = []
self.__video = []
url = 'https://mooc1-api.chaoxing.com/knowledge/cards'
params = {
'clazzid': self.chapter.course.key,
'courseid': self.chapter.course.id,
'knowledgeid': self.chapter.id,
'num': self.num,
'isPhone': 1,
'control': True,
}
html_text = self.session.get(url, params=params).text
result = {}
# Python 3.8 引入的赋值表达式(也被称为 "海象操作符"), 正则返回的 Match 对象 或 None 被赋值给res
# 然后判断res不为None的情况下使用json解析并返回
if res := re.search(
r'window\.AttachmentSetting =({.*"hiddenConfig":false,.*"attachments":.*})',
html_text):
result = json.loads(res[1])
self.__attachment_raw = result # 原始数据
fid = result['defaults']['fid']
cpi = result['defaults']['cpi']
for attachment in result['attachments']:
self.__attachment.append(Attachment(self, fid, cpi, attachment))
if attachment.get('type') == 'video':
self.__video.append(Video(self, fid, cpi, attachment))
@property
def attachment(self):
if self.__attachment is None:
self.__get_attachment()
return self.__attachment
@property
def video(self):
if self.__attachment is None:
self.__get_attachment()
return self.__video
class Chapter(ChaoxingBase):
"""学习通课程的章节
Attribute:
- course:
- id:
- name:
- label:
- layer:
- status:
- param_raw:
- knowledge_raw:
"""
course: Course
id: int
name: str
label: str
layer: str
status: str
param_raw: dict
knowledge_raw: dict
def __init__(self, course, chapter_id, param_json):
super().__init__(course.parent)
# 属性列表
self.attributes = ['course', 'id', 'name', 'label', 'layer', 'status', 'param_raw', 'knowledge_raw']
self.__course = course # 所属课程
self.__param_raw = param_json # 原始数据
self.__id: int = chapter_id
self.__name: str = param_json["name"]
self.__label: str = param_json["label"]
self.__layer: str = param_json["layer"]
self.__status: str = param_json["status"]
self.__knowledge: List[Optional[Knowledge]] | None = None
def __get_knowledge(self):
"""
获取知识点相关信息
:param chapter_id: 知识点id
:param course_key: 所属课程key
:return:
"""
self.__knowledge = []
def get_enc_time():
m_time = str(int(time.time() * 1000))
m_token = '4faa8662c59590c6f43ae9fe5b002b42'
m_encrypt_str = 'token=' + m_token + '&_time=' + m_time + '&DESKey=Z(AfY@XS'
m_inf_enc = md5(m_encrypt_str.encode('utf-8')).hexdigest()
return m_time, m_inf_enc
url = 'https://mooc1-api.chaoxing.com/gas/knowledge'
enc = get_enc_time()
params = {
'id': self.id,
'courseid': self.course.key,
'fields':
'id,parentnodeid,indexorder,label,layer,name,begintime,createtime,lastmodifytime,status,jobUnfinishedCount,clickcount,openlock,card.fields(id,knowledgeid,title,knowledgeTitile,description,cardorder).contentcard(all)',
'view': 'json',
'token': "4faa8662c59590c6f43ae9fe5b002b42",
'_time': enc[0],
'inf_enc': enc[1]
}
result = self.session.get(url, params=params).json()
result = result["data"][0]
# 后续请求需要一个num, 这里补充一下
for num in range(len(result["card"]["data"])):
result["card"]["data"][num]["num"] = num
self.__knowledge_raw = result
for knowledge in result["card"]["data"]:
self.__knowledge.append(Knowledge(self, knowledge['id'], knowledge))
@property
def knowledge(self):
if self.__knowledge is None:
self.__get_knowledge()
return self.__knowledge
class Course(ChaoxingBase):
"""学习通课程
学习通课程内容包装的类
Attribute:
- key:
- id:
- name:
- cover:
- is_start:
- teacher_name:
- school_name:
- student_count:
- chapter:
- param_raw:
- chapter_raw:
"""
key: str
id: int
name: str
cover: str
is_start: bool
teacher_name: str
school_name: str
student_count: int
chapter: List[Optional[Chapter]] | None
param_raw: dict
chapter_raw: dict
def __init__(self, parent, key, param_json):
super().__init__(parent)
# 属性列表
self.attributes = ['key', 'id', 'name', 'cover', 'is_start', 'teacher_name', 'school_name',
'student_count', 'param_raw', 'chapter_raw']
self.__key: str = key # 课程key
self.__param_raw: dict = param_json # course的原始数据
self.__student_count: int = param_json["content"]["studentcount"]
self.__is_start: bool = param_json["content"]["isstart"]
self.__teacher_name: str = param_json["content"]["course"]["data"][0]["teacherfactor"]
self.__school_name: str = param_json["content"]["course"]["data"][0]["schools"]
self.__cover: str = param_json["content"]["course"]["data"][0]["imageurl"]
self.__name: str = param_json["content"]["course"]["data"][0]["name"]
self.__id: int = param_json["content"]["course"]["data"][0]["id"]
self.__chapter: List[Optional[Chapter]] | None = None # 章节
def __get_chapter(self):
self.__chapter = [] # 无论是否获取成功都是list, 未获取前状态是none
def sort_chapter_by_label(chapter_list):
"""
根据章节的label(章节前的序号,如2.3)进行自然排序(label是个字符串,需要自然排序)
1. 创建一个空字典 data。
2. 遍历 chapter_list 列表中的每个 chapter 字典。
3. 对于每个 chapter字典,将 'label' 键的值作为 data 字典的键,将整个 chapter字典 作为对应的值。
4. 使用 natsorted 函数对 data 字典的键进行自然排序,得到一个排序后的键列表 keys_sorted。
5. 使用列表推导式,按照 keys_sorted 中的顺序,从 data 字典中取出对应的 chapter字典,得到一个排序后的 chapter_list 列表。
:param chapter_list: [{"label":, ... },{"label":, ... }, ...]
:return:
"""
data = {}
for chapter in chapter_list:
data[chapter['label']] = chapter
keys_sorted = natsorted(data.keys())
return [data[label] for label in keys_sorted]
url = 'https://mooc1-api.chaoxing.com/gas/clazz'
params = {
'id': self.key,
'fields':
'id,bbsid,classscore,isstart,allowdownload,chatid,name,state,isthirdaq,isfiled,information,discuss,'
'visiblescore,begindate,coursesetting.fields(id,courseid,hiddencoursecover,hiddenwrongset,'
'coursefacecheck),course.fields(id,name,infocontent,objectid,app,bulletformat,mappingcourseid,imageurl,'
'teacherfactor,knowledge.fields(id,name,indexOrder,parentnodeid,status,layer,label,begintime,endtime,'
'attachment.fields(id,type,objectid,extension).type(video)))',
'view': 'json'
}
result = self.parent.session.get(url, params=params).json()
self.__chapter_raw = result # chapter的原始数据
chapter_list = sort_chapter_by_label(result["knowledge"]["data"]) # 对"knowledge" 元素 按章节序号排序序号
for chapter in chapter_list:
self.__chapter.append(Chapter(self, chapter["id"], chapter))
@property
def chapter(self):
if self.__chapter is None:
self.__get_chapter()
return self.__chapter
class ChaoxingApi:
"""api
使用这个类管理账号和视频任务
"""
def __init__(self):
self.__accounts: List[Optional[Account]] = {} # 账户
self.__task = [] # 总任务列表
self.__current_task = [] # 当前任务列表
self.max_works = 2 # 同时工作的最大数目, 默认同时给俩个人刷
def add_account(self, phone, password):
def generate_account_id():
while True:
account_id = random.randint(100, 999)
if account_id not in list(self.__accounts.keys()):
return account_id
account = Account(self)
if account.login(phone, password):
self.__accounts[generate_account_id()] = account
return True
else:
return False
@property
def accounts(self):
"""
所有登录的账户
:return: dict
"""
return self.__accounts
@staticmethod
async def pass_video(task: AccountTask):
def get_enc(clazzId, jobid, objectId, playingTime, duration, userid):
# https://github.com/ZhyMC/chaoxing-xuexitong-autoflush/blob/445c8d8a8cc63472dd90cdf2a6ab28542c56d93b/logger.js
return md5(
f"[{clazzId}][{userid}][{jobid}][{objectId}][{playingTime * 1000}][d_yHJ!$pdA~5][{duration * 1000}][0_{duration}]"
.encode()).hexdigest()
while True:
video = task.currentVideo
account = task.account
url = 'https://mooc1-api.chaoxing.com/multimedia/log/a/{}/{}'.format(
video.cpi, video.d_token)
params = {
'otherInfo':
video.other_info,
'playingTime':
str(task.playing_time),
'duration':
str(video.duration),
# 'akid': None,
'jobid':
video.job_id,
'clipTime':
'0_{}'.format(video.duration),
'clazzId':
str(video.knowledge.chapter.course.key),
'objectId':
video.object_id,
'userid':
video.parent.uid,
'isdrag':
'0',
'enc':
get_enc(video.knowledge.chapter.course.key, video.job_id, video.object_id, task.playing_time,
video.duration,
video.parent.uid),
'rt':
'0.9', # 'rt': '1.0', ??
# 'dtype': 'Video', 音频文件为Audio
'dtype':
video.d_type,
'view':
'pc',
'_t':
str(int(round(time.time() * 1000)))
}
params_str = urlencode(params)
# print:(url+params)
tmp_response = account.session.get(url, params=params)
try:
result = tmp_response.json()
if result.get('isPassed'):
task.change_state(True)
task.playing_time += 1 # Todo 变速
await asyncio.sleep(1)
except Exception:
result = {
'error': {
'status_code': tmp_response.status_code,
'text': tmp_response.text
}
}
task.change_state(False, result)
def task_quit(self, account_id):
"""
任务退出
:return:
"""
if account_id in self.__current_task:
self.__current_task.remove(account_id)
# Todo 处理没有任务的情况
# 从任务列表中拿出第一个放入当前任务列表
new_task_id = self.__task[0]
self.__current_task.append(new_task_id)
self.__task.pop(0)
# 开始刷这个视频
self.pass_video(self.accounts[new_task_id].api)
class AccountTask:
"""账户的任务管理类
"""
# 账户,api实例
__parent: Account
__api: ChaoxingApi
# 记录当前状态
task_course: List[Optional[Course]] | None = None
current_course_index = 0
current_course: Course | None = None
current_chapter: Chapter | None = None
current_chapter_index = 0
current_knowledge: Knowledge | None = None
current_knowledge_index = None
# 当前播放视频对象和播放进度
__currentVideo: Video | None = None
__current_video_index = 0
__playing_time = 0
def __init__(self, parent, api):
self.__parent = parent
self.__api = api
self.task = AccountTask(self, self.__api)
@property
def parent(self):
return self.__parent
@property
def api(self):
return self.__api
@property
def currentVideo(self):
return self.__currentVideo
@property
def account(self):
return self.__parent
def change_state(self, state, message: dict = {}):
if state:
self.next()
else:
# Todo 错误记录,重试
self.next()
@property
def playing_time(self):
return self.__playing_time
@playing_time.setter
def playing_time(self, value):
self.__playing_time = value
def next(self):
self.playing_time = 0 # 播放进度归零
# Todo 太繁琐了,有机会改进一下函数
# 判断当前Knowledge视频是否播放完毕
self.__current_video_index += 1
if self.__current_video_index < len(self.current_knowledge.video):
self.__currentVideo = self.current_knowledge.video[self.__current_video_index]
return
# 当前章节Knowledge是否播放完毕
self.current_knowledge_index += 1
if self.current_knowledge_index < len(self.current_chapter.knowledge):
self.current_knowledge = self.current_chapter.knowledge[self.current_knowledge_index]
self.__current_video_index = 0
self.__currentVideo = self.current_knowledge.video[self.__current_video_index]
return
# 当前课程章节是否播放完毕
self.current_chapter_index += 1
if self.current_chapter_index < len(self.current_course.chapter):
self.current_chapter = self.current_course.chapter[self.current_chapter_index]
self.current_knowledge_index = 0
self.current_knowledge = self.current_chapter.knowledge[self.current_knowledge_index]
self.__current_video_index = 0
self.__currentVideo = self.current_knowledge.video[self.__current_video_index]
return
# 当前课程是否播放完毕
self.current_course_index += 1
if self.current_course_index < len(self.task_course):
self.current_course = self.task_course[self.current_course_index]
self.current_chapter_index = 0
self.current_chapter = self.current_course.chapter[self.current_chapter_index]
self.current_knowledge_index = 0
self.current_knowledge = self.current_chapter.knowledge[self.current_knowledge_index]
self.__current_video_index = 0
self.__currentVideo = self.current_knowledge.video[self.__current_video_index]
return
# 账户播放完毕,通知api将该账户移出任务列表
self.api.
class Account:
uid: str # cookies['_uid']
cookies: dict # cookies
__isLogin: bool = False # 登录状态
__courses: dict | None = None # 课程信息 {id:{具体参数信息}, ...}
__currentCourses: int | None = None # 当前课程
__task: list = [] # 任务列表
speed: int = 1 # 播放速度
def __init__(self, api):
self.api = api
def __init_explorer(self):
"""
初始化会话
:return: None
"""
self.session = requests.session()
self.session.headers = {
'User-Agent':
f'Dalvik/2.1.0 (Linux; U; Android {random.randint(9, 12)}; MI{random.randint(10, 12)} Build/SKQ1.210216.001) (device:MI{random.randint(10, 12)}) Language/zh_CN com.chaoxing.mobile/ChaoXingStudy_3_5.1.4_android_phone_614_74 (@Kalimdor)_{secrets.token_hex(16)}',
'X-Requested-With': 'com.chaoxing.mobile'
}
def login(self, phone, password):
"""
登录, success: True, fail: False
:return: bool
"""
"""
Explain:
pkcs7padding 函数用于对明文进行 PKCS7 填充。PKCS7 是一种常用的填充方式,
主要用于将数据块填充到特定的字节长度。在这个函数中,首先计算出需要添加的填充大小,
然后创建一个由相应数量的填充字符组成的字符串,最后将这个填充字符串添加到原始文本的末尾。
encryptByAES 函数用于进行 AES 加密。AES 是一种常用的对称加密算法,
需要一个密钥和一个初始化向量(IV)。在这个函数中,首先定义了密钥和 IV,
然后创建了一个新的 AES 对象。接着,使用 pkcs7padding 函数处理明文,
然后使用 AES 对象对处理后的明文进行加密。最后,将加密后的字节重新编码为 UTF-8 字符串,并返回。
这段代码的主要目的是对输入的消息进行 AES 加密,并返回加密后的结果
"""
def pkcs7padding(text):
"""
明文使用PKCS7填充
计算出需要添加的填充大小,然后创建一个由相应数量的填充字符组成的字符串,最后将这个填充字符串添加到原始文本的末尾。
这个函数只是实现了 PKCS7 填充,而没有进行任何加密操作
"""
bs = 16
length = len(text)
bytes_length = len(text.encode('utf-8'))
padding_size = length if (bytes_length == length) else bytes_length
padding = bs - padding_size % bs
padding_text = chr(padding) * padding
# coding = chr(padding) # Todo 没看懂干嘛的,好像别的地方没用到就先注释了
return text + padding_text
def encryptByAES(message):
"""
AES加密函数。它将接收一个消息作为输入,并返回加密的结果
:param message: str
:return: str(utf-8)
"""
keyword = "u2oh6Vu^HWe4_AES"
key = keyword.encode('utf-8')
iv = keyword.encode('utf-8')
cipher = AES.new(key, AES.MODE_CBC, iv)
# 处理明文
content_padding = pkcs7padding(message)
# 加密
encrypt_bytes = cipher.encrypt(content_padding.encode('utf-8'))
# 重新编码
result = str(b64encode(encrypt_bytes), encoding='utf-8')
return result
self.__init_explorer()
pprint(phone)
pprint(password)
url = "https://passport2.chaoxing.com/fanyalogin"
data = {
"fid": "-1",
"uname": encryptByAES(phone),
"password": encryptByAES(password),
"t": "true",
"refer": "http%3A%2F%2Fi.chaoxing.com",
"forbidotherlogin": "0",
"validate": "",
"doubleFactorLogin": "0",
"independentId": "0"
}
resp = self.session.post(url, data=data)
pprint(resp.json())
if resp.json()["status"]:
self.uid = resp.cookies['_uid']
self.cookies = dict_from_cookiejar(resp.cookies) # 的到cookie字典
self.__isLogin = True
return True
else:
return False
def __get_courses(self):
"""
获取所有课程
:return: None
"""
self.__courses = {} # 无论成功与否先赋值,表示已获取过(None是未获取过)
url = 'https://mooc1-api.chaoxing.com/mycourse/backclazzdata?view=json&mcode='
courses = self.session.get(url).json()
if courses["result"] == 1: # 假如返回值为1
__temp = courses["channelList"]
for course in __temp: # 删除所有的自建课程
if "course" not in course['content']:
__temp.remove(course)
courses = __temp
# pprint(courses)
else:
return
for course_item in courses:
key = course_item["key"]
self.__courses[key] = Course(self, key, course_item)
@property
def task(self):
return self.__task
def add_task(self, course_key):
"""
添加一个课程到任务列表
:param course_key: course列表中的给出的key
:return: bool
"""
course_key = str(course_key) # 一律转为字符串,避免类型出错
if course_key in self.courses.keys():
self.__task.append(course_key)
return True
else:
return False
@property
def courses(self):
"""
所有课程
:return: list
"""
if self.__courses is None: # 没有获取过课程
if self.is_login: # 判断登录状态
self.__get_courses()
return self.__courses
else:
raise LoginStatusException
else:
return self.__courses
@property
def is_login(self):
"""
登录状态, Logged in : True
:return: bool
"""
return self.__isLogin |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
大家觉得我用fastapi简单写一点接口,然后再用vue写一个适配的前端怎么样?
在开始前我有以下疑问:
欢迎大家畅所欲言,给我一些想法
Beta Was this translation helpful? Give feedback.
All reactions