Skip to content

Commit 99780de

Browse files
committed
Cleanup.
Use better rate limiting because Bitmex throttles aggressively.This needs to be tested so that when using many threads we can successfully get all the data the user wants. Fix type error in get_candles. Fix checks for pairs supported by exchanges.
1 parent 62ae76e commit 99780de

File tree

1 file changed

+64
-85
lines changed

1 file changed

+64
-85
lines changed

app.py

Lines changed: 64 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ class Config:
4646
# we can determine which ones shouldn't be supported i.e. populate
4747
# this list with exchanges that fail tests.
4848
_unsupported_exchanges = []
49-
_supported_exchanges: dict[str, ccxt.Exchange] = {}
49+
supported_exchanges: dict[str, ccxt.Exchange] = {}
5050

5151
def get_logger():
5252

5353
import logging
5454
logger = logging.getLogger(__name__)
55-
if _settings.debug: logger.setLevel(logging.DEBUG)
55+
if settings.debug: logger.setLevel(logging.DEBUG)
5656

5757
formatter = logging.Formatter(
5858
'[%(asctime)s] %(levelname)s (thread %(thread)d):\t%(module)s.%(funcName)s: %(message)s')
@@ -72,36 +72,36 @@ def get_logger():
7272

7373
return logger
7474

75-
_settings = Settings()
75+
settings = Settings()
7676

7777
# TODO(nochiel) Move this to the settings class.
7878
from enum import Enum
79-
CurrencyName = Enum('CurrencyName', [(currency, currency) for currency in _settings.currencies])
79+
CurrencyName = Enum('CurrencyName', [(currency, currency) for currency in settings.currencies])
8080

8181
logger = get_logger()
8282
assert logger
8383

84-
app = FastAPI(debug = _settings.debug)
84+
app = FastAPI(debug = settings.debug)
8585

86-
logger.debug(f'Using currencies: {_settings.currencies}')
87-
if not _settings.exchanges:
86+
logger.debug(f'Using currencies: {settings.currencies}')
87+
if not settings.exchanges:
8888
logger.info('using all exchanges.')
89-
_settings.exchanges = list(ccxt.exchanges)
89+
settings.exchanges = list(ccxt.exchanges)
9090

91-
assert _settings.exchanges
91+
assert settings.exchanges
9292

9393
logger.info('Initialising supported exchanges.')
94-
for e in _settings.exchanges:
94+
for e in settings.exchanges:
9595
if e in ccxt.exchanges and e not in _unsupported_exchanges:
96-
_supported_exchanges[e] = ccxt.__dict__[e]()
96+
supported_exchanges[e] = ccxt.__dict__[e]()
9797
try:
98-
if app.debug is False: _supported_exchanges[e].load_markets()
98+
if app.debug is False: supported_exchanges[e].load_markets()
9999
except Exception as e:
100100
logger.debug(f'Error loading markets for {e}.')
101101

102-
assert _supported_exchanges
102+
assert supported_exchanges
103103

104-
ExchangeName = Enum('ExchangeName', [(id.upper(), id) for id in _supported_exchanges])
104+
ExchangeName = Enum('ExchangeName', [(id.upper(), id) for id in supported_exchanges])
105105

106106
# Exchange data is sometimes returned as epoch milliseconds.
107107
def is_ms(timestamp): return timestamp % 1e3 == 0
@@ -134,12 +134,21 @@ def get_supported_pair_for(currency: CurrencyName, exchange: ccxt.Exchange) -> s
134134
result = ''
135135

136136
exchange.load_markets()
137-
market_ids = {f'BTC{currency.value}', f'XBT{currency.value}', f'BTC{currency.value}'.lower(), f'XBT{currency.value}'.lower()}
138-
market_ids_found = list((market_ids & exchange.markets_by_id.keys()))
137+
market_ids = {
138+
f'BTC{currency.value}',
139+
f'BTC/{currency.value}',
140+
f'XBT{currency.value}',
141+
f'XBT/{currency.value}',
142+
f'BTC{currency.value}'.lower(),
143+
f'XBT{currency.value}'.lower()}
144+
market_ids_found = list(market_ids & exchange.markets_by_id.keys())
145+
if not market_ids_found:
146+
market_ids_found = list(market_ids & set([market['symbol'] for market in exchange.markets_by_id.values()]))
139147
if market_ids_found:
148+
logger.debug(f'market_ids_found: {market_ids_found}')
140149
market_id = market_ids_found[0]
141-
market = exchange.markets_by_id[market_id]
142-
if market:
150+
market = exchange.markets_by_id[exchange.market_id(market_id)]
151+
if market:
143152
result = market['symbol']
144153
logger.debug(f'Found market {market}, with symbol {result}')
145154

@@ -164,7 +173,6 @@ def request_single(exchange: ccxt.Exchange, currency: CurrencyName) -> Candle |
164173
latest_candle = None
165174
dt = None
166175

167-
168176
if exchange.has['fetchOHLCV']:
169177
logger.debug('fetchOHLCV')
170178

@@ -261,8 +269,8 @@ def status(): return "server is running"
261269
@app.get('/api/configure')
262270
def get_configuration():
263271
return {
264-
'currencies': _settings.currencies,
265-
'exchanges': _settings.exchanges,
272+
'currencies': settings.currencies,
273+
'exchanges': settings.exchanges,
266274
}
267275

268276
def calculate_average_price(candles: list[Candle]) -> Candle:
@@ -302,14 +310,14 @@ async def get_exchanges():
302310

303311
def get_supported_currencies(exchange: ccxt.Exchange) -> list[str] :
304312

305-
required = set(_settings.currencies)
313+
required = set(settings.currencies)
306314
given = set(exchange.currencies.keys())
307315

308316
return list(required & given)
309317

310318
result: list[ExchangeDetails] = []
311319

312-
assert _supported_exchanges
320+
assert supported_exchanges
313321

314322
def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails:
315323

@@ -320,7 +328,7 @@ def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails:
320328

321329
currencies = []
322330
if exchange.currencies:
323-
currencies = [c for c in _settings.currencies
331+
currencies = [c for c in settings.currencies
324332
if c in exchange.currencies]
325333

326334
details = ExchangeDetails(
@@ -334,7 +342,7 @@ def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails:
334342
return result
335343

336344
tasks = [asyncio.to_thread(get_exchange_details, exchange)
337-
for exchange in _supported_exchanges.values()]
345+
for exchange in supported_exchanges.values()]
338346
details = await asyncio.gather(*tasks)
339347

340348
result = list(details)
@@ -356,7 +364,7 @@ async def now_average(currency: CurrencyName):
356364

357365
logger.debug(f'currency: {currency}')
358366

359-
def get_candle(exchange: ccxt.Exchange, currency: CurrencyName) -> tuple[ccxt.Exchange, Candle | None]:
367+
def get_candle(exchange: ccxt.Exchange, currency: CurrencyName = currency) -> tuple[ccxt.Exchange, Candle | None]:
360368
assert exchange
361369
assert currency
362370

@@ -365,16 +373,16 @@ def get_candle(exchange: ccxt.Exchange, currency: CurrencyName) -> tuple[ccxt.Ex
365373
if currency.value in exchange.currencies:
366374
try:
367375
candle = None
368-
candle = request_single(exchange, currency.value)
376+
candle = request_single(exchange, currency)
369377
if candle:
370378
result = exchange, candle
371379
except Exception as e:
372380
logger.error(f'error requesting data from exchange: {e}')
373381

374382
return result
375383

376-
tasks = [asyncio.to_thread(get_candle, exchange, currency)
377-
for exchange in _supported_exchanges.values()]
384+
tasks = [asyncio.to_thread(get_candle, exchange)
385+
for exchange in supported_exchanges.values()]
378386
task_results = await asyncio.gather(*tasks)
379387
logger.debug(f'task results: {task_results}')
380388

@@ -395,7 +403,7 @@ def get_candle(exchange: ccxt.Exchange, currency: CurrencyName) -> tuple[ccxt.Ex
395403
status_code = HTTPStatus.INTERNAL_SERVER_ERROR,
396404
detail = 'Spotbit could get any candle data from the configured exchanges.')
397405

398-
exchanges_used = [exchange.name for exchange in _supported_exchanges.values()
406+
exchanges_used = [exchange.name for exchange in supported_exchanges.values()
399407
if exchange.name not in failed_exchanges]
400408

401409
result = PriceResponse(
@@ -414,14 +422,14 @@ def now(currency: CurrencyName, exchange: ExchangeName):
414422
currency: the symbol for the base currency to use e.g. USD, GBP, UST.
415423
'''
416424

417-
if exchange.value not in _supported_exchanges:
425+
if exchange.value not in supported_exchanges:
418426
raise HTTPException(
419427
status_code = HTTPStatus.INTERNAL_SERVER_ERROR,
420428
detail = f'Spotbit is not configured to use {exchange.value} exchange.')
421429

422430
result = None
423431

424-
ccxt_exchange = _supported_exchanges[exchange.value]
432+
ccxt_exchange = supported_exchanges[exchange.value]
425433
assert ccxt_exchange
426434
ccxt_exchange.load_markets()
427435

@@ -465,31 +473,31 @@ def get_history(*,
465473

466474
_since = round(since.timestamp() * 1e3)
467475

468-
params = {}
476+
params = dict(end = None)
469477
if exchange == "bitfinex":
470-
params = {'end' : round(end.timestamp() * 1e3)}
478+
# TODO(nochiel) Is this needed?
479+
# params['end'] = round(end.timestamp() * 1e3)
480+
...
471481

472482
candles = None
473-
try:
474-
wait = exchange.rateLimit * 1e-3
475-
while wait:
476-
try:
477-
candles = exchange.fetchOHLCV(
478-
symbol = pair,
479-
limit = limit,
480-
timeframe = timeframe,
481-
since = _since,
482-
params = params)
483-
484-
wait = 0
483+
rate_limit = exchange.rateLimit * 1e-3
484+
wait = rate_limit
485+
while wait:
486+
try:
487+
candles = exchange.fetchOHLCV(
488+
symbol = pair,
489+
limit = limit,
490+
timeframe = timeframe,
491+
since = _since,
492+
params = params)
485493

486-
except ccxt.errors.RateLimitExceeded as e:
487-
logger.debug(f'{e}. Rate limit for {exchange} is {exchange.rateLimit}')
488-
time.sleep(wait)
489-
wait *= 2
494+
wait = 0
490495

491-
except Exception as e:
492-
logger.error(f'{exchange} candle request error: {e}')
496+
except (ccxt.errors.RateLimitExceeded, ccxt.errors.DDoSProtection) as e:
497+
logger.error(f'{exchange} candle request error: {e}')
498+
time.sleep(wait)
499+
wait *= rateLimit
500+
if wait > 60: wait = 0
493501

494502
if candles:
495503
result = [Candle(
@@ -518,7 +526,7 @@ async def get_candles_in_range(
518526
start, end(required): datetime formatted as ISO8601 "YYYY-MM-DDTHH:mm:SS" or unix timestamp.
519527
'''
520528

521-
ccxt_exchange = _supported_exchanges[exchange.value]
529+
ccxt_exchange = supported_exchanges[exchange.value]
522530
ccxt_exchange.load_markets()
523531
assert ccxt_exchange.currencies
524532
assert ccxt_exchange.markets
@@ -625,12 +633,13 @@ async def get_candles_at_dates(
625633
Dates should be provided in the body of the request as a json array of dates formatted as ISO8601 "YYYY-MM-DDTHH:mm:SS".
626634
'''
627635

628-
if exchange.value not in _supported_exchanges:
636+
result: list[Candle] = []
637+
if exchange.value not in supported_exchanges:
629638
raise HTTPException(
630639
detail = ServerErrors.EXCHANGE_NOT_SUPPORTED,
631640
status_code = HTTPStatus.INTERNAL_SERVER_ERROR)
632641

633-
ccxt_exchange = _supported_exchanges[exchange.value]
642+
ccxt_exchange = supported_exchanges[exchange.value]
634643
ccxt_exchange.load_markets()
635644

636645
pair = get_supported_pair_for(currency, ccxt_exchange)
@@ -671,36 +680,6 @@ async def get_candles_at_dates(
671680

672681
return result
673682

674-
def tests():
675-
# Placeholder
676-
# Expected: validation errors or server errors or valid responses.
677-
678-
import requests
679-
response = requests.get('http://[::1]:5000/api/now/FOOBAR')
680-
response = requests.get('http://[::1]:5000/api/now/usd')
681-
response = requests.get('http://[::1]:5000/api/now/USD')
682-
response = requests.get('http://[::1]:5000/api/now/JPY')
683-
684-
response = requests.get('http://[::1]:5000/api/now/USD/Bitstamp')
685-
response = requests.get('http://[::1]:5000/api/now/USD/bitstamp')
686-
response = requests.get('http://[::1]:5000/api/now/usdt/bitstamp')
687-
688-
response = requests.get(
689-
'http://[::1]:5000/api/history/USD/bitstamp?start=2019-01-01T0000&end=1522641600'
690-
)
691-
692-
response = requests.get(
693-
"http://[::1]:5000/api/history/USD/liquid?start=2022-01-01T00:00&end=2022-02-01T00:00"
694-
)
695-
696-
response = requests.post('http://[::1]:5000/history/USDT/binance',
697-
json = ['2022-01-01T00:00', '2022-02-01T00:00', '2021-12-01T00:00']
698-
)
699-
700-
response = requests.post(
701-
"http://[::1]:5000/api/history/JPY/liquid",
702-
json=["2022-01-01T00:00", "2022-02-01T00:00", "2021-12-01T00:00"],
703-
)
704683

705684
if __name__ == '__main__':
706685
import uvicorn

0 commit comments

Comments
 (0)