-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathhttp.py
More file actions
472 lines (396 loc) · 19.1 KB
/
http.py
File metadata and controls
472 lines (396 loc) · 19.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
"""
HTTP communication management for OpenRouter Client.
This module handles HTTP requests, response handling, rate limiting, and
error handling for all API interactions.
Exported:
- HTTPManager: HTTP request and response manager with rate limiting
"""
import logging
import time
from typing import Dict, Optional, Any, Union, Tuple
import requests
from smartsurge.client import SmartSurgeClient
from .exceptions import APIError, RateLimitExceeded, OpenRouterError
from .types import RequestMethod
class HTTPManager:
"""
Manages HTTP communications with the OpenRouter API.
Attributes:
client (SmartSurgeClient or requests.Session): HTTP client with rate limiting.
base_url (str): Base URL for API requests.
logger (logging.Logger): HTTP communication logger.
"""
def __init__(self,
base_url: Optional[str] = None,
client: Optional[SmartSurgeClient] = None,
**kwargs):
"""
Initialize the HTTP manager.
Args:
base_url (str): Base URL for API requests. If None, uses the URL from pre-configured client.
client (Optional[SmartSurgeClient]): Pre-configured HTTP client. If None, one is created.
kwargs: Additional arguments for SmartSurgeClient.
Raises:
OpenRouterError: If neither base_url nor client is provided.
"""
if base_url is None and client is None:
raise OpenRouterError("Either base_url or client must be provided")
# Set up logger for HTTP operations
self.logger = logging.getLogger("openrouter_client.http")
# Store base_url for forming full request URLs
if base_url is None and client is not None:
# Extract base_url from client or set a default
self.base_url = getattr(client, "base_url", "")
else:
self.base_url = base_url.rstrip("/") if base_url else ""
if client:
# Use the provided client
self.client = client
self.logger.debug("Using provided HTTP client")
if base_url is not None:
self.logger.warning("base_url is ignored when using a pre-configured client")
else:
# Create a SmartSurgeClient with appropriate configuration
self.client = SmartSurgeClient(
base_url=self.base_url,
**kwargs
)
self.logger.debug("Created SmartSurgeClient with rate limiting")
self.logger.info(f"HTTP manager initialized with base_url={base_url}")
# Workaround: Configure smartsurge logging to respect root logger level
# SmartSurge doesn't follow proper logging hierarchy and explicitly sets
# smartsurge.client to INFO level after initialization, overriding user's settings
root_level = logging.getLogger().getEffectiveLevel()
# Set parent smartsurge logger
smartsurge_logger = logging.getLogger('smartsurge')
if smartsurge_logger.level == logging.NOTSET:
smartsurge_logger.setLevel(root_level)
# Workaround: SmartSurge also explicitly sets smartsurge.client to INFO, so fix that too
smartsurge_client_logger = logging.getLogger('smartsurge.client')
if smartsurge_client_logger.level <= logging.INFO: # Only if not set to WARNING+ by user
smartsurge_client_logger.setLevel(root_level)
def request(self,
method: RequestMethod,
endpoint: str,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
data: Optional[Union[Dict[str, Any], str, bytes]] = None,
files: Optional[Dict[str, Any]] = None,
stream: bool = False,
timeout: Optional[Union[float, Tuple[float, float]]] = None) -> requests.Response:
"""
Make an HTTP request to the OpenRouter API.
Args:
method (RequestMethod): HTTP method to use.
endpoint (str): API endpoint to call (will be combined with base_url).
headers (Optional[Dict[str, str]]): HTTP headers to include.
params (Optional[Dict[str, Any]]): URL query parameters.
json (Optional[Dict[str, Any]]): JSON body to send.
data (Optional[Union[Dict[str, Any], str, bytes]]): Form data or raw data to send.
files (Optional[Dict[str, Any]]): Files to upload.
stream (bool): Whether to stream the response. Defaults to False.
timeout (Optional[Union[float, Tuple[float, float]]]): Request timeout override.
Returns:
Response: API response.
Raises:
APIError: For API-related errors.
RateLimitExceeded: When rate limits are exceeded.
requests.RequestException: For network-related errors.
"""
# Check if the method is a valid RequestMethod value
if not isinstance(method, RequestMethod):
raise TypeError(f"'method' must be a RequestMethod enum, not {type(method).__name__}")
# Check if endpoint is a valid string
if not isinstance(endpoint, str):
raise TypeError(f"'endpoint' must be a string, not {type(endpoint).__name__}")
# Check if headers is a dictionary or None
if headers is not None and not isinstance(headers, dict):
raise TypeError(f"'headers' must be a dictionary, not {type(headers).__name__}")
# Check if params is a dictionary or None
if params is not None and not isinstance(params, dict):
raise TypeError(f"'params' must be a dictionary, not {type(params).__name__}")
# Check if json is a dictionary or None
if json is not None and not isinstance(json, dict):
raise TypeError(f"'json' must be a dictionary, not {type(json).__name__}")
# Form the full URL by combining base_url and endpoint
url = f"{self.base_url}/{endpoint.lstrip('/')}"
# If headers are None, initialize as empty dictionary
if headers is None:
headers = {}
# Generate a request ID for logging and correlation
request_id = f"req_{int(time.time() * 1000)}_{id(self)}"
# Log the outgoing request details (sanitize sensitive information)
sanitized_json = None
if json:
# Create a shallow copy to avoid modifying the original
sanitized_json = {**json}
# Sanitize sensitive fields like API keys if present
if 'api_key' in sanitized_json:
sanitized_json['api_key'] = '***'
if 'Authorization' in sanitized_json:
sanitized_json['Authorization'] = '***'
self.logger.debug(
f"Request {request_id}: {method.value} {url} | "
f"Headers: {headers} | Params: {params} | "
f"JSON: {sanitized_json} | Stream: {stream}"
)
# Determine the appropriate timeout to use (given or a reasonable default)
actual_timeout = timeout if timeout is not None else 60.0
# Record the start time for performance tracking
start_time = time.time()
try:
try:
response = self.client.request(
method=method.value,
endpoint=url,
headers=headers,
params=params,
json=json,
data=data,
files=files,
stream=stream,
timeout=actual_timeout
)
except Exception as e:
raise
# Calculate request duration for logging
duration = time.time() - start_time
# Log response details including status code and duration
self.logger.debug(
f"Response {request_id}: Status {response.status_code} | "
f"Duration: {duration:.2f}s"
)
# Handle error responses (non-2xx status codes)
if not 200 <= response.status_code < 300:
if 300 <= response.status_code < 400:
# Handle redirects by retrying with allow_redirects=True
self.logger.info(f"Received {response.status_code} redirect, retrying with allow_redirects=True")
try:
response = self.client.request(
method=method.value,
endpoint=url,
headers=headers,
params=params,
json=json,
data=data,
files=files,
stream=stream,
timeout=actual_timeout,
allow_redirects=True
)
except Exception as e:
self.logger.error(f"Error when following redirect: {str(e)}")
raise
elif response.status_code == 429:
# Rate limit exceeded
retry_after = response.headers.get('Retry-After')
raise RateLimitExceeded(
message="Rate limit exceeded",
retry_after=retry_after,
response=response
)
elif 400 <= response.status_code < 500:
# Client error
error_detail = {}
error_message = f"API Error: {response.status_code}"
try:
response_data = response.json()
# Check if response has OpenRouter's error structure
if 'error' in response_data:
error_info = response_data['error']
error_message = error_info.get('message', error_message)
error_detail = error_info
else:
# Fallback to using the whole response as error detail
error_detail = response_data
error_message = response_data.get('message', error_message)
except Exception:
# If JSON parsing fails, use the raw response text
if response.text.strip():
error_message = f"API Error {response.status_code}: {response.text}"
error_detail = {'message': error_message}
# Log the full error for debugging
self.logger.error(f"API Error {response.status_code}: {error_message}")
raise APIError(
message=error_message,
code=error_detail.get('code', response.status_code),
param=error_detail.get('param'),
type=error_detail.get('type'),
status_code=response.status_code,
response=response
)
elif 500 <= response.status_code < 600:
# Server error
raise APIError(
message=f"Server error: {response.status_code}",
status_code=response.status_code,
response=response
)
return response
except (RateLimitExceeded, APIError):
# Re-raise our custom exceptions
raise
except requests.RequestException as e:
# Convert requests exceptions to APIError
self.logger.error(f"Request error: {str(e)}")
raise APIError(
message=f"Request failed: {str(e)}"
) from e
def get(self, endpoint: str, **kwargs) -> requests.Response:
"""
Make a GET request to the OpenRouter API.
Args:
endpoint (str): API endpoint to call.
**kwargs: Additional parameters to pass to request().
Returns:
Response: API response.
"""
return self.request(method=RequestMethod.GET, endpoint=endpoint, **kwargs)
def post(self, endpoint: str, **kwargs) -> requests.Response:
"""
Make a POST request to the OpenRouter API.
Args:
endpoint (str): API endpoint to call.
**kwargs: Additional parameters to pass to request().
Returns:
Response: API response.
"""
return self.request(method=RequestMethod.POST, endpoint=endpoint, **kwargs)
def put(self, endpoint: str, **kwargs) -> requests.Response:
"""
Make a PUT request to the OpenRouter API.
Args:
endpoint (str): API endpoint to call.
**kwargs: Additional parameters to pass to request().
Returns:
Response: API response.
"""
return self.request(method=RequestMethod.PUT, endpoint=endpoint, **kwargs)
def delete(self, endpoint: str, **kwargs) -> requests.Response:
"""
Make a DELETE request to the OpenRouter API.
Args:
endpoint (str): API endpoint to call.
**kwargs: Additional parameters to pass to request().
Returns:
Response: API response.
"""
return self.request(method=RequestMethod.DELETE, endpoint=endpoint, **kwargs)
def patch(self, endpoint: str, **kwargs) -> requests.Response:
"""
Make a PATCH request to the OpenRouter API.
Args:
endpoint (str): API endpoint to call.
**kwargs: Additional parameters to pass to request().
Returns:
Response: API response.
"""
return self.request(method=RequestMethod.PATCH, endpoint=endpoint, **kwargs)
def stream_request(self, method: RequestMethod, endpoint: str, **kwargs) -> requests.Response:
"""
Make a streaming request to the OpenRouter API.
Args:
method (RequestMethod): HTTP method to use.
endpoint (str): API endpoint to call.
**kwargs: Additional parameters to pass to request().
Returns:
Response: Streaming API response.
"""
# Force stream=True in kwargs to ensure streaming behavior
kwargs['stream'] = True
return self.request(method=method, endpoint=endpoint, **kwargs)
def set_rate_limit(self,
endpoint: str,
method: Union[str, RequestMethod],
max_requests: int,
time_period: float,
cooldown: Optional[float] = None) -> None:
"""
Set the rate limit for a specific API endpoint and method.
This method passes through to SmartSurgeClient.set_rate_limit to dynamically
adjust rate limiting parameters for a specific endpoint/method combination.
Args:
endpoint (str): The API endpoint to set rate limit for.
method (Union[str, RequestMethod]): HTTP method (GET, POST, etc.).
max_requests (int): Maximum number of requests allowed per time period.
time_period (float): Time period in seconds for the rate limit.
cooldown (Optional[float]): Cooldown period in seconds after hitting the limit.
Raises:
AttributeError: If the client doesn't support set_rate_limit.
ValueError: If invalid rate limit parameters are provided.
"""
if not hasattr(self.client, 'set_rate_limit'):
raise AttributeError(
"The HTTP client does not support dynamic rate limit configuration. "
"Ensure you're using SmartSurgeClient."
)
# Convert RequestMethod enum to string if needed
if isinstance(method, RequestMethod):
method = method.value
# Log the rate limit change
self.logger.info(
f"Setting rate limit for {method} {endpoint}: "
f"max_requests={max_requests}, time_period={time_period}s, cooldown={cooldown}s"
)
try:
# Pass through to SmartSurgeClient
self.client.set_rate_limit(
endpoint=endpoint,
method=method,
max_requests=max_requests,
time_period=time_period,
cooldown=cooldown
)
self.logger.debug(f"Rate limit successfully updated for {method} {endpoint}")
except Exception as e:
self.logger.error(f"Failed to set rate limit: {str(e)}")
raise
def set_global_rate_limit(self,
max_requests: int,
time_period: float,
cooldown: Optional[float] = None) -> None:
"""
Set a global rate limit for all common OpenRouter API endpoints.
This is a convenience method that applies the same rate limit to all
standard OpenRouter endpoints. For fine-grained control, use set_rate_limit().
Args:
max_requests (int): Maximum number of requests allowed per time period.
time_period (float): Time period in seconds for the rate limit.
cooldown (Optional[float]): Cooldown period in seconds after hitting the limit.
"""
# Common OpenRouter endpoints
common_endpoints = [
('/chat/completions', 'POST'),
('/completions', 'POST'),
('/models', 'GET'),
('/credits', 'GET'),
('/generation', 'GET'),
('/auth/key', 'GET'),
('/auth/keys', 'POST'),
('/keys', 'POST'),
]
self.logger.info(
f"Setting global rate limit: max_requests={max_requests}, "
f"time_period={time_period}s, cooldown={cooldown}s"
)
for endpoint, method in common_endpoints:
try:
self.set_rate_limit(
endpoint=endpoint,
method=method,
max_requests=max_requests,
time_period=time_period,
cooldown=cooldown
)
except Exception as e:
self.logger.warning(
f"Failed to set rate limit for {method} {endpoint}: {str(e)}"
)
def close(self) -> None:
"""
Close the HTTP manager and release resources.
"""
if hasattr(self.client, 'close'):
self.client.close()
self.logger.debug("HTTP manager closed and resources released")