Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .base_client import BaseClient
from .deepseek_client import DeepSeekClient
from .deepseek_client import DeepSeekClient, DropContent
from .claude_client import ClaudeClient

__all__ = ['BaseClient', 'DeepSeekClient', 'ClaudeClient']
__all__ = ['BaseClient', 'DeepSeekClient', 'ClaudeClient', 'DropContent']
9 changes: 9 additions & 0 deletions app/clients/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ def __init__(
self.api_url = api_url
self.timeout = timeout or self.DEFAULT_TIMEOUT
self.proxy = proxy
self.active_http_response = None

def reset_state(self) -> None:
if not self.active_http_response.closed:
logger.info("关闭 deepseek 连接")
self.active_http_response.close()
del self.active_http_response

async def _make_request(
self, headers: dict, data: dict, timeout: Optional[aiohttp.ClientTimeout] = None
Expand Down Expand Up @@ -81,6 +88,8 @@ async def _make_request(
timeout=request_timeout,
proxy=proxy_url
) as response:
self.active_http_response = response

# 检查响应状态
if not response.ok:
error_text = await response.text()
Expand Down
3 changes: 3 additions & 0 deletions app/clients/deepseek_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,6 @@ async def stream_chat(
logger.error(f"JSON 解析错误: {e}")
except Exception as e:
logger.error(f"处理 chunk 时发生错误: {e}")

class DropContent(Exception):
pass
18 changes: 10 additions & 8 deletions app/deepclaude/deepclaude.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from app.clients import ClaudeClient, DeepSeekClient
from app.utils.logger import logger
from ..clients.deepseek_client import DeepSeekClient, DropContent


class DeepClaude:
Expand Down Expand Up @@ -82,10 +83,9 @@ async def chat_completions_with_stream(
# 队列,用于传递 DeepSeek 推理内容给 Claude
claude_queue = asyncio.Queue()

# 用于存储 DeepSeek 的推理累积内容
reasoning_content = []

async def process_deepseek():
# 用于存储 DeepSeek 的推理累积内容
reasoning_content = []
logger.info(f"开始处理 DeepSeek 流,使用模型:{deepseek_model}")
try:
async for content_type, content in self.deepseek_client.stream_chat(
Expand Down Expand Up @@ -113,15 +113,17 @@ async def process_deepseek():
f"data: {json.dumps(response)}\n\n".encode("utf-8")
)
elif content_type == "content":
# 当收到 content 类型时,将完整的推理内容发送到 claude_queue,并结束 DeepSeek 流处理
logger.info(
f"DeepSeek 推理完成,收集到的推理内容长度:{len(''.join(reasoning_content))}"
)
await claude_queue.put("".join(reasoning_content))
break
raise DropContent("DeepSeek 思考推理部分完成")

except DropContent as e:
logger.info({e})
self.deepseek_client.reset_state()

except Exception as e:
logger.error(f"处理 DeepSeek 流时发生错误: {e}")
await claude_queue.put("")

# 用 None 标记 DeepSeek 任务结束
logger.info("DeepSeek 任务处理完成,标记结束")
await output_queue.put(None)
Expand Down
21 changes: 13 additions & 8 deletions app/openai_composite/openai_composite.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from app.clients import DeepSeekClient
from app.clients.openai_compatible_client import OpenAICompatibleClient
from app.utils.logger import logger
from ..clients.deepseek_client import DeepSeekClient, DropContent


class OpenAICompatibleComposite:
Expand Down Expand Up @@ -77,10 +78,10 @@ async def chat_completions_with_stream(
# 队列,用于传递 DeepSeek 推理内容
reasoning_queue = asyncio.Queue()

# 用于存储 DeepSeek 的推理累积内容
reasoning_content = []

async def process_deepseek():
logger.info(f"1. 第一阶段推理请求:{deepseek_model}\ndata:{messages}\n")
# 用于存储 DeepSeek 的推理累积内容
reasoning_content = []
logger.info(f"开始处理 DeepSeek 流,使用模型:{deepseek_model}")
try:
async for content_type, content in self.deepseek_client.stream_chat(
Expand Down Expand Up @@ -108,15 +109,17 @@ async def process_deepseek():
f"data: {json.dumps(response)}\n\n".encode("utf-8")
)
elif content_type == "content":
# 当收到 content 类型时,将完整的推理内容发送到 reasoning_queue
logger.info(
f"DeepSeek 推理完成,收集到的推理内容长度:{len(''.join(reasoning_content))}"
)
await reasoning_queue.put("".join(reasoning_content))
break
raise DropContent("DeepSeek 思考推理部分完成")

except DropContent as e:
logger.info({e})
self.deepseek_client.reset_state()

except Exception as e:
logger.error(f"处理 DeepSeek 流时发生错误: {e}")
await reasoning_queue.put("")

# 标记 DeepSeek 任务结束
logger.info("DeepSeek 任务处理完成,标记结束")
await output_queue.put(None)
Expand All @@ -138,6 +141,8 @@ async def process_openai():
Here's my another model's reasoning process:\n{reasoning}\n\n
Based on this reasoning, provide your response directly to me:"""

logger.info(f"2. 第二阶段推理请求:{target_model}\ndata:{combined_content}\n")

# 检查过滤后的消息列表是否为空
if not openai_messages:
raise ValueError("消息列表为空,无法处理请求")
Expand Down