-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess.py
More file actions
275 lines (233 loc) · 11.7 KB
/
process.py
File metadata and controls
275 lines (233 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import asyncio
import json
import os
import httpx
LANGUAGE_MAPPING = {
# Existing
"Albanian": "sq", "Arabic": "ar", "Armenian": "hy", "Azerbaijani": "az",
"Basque": "eu", "Belarusian": "be", "Bengali": "bn", "Bulgarian": "bg",
"Chinese": "zh", "Croatian": "hr", "Dutch": "nl", "Estonian": "et",
"Finnish": "fi", "French": "fr", "Georgian": "ka", "German": "de",
"Greek": "el", "Hebrew": "he", "Hindi": "hi", "Hungarian": "hu",
"Indonesian": "id", "Italian": "it", "Japanese": "ja", "Kazakh": "kk",
"Korean": "ko", "Lithuanian": "lt", "Malay": "ms", "Malayalam": "ml",
"Nepali": "ne", "North Macedonian": "mk", "Persian": "fa", "Polish": "pl",
"Portuguese": "pt", "Russian": "ru", "Serbian": "sr", "Spanish": "es",
"Swahili": "sw", "Tagalog": "tl", "Tamil": "ta", "Telugu": "te",
"Thai": "th", "Turkish": "tr", "Ukrainian": "uk", "Urdu": "ur",
"Uzbek": "uz", "Vietnamese": "vi", "English": "en",
"Afrikaans": "af", "Amharic": "am", "Icelandic": "is",
"Javanese": "jv", "Khmer": "km", "Kyrgyz": "ky",
"Latvian": "lv", "Luxembourgish": "lb", "Norwegian": "no",
"Pashto": "ps", "Punjabi": "pa", "Romanian": "ro", "Slovenian": "sl",
"Somali": "so", "Tajik": "tg","Xhosa": "xh", "Zulu": "zu"
}
LANG_NAME_TO_CODE = {k.lower(): v for k, v in LANGUAGE_MAPPING.items()}
REASONING_PREFIXES = {
"en": "Ok, let me think step by step.",
"bn": "ঠিক আছে, আমি ধাপে ধাপে চিন্তা করি।",
"de": "Ok, lass mich Schritt für Schritt nachdenken.",
"es": "Ok, déjame pensar paso a paso.",
"fr": "D'accord, laissez-moi essayer de résoudre ce problème étape par étape.",
"ja": "よし、順を追って考えてみよう。",
"ru": "Хорошо, давай подумаем шаг за шагом.",
"sw": "Sawa, hebu nifikirie hatua kwa hatua.",
"te": "సరే, నేను అంచెలంచెలుగా ఆలోచిస్తాను.",
"th": "เอาล่ะ ให้ฉันคิดไปทีละขั้นตอน",
"zh": "好吧,让我一步步思考。",
# Nordic / Baltic
"fi": "Okei, anna minun miettiä askel askeleelta.",
"no": "Ok, la meg tenke trinn for trinn.",
"is": "Jæja, leyfðu mér að hugsa þetta skref fyrir skref.",
"lv": "Labi, ļaujiet man domāt soli pa solim.",
"et": "Olgu, las ma mõtlen samm-sammult.",
# Romance / Basque / Luxembourgish
"eu": "Ados, utzidazu pausoz pauso pentsatzen.",
"lb": "Ok, loosst mech Schrëtt fir Schrëtt denken.",
"ro": "Bine, hai să mă gândesc pas cu pas.",
"it": "Ok, lasciami pensare passo dopo passo.",
# Slavic / Eastern European
"hu": "Rendben, hadd gondolkodjam lépésről lépésre.",
"bg": "Добре, нека помисля стъпка по стъпка.",
"sl": "V redu, naj razmislim korak za korakom.",
# Indian Subcontinent
"hi": "ठीक है, मुझे चरण-दर-चरण सोचने दें।",
"pa": "ਠੀਕ ਹੈ, ਮੈਨੂੰ ਕਦਮ-ਦਰ-ਕਦਮ ਸੋਚਣ ਦਿਓ।",
"ps": "ښه، اجازه راکړئ چې ګام په ګام فکر وکړم.",
"ml": "ശരി, എനിക്ക് ഘട്ടം ഘട്ടമായി ചിന്തിക്കാം.",
"ta": "சரி, நான் படிப்படியாக சிந்திக்கிறேன்.",
# Middle East / Central Asia
"fa": "باشه، بذار گام به گام فکر کنم.",
"ar": "حسنا، دعني أفكر خطوة بخطوة.",
"he": "אוקיי, תן לי לחשוב צעד אחר צעד.",
"kk": "Жарайды, маған кезең-кезеңімен ойлануға рұқсат етіңіз.",
"tg": "Хуб, биёед зина ба зина фикр кунам.",
"ky": "Макул, келгиле, кадам сайын ойлонуп көрөйүн.",
# SE Asia / East Asia
"km": "យល់ព្រម សូមឱ្យខ្ញុំគិតមួយជំហានម្តង ៗ ។",
"jv": "Ok, ayo mikir siji-siji.",
"id": "Oke, biarkan saya berpikir selangkah demi selangkah.",
"tl": "Sige, hayaan mong mag-isip ako nang hakbang-hakbang.",
# African
"am": "እሺ፣ እስቲ ደረጃ በደረጃ ላስብ።",
"so": "Haye, aan tallaabo-tallaabo u fikiro.",
"zu": "Kulungile, ake ngicabange isinyathelo ngesinyathelo.",
"af": "Okei, laat ek stap vir stap dink.",
"xh": "Kulungile, makhe ndicinge inyathelo ngenyathelo."
}
async def fetch_response(model, prompt, reasoning_effort, target_lang=None, max_retries=5):
"""
Make an async API request to OpenRouter for translation.
Implements exponential backoff retry logic for handling timeouts and failures.
Args:
session: HTTP session (unused, kept for interface compatibility).
model (str): Model identifier (e.g., 'qwen/qwen3-235b-a22b-2507').
prompt (str): The text to translate.
src_lang (str): Source language name.
target_lang (str): Target language name.
effort (str|None): Reasoning effort level ('high', 'medium', 'low') or None.
max_retries (int): Maximum number of retry attempts (default: 5).
Returns:
api_response_json
Raises:
httpx.ReadTimeout: If all retry attempts fail.
"""
url = "https://openrouter.ai/api/v1/chat/completions"
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
raise RuntimeError(
"OPENROUTER_API_KEY is not set. Export it in your shell or load it from a local .env file "
"before running the translation or judge pipeline."
)
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
messages = []
lang_code = "en" # Default
prefix_text = REASONING_PREFIXES["en"]
reasoning = reasoning_effort not in ['none', None, 'cot']
# if reasoning and target_lang:
# lang_code = LANG_NAME_TO_CODE.get(target_lang.lower(), "en")
# prefix_text = REASONING_PREFIXES.get(lang_code, REASONING_PREFIXES["en"])
# system_content = (
# f"You are a helpful assistant. You should reason and analyze the question "
# f"in {target_lang} and wrap your thought process in <think>...</think> tags."
# )
# messages.append({"role": "system", "content": system_content})
messages.append({"role": "user", "content": prompt})
# if reasoning and target_lang:
# messages.append({"role": "assistant", "content": f"<think>\n{prefix_text}"})
payload = {
"model": model['name'],
"temperature": model['temperature'],
"top_p": model['top_p'],
"max_tokens": 32768,
"messages": messages,
'provider': {
'order': ['nvidia', 'z-ai', 'mistral', 'deepinfra', 'google-vertex', 'moonshotai'],
'allow_fallbacks': True
}
}
payload["reasoning"] = {
"effort": reasoning_effort if reasoning_effort is not None else "none",
"exclude": False,
}
timeout = httpx.Timeout(600.0, connect=60.0) # 10 min read timeout, 1 min connect timeout
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, headers=headers, data=json.dumps(payload))
if response.status_code in {429, 500, 502, 503, 504}:
if attempt < max_retries - 1:
retry_after = response.headers.get("Retry-After")
try:
wait_time = float(retry_after)
except (TypeError, ValueError):
# Use a longer backoff for upstream provider throttling.
wait_time = min(60, 5 * (2 ** attempt))
print(
f"Retryable API Error ({response.status_code}) for prompt "
f"'{prompt[:50]}...', retrying in {wait_time}s "
f"(attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(wait_time)
continue
print(f"API Error ({response.status_code}): {response.text}")
response.raise_for_status()
if response.status_code >= 400:
print(f"API Error ({response.status_code}): {response.text}")
result = response.json()
return result
except httpx.ReadTimeout:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff: 1, 2, 4 seconds
print(f"Timeout for prompt '{prompt[:50]}...', retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(wait_time)
else:
print(f"Failed after {max_retries} attempts for prompt: '{prompt[:50]}...'")
raise
except Exception as exc:
print(
f"Request failed for prompt '{prompt[:50]}...': "
f"{type(exc).__name__}: {exc}"
)
raise
async def _return_none(p):
"""Helper coroutine that returns a None result for a prompt."""
return None
async def batch_send(model, prompts, reasoning_effort, target_lang=None):
"""
Translate a batch of prompts from source to target language.
Args:
model (dict): Model configuration with 'name' and 'effort' keys.
prompts (list[str]): List of texts to translate.
reasoning_effort (str|None): Reasoning effort level or None (unused, effort comes from model dict).
Returns:
list[tuple]: List of (original_prompt, api_response) tuples.
Raises:
Exception: If any translation fails after retries.
"""
model_name = model["name"]
is_kimi = model_name == "moonshotai/kimi-k2"
is_gemma4 = model_name == "google/gemma-4-31b-it"
if is_kimi:
batch_size = 40
pause_seconds = 65
elif is_gemma4:
# Gemma 4 providers on OpenRouter rate-limit aggressively.
batch_size = 20
pause_seconds = 2
else:
batch_size = len(prompts)
pause_seconds = 0
results = [None] * len(prompts)
for start in range(0, len(prompts), batch_size):
batch = prompts[start:start + batch_size]
tasks = [
fetch_response(model, p, reasoning_effort, target_lang=target_lang)
if p is not None else _return_none(p)
for p in batch
]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for offset, result in enumerate(batch_results):
i = start + offset
if isinstance(result, Exception):
print(f"Error processing prompt {i}: {type(result).__name__}: {result}")
results[i] = None
elif result is None:
if prompts[i] is None:
print(
f"Error processing prompt {i}: input prompt is None "
f"(an earlier stage likely failed for this row)"
)
else:
print(f"Error processing prompt {i}: received None response from provider")
results[i] = None
else:
results[i] = result
if pause_seconds and start + batch_size < len(prompts):
provider_label = "Kimi" if is_kimi else "Gemma 4"
print(f"{provider_label} throttle: sleeping {pause_seconds}s before next batch...")
await asyncio.sleep(pause_seconds)
return results