-
Notifications
You must be signed in to change notification settings - Fork 359
/
Copy pathvideo_download_thread.py
218 lines (187 loc) · 7.68 KB
/
video_download_thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import os
import re
from pathlib import Path
import requests
import yt_dlp
from PyQt5.QtCore import QThread, pyqtSignal
from app.config import APPDATA_PATH
from app.core.entities import VideoInfo
from app.core.utils.logger import setup_logger
logger = setup_logger("video_download_thread")
class VideoDownloadThread(QThread):
"""视频下载线程类"""
finished = pyqtSignal(
str
) # 发送下载完成的信号(视频路径, 字幕路径, 缩略图路径, 视频信息)
progress = pyqtSignal(int, str) # 发送下载进度的信号
error = pyqtSignal(str) # 发送错误信息的信号
def __init__(self, url: str, work_dir: str):
super().__init__()
self.url = url
self.work_dir = work_dir
def run(self):
try:
video_file_path, subtitle_file_path, thumbnail_file_path, info_dict = (
self.download()
)
self.finished.emit(video_file_path)
except Exception as e:
logger.exception("下载视频失败: %s", str(e))
self.error.emit(str(e))
def progress_hook(self, d):
"""下载进度回调函数"""
if d["status"] == "downloading":
percent = d["_percent_str"]
speed = d["_speed_str"]
# 提取百分比和速度的纯文本
clean_percent = (
percent.replace("\x1b[0;94m", "")
.replace("\x1b[0m", "")
.strip()
.replace("%", "")
)
clean_speed = speed.replace("\x1b[0;32m", "").replace("\x1b[0m", "").strip()
self.progress.emit(
int(float(clean_percent)),
f"下载进度: {clean_percent}% 速度: {clean_speed}",
)
def sanitize_filename(self, name: str, replacement: str = "_") -> str:
"""清理文件名中不允许的字符"""
# 定义不允许的字符
forbidden_chars = r'<>:"/\\|?*'
# 替换不允许的字符
sanitized = re.sub(f"[{re.escape(forbidden_chars)}]", replacement, name)
# 移除控制字符
sanitized = re.sub(r"[\0-\31]", "", sanitized)
# 去除文件名末尾的空格和点
sanitized = sanitized.rstrip(" .")
# 限制文件名长度
max_length = 255
if len(sanitized) > max_length:
base, ext = os.path.splitext(sanitized)
base_max_length = max_length - len(ext)
sanitized = base[:base_max_length] + ext
# 处理Windows保留名称
windows_reserved_names = {
"CON",
"PRN",
"AUX",
"NUL",
"COM1",
"COM2",
"COM3",
"COM4",
"COM5",
"COM6",
"COM7",
"COM8",
"COM9",
"LPT1",
"LPT2",
"LPT3",
"LPT4",
"LPT5",
"LPT6",
"LPT7",
"LPT8",
"LPT9",
}
name_without_ext = os.path.splitext(sanitized)[0].upper()
if name_without_ext in windows_reserved_names:
sanitized = f"{sanitized}_"
# 如果文件名为空,返回默认名称
if not sanitized:
sanitized = "default_filename"
return sanitized
def download(self, need_subtitle: bool = True, need_thumbnail: bool = False):
"""下载视频"""
logger.info("开始下载视频: %s", self.url)
# 初始化 ydl 选项
initial_ydl_opts = {
"outtmpl": {
"default": "%(title)s.%(ext)s",
"subtitle": "【下载字幕】.%(ext)s",
"thumbnail": "thumbnail",
},
"format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best", # 优先下载mp4格式
"progress_hooks": [self.progress_hook], # 下载进度钩子
"quiet": True, # 禁用日志输出
"no_warnings": True, # 禁用警告信息
"noprogress": True,
"writeautomaticsub": need_subtitle, # 下载自动生成的字幕
"writethumbnail": need_thumbnail, # 下载缩略图
"thumbnail_format": "jpg", # 指定缩略图的格式
}
# 检查 cookies 文件
cookiefile_path = APPDATA_PATH / "cookies.txt"
if cookiefile_path.exists():
logger.info(f"使用cookiefile: {cookiefile_path}")
initial_ydl_opts["cookiefile"] = str(cookiefile_path)
with yt_dlp.YoutubeDL(initial_ydl_opts) as ydl:
# 提取视频信息(不下载)
info_dict = ydl.extract_info(self.url, download=False)
# 设置动态下载文件夹为视频标题
video_title = self.sanitize_filename(info_dict.get("title", "MyVideo"))
video_work_dir = Path(self.work_dir) / self.sanitize_filename(video_title)
subtitle_language = info_dict.get("language", None)
if subtitle_language:
subtitle_language = subtitle_language.lower().split("-")[0]
try:
subtitle_download_link = None
for l in info_dict["automatic_captions"]:
if l.startswith(subtitle_language):
subtitle_download_link = info_dict["automatic_captions"][l][-1][
"url"
]
break
except Exception:
subtitle_download_link = None
# 设置 yt-dlp 下载选项
ydl_opts = {
"paths": {
"home": str(video_work_dir),
"subtitle": str(video_work_dir / "subtitle"),
"thumbnail": str(video_work_dir),
},
}
# 更新 yt-dlp 的配置
ydl.params.update(ydl_opts)
# 使用 process_info 进行下载
ydl.process_info(info_dict)
# 获取视频文件路径
video_file_path = Path(ydl.prepare_filename(info_dict))
if video_file_path.exists():
video_file_path = str(video_file_path)
else:
video_file_path = None
# 获取字幕文件路径
subtitle_file_path = None
for file in video_work_dir.glob("**/【下载字幕】*"):
file_path = str(file)
if subtitle_language and subtitle_language not in file_path:
logger.info(
"字幕语言错误,重新下载字幕: %s", subtitle_download_link
)
os.remove(file_path)
if subtitle_download_link:
response = requests.get(subtitle_download_link)
file_path = (
video_work_dir
/ "subtitle"
/ f"【下载字幕】{subtitle_language}.vtt"
)
if res := response.text:
with open(file_path, "w", encoding="utf-8") as f:
f.write(res)
subtitle_file_path = file_path
else:
subtitle_file_path = file_path
break
# 获取缩略图文件路径
thumbnail_file_path = None
for file in video_work_dir.glob("**/thumbnail*"):
thumbnail_file_path = str(file)
break
logger.info(f"视频下载完成: {video_file_path}")
logger.info(f"字幕文件路径: {subtitle_file_path}")
return video_file_path, subtitle_file_path, thumbnail_file_path, info_dict