Skip to content
This repository was archived by the owner on Dec 25, 2023. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 66 additions & 28 deletions videoHelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import requests
import re
import json
import threading

# 以下的csrftoken和sessionid需要改成自己登录后的cookie中对应的字段!!!!而且脚本需在登录雨课堂状态下使用
# 登录上雨课堂,然后按F12-->选Application-->找到雨课堂的cookies,寻找csrftoken、sessionid、university_id字段,并复制到下面两行即可
Expand Down Expand Up @@ -42,11 +43,22 @@
def one_video_watcher(video_id, video_name, cid, user_id, classroomid, skuid):
video_id = str(video_id)
classroomid = str(classroomid)
url = url_root + "video-log/heartbeat/"
get_url = url_root + "video-log/get_video_watch_progress/?cid=" + str(
cid) + "&user_id=" + user_id + "&classroom_id=" + classroomid + "&video_type=video&vtype=rate&video_id=" + str(
video_id) + "&snapshot=1&term=latest&uv_id=" + university_id + ""
progress = requests.get(url=get_url, headers=headers)
url = f'{url_root}video-log/heartbeat/'
params = {
'cid': str(cid),
'user_id': user_id,
'classroom_id': classroomid,
'video_type': 'video',
'vtype': 'rate',
'video_id': str(video_id),
'snapshot': '1',
'term': 'latest',
'uv_id': university_id
}
params_str = '&'.join([f'{k}={v}' for k, v in params.items()])

get_info_url = f'{url_root}video-log/get_video_watch_progress/?{params_str}'
progress = requests.get(url=get_info_url, headers=headers)
if_completed = '0'
try:
if_completed = re.search(r'"completed":(.+?),', progress.text).group(1)
Expand Down Expand Up @@ -116,13 +128,13 @@ def one_video_watcher(video_id, video_name, cid, user_id, classroomid, skuid):
except:
pass
try:
progress = requests.get(url=get_url, headers=headers)
progress = requests.get(url=get_info_url, headers=headers)
res_rate = json.loads(progress.text)
tmp_rate = res_rate["data"][video_id]["rate"]
if tmp_rate is None:
return 0
val = str(tmp_rate)
print("学习进度为:\t" + str(float(val) * 100) + "%/100%")
print(video_name+"学习进度为:\t" + str(float(val) * 100) + "%/100%")
time.sleep(2)
except Exception as e:
print(e.__str__())
Expand All @@ -132,9 +144,15 @@ def one_video_watcher(video_id, video_name, cid, user_id, classroomid, skuid):


def get_videos_ids(course_name, classroom_id, course_sign):
get_homework_ids = url_root + "mooc-api/v1/lms/learn/course/chapter?cid=" + str(
classroom_id) + "&term=latest&uv_id=" + university_id + "&sign=" + course_sign
homework_ids_response = requests.get(url=get_homework_ids, headers=headers)
params = {
'cid': str(classroom_id),
'term': 'latest',
'uv_id': university_id,
'sign': course_sign
}
params_str = '&'.join([f'{k}={v}' for k, v in params.items()])
get_homework_ids_url = f'{url_root}mooc-api/v1/lms/learn/course/chapter?{params_str}'
homework_ids_response = requests.get(url=get_homework_ids_url, headers=headers)
homework_json = json.loads(homework_ids_response.text)
homework_dic = {}
try:
Expand All @@ -155,8 +173,26 @@ def get_videos_ids(course_name, classroom_id, course_sign):
raise Exception("fail while getting homework_ids!!! please re-run this program!")


def watch_videos(videos_id_name_dic, course_id, user_id, classroom_id, sku_id):
for one_video in videos_id_name_dic.items():
one_video_watcher(one_video[0], one_video[1], course_id, user_id, classroom_id, sku_id)


def multiple_watch_video(videos_id_name_dic, course_id, user_id, classroom_id, sku_id, num_workers=4):
parts = list(videos_id_name_dic.items())
threads = []
for i in range(num_workers):
part = parts[i::num_workers]
thread = threading.Thread(target=watch_videos, args=({t[0]: t[1] for t in part[:]}, course_id, user_id, classroom_id, sku_id))
thread.start()
threads.append(thread)

for thread in threads:
thread.join()


if __name__ == "__main__":
your_courses = []
courses = []

# 首先要获取用户的个人ID,即user_id,该值在查询用户的视频进度时需要使用
user_id_url = url_root + "edu_admin/check_user_session/"
Expand All @@ -168,12 +204,12 @@ def get_videos_ids(course_name, classroom_id, course_sign):
raise Exception("也许是网路问题,获取不了user_id,请试着重新运行!!! please re-run this program!")

# 然后要获取教室id
get_classroom_id = url_root + "mooc-api/v1/lms/user/user-courses/?status=1&page=1&no_page=1&term=latest&uv_id=" + university_id + ""
submit_url = url_root + "mooc-api/v1/lms/exercise/problem_apply/?term=latest&uv_id=" + university_id + ""
get_classroom_id = f'{url_root}mooc-api/v1/lms/user/user-courses/?status=1&page=1&no_page=1&term=latest&uv_id={university_id}'
submit_url = f'{url_root}mooc-api/v1/lms/exercise/problem_apply/?term=latest&uv_id={university_id}'
classroom_id_response = requests.get(url=get_classroom_id, headers=headers)
try:
for ins in json.loads(classroom_id_response.text)["data"]["product_list"]:
your_courses.append({
courses.append({
"course_name": ins["course_name"],
"classroom_id": ins["classroom_id"],
"course_sign": ins["course_sign"],
Expand All @@ -185,32 +221,34 @@ def get_videos_ids(course_name, classroom_id, course_sign):
raise Exception("fail while getting classroom_id!!! please re-run this program!")

# 显示用户提示
for index, value in enumerate(your_courses):
for index, value in enumerate(courses):
print("编号:" + str(index + 1) + " 课名:" + str(value["course_name"]))

flag = True
while(flag):
number = input("你想刷哪门课呢?请输入编号。输入0表示全部课程都刷一遍\n")
# 输入不合法则重新输入
if not (number.isdigit()) or int(number) > len(your_courses):
if not (number.isdigit()) or int(number) > len(courses):
print("输入不合法!")
continue
elif int(number) == 0:
flag = False # 输入合法则不需要循环
# 0 表示全部刷一遍
for ins in your_courses:
homework_dic = get_videos_ids(ins["course_name"], ins["classroom_id"], ins["course_sign"])
for one_video in homework_dic.items():
one_video_watcher(one_video[0], one_video[1], ins["course_id"], user_id, ins["classroom_id"],
ins["sku_id"])
for ins in courses:
videos_id_name_dic = get_videos_ids(ins["course_name"], ins["classroom_id"], ins["course_sign"])
course_id = ins["course_id"]
classroom_id = ins["classroom_id"]
sku_id = ins["sku_id"]
multiple_watch_video(videos_id_name_dic, course_id, user_id, classroom_id, sku_id)
else:
flag = False # 输入合法则不需要循环
# 指定序号的课程刷一遍
number = int(number) - 1
homework_dic = get_videos_ids(your_courses[number]["course_name"], your_courses[number]["classroom_id"],
your_courses[number]["course_sign"])
for one_video in homework_dic.items():
one_video_watcher(one_video[0], one_video[1], your_courses[number]["course_id"], user_id,
your_courses[number]["classroom_id"],
your_courses[number]["sku_id"])
print("搞定啦")
videos_id_name_dic = get_videos_ids(courses[number]["course_name"], courses[number]["classroom_id"],
courses[number]["course_sign"])
ins = courses[number]
course_id = ins["course_id"]
classroom_id = ins["classroom_id"]
sku_id = ins["sku_id"]
multiple_watch_video(videos_id_name_dic, course_id, user_id, classroom_id, sku_id)
print("搞定啦")