Skip to content

Commit 4e5c6ca

Browse files
author
Sam McHardy
committed
Tidy up after merge
1 parent b6141c3 commit 4e5c6ca

File tree

2 files changed

+23
-24
lines changed

2 files changed

+23
-24
lines changed

binance/streams.py

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from typing import Optional, List, Dict, Callable, Any
1111

1212
import websockets as ws
13-
from aiohttp import ClientConnectorError
1413
from websockets.exceptions import ConnectionClosedError
1514

1615
from .client import AsyncClient
@@ -46,8 +45,7 @@ class ReconnectingWebsocket:
4645
MAX_QUEUE_SIZE = 100
4746

4847
def __init__(
49-
self, loop, url: str, path: Optional[str] = None, prefix: str = 'ws/', is_binary: bool = False,
50-
exit_coro=None
48+
self, loop, url: str, path: Optional[str] = None, prefix: str = 'ws/', is_binary: bool = False, exit_coro=None
5149
):
5250
self._loop = loop or asyncio.get_event_loop()
5351
self._log = logging.getLogger(__name__)
@@ -141,13 +139,12 @@ async def _read_loop(self):
141139

142140
elif self.ws_state == WSListenerState.STREAMING:
143141
res = await asyncio.wait_for(self.ws.recv(), timeout=self.TIMEOUT)
144-
print(".",flush=True,end="");
145142
res = self._handle_message(res)
146143
if res:
147144
if self._queue.qsize() < self.MAX_QUEUE_SIZE:
148145
await self._queue.put(res)
149146
else:
150-
self._log.debug("Queue overflow. Message not filled")
147+
self._log.debug(f"Queue overflow {self.MAX_QUEUE_SIZE}. Message not filled")
151148
await self._queue.put({
152149
'e': 'error',
153150
'm': 'Queue overflow. Message not filled'
@@ -166,6 +163,7 @@ async def _read_loop(self):
166163
except gaierror as e:
167164
self._log.debug(f"DNS Error ({e})")
168165
except BinanceWebsocketUnableToConnect as e:
166+
self._log.debug(f"BinanceWebsocketUnableToConnect ({e})")
169167
break
170168
except Exception as e:
171169
self._log.debug(f"Unknown exception ({e})")
@@ -223,11 +221,12 @@ def _no_message_received_reconnect(self):
223221
async def _reconnect(self):
224222
self._ws_state = WSListenerState.RECONNECTING
225223

224+
226225
class KeepAliveWebsocket(ReconnectingWebsocket):
227226

228227
def __init__(
229-
self, client: AsyncClient, loop, url, keepalive_type, prefix='ws/', is_binary=False, exit_coro=None,
230-
user_timeout=None
228+
self, client: AsyncClient, loop, url, keepalive_type, prefix='ws/', is_binary=False, exit_coro=None,
229+
user_timeout=None
231230
):
232231
super().__init__(loop=loop, path=None, url=url, prefix=prefix, is_binary=is_binary, exit_coro=exit_coro)
233232
self._keepalive_type = keepalive_type
@@ -292,7 +291,7 @@ async def _keepalive_socket(self):
292291
# Passing symbol for isolated margin
293292
await self._client.isolated_margin_stream_keepalive(self._keepalive_type, self._path)
294293
except Exception:
295-
pass # Ignore
294+
pass # Ignore
296295
finally:
297296
self._start_socket_timer()
298297

@@ -340,8 +339,8 @@ def _get_stream_url(self, stream_url: Optional[str] = None):
340339
return stream_url
341340

342341
def _get_socket(
343-
self, path: str, stream_url: Optional[str] = None, prefix: str = 'ws/', is_binary: bool = False,
344-
socket_type: BinanceSocketType = BinanceSocketType.SPOT
342+
self, path: str, stream_url: Optional[str] = None, prefix: str = 'ws/', is_binary: bool = False,
343+
socket_type: BinanceSocketType = BinanceSocketType.SPOT
345344
) -> str:
346345
conn_id = f'{socket_type}_{path}'
347346
if conn_id not in self._conns:
@@ -357,7 +356,7 @@ def _get_socket(
357356
return self._conns[conn_id]
358357

359358
def _get_account_socket(
360-
self, path: str, stream_url: Optional[str] = None, prefix: str = 'ws/', is_binary: bool = False
359+
self, path: str, stream_url: Optional[str] = None, prefix: str = 'ws/', is_binary: bool = False
361360
):
362361
conn_id = f'{BinanceSocketType.ACCOUNT}_{path}'
363362
if conn_id not in self._conns:
@@ -1178,9 +1177,9 @@ async def _stop_socket(self, conn_key):
11781177
class ThreadedWebsocketManager(ThreadedApiManager):
11791178

11801179
def __init__(
1181-
self, api_key: Optional[str] = None, api_secret: Optional[str] = None,
1182-
requests_params: Dict[str, str] = None, tld: str = 'com',
1183-
testnet: bool = False
1180+
self, api_key: Optional[str] = None, api_secret: Optional[str] = None,
1181+
requests_params: Dict[str, str] = None, tld: str = 'com',
1182+
testnet: bool = False
11841183
):
11851184
super().__init__(api_key, api_secret, requests_params, tld, testnet)
11861185
self._bsm: Optional[BinanceSocketManager] = None
@@ -1190,7 +1189,7 @@ async def _before_socket_listener_start(self):
11901189
self._bsm = BinanceSocketManager(client=self._client, loop=self._loop)
11911190

11921191
def _start_async_socket(
1193-
self, callback: Callable, socket_name: str, params: Dict[str, Any], path: Optional[str] = None
1192+
self, callback: Callable, socket_name: str, params: Dict[str, Any], path: Optional[str] = None
11941193
) -> str:
11951194
while not self._bsm:
11961195
time.sleep(0.1)
@@ -1201,7 +1200,7 @@ def _start_async_socket(
12011200
return path
12021201

12031202
def start_depth_socket(
1204-
self, callback: Callable, symbol: str, depth: Optional[str] = None, interval: Optional[int] = None
1203+
self, callback: Callable, symbol: str, depth: Optional[str] = None, interval: Optional[int] = None
12051204
) -> str:
12061205
return self._start_async_socket(
12071206
callback=callback,
@@ -1266,7 +1265,7 @@ def start_aggtrade_socket(self, callback: Callable, symbol: str) -> str:
12661265
)
12671266

12681267
def start_aggtrade_futures_socket(
1269-
self, callback: Callable, symbol: str, futures_type: FuturesType = FuturesType.USD_M
1268+
self, callback: Callable, symbol: str, futures_type: FuturesType = FuturesType.USD_M
12701269
) -> str:
12711270
return self._start_async_socket(
12721271
callback=callback,
@@ -1313,7 +1312,7 @@ def start_index_price_socket(self, callback: Callable, symbol: str, fast: bool =
13131312
)
13141313

13151314
def start_symbol_mark_price_socket(
1316-
self, callback: Callable, symbol: str, fast: bool = True, futures_type: FuturesType = FuturesType.USD_M
1315+
self, callback: Callable, symbol: str, fast: bool = True, futures_type: FuturesType = FuturesType.USD_M
13171316
) -> str:
13181317
return self._start_async_socket(
13191318
callback=callback,
@@ -1326,7 +1325,7 @@ def start_symbol_mark_price_socket(
13261325
)
13271326

13281327
def start_all_mark_price_socket(
1329-
self, callback: Callable, fast: bool = True, futures_type: FuturesType = FuturesType.USD_M
1328+
self, callback: Callable, fast: bool = True, futures_type: FuturesType = FuturesType.USD_M
13301329
) -> str:
13311330
return self._start_async_socket(
13321331
callback=callback,
@@ -1338,7 +1337,7 @@ def start_all_mark_price_socket(
13381337
)
13391338

13401339
def start_symbol_ticker_futures_socket(
1341-
self, callback: Callable, symbol: str, futures_type: FuturesType = FuturesType.USD_M
1340+
self, callback: Callable, symbol: str, futures_type: FuturesType = FuturesType.USD_M
13421341
) -> str:
13431342
return self._start_async_socket(
13441343
callback=callback,
@@ -1350,7 +1349,7 @@ def start_symbol_ticker_futures_socket(
13501349
)
13511350

13521351
def start_individual_symbol_ticker_futures_socket(
1353-
self, callback: Callable, symbol: str, futures_type: FuturesType = FuturesType.USD_M
1352+
self, callback: Callable, symbol: str, futures_type: FuturesType = FuturesType.USD_M
13541353
) -> str:
13551354
return self._start_async_socket(
13561355
callback=callback,
@@ -1405,7 +1404,7 @@ def start_options_multiplex_socket(self, callback: Callable, streams: List[str])
14051404
)
14061405

14071406
def start_futures_multiplex_socket(
1408-
self, callback: Callable, streams: List[str], futures_type: FuturesType = FuturesType.USD_M
1407+
self, callback: Callable, streams: List[str], futures_type: FuturesType = FuturesType.USD_M
14091408
) -> str:
14101409
return self._start_async_socket(
14111410
callback=callback,
@@ -1472,7 +1471,7 @@ def start_options_recent_trades_socket(self, callback: Callable, symbol: str) ->
14721471
)
14731472

14741473
def start_options_kline_socket(
1475-
self, callback: Callable, symbol: str, interval=AsyncClient.KLINE_INTERVAL_1MINUTE
1474+
self, callback: Callable, symbol: str, interval=AsyncClient.KLINE_INTERVAL_1MINUTE
14761475
) -> str:
14771476
return self._start_async_socket(
14781477
callback=callback,

docs/websockets.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ If the websocket is disconnected and is unable to reconnect, a message is sent t
207207
# check for it like so
208208
def process_message(msg):
209209
if msg['e'] == 'error':
210-
# close and _restart the socket
210+
# close and restart the socket
211211
else:
212212
# process message normally
213213

0 commit comments

Comments
 (0)