-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhttp_request.py
More file actions
164 lines (142 loc) · 6.78 KB
/
http_request.py
File metadata and controls
164 lines (142 loc) · 6.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import asyncio
import json
import logging
from typing import Any, List, Type
from inspect import iscoroutinefunction
from aiohttp import ClientSession, ClientTimeout, ContentTypeError
from aiohttp.hdrs import METH_HEAD, METH_GET, METH_OPTIONS, METH_POST, METH_PUT, METH_PATCH, METH_DELETE
from code_simplify.models.base import BaseResponse
from code_simplify.models.errors import RequestTimeoutError, InternalError, Error
logger = logging.getLogger(__name__)
async def http_head(path: str, headers: dict = None) -> BaseResponse:
return await _request(METH_HEAD, path, headers=headers)
async def http_get(path: str, headers: dict = None) -> BaseResponse:
return await _request(METH_GET, path, headers=headers)
async def http_options(path: str, headers: dict = None, allow_redirects: bool = True) -> BaseResponse:
return await _request(METH_OPTIONS, path, headers=headers, allow_redirects=allow_redirects)
async def http_post(path: str, data: Any = None, json_data: Any = None, headers: dict = None) -> BaseResponse:
return await _request(METH_POST, path, data, json_data, headers)
async def http_put(path: str, data: Any, json_data: Any, headers: dict = None) -> BaseResponse:
return await _request(METH_PUT, path, data=data, json_data=json_data, headers=headers)
async def http_patch(path: str, data: Any, headers: dict = None) -> BaseResponse:
return await _request(METH_PATCH, path, data=data, headers=headers)
async def http_delete(path: str, data: Any, headers: dict = None) -> BaseResponse:
return await _request(METH_DELETE, path, data, headers)
async def sse_handler(
method: str,
url: str,
data: Any = None,
json_data: Any = None,
headers: dict = None,
timeout: int = 300,
event_process_func=None,
event_func_args: List = None,
data_process_func=None,
data_func_args: List = None,
custom_errors: List[Type[Error]] = None
):
"""
sse proto handler, both sync and async processing func are supported. Processing func for event and data type
doesn't have to be async or sync at the same time. Raise custom errors if provided.
:param method: http_utils method
:param url: url
:param data: data
:param json_data: json
:param headers: headers
:param timeout: total timeout in seconds(include connecting)
:param event_process_func: processing func for event type in sse proto, both async and sync func are supported
:param event_func_args: processing func args for event type in sse proto
:param data_process_func: processing func for data type in sse proto, both async and sync func are supported
:param data_func_args: processing func args for data type in sse proto
:param custom_errors: custom errors for event_process_func or data_process_func will be raised
:return:
"""
try:
async with ClientSession() as session:
async with session.request(
method,
url,
data=data,
json=json_data,
headers=headers,
timeout=ClientTimeout(total=timeout) # timeout: total timeout in seconds(include connecting)
) as response:
while True:
line = await response.content.readline()
if not line:
break
line = line.decode().strip()
if not line:
# ignore empty line
continue
if line.startswith("data:"):
data = line.replace("data: ", "")
if data == '[DONE]':
break
data = json.loads(data)
if data_process_func:
if not data_func_args:
data_func_args = []
if iscoroutinefunction(data_process_func):
await data_process_func(data, *data_func_args)
continue
data_process_func(data, *data_func_args)
elif line.startswith("event:"):
if event_process_func:
if not event_func_args:
event_func_args = []
if iscoroutinefunction(event_process_func):
await event_process_func(
json.loads(line.replace("event: ", "")),
*event_func_args
)
continue
event_process_func(json.loads(
line.replace("event: ", "")), *event_func_args)
await response.release()
except asyncio.TimeoutError as e:
logger.error("request to %s timeout after %s seconds, error: %s", url, timeout, e)
raise RequestTimeoutError(f"request timeout: {e}")
except Exception as e:
logger.error("request to %s failed, error: %s", url, e)
if not custom_errors:
raise InternalError(f"request to {url} failed, error: {e}")
for custom_error_type in custom_errors:
if isinstance(e, custom_error_type):
raise e
raise InternalError(
f"request to {url} failed and no matching custom error found, got error type: {type(e)},"
f" custom error types: [{','.join([str(custom_error) for custom_error in custom_errors])}],"
f" error: {e}")
async def _request(
method: str,
url: str,
data: Any = None,
json_data: Any = None,
headers: dict = None,
timeout: int = 300,
**kwargs
) -> BaseResponse:
try:
async with ClientSession() as session:
async with session.request(
method,
url,
data=data,
json=json_data,
headers=headers,
# timeout: total timeout in seconds(include connecting)
timeout=ClientTimeout(total=timeout),
**kwargs
) as response:
try:
data = await response.json()
except ContentTypeError:
data = await response.read()
return BaseResponse(code=response.status, data=data)
except asyncio.TimeoutError as e:
logger.error(f"request to {url} failed cause timeout after {timeout} seconds, error: {e}")
raise RequestTimeoutError(f"request timeout: {e}")
except Exception as e:
logger.error(f"request to {url} failed, error: {e}")
raise InternalError(f"request to {url} error: {e}")