Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.idea/

# Abstra
# Abstra is an AI-powered process automation framework.
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ GenAI 是一个基于 Flask 的聊天机器人接口服务,兼容 OpenAI 的
### 启动服务

```bash
uv run main.py --token <token> [--port 5000]
uv run main.py --token <token> --imgtoken <imgtoken> [--port 5000]
```

<token>的获取方式见下文,端口默认5000。服务将在本地 `0.0.0.0:5000` 端口启动。
<token>, <imgtoken>的获取方式见下文,端口默认5000。服务将在本地 `0.0.0.0:5000` 端口启动。

## 功能和用法

Expand All @@ -31,6 +31,8 @@ uv run main.py --token <token> [--port 5000]
1. 首先前往[GenAI对话平台](https://genai.shanghaitech.edu.cn/dialogue)
2. 打开浏览器开发者工具,随便发送一条消息,捕获名为`chat`的请求
3. 复制请求标头中的`x-access-token`字段,即为`<token>`
4. 再进行一次图片上传,捕获名为`upload`的请求
5. 复制请求标头中的`Token`字段,即为`<imgtoken>`

![图片说明](images/chrome.png)

Expand Down
176 changes: 152 additions & 24 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from flask import Flask, request, jsonify, stream_with_context, Response
from flask_cors import CORS
import requests
import base64
import io
import json
import uuid
import sqlite3
import hashlib
from datetime import datetime
import re
import argparse
from icecream import ic

app = Flask(__name__)
CORS(app)
Expand All @@ -17,10 +19,14 @@
help='GenAI API Access Token')
parser.add_argument('--port', type=int, default=5000,
help='Flask server port (default: 5000)')
parser.add_argument('--imgtoken', type=str,
help='GenAI image upload token')
args = parser.parse_args()

# GenAI API 配置
GENAI_URL = "https://genai.shanghaitech.edu.cn/htk/chat/start/chat"
UPLOAD_URL = "https://genaipic.shanghaitech.edu.cn/sys/common/upload"
STATIC_BASE_URL = "https://genaipic.shanghaitech.edu.cn/sys/common/static/"
GENAI_HEADERS = {
"Accept": "*/*, text/event-stream",
"Cache-Control": "no-cache",
Expand All @@ -33,23 +39,140 @@
"Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
"X-Access-Token": args.token,
"Token": args.imgtoken,
"sec-ch-ua": '"Chromium";v="142", "Google Chrome";v="142", "Not_A Brand";v="99"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
}

# 数据库初始化
DB_FILE = "image_cache.db"

def init_db():
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS images
(hash TEXT PRIMARY KEY, url TEXT)''')
conn.commit()
conn.close()

init_db()

def get_url_from_db(base64_str):
"""根据Base64内容的哈希获取缓存URL"""
img_hash = hashlib.md5(base64_str.encode('utf-8')).hexdigest()
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("SELECT url FROM images WHERE hash=?", (img_hash,))
row = cursor.fetchone()
conn.close()
return row[0] if row else None

def save_url_to_db(base64_str, url):
"""保存Base64哈希和URL的映射"""
img_hash = hashlib.md5(base64_str.encode('utf-8')).hexdigest()
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("INSERT OR REPLACE INTO images (hash, url) VALUES (?, ?)", (img_hash, url))
conn.commit()
conn.close()

def upload_image(base64_str):
"""上传图片"""
try:
original_b64 = base64_str
if "base64," in base64_str:
base64_str = base64_str.split("base64,")[1]

# 查看缓存
cached_url = get_url_from_db(base64_str)
if cached_url:
print(f"DEBUG: Image found in cache: {cached_url}")
return cached_url

# 上传
img_data = base64.b64decode(base64_str)

common_headers = {
"User-Agent": GENAI_HEADERS["User-Agent"],
"Accept": "*/*",
"Referer": "https://genai.shanghaitech.edu.cn/",
"Origin": "https://genai.shanghaitech.edu.cn",
}

files = {'file': ('image.png', io.BytesIO(img_data), 'image/png')}
data = {'biz': 'temp', 'uploadType': 'local'}
post_headers = dict(common_headers, **{"token": args.imgtoken}) # 确保这里用了 imgtoken

res = requests.post(
UPLOAD_URL,
files=files, data=data, headers=post_headers, verify=False
)

res_json = res.json()
if res_json.get("success") and res_json.get("result"):
full_url = STATIC_BASE_URL + res_json["result"]["url"]

# 写入缓存
save_url_to_db(base64_str, full_url)
return full_url

except Exception as e:
print(f"Upload Critical Error: {e}")

return None


def convert_messages_to_genai_format(messages):
"""将OpenAI格式的消息转换为GenAI格式"""
# 提取最后一条用户消息作为 chatInfo
"""转换格式"""
if not messages:
return [], "", ""

current_msg = messages[-1]
history_messages = messages[:-1]

chat_info = ""
for msg in reversed(messages):
if msg.get("role") == "user":
chat_info = msg.get("content", "")
break

return chat_info
image_url = ""

# 历史图片base64 -> url
processed_history = []
for msg in history_messages:
new_msg = msg.copy()
content = msg.get("content")

if isinstance(content, list):
new_content = []
for item in content:
if item.get("type") == "image_url":
raw_url = item.get("image_url", {}).get("url", "")
if raw_url.startswith("data:"):
url = upload_image(raw_url)
if url:
item["image_url"]["url"] = url
new_content.append(item)
new_msg["content"] = new_content

processed_history.append(new_msg)

# 当前
if current_msg.get("role") == "user":
content = current_msg.get("content")
if isinstance(content, str):
chat_info = content
elif isinstance(content, list):
text_parts = []
for item in content:
if item.get("type") == "text":
text_parts.append(item.get("text", ""))
elif item.get("type") == "image_url":
raw_url = item.get("image_url", {}).get("url", "")
url = upload_image(raw_url)
if url:
image_url = url
chat_info = " ".join(text_parts)

return processed_history, chat_info, image_url


def extract_content_from_genai(response_data):
"""从GenAI API响应中提取内容"""
Expand All @@ -64,17 +187,16 @@ def extract_content_from_genai(response_data):
pass
return None

def stream_genai_response(chat_info, messages, model, max_tokens):
def stream_genai_response(chat_info, messages, model, max_tokens, image_url = None):
"""流式调用GenAI API并转换为OpenAI格式"""

# 确定 rootAiType
azure_models = {"GPT-5", "o4-mini", "GPT-4.1", "o3", "GPT-4.1-mini"}
azure_models = {"GPT-5.2", "GPT-5-Pro", "GPT-5", "o4-mini", "GPT-4.1", "o3", "GPT-4.1-mini"}
root_ai_type = "azure" if model in azure_models else "xinference"

# 构建GenAI请求数据
genai_data = {
# "chatInfo": chat_info,
"chatInfo": "",
"chatInfo": chat_info,
"messages": messages,
"type": "3",
"stream": True,
Expand All @@ -84,6 +206,9 @@ def stream_genai_response(chat_info, messages, model, max_tokens):
"rootAiType": root_ai_type,
"maxToken": max_tokens or 30000
}

if image_url:
genai_data["imageUrl"] = image_url

try:

Expand Down Expand Up @@ -204,19 +329,20 @@ def chat_completions():
max_tokens = req_data.get('max_tokens', 30000)

# 转换消息格式
chat_info = convert_messages_to_genai_format(messages)
messages, chat_info, image_url = convert_messages_to_genai_format(messages)

if not chat_info:
if not chat_info and not image_url:
return jsonify({'error': 'No user message found'}), 400

# 流式响应
if stream:
return Response(
stream_with_context(stream_genai_response(
chat_info,
messages,
chat_info,
messages,
model,
max_tokens
max_tokens,
image_url
)),
mimetype='text/event-stream',
headers={
Expand All @@ -229,7 +355,7 @@ def chat_completions():
# 非流式响应(收集所有内容后返回)
else:
complete_content = ""
for line in stream_genai_response(chat_info, messages, model, max_tokens):
for line in stream_genai_response(chat_info, messages, model, max_tokens, image_url):
if line.startswith('data: '):
try:
data = json.loads(line[6:])
Expand Down Expand Up @@ -273,6 +399,8 @@ def list_models():
available_models = [
"deepseek-v3:671b",
"deepseek-r1:671b",
"GPT-5.2",
"GPT-5-Pro",
"GPT-5",
"o4-mini",
"GPT-4.1",
Expand All @@ -281,7 +409,7 @@ def list_models():
"qwen-instruct",
"qwen-think"
]

models = []
for model_id in available_models:
models.append({
Expand All @@ -290,7 +418,7 @@ def list_models():
"owned_by": "genai",
"permission": []
})

return jsonify({"object": "list", "data": models})

@app.route('/health', methods=['GET'])
Expand Down