Skip to content

Commit

Permalink
尝试修复 Event loop is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
duolabmeng6 committed Nov 12, 2024
1 parent 9a174a4 commit 68a0200
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 52 deletions.
4 changes: 3 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,5 +309,7 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
host="0.0.0.0",
port=8000,
reload=True,
# workers=1,
reload_dirs=["./"],
reload_includes=["*.py", "api.yaml"],
ws="none",
)
113 changes: 62 additions & 51 deletions app/provider/httpxHelp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,6 @@
from app.db.logDB import CacheManager
cacheManager = CacheManager()

client = httpx.AsyncClient(
headers={
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": "curl/7.68.0",
},
timeout=httpx.Timeout(connect=120.0, read=600, write=120.0, pool=120.0),
verify=False,
proxies={
# "http://": "http://127.0.0.1:8888",
# "https://": "http://127.0.0.1:8888",
},
)

async def raise_for_status(sendReady, response: httpx.Response):
if response.status_code == 200:
return
Expand All @@ -45,31 +31,39 @@ async def raise_for_status(sendReady, response: httpx.Response):
raise HTTPException(status_code=500, detail=error_data)

async def get_api_data(sendReady) -> AsyncGenerator[str, None]:
try:
if sendReady["stream"]:
async with client.stream("POST", sendReady["url"], headers=sendReady["headers"],
json=sendReady["body"]) as response:
async with httpx.AsyncClient(
headers={
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": "curl/7.68.0",
},
timeout=httpx.Timeout(connect=120.0, read=600, write=120.0, pool=120.0),
verify=False,
) as client:
try:
if sendReady["stream"]:
async with client.stream("POST", sendReady["url"], headers=sendReady["headers"],
json=sendReady["body"]) as response:
await raise_for_status(sendReady, response)
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if line.startswith('data:'):
yield line
else:
response = await client.post(sendReady["url"], headers=sendReady["headers"], json=sendReady["body"])
await raise_for_status(sendReady, response)
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if line.startswith('data:'):
yield line
else:
response = await client.post(sendReady["url"], headers=sendReady["headers"], json=sendReady["body"])
await raise_for_status(sendReady, response)
response_text = response.content.decode("utf-8")
yield response_text
except httpx.RequestError as e:
logger.error(f"网络请求错误: {e} {sendReady}")
# e.request.url.host
raise HTTPException(status_code=503, detail={"error": "网络请求错误", "detail": str(e)})
except Exception as e:
logger.error(f"未知错误: {e} {sendReady}")
raise HTTPException(status_code=500, detail={"error": "上游服务器出现未知错误", "detail": str(e)})
response_text = response.content.decode("utf-8")
yield response_text
except httpx.RequestError as e:
logger.error(f"网络请求错误: {e} {sendReady}")
raise HTTPException(status_code=503, detail={"error": "网络请求错误", "detail": str(e)})
except Exception as e:
logger.error(f"未知错误: {e} {sendReady}")
raise HTTPException(status_code=500, detail={"error": "上游服务器出现未知错误", "detail": str(e)})

async def get_api_data_cache(sendReady) -> AsyncGenerator[str, None]:
cache_md5 = hashlib.md5(json.dumps(sendReady['body']).encode('utf-8')).hexdigest()
Expand All @@ -91,20 +85,37 @@ async def get_api_data_cache(sendReady) -> AsyncGenerator[str, None]:
cacheData = ""
try:
if sendReady["stream"]:
async with client.stream("POST", sendReady["url"], headers=sendReady["headers"],
json=sendReady["body"]) as response:
await raise_for_status(sendReady, response)
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if line.startswith('data:'):
cacheData += line + "\r\n"
yield line
async with httpx.AsyncClient(
headers={
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": "curl/7.68.0",
},
timeout=httpx.Timeout(connect=120.0, read=600, write=120.0, pool=120.0),
verify=False,
) as client:
async with client.stream("POST", sendReady["url"], headers=sendReady["headers"],
json=sendReady["body"]) as response:
await raise_for_status(sendReady, response)
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if line.startswith('data:'):
cacheData += line + "\r\n"
yield line
else:
response = await client.post(sendReady["url"], headers=sendReady["headers"], json=sendReady["body"])
response = await httpx.AsyncClient(
headers={
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": "curl/7.68.0",
},
timeout=httpx.Timeout(connect=120.0, read=600, write=120.0, pool=120.0),
verify=False,
).post(sendReady["url"], headers=sendReady["headers"], json=sendReady["body"])
await raise_for_status(sendReady, response)
response_text = response.content.decode("utf-8")
cacheData = response_text
Expand Down

0 comments on commit 68a0200

Please sign in to comment.