88from typing import Optional , List , Dict , Callable , Any
99from socket import gaierror
1010from aiohttp import ClientConnectorError
11+ from asyncio import sleep
1112
1213import websockets as ws
14+ from websockets .exceptions import ConnectionClosedError
1315
1416from .client import AsyncClient
1517from .exceptions import BinanceWebsocketUnableToConnect
@@ -124,8 +126,8 @@ async def _read_loop(self):
124126 'e' : 'error' ,
125127 'm' : 'Max reconnect retries reached'
126128 }
127- else :
128- break
129+ # else:
130+ # break
129131 try :
130132 res = await asyncio .wait_for (self .ws .recv (), timeout = self .TIMEOUT )
131133 except asyncio .TimeoutError :
@@ -134,19 +136,35 @@ async def _read_loop(self):
134136 self ._log .debug (f"cancelled error { e } " )
135137 break
136138 except asyncio .IncompleteReadError as e :
137- self ._log .debug (f"incomplete read error { e } " )
139+ self ._log .debug (f"incomplete read error ({ e } )" )
140+ except ConnectionClosedError as e :
141+ self ._log .debug (f"connection close error ({ e } )" )
142+ try :
143+ await self ._reconnect ()
144+ except BinanceWebsocketUnableToConnect :
145+ await self ._queue .put ({
146+ 'e' : 'error' ,
147+ 'm' : 'Max reconnect retries reached'
148+ })
149+ return
138150 except Exception as e :
139- self ._log .debug (f"exception { e } " )
140- break
151+ self ._log .debug (f"Unknown exception ({ e } )" )
152+ if self .ws :
153+ self .ws .state = ws .protocol .State .CLOSED
141154 else :
142155 if self .ws_state in (WSListenerState .EXITING , WSListenerState .RECONNECTING ):
143156 break
144157 res = self ._handle_message (res )
145158 if self .ws_state in (WSListenerState .EXITING , WSListenerState .RECONNECTING ):
146159 break
147160
148- if res and self ._queue .qsize () < 100 :
149- await self ._queue .put (res )
161+ if res :
162+ if self ._queue .qsize () < 100 :
163+ await self ._queue .put (res )
164+ else :
165+ self ._log .debug ("Queue overflow. Message not filled" )
166+ else :
167+ self ._log .debug ("No messages" )
150168
151169 async def recv (self ):
152170 res = None
@@ -160,6 +178,7 @@ async def recv(self):
160178 async def _wait_for_reconnect (self ):
161179 while self .ws_state == WSListenerState .RECONNECTING :
162180 self ._log .debug ("reconnecting waiting for connect" )
181+ await sleep (10 ) # FIXME
163182 if not self .ws :
164183 self ._log .debug ("ignore message no ws" )
165184 else :
@@ -183,18 +202,21 @@ async def _reconnect(self):
183202 if self .ws_state == WSListenerState .RECONNECTING :
184203 return
185204 self .ws_state = WSListenerState .RECONNECTING
186- await self .before_reconnect ()
187- if self ._reconnects < self .MAX_RECONNECTS :
188- reconnect_wait = self ._get_reconnect_wait (self ._reconnects )
189- self ._log .debug (
190- f"websocket reconnecting { self .MAX_RECONNECTS - self ._reconnects } reconnects left - "
191- f"waiting { reconnect_wait } "
192- )
193- await asyncio .sleep (reconnect_wait )
194- await self .connect ()
195- else :
196- self ._log .error (f'Max reconnections { self .MAX_RECONNECTS } reached:' )
197- raise BinanceWebsocketUnableToConnect
205+ while True :
206+ await self .before_reconnect ()
207+ if self ._reconnects < self .MAX_RECONNECTS :
208+ reconnect_wait = self ._get_reconnect_wait (self ._reconnects )
209+ self ._log .debug (
210+ f"websocket reconnecting { self .MAX_RECONNECTS - self ._reconnects } reconnects left - "
211+ f"waiting { reconnect_wait } "
212+ )
213+ await asyncio .sleep (reconnect_wait )
214+ await self .connect ()
215+ if self .ws_state == WSListenerState .STREAMING :
216+ break
217+ else :
218+ self ._log .error (f'Max reconnections { self .MAX_RECONNECTS } reached:' )
219+ raise BinanceWebsocketUnableToConnect
198220
199221
200222class KeepAliveWebsocket (ReconnectingWebsocket ):
0 commit comments