Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion saxo_openapi/saxo_openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import json
import requests
import logging
import time
from threading import Lock
from .exceptions import OpenAPIError


Expand Down Expand Up @@ -44,6 +46,37 @@ def mk_endpoint(endpoint, env, ep_type):
_endpoint)


class RateLimiter:
def __init__(self):
self.session_remaining = 120 # Default session limit
self.session_reset = 0 # Time when the session limit resets
self.lock = Lock()
self.LOW_REQUESTS_THRESHOLD = 10 # Threshold for adding extra delay

def update_limits(self, headers):
"""Update rate limits based on response headers"""
with self.lock:
if 'X-RateLimit-Session-Remaining' in headers:
self.session_remaining = int(headers['X-RateLimit-Session-Remaining'])
else:
self.session_remaining = 120
if 'X-RateLimit-Session-Reset' in headers:
self.session_reset = int(headers['X-RateLimit-Session-Reset'])
else:
self.session_reset = 0

def wait_if_needed(self):
"""Wait if we're close to hitting rate limits"""
with self.lock:
if self.session_remaining <= 1: # Leave a buffer of 1 request
wait_time = self.session_reset
logger.info(f"Rate limit near threshold. Waiting {wait_time} seconds")
time.sleep(wait_time)
elif self.session_remaining <= self.LOW_REQUESTS_THRESHOLD: # Low threshold
logger.info(f"Rate limit below {self.LOW_REQUESTS_THRESHOLD} ({self.session_remaining} remaining). Adding 1 second delay")
time.sleep(1)


class API(object):
r"""API - class to handle APIRequests objects to access API endpoints."""

Expand Down Expand Up @@ -101,6 +134,7 @@ def __init__(self,
self.client = requests.Session()
self.client.stream = False
self._request_params = request_params if request_params else {}
self.rate_limiter = RateLimiter()

# personal token authentication
if self.access_token:
Expand All @@ -126,15 +160,31 @@ def __request(self, method, url, request_args, headers=None, stream=False):
func = getattr(self.client, method)
headers = headers if headers else {}
response = None

# Check rate limits before making the request
self.rate_limiter.wait_if_needed()

logger.info("performing (%s) request %s", method, url)
try:
response = func(url, stream=stream, headers=headers,
**request_args)

# Update rate limits from response headers
self.rate_limiter.update_limits(response.headers)

except requests.RequestException as err:
logger.error("request %s failed [%s]", url, err)
raise err

# Handle error responses
# Handle rate limit errors specifically
if response.status_code == 429:
reset_time = int(response.headers.get('X-RateLimit-Session-Reset', 60))
logger.warning(f"Rate limit exceeded. Waiting {reset_time} seconds")
time.sleep(reset_time)
# Retry the request once after waiting
response = func(url, stream=stream, headers=headers, **request_args)

# Handle other error responses
if response.status_code >= 400:
logger.error("request %s failed [%d,%s]",
url,
Expand Down