Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added dash-fastapi-backend/debug.db
Binary file not shown.
22 changes: 22 additions & 0 deletions dash-fastapi-backend/debug_create_conversation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio
import httpx

async def main():
async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
conversation_data = {
"conversation_title": "测试会话",
"conversation_type": "chat",
"user_id": 1
}

response = await client.post(
"/conversation",
json=conversation_data,
headers={"Authorization": "Bearer test-skip-login-token"}
)

print(f"Status Code: {response.status_code}")
print(f"Response: {response.json()}")

if __name__ == "__main__":
asyncio.run(main())
91 changes: 91 additions & 0 deletions dash-fastapi-backend/debug_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import asyncio
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from config.database import Base
from module_admin.entity.do.user_do import SysUser
from module_conversation.entity.do.conversation_do import Conversation, Message, MessageContent
from app import app

# 模拟Redis
class MockRedis:
def __init__(self):
self.data = {}

async def get(self, key):
if key == "token:test_token":
return b'{"user_id": 1, "user_name": "test_user", "nick_name": "\u6d4b\u8bd5\u7528\u6237"}'
return None

async def setex(self, key, time, value):
self.data[key] = value

async def delete(self, key):
if key in self.data:
del self.data[key]

# 替换Redis依赖
app.state.redis = MockRedis()

# 创建测试数据库
async_engine = create_async_engine(
"sqlite+aiosqlite:///./test_debug.db",
connect_args={"check_same_thread": False},
)

TestingSessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=async_engine,
class_=AsyncSession
)

async def setup():
# 创建所有表
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)

# 创建测试用户
db = TestingSessionLocal()
test_user = SysUser(
user_name="test_user",
nick_name="测试用户",
password="test_password",
status="0"
)
db.add(test_user)
await db.commit()
await db.refresh(test_user)
await db.close()

print(f"Created test user with id: {test_user.user_id}")
return test_user.user_id

async def test_create_conversation():
await setup()

async with AsyncClient(app=app, base_url="http://test") as client:
# 使用测试环境跳过登录的token
token = "test-skip-login-token"

# 创建会话
response = await client.post(
"/conversation",
json={
"title": "测试会话",
"model": "gpt-3.5-turbo",
"temperature": 0.7
},
headers={"Authorization": f"Bearer {token}"}
)

print(f"Response status: {response.status_code}")
print(f"Response data: {response.json()}")

if __name__ == "__main__":
asyncio.run(test_create_conversation())
Loading