Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 129 additions & 64 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,99 +41,164 @@ async def save_upload_file(upload_file: UploadFile) -> str:
return temp_file.name # 返回临时文件路径


def funasr_to_srt(funasr_result):
data = funasr_result
text = data[0]['text']
timestamps = data[0]['timestamp']

# 配置参数
max_chars_per_line = 20 # 每行字幕的最大字符数

# 首先按照标点符号分割文本为短句
sentence_pattern = r'([^,。!?,.!?;;、]+[,。!?,.!?;;、]+)'
phrases = re.findall(sentence_pattern, text)
PUNCS = set(",。!?,.!?;;、")


def build_char2ts_index(text: str, timestamps):
"""
根据文本字符顺序(忽略标点和空白)为每个可发音字符分配一个时间戳索引。
假设 timestamps 按 token/字顺序给出。
"""
char_positions = [
i for i, ch in enumerate(text)
if ch not in PUNCS and not ch.isspace()
]

char2ts = {}
n = min(len(char_positions), len(timestamps))
for ts_idx in range(n):
char_idx = char_positions[ts_idx]
char2ts[char_idx] = ts_idx

return char2ts


def split_phrases_with_pos(text: str):
"""
按标点切句,并保留每句在原文中的字符起止位置。
返回: [(phrase_text, char_start, char_end), ...]
"""
phrases = []
start = 0
for i, ch in enumerate(text):
if ch in PUNCS:
if i >= start:
phrases.append((text[start:i + 1], start, i))
start = i + 1

if start < len(text):
phrases.append((text[start:], start, len(text) - 1))

return phrases


def phrases_to_time_segments(text: str, timestamps):
"""
核心:按“句子”映射到一段连续的 token 时间,
并保证同一个 token 不会被前后两句同时使用。
"""
char2ts = build_char2ts_index(text, timestamps)
phrases = split_phrases_with_pos(text)

segments = []
last_ts_end = -1 # 上一条字幕用到的最大 token index

for phrase, c_start, c_end in phrases:
# 只使用 ts_index > last_ts_end 的 token,避免复用
ts_indices = [
char2ts[i]
for i in range(c_start, c_end + 1)
if i in char2ts and char2ts[i] > last_ts_end
]
if not ts_indices:
# 这一句可能全是标点,或者对应 token 已被前面吃完
continue

# 如果没有找到短句,就把整个文本作为一个短句
if not phrases:
phrases = [text]
ts_start = min(ts_indices)
ts_end = max(ts_indices)

# 确保所有文本都被包含
remaining_text = text
for phrase in phrases:
remaining_text = remaining_text.replace(phrase, '', 1)
if remaining_text.strip():
phrases.append(remaining_text.strip())
start_time = timestamps[ts_start][0]
end_time = timestamps[ts_end][1]

# 计算每个短句对应的时间戳
phrase_timestamps = []
total_chars = len(text)
segments.append((phrase, start_time, end_time))
last_ts_end = ts_end

char_index = 0
for phrase in phrases:
if not phrase.strip():
continue
return segments

phrase_len = len(phrase)
# 计算短句在整个文本中的比例
start_ratio = char_index / total_chars
end_ratio = (char_index + phrase_len) / total_chars

start_idx = min(int(start_ratio * len(timestamps)), len(timestamps) - 1)
end_idx = min(int(end_ratio * len(timestamps)), len(timestamps) - 1)
def funasr_to_srt(funasr_result):
"""
将 funasr 的识别结果转换为 SRT 字幕:
- 使用词级时间戳,线性扫描,不复用 token;
- 再按字数合并成合适长度的字幕段。
"""
data = funasr_result
if not data:
return ""

if start_idx == end_idx:
if end_idx < len(timestamps) - 1:
end_idx += 1
text = data[0]['text']
timestamps = data[0]['timestamp']

start_time = timestamps[start_idx][0]
end_time = timestamps[end_idx][1]
# 1. 按句子拿到基础时间段(token 顺序 + 不复用)
phrase_segments = phrases_to_time_segments(text, timestamps)
# phrase_segments: [(phrase_text, start_ms, end_ms), ...]

phrase_timestamps.append((phrase, start_time, end_time))
char_index += phrase_len
# 2. 按字数合并成字幕段
max_chars_per_line = 20 # 每条字幕的最大字符数,可按需调整

# 合并短句为合适长度的字幕段落,只考虑字数限制
text_segments = []
current_text = ""
current_start = None
current_end = None

for phrase, start, end in phrase_timestamps:
# 如果当前段落为空,直接添加
if not current_text:
current_text = phrase
current_start = start
current_end = end
for phrase, start_ms, end_ms in phrase_segments:
cleaned_phrase = phrase.strip()
if not cleaned_phrase:
continue

# 检查添加当前短句后是否会超出字数限制
combined_text = current_text + phrase
if not current_text:
# 当前字幕为空,直接起一条新字幕
current_text = cleaned_phrase
current_start = start_ms
current_end = end_ms
continue

# 尝试把当前短句拼接到这一条字幕里
combined_text = current_text + cleaned_phrase
if len(combined_text) > max_chars_per_line:
# 如果会超出限制,保存当前段落并开始新段落
# 超过字数限制,先收掉当前字幕,再起新字幕
text_segments.append((current_text, current_start, current_end))
current_text = phrase
current_start = start
current_end = end
current_text = cleaned_phrase
current_start = start_ms
current_end = end_ms
else:
# 否则合并短句
# 不超,就合并到同一条字幕
current_text = combined_text
current_end = end
current_end = max(current_end, end_ms)

# 添加最后一个段落
# 收尾:把最后一条字幕加进去
if current_text:
text_segments.append((current_text, current_start, current_end))

# 生成SRT格式,去除每段末尾的标点符号
srt = ""
for i, (text, start, end) in enumerate(text_segments, 1):
# 去除段落末尾的标点符号
cleaned_text = re.sub(r'[,。!?,.!?;;、]+$', '', text)
# 3. 时间轴校正:保证严格单调、无重叠
MIN_GAP = 0 # 相邻字幕的最小间隔(毫秒),不需要就设 0
MIN_DUR = 300 # 单条字幕最短显示时间(毫秒)

fixed_segments = []
prev_end = 0
for seg_text, start, end in text_segments:
if start < prev_end + MIN_GAP:
start = prev_end + MIN_GAP
if end <= start:
end = start + MIN_DUR
fixed_segments.append((seg_text, start, end))
prev_end = end

# 4. 生成 SRT 字符串
srt_lines = []
for i, (seg_text, start, end) in enumerate(fixed_segments, 1):
# 去掉末尾多余标点
cleaned_text = re.sub(r'[,。!?,.!?;;、]+$', '', seg_text).strip()
if not cleaned_text:
continue

srt += f"{i}\n"
srt += f"{format_timestamp(start)} --> {format_timestamp(end)}\n"
srt += f"{cleaned_text.strip()}\n\n"
srt_lines.append(str(i))
srt_lines.append(f"{format_timestamp(start)} --> {format_timestamp(end)}")
srt_lines.append(cleaned_text)
srt_lines.append("") # 空行分隔

return srt
return "\n".join(srt_lines)


def format_timestamp(milliseconds):
Expand Down
Loading